Every so often, maybe annually, I go through my E6 favorites and upvotes and sort of... 'review' them. Make sure that there's no accidental marks or check for posts I just don't enjoy anymore.
I figured I'd share the scripts I've been using for this. Or in case anyone happens to have a use-case for this kind of thing.
Generally my method is:
- Move favorites and upvotes into a set (E6-FavUp-List.py)
- Clear favorites and upvotes (E6-FavUp-Purge.py)
- Go through the set, re-adding favorites and upvotes (manually)
- Remove posts from set (E6-FavUp-PurgeSet.py)
E6-FavUp-List.py
Instructions:
- Run the script with python.
- If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
- Files will be created with a list of post ids, separated by favorites and upvotes, each id on a new line.
- Sets are limited to 10k posts, so individual files will be created for every 10k ids found. Two master files will contain the full list.
- Add lists to your desired set
- Optional: If using one set for both favorites and upvotes, put the lists into ListDiff to remove duplicates (cleaned list is: 'A ∪ B')
- Recommended: Use find and replace to switch from new-line delimited to space delimited.
Code:
import requests
from requests.auth import HTTPBasicAuth
import time
import os
TEST_MODE = False
TEST_RANGE = 30001
TEST_IDS = list(range(1, TEST_RANGE + 1))
this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")
output_folder = os.path.join(this_folder, "output")
os.makedirs(output_folder, exist_ok=True)
posts_url = "https://e621.net/posts.json"
headers = {"User-Agent": "E6-FavUp-List/1.2 (by DirtyDerg on e621)"}
base_params = {"limit": 200}
rate_limit_seconds = 1
last_request_time = time.time()
def rate_limit():
global last_request_time
elapsed = time.time() - last_request_time
if elapsed < rate_limit_seconds:
time.sleep(rate_limit_seconds - elapsed)
last_request_time = time.time()
try:
with open(apikey_file_path, "r", encoding="utf-8") as api_file:
api_data = api_file.read().splitlines()
except FileNotFoundError:
with open(apikey_file_path, "a", encoding="utf-8") as api_file:
api_file.write("username=\napi_key=")
print("apikey.txt created. Add your credentials and rerun.")
raise SystemExit(1)
api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()
session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)
def get_all_ids_for_tag(tag_expr):
"""Return list of post IDs for a given tag query."""
params = dict(base_params)
params["tags"] = tag_expr
post_ids = []
lowest_id = -1
stop = False
print(f"Fetching post IDs for: {tag_expr!r}...")
while not stop:
rate_limit()
response = session.get(posts_url, headers=headers, params=params, auth=auth)
if response.status_code != 200:
print(f"ERROR {response.status_code}: {response.text}")
break
data = response.json()
posts = data.get("posts", [])
if not posts:
break
for post in posts:
pid = post["id"]
post_ids.append(pid)
if pid < lowest_id or lowest_id == -1:
lowest_id = pid
if len(posts) < params["limit"]:
stop = True
else:
params["page"] = f"b{lowest_id}"
print(f"Fetched {len(post_ids)} IDs so far...")
print(f"Retrieved {len(post_ids)} posts for {tag_expr!r}.")
return post_ids
def write_ids_to_file(ids, base_filename):
total = len(ids)
if not ids:
print(f"No IDs to write for {base_filename}.")
return
master_path = os.path.join(output_folder, f"{base_filename}_ids.txt")
write_ids_to_file_chunk(ids, master_path)
chunk_size = 10000
for i in range(0, total, chunk_size):
chunk = ids[i:i + chunk_size]
chunk_path = os.path.join(output_folder, f"{base_filename}_ids_chunk_{i // chunk_size + 1:03d}.txt")
write_ids_to_file_chunk(chunk, chunk_path)
print(f"[{i + len(chunk):>6}/{total}] Wrote chunk → {chunk_path}")
print(f"{base_filename.capitalize()} list complete ({total} IDs total).")
def write_ids_to_file_chunk(ids, filepath):
with open(filepath, "w", encoding="utf-8") as f:
f.write("\n".join(map(str, ids)))
if __name__ == "__main__":
if TEST_MODE:
print(f"TEST MODE: Using {TEST_RANGE} simulated IDs.")
favorite_ids, upvote_ids = TEST_IDS.copy(), TEST_IDS.copy()
else:
favorite_ids = get_all_ids_for_tag(f"fav:{api_username}")
upvote_ids = get_all_ids_for_tag(f"votedup:{api_username}")
write_ids_to_file(favorite_ids, "favorites")
write_ids_to_file(upvote_ids, "upvotes")
print("Done.")
E6-FavUp-Purge.py
Instructions:
- Script requires E6-FavUp-List.py to be run first, to generate the master id lists.
- After those files have been generated, Run the script with python.
- If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
- Optional: If you don't want to clear out a certain type, comment out the section in if __name__ == "__main__": with #
- The script will go through and remove posts from favorites first, then upvotes.
- This will take a while, removing one post per second to abide by API rate limits.
Code:
import requests
from requests.auth import HTTPBasicAuth
import time
import os
this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")
output_folder = os.path.join(this_folder, "output")
favorites_file = os.path.join(output_folder, "favorites_ids.txt")
upvotes_file = os.path.join(output_folder, "upvotes_ids.txt")
fav_url_fmt = "https://e621.net/favorites/{}.json"
vote_url_fmt = "https://e621.net/posts/{}/votes.json"
headers = {"User-Agent": "E6-FavUp-Purge/1.2 (by DirtyDerg on e621)"}
rate_limit_seconds = 1
last_request_time = time.time()
def rate_limit():
global last_request_time
elapsed = time.time() - last_request_time
if elapsed < rate_limit_seconds:
time.sleep(rate_limit_seconds - elapsed)
last_request_time = time.time()
def read_ids(path):
if not os.path.isfile(path):
print(f"Missing file: {path}")
return []
with open(path, "r", encoding="utf-8") as f:
text = f.read().strip()
if not text:
return []
ids = []
for token in text.split():
try:
ids.append(int(token))
except ValueError:
print(f"Invalid token in {path!r}: {token!r}")
return ids
try:
with open(apikey_file_path, "r", encoding="utf-8") as api_file:
api_data = api_file.read().splitlines()
except FileNotFoundError:
with open(apikey_file_path, "a", encoding="utf-8") as api_file:
api_file.write("username=\napi_key=")
print("apikey.txt created. Add your credentials and rerun.")
raise SystemExit(1)
api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()
session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)
def clear_favorites(ids):
total = len(ids)
print(f"Clearing {total} favorites...")
for i, pid in enumerate(ids, start=1):
rate_limit()
response = session.delete(fav_url_fmt.format(pid), headers=headers, auth=auth)
if response.status_code in (200, 204):
print(f"[{i}/{total}] Unfavorited post {pid}")
else:
print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")
def clear_upvotes(ids):
total = len(ids)
print(f"Clearing {total} upvotes...")
for i, pid in enumerate(ids, start=1):
rate_limit()
response = session.delete(vote_url_fmt.format(pid), headers=headers, auth=auth)
if response.status_code in (200, 204):
print(f"[{i}/{total}] Cleared vote for post {pid}")
else:
print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")
if __name__ == "__main__":
# Comment with '#' to skip either favorites or upvotes clearing
favorite_ids = read_ids(favorites_file)
if favorite_ids:
clear_favorites(favorite_ids)
else:
print("No favorites found.")
upvote_ids = read_ids(upvotes_file)
if upvote_ids:
clear_upvotes(upvote_ids)
else:
print("No upvotes found.")
print("Done.")
E6-FavUp-PurgeSet.py
Instructions:
- Change SET_ID to the ID of your set (i.e. 60450, NOT the shortname)
- Set can be either private or public, but you must be the owner (maybe a maintainer, I haven't checked)
- Run the script with python.
- If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
- The script will go through and check if any of your favorites/upvotes exist in the set your chose.
- If a post exists in both, it will remove it from the set.
Code:
import requests
from requests.auth import HTTPBasicAuth
import time
import os
SET_ID = SET_ID # Change to set ID, not shortname
this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")
posts_url = "https://e621.net/posts.json"
set_url = f"https://e621.net/post_sets/{SET_ID}.json"
remove_posts_url = f"https://e621.net/post_sets/{SET_ID}/remove_posts.json"
headers = {"User-Agent": "E6-FavUp-SetClean/1.2 (by DirtyDerg on e621)"}
base_params = {"limit": 200}
rate_limit_seconds = 1
last_request_time = time.time()
def rate_limit():
global last_request_time
elapsed = time.time() - last_request_time
if elapsed < rate_limit_seconds:
time.sleep(rate_limit_seconds - elapsed)
last_request_time = time.time()
try:
with open(apikey_file_path, "r", encoding="utf-8") as api_file:
api_data = api_file.read().splitlines()
except FileNotFoundError:
with open(apikey_file_path, "a", encoding="utf-8") as api_file:
api_file.write("username=\napi_key=")
print("apikey.txt created. Add your credentials and rerun.")
raise SystemExit(1)
api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()
session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)
def get_all_ids_for_tag(tag_expr):
params = dict(base_params)
params["tags"] = tag_expr
post_ids = []
lowest_id = -1
stop = False
print(f"Fetching post IDs for: {tag_expr!r}...")
while not stop:
rate_limit()
response = session.get(posts_url, headers=headers, params=params, auth=auth)
if response.status_code != 200:
print(f"ERROR {response.status_code}: {response.text}")
break
data = response.json()
posts = data.get("posts", [])
if not posts:
break
for post in posts:
pid = post["id"]
post_ids.append(pid)
if pid < lowest_id or lowest_id == -1:
lowest_id = pid
if len(posts) < params["limit"]:
stop = True
else:
params["page"] = f"b{lowest_id}"
print(f"Fetched {len(post_ids)} IDs so far...")
print(f"Retrieved {len(post_ids)} posts for {tag_expr!r}.")
return post_ids
def get_set_post_ids():
rate_limit()
response = session.get(set_url, headers=headers, auth=auth)
if response.status_code != 200:
print(f"ERROR retrieving set {SET_ID}: {response.status_code}")
return set()
data = response.json()
return set(data.get("post_ids", []))
def remove_overlapping_posts(user_ids, label):
set_post_ids = get_set_post_ids()
overlap = [pid for pid in user_ids if pid in set_post_ids]
total = len(overlap)
if not overlap:
print(f"[{label}] 0 posts to remove.")
return
print(f"Removing {total} {label} from set {SET_ID}...")
data = {"post_ids": overlap}
for i, pid in enumerate(overlap, start=1):
rate_limit()
response = session.post(remove_posts_url, headers=headers, json={"post_ids": [pid]}, auth=auth)
if response.status_code in (200, 201, 204):
print(f"[{i}/{total}] Removed post {pid}")
else:
print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")
print(f"[{label}] Complete: {total} posts processed.")
if __name__ == "__main__":
# Comment with '#' to skip purging either favorites or upvotes
favorite_ids = get_all_ids_for_tag(f"fav:{api_username}")
remove_overlapping_posts(favorite_ids, "favorites")
upvote_ids = get_all_ids_for_tag(f"votedup:{api_username}")
remove_overlapping_posts(upvote_ids, "upvotes")
print("Done.")