🗝
summary refs log tree commit diff
diff options
context:
space:
mode:
authormia <mia@mia.jetzt>2024-10-04 15:43:40 -0700
committermia <mia@mia.jetzt>2024-10-04 15:43:40 -0700
commit7e060e5cf2656a0a53d41ea0ff42b753316cd441 (patch)
tree2629f3d1e12d21c406974000dd195518aa5b6041
parentbb8a48fd4d85ba4f8224c68aaaf9069d5d79dae2 (diff)
downloadscrubber-7e060e5cf2656a0a53d41ea0ff42b753316cd441.tar.gz
scrubber-7e060e5cf2656a0a53d41ea0ff42b753316cd441.zip
she's goin
-rw-r--r--1_graph.py4
-rw-r--r--2_filter.py14
-rw-r--r--3_archive.py21
-rw-r--r--4_delete.py130
-rw-r--r--com.py8
-rw-r--r--conf_mia.py21
-rw-r--r--conf_pain.py13
-rwxr-xr-xgo.sh22
-rwxr-xr-xproxy.sh2
9 files changed, 192 insertions, 43 deletions
diff --git a/1_graph.py b/1_graph.py
index 824d723..e2de90d 100644
--- a/1_graph.py
+++ b/1_graph.py
@@ -1,5 +1,6 @@
 import json
 import sys
+import time
 from collections import namedtuple
 from functools import cache
 from pathlib import Path
@@ -26,6 +27,7 @@ cur = conn.execute(
     [user_id],
 )
 while rows := cur.fetchmany(0xFF):
+    time.sleep(0.0001)
     for row in rows:
         note_ids.add(row[0])
     if early_exit and len(note_ids) > early_exit:
@@ -34,6 +36,7 @@ while rows := cur.fetchmany(0xFF):
 
 @cache
 def get_note(id: str) -> Note:
