1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
import sys
import time
from pathlib import Path
import httpx
import psycopg
from com import eval_config, parse_graph, progressbar, FilterAction
config = eval_config()
conn: psycopg.Connection = config["connect"]()
token: str = config["token"]
api: str = config["api"]
graph = parse_graph()
print("reading filterlist")
filtered = Path("filtered.list").read_text().strip().splitlines()
filtered = list(map(lambda line: line.split(' '), filtered))
print("building queue")
queue = []
def enqueue(note, action):
for reply in note["replies"]:
enqueue(graph[reply], action)
for quote in note["quotes"]:
enqueue(graph[quote], action)
if "self" in note["flags"]:
queue.append((note["id"], action))
for id, action in filtered:
enqueue(graph[id], FilterAction(action))
class CustomETA(progressbar.ETA):
def __init__(self, *args, **kwargs):
self.history = []
self.lastval = None
progressbar.ETA.__init__(self, *args, **kwargs)
def _calculate_eta(self, progress, data, value, elapsed):
if self.lastval != value:
self.history = [*self.history[-9:], elapsed.total_seconds()]
self.lastval = value
per_item = (self.history[-1] - self.history[0]) / len(self.history)
remaining = (progress.max_value - value) * per_item
spent = elapsed.total_seconds() - self.history[-1]
return max(remaining - spent, 0)
pb = progressbar.ProgressBar(
0,
len(queue),
widgets=[
progressbar.Variable("message", format="{formatted_value}"),
" ",
progressbar.Percentage(),
" ",
progressbar.Bar(),
" ",
progressbar.SimpleProgress("%(value_s)s/%(max_value_s)s"),
" ",
CustomETA(),
],
variables={"status": "work"}
)
pb.update(0) # force initial display
client = httpx.Client(timeout=60)
seeking = False
last_req = 0
for note, action in queue:
# seek through queue
# helps prevent rate limits on resumed deletions
if seeking:
while True:
resp = client.post(f"{api}/notes/show", json={
"i": token,
"noteId": note,
})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
break
if resp.status_code == 404:
pb.increment(message="seeking")
continue
seeking = False
# wait for queue to empty
while True:
resp = client.post(f"{api}/admin/queue/stats", json={"i": token})
if resp.status_code == 502:
pb.update(message="down")
time.sleep(1)
continue
deliver_waiting = resp.json()["deliver"]["waiting"]
obliterate_waiting = resp.json()["obliterate"]["waiting"]
obliterate_delayed = resp.json()["obliterate"]["delayed"]
if deliver_waiting < 100 and obliterate_waiting + obliterate_delayed< 50000:
break
pb.update(message=f"queue ({deliver_waiting}/{obliterate_waiting + obliterate_delayed})")
time.sleep(10)
# prevent api rate limiting
req_delay = time.time() - last_req
if req_delay < 15:
pb.update(message="delaying")
time.sleep(req_delay)
# queue new deletions
err = 0
while True:
resp = client.post(f"{api}/notes/delete", json={
"i": token,
"noteId": note,
"obliterate": action == FilterAction.Obliterate,
})
if resp.status_code == 429:
pb.update(status="limit")
time.sleep(1)
continue
elif resp.status_code == 502:
pb.update(status="down")
continue
time.sleep(1)
elif resp.status_code >= 400:
body = resp.json()
if body["error"]["code"] == "NO_SUCH_NOTE":
pb.increment(message="seeking")
seeking = True
break
err += 1
if err > 10:
raise Exception(f"{body['error']['code']}: {body['error']['message']}")
sys.stdout.write("\r")
print(f"err {body['error']['code']} {body['error']['message']} ")
time.sleep(30)
pb.increment(message="deleting")
last_req = time.time()
break
pb.finish()
|