🗝
summary refs log tree commit diff
path: root/4_delete.py
blob: 0e47e52c3038792bf0c4704cb0ee1d20d1b6df64 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import sys
import time
from pathlib import Path

import httpx
import psutil
import psycopg

from com import FilterAction, eval_config, parse_graph, progressbar

config = eval_config()
conn: psycopg.Connection = config["connect"]()
token: str = config["token"]
api: str = config["api"]

graph = parse_graph()
print("reading filterlist")
filtered = Path("filtered.list").read_text().strip().splitlines()
filtered = list(map(lambda line: line.split(' '), filtered))

print("building queue")
queue = []

def enqueue(note, action):
    for reply in note["replies"]:
        enqueue(graph[reply], action)
    for quote in note["quotes"]:
        enqueue(graph[quote], action)
    if "self" in note["flags"]:
        queue.append((note["id"], action))

for id, action in filtered:
    enqueue(graph[id], FilterAction(action))

class CustomETA(progressbar.ETA):
    def __init__(self, *args, **kwargs):
        self.history = []
        self.lastval = None
        progressbar.ETA.__init__(self, *args, **kwargs)

    def _calculate_eta(self, progress, data, value, elapsed):
        if self.lastval != value:
            self.history = [*self.history[-9:], elapsed.total_seconds()]
            self.lastval = value
        per_item = (self.history[-1] - self.history[0]) / len(self.history)
        remaining = (progress.max_value - value) * per_item
        spent = elapsed.total_seconds() - self.history[-1]
        return max(remaining - spent, 0)

pb = progressbar.ProgressBar(
    0,
    len(queue),
    widgets=[
        progressbar.Variable("message", format="{formatted_value}"),
        " ",
        progressbar.Percentage(),
        " ",
        progressbar.Bar(),
        " ",
        progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
        " ",
        CustomETA(),
    ],
    variables={"status": "work"}
)
pb.update(0) # force initial display
client = httpx.Client(timeout=60)
seeking = False
last_req = 0

for note, action in queue:

    # seek through queue
    # helps prevent rate limits on resumed deletions
    if seeking:
        while True:
            resp = client.post(f"{api}/notes/show", json={
                "i": token,
                "noteId": note,
            })
            if resp.status_code == 502:
                pb.update(message="down")
                time.sleep(1)
                continue
            break
        if resp.status_code == 404:
            pb.increment(message="seeking")
            continue
        seeking = False

    # wait for queue to empty
    while True:
        resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
        if resp.status_code == 502:
            pb.update(message="down")
            time.sleep(1)
            continue
        Path('queue-stats.dump').write_text(f"status:{resp.status_code}\nbody:\n{resp.text}")
        deliver_waiting = resp.json()["deliver"]["waiting"]
        obliterate_waiting = resp.json()["obliterate"]["waiting"]
        if deliver_waiting < 100 and obliterate_waiting < 50000:
            break
        pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting})")
        time.sleep(10)

    # make sure there's enough memory for new jobs
    while True:
        vmem = psutil.virtual_memory()
        if vmem.available > (512 * 1024 * 1024):
            break
        pb.update(message="memory")
        time.sleep(10)

    # prevent api rate limiting
    req_delay = time.time() - last_req
    if req_delay < 30:
        pb.update(message="delaying")
        time.sleep(req_delay)

    # queue new deletions
    err = 0
    while True:
        resp = client.post(f"{api}/notes/delete", json={
            "i": token,
            "noteId": note,
            "obliterate": action == FilterAction.Obliterate,
        })
        if resp.status_code == 429:
            pb.update(status="limit")
            time.sleep(1)
            continue
        elif resp.status_code == 502:
            pb.update(status="down")
            time.sleep(1)
            continue
        elif resp.status_code >= 400:
            body = resp.json()
            if body["error"]["code"] == "NO_SUCH_NOTE":
                pb.increment(message="seeking")
                seeking = True
                break
            elif body["error"]["code"] == "QUEUE_FULL":
                print("\nobliterate queue overflowed, exiting to save server")
                break
            err += 1
            if err > 3:
                raise Exception(f"{body['error']['code']}: {body['error']['message']}")
            sys.stdout.write("\r")
            print(f"err {body['error']['code']} {body['error']['message']} ")
            time.sleep(30)
        pb.increment(message="deleting")
        last_req = time.time()
        break

pb.finish()