+    time.sleep(0.0001)
     return Note(
         *conn.execute(
             'select "renoteId", "replyId", "userId" from note where id = %s', [id]
@@ -102,6 +105,7 @@ def traverse(tree: Tree):
 
 
 def expand(tree: Tree):
+    time.sleep(0.0001)
     for row in conn.execute(
         "select id from note_replies(%s, 1, 1000)", [tree.id]
     ).fetchall():
diff --git a/2_filter.py b/2_filter.py
index 8e77945..89311d2 100644
--- a/2_filter.py
+++ b/2_filter.py
@@ -1,19 +1,21 @@
+import time
 from dataclasses import dataclass
 from pathlib import Path
 from typing import Callable, List
 
 import psycopg
 
-from com import FilterableNote, Visibility, eval_config, parse_graph, progressbar
-
+from com import (FilterableNote, Visibility, eval_config, parse_graph,
+                 progressbar, FilterAction)
 
 config = eval_config()
 conn: psycopg.Connection = config["connect"]()
-criteria: Callable[[FilterableNote], bool] = config["criteria"]
+criteria: Callable[[FilterableNote], FilterAction] = config["criteria"]
 
 intermediate = parse_graph()
 
 def transform(entry: dict) -> FilterableNote:
+    time.sleep(0.0001)
     note = conn.execute(
         'select "createdAt", reactions, "renoteCount", visibility from note where id = %s',
         [entry["id"]],
@@ -57,10 +59,10 @@ for entry in intermediate.values():
     transformed = transform(entry)
     if transformed is None:
         continue # we'll get to it next cycle
-    if criteria(transformed):
-        targets.append(entry["id"])
+    action = criteria(transformed)
+    if action != FilterAction.Ignore:
+        targets.append(f"{entry['id']} {action.value}")
     pb.increment()
 pb.finish()
 
-
 Path("filtered.list").write_text("\n".join(targets))
diff --git a/3_archive.py b/3_archive.py
index 6eef0e1..39affdd 100644
--- a/3_archive.py
+++ b/3_archive.py
@@ -1,4 +1,5 @@
 import json
+import time
 from http.client import HTTPResponse
 from pathlib import Path
 from shutil import copyfileobj
@@ -16,11 +17,13 @@ conn: psycopg.Connection = config["connect"]()
 graph = parse_graph()
 print("reading filterlist")
 filtered = Path("filtered.list").read_text().strip().splitlines()
+filtered = list(map(lambda line: line.split(' ')[0], filtered))
 
 collected_users = {}
 def collect_user(id: str):
     if id in collected_users:
         return
+    time.sleep(0.001)
     user = conn.execute('select username, host, "avatarUrl" from "user" where id = %s', [id]).fetchone()
     if user is None:
         return None
@@ -44,10 +47,11 @@ def collect_note(id: str):
     output = {}
     output["id"] = id
 
-    note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds" from note where id = %s', [id]).fetchone()
+    time.sleep(0.001)
+    note = conn.execute('select text, "userId", "createdAt", "updatedAt", reactions, "renoteCount", visibility, "fileIds", cw from note where id = %s', [id]).fetchone()
     if note is None:
         return None
-    text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids = note
+    text, user_id, created_at, updated_at, reactions, renotes, visibility, file_ids, cw = note
     collect_user(user_id)
 
     output["text"] = text
@@ -59,6 +63,7 @@ def collect_note(id: str):
     output["reactions"] = reactions
     output["renotes"] = renotes
     output["visibility"] = Visibility.from_db(visibility).code()
+    output["cw"] = cw
 
     node = graph[id]
     replies = [collect_note(reply) for reply in node["replies"]]
@@ -68,6 +73,7 @@ def collect_note(id: str):
 
     output["attachments"] = []
     for file_id in file_ids:
+        time.sleep(0.0005)
         name, type_, comment, url = conn.execute('select name, type, comment, url from drive_file where id = %s', [file_id]).fetchone()
         attachment = {
             "id": file_id,
@@ -117,13 +123,15 @@ pb = progressbar.ProgressBar(
 )
 
 for id, note in collected_notes:
-    outfile = outdir / "note" / f"{id}.mpk.br"
+    outfile = outdir / "note" / id[:3] / f"{id[3:]}.mpk.br"
+    outfile.parent.mkdir(exist_ok=True)
     with outfile.open("wb") as f:
         f.write(brotli.compress(msgpack.dumps(note)))
     pb.increment()
 
 for id, user in collected_users.items():
-    outfile = outdir / "user" / f"{id}.mpk.br"
+    outfile = outdir / "user" / id[:2] / f"{id[2:]}.mpk.br"
+    outfile.parent.mkdir(exist_ok=True)
     with outfile.open("wb") as f:
         f.write(brotli.compress(msgpack.dumps(note)))
     pb.increment()
@@ -134,8 +142,9 @@ pb = progressbar.ProgressBar(
     len(files_to_collect),
     prefix="downloading attachments ",
 )
-for (id, url) in files_to_collect: 
-    outfile = outdir / "file" / id
+for (id, url) in files_to_collect:
+    outfile = outdir / "file" / id[:2] / id[2:]
+    outfile.parent.mkdir(exist_ok=True)
     response: HTTPResponse = urlopen(url)
     with outfile.open("wb") as f:
         copyfileobj(response, f)
diff --git a/4_delete.py b/4_delete.py
index 51e1ef3..615fbab 100644
--- a/4_delete.py
+++ b/4_delete.py
@@ -1,9 +1,11 @@
+import sys
+import time
 from pathlib import Path
 
 import httpx
 import psycopg
 
-from com import eval_config, parse_graph, progressbar
+from com import eval_config, parse_graph, progressbar, FilterAction
 
 config = eval_config()
 conn: psycopg.Connection = config["connect"]()
@@ -13,21 +15,129 @@ api: str = config["api"]
 graph = parse_graph()
 print("reading filterlist")
 filtered = Path("filtered.list").read_text().strip().splitlines()
+filtered = list(map(lambda line: line.split(' '), filtered))
 
+print("building queue")
 queue = []
 
-def enqueue(note):
+def enqueue(note, action):
     for reply in note["replies"]:
-        enqueue(graph[reply])
+        enqueue(graph[reply], action)
     for quote in note["quotes"]:
-        enqueue(graph[quote])
+        enqueue(graph[quote], action)
     if "self" in note["flags"]:
-        files = conn.execute('select "fileIds" from note where id = %s', [note["id"]]).fetchone()[0]
-        queue.append((note["id"], files))
+        queue.append((note["id"], action))
 
-for id in filtered:
-    enqueue(graph[id])
+for id, action in filtered:
+    enqueue(graph[id], FilterAction(action))
 
-print(queue)
+class CustomETA(progressbar.ETA):
+    def __init__(self, *args, **kwargs):
+        self.history = []
+        self.lastval = None
+        progressbar.ETA.__init__(self, *args, **kwargs)
 
-# client = httpx.Client()
+    def _calculate_eta(self, progress, data, value, elapsed):
+        if self.lastval != value:
+            self.history = [*self.history[-9:], elapsed.total_seconds()]
+            self.lastval = value
+        per_item = (self.history[-1] - self.history[0]) / len(self.history)
+        remaining = (progress.max_value - value) * per_item
+        spent = elapsed.total_seconds() - self.history[-1]
+        return max(remaining - spent, 0)
+
+pb = progressbar.ProgressBar(
+    0,
+    len(queue),
+    widgets=[
+        progressbar.Variable("message", format="{formatted_value}"),
+        " ",
+        progressbar.Percentage(),
+        " ",
+        progressbar.Bar(),
+        " ",
+        progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
+        " ",
+        CustomETA(),
+    ],
+    variables={"status": "work"}
+)
+pb.update(0) # force initial display
+client = httpx.Client(timeout=60)
+seeking = False
+last_req = 0
+
+for note, action in queue:
+
+    # seek through queue
+    # helps prevent rate limits on resumed deletions
+    if seeking:
+        while True:
+            resp = client.post(f"{api}/notes/show", json={
+                "i": token,
+                "noteId": note,
+            })
+            if resp.status_code == 502:
+                pb.update(message="down")
+                time.sleep(1)
+                continue
+            break
+        if resp.status_code == 404:
+            pb.increment(message="seeking")
+            continue
+        seeking = False
+
+    # wait for queue to empty
+    while True:
+        resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
+        if resp.status_code == 502:
+            pb.update(message="down")
+            time.sleep(1)
+            continue
+        deliver_waiting = resp.json()["deliver"]["waiting"]
+        obliterate_waiting = resp.json()["obliterate"]["waiting"]
+        obliterate_delayed = resp.json()["obliterate"]["delayed"]
+        if deliver_waiting < 100 and obliterate_waiting + obliterate_delayed< 50000:
+            break
+        pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting + obliterate_delayed})")
+        time.sleep(10)
+
+    # prevent api rate limiting
+    req_delay = time.time() - last_req
+    if req_delay < 15:
+        pb.update(message="delaying")
+        time.sleep(req_delay)
+
+    # queue new deletions
+    err = 0
+    while True:
+        resp = client.post(f"{api}/notes/delete", json={
+            "i": token,
+            "noteId": note,
+            "obliterate": action == FilterAction.Obliterate,
+        })
+        if resp.status_code == 429:
+            pb.update(status="limit")
+            time.sleep(1)
+            continue
+        elif resp.status_code == 502:
+            pb.update(status="down")
+            continue
+            time.sleep(1)
+        elif resp.status_code >= 400:
+            body = resp.json()
+            if body["error"]["code"] == "NO_SUCH_NOTE":
+                pb.increment(message="seeking")
+                seeking = True
+                break
+            err += 1
+            if err > 10:
+                raise Exception(f"{body['error']['code']}: {body['error']['message']}")
+            sys.stdout.write("\r")
+            print(f"err {body['error']['code']} {body['error']['message']} ")
+            time.sleep(30)
+        pb.increment(message="deleting")
+        last_req = time.time()
+        break
+
+pb.finish()
diff --git a/com.py b/com.py
index 4ceb849..3ebb948 100644
--- a/com.py
+++ b/com.py
@@ -25,7 +25,7 @@ class Visibility(Enum):
             case "followers": return cls.followers
             case "specified": return cls.direct
             case _: raise ValueError(f"unknown visibility `{raw}`")
-    
+
     def code(self) -> str:
         match self:
             case self.public: return "p"
@@ -76,6 +76,12 @@ class FilterableNote:
         }
 
 
+class FilterAction(Enum):
+    Ignore = 'ignore'
+    Delete = 'delete'
+    Obliterate = 'obliterate'
+
+
 def eval_config() -> dict:
     print("configuring")
     config = {}
diff --git a/conf_mia.py b/conf_mia.py
index 6496e3b..a32255f 100644
--- a/conf_mia.py
+++ b/conf_mia.py
@@ -1,18 +1,18 @@
 import math
 from datetime import UTC, datetime, timedelta
 
-from com import FilterableNote, Visibility
+from com import FilterableNote, Visibility, FilterAction
 from sec import connect, tokens
 
 user_id = "9gf2ev4ex5dflllo"
 token = tokens["mia"]
-api = "https://void.rehab/api/"
+api = "https://void.rehab/api"
 early_exit = 0xFFF
 
 now = datetime.now(UTC)
 threshold = 0.1
 
-def criteria(root: FilterableNote) -> bool:
+def criteria(root: FilterableNote) -> FilterAction:
     thread = root.thread()
     thread_self = root.thread_self()
 
@@ -24,13 +24,13 @@ def criteria(root: FilterableNote) -> bool:
         # ...and the dms are younger than two months...
         if now - most_recent_direct.when < timedelta(days=30 * 2):
             # ...do not delete the thread
-            return False
+            return FilterAction.Ignore
 
     # get the most recent post...
     others_recency = max(thread, key=lambda note: note.when)
     # ...and bail if it's too new
     if now - others_recency.when < timedelta(days=14):
-        return False
+        return FilterAction.Ignore
 
     # get my...
     most_recent_post = max(thread_self, key=lambda note: note.when) # ...most recent post...
@@ -43,4 +43,13 @@ def criteria(root: FilterableNote) -> bool:
     # ...weigh it...
     weighted_score = high_score / math.sqrt(most_recent_age.days)
     # ...and check it against a threshold
-    return weighted_score < threshold
+    if weighted_score < threshold:
+        if any(map(
+            lambda note: note.visibility in [Visibility.public, Visibility.unlisted],
+            thread_self,
+        )):
+            return FilterAction.Obliterate
+        else:
+            return FilterAction.Delete
+    else:
+        return FilterAction.Ignore
diff --git a/conf_pain.py b/conf_pain.py
index 85e7095..9690de1 100644
--- a/conf_pain.py
+++ b/conf_pain.py
@@ -1,14 +1,17 @@
 import math
 from datetime import UTC, datetime, timedelta
 
-from com import FilterableNote
+from com import FilterableNote, FilterAction
 from sec import connect, tokens
 
 user_id = "9gszslkcdfnomssj"
 token = tokens["pain"]
-api = "https://void.rehab/api/"
+api = "https://void.rehab/api"
 
-def criteria(root: FilterableNote) -> bool:
+def criteria(root: FilterableNote) -> (bool, FilterAction):
     # if it's more than two months old, delete
-    # return (datetime.now(UTC) - root.when).days > 60
-    return (datetime.now(UTC) - root.when).days > (12 * 30)
+    latest = max(map(lambda note: note.when, root.thread_self()))
+    if (datetime.now(UTC) - latest).days > 60:
+        return FilterAction.Obliterate
+    else:
+        return FilterAction.Ignore
diff --git a/go.sh b/go.sh
index 39f3779..169d025 100755
--- a/go.sh
+++ b/go.sh
@@ -1,13 +1,19 @@
-#!/bin/sh
+#!/bin/bash
 
-set -ex
+set -e
+
+if [[ "$1" = "" ]]; then
+  echo missing name
+  exit
+fi
 
-test -f graph.db && rm graph.db
-test -f filtered.list && rm filtered.list
 test -d out && rm -r out
+
+set -x
 python3 1_graph.py conf_$1.py
 python3 2_filter.py conf_$1.py
-# python3 3_archive.py conf_$1.py
-# echo uploading to memorial
-# rsync -r -e 'ssh -p23' --progress out/ memorial:fediverse/$1/
-# python3 4_delete.py conf_$1.py
+python3 3_archive.py conf_$1.py
+rsync -r -e 'ssh -p23' --progress out/file/ memorial:fediverse/$1/file/
+rsync -r -e 'ssh -p23' --progress --ignore-existing out/note/ memorial:fediverse/$1/note/
+rsync -r -e 'ssh -p23' --progress out/user/ memorial:fediverse/$1/user/
+python3 4_delete.py conf_$1.py
diff --git a/proxy.sh b/proxy.sh
index 9628fab..1689ab2 100755
--- a/proxy.sh
+++ b/proxy.sh
@@ -1,2 +1,2 @@
 #!/bin/sh
-exec ssh -NL 5432:localhost:5432 vr
+exec ssh -NL 54321:localhost:5432 vr