Topic: [Scripts] Favorite/Upvote Listing and Purging

Posted under e621 Tools and Applications

Every so often, maybe annually, I go through my E6 favorites and upvotes and sort of... 'review' them. Make sure that there's no accidental marks or check for posts I just don't enjoy anymore.

I figured I'd share the scripts I've been using for this. Or in case anyone happens to have a use-case for this kind of thing.

Generally my method is:

  • Move favorites and upvotes into a set (E6-FavUp-List.py)
  • Clear favorites and upvotes (E6-FavUp-Purge.py)
  • Go through the set, re-adding favorites and upvotes (manually)
  • Remove posts from set (E6-FavUp-PurgeSet.py)
E6-FavUp-List.py

Instructions:

  • Run the script with python.
    • If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
  • Files will be created with a list of post ids, separated by favorites and upvotes, each id on a new line.
    • Sets are limited to 10k posts, so individual files will be created for every 10k ids found. Two master files will contain the full list.
  • Add lists to your desired set
    • Optional: If using one set for both favorites and upvotes, put the lists into ListDiff to remove duplicates (cleaned list is: 'A ∪ B')
    • Recommended: Use find and replace to switch from new-line delimited to space delimited.

Code:

import requests
from requests.auth import HTTPBasicAuth
import time
import os


TEST_MODE = False
TEST_RANGE = 30001
TEST_IDS = list(range(1, TEST_RANGE + 1))

this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")
output_folder = os.path.join(this_folder, "output")
os.makedirs(output_folder, exist_ok=True)

posts_url = "https://e621.net/posts.json"
headers = {"User-Agent": "E6-FavUp-List/1.2 (by DirtyDerg on e621)"}
base_params = {"limit": 200}

rate_limit_seconds = 1
last_request_time = time.time()


def rate_limit():
    global last_request_time
    elapsed = time.time() - last_request_time
    if elapsed < rate_limit_seconds:
        time.sleep(rate_limit_seconds - elapsed)
    last_request_time = time.time()


try:
    with open(apikey_file_path, "r", encoding="utf-8") as api_file:
        api_data = api_file.read().splitlines()
except FileNotFoundError:
    with open(apikey_file_path, "a", encoding="utf-8") as api_file:
        api_file.write("username=\napi_key=")
    print("apikey.txt created. Add your credentials and rerun.")
    raise SystemExit(1)

api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()

session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)


def get_all_ids_for_tag(tag_expr):
    """Return list of post IDs for a given tag query."""
    params = dict(base_params)
    params["tags"] = tag_expr

    post_ids = []
    lowest_id = -1
    stop = False

    print(f"Fetching post IDs for: {tag_expr!r}...")

    while not stop:
        rate_limit()
        response = session.get(posts_url, headers=headers, params=params, auth=auth)

        if response.status_code != 200:
            print(f"ERROR {response.status_code}: {response.text}")
            break

        data = response.json()
        posts = data.get("posts", [])
        if not posts:
            break

        for post in posts:
            pid = post["id"]
            post_ids.append(pid)
            if pid < lowest_id or lowest_id == -1:
                lowest_id = pid

        if len(posts) < params["limit"]:
            stop = True
        else:
            params["page"] = f"b{lowest_id}"

        print(f"Fetched {len(post_ids)} IDs so far...")

    print(f"Retrieved {len(post_ids)} posts for {tag_expr!r}.")
    return post_ids


def write_ids_to_file(ids, base_filename):
    total = len(ids)
    if not ids:
        print(f"No IDs to write for {base_filename}.")
        return

    master_path = os.path.join(output_folder, f"{base_filename}_ids.txt")
    write_ids_to_file_chunk(ids, master_path)

    chunk_size = 10000
    for i in range(0, total, chunk_size):
        chunk = ids[i:i + chunk_size]
        chunk_path = os.path.join(output_folder, f"{base_filename}_ids_chunk_{i // chunk_size + 1:03d}.txt")
        write_ids_to_file_chunk(chunk, chunk_path)
        print(f"[{i + len(chunk):>6}/{total}] Wrote chunk → {chunk_path}")

    print(f"{base_filename.capitalize()} list complete ({total} IDs total).")


def write_ids_to_file_chunk(ids, filepath):
    with open(filepath, "w", encoding="utf-8") as f:
        f.write("\n".join(map(str, ids)))


if __name__ == "__main__":
    if TEST_MODE:
        print(f"TEST MODE: Using {TEST_RANGE} simulated IDs.")
        favorite_ids, upvote_ids = TEST_IDS.copy(), TEST_IDS.copy()
    else:
        favorite_ids = get_all_ids_for_tag(f"fav:{api_username}")
        upvote_ids = get_all_ids_for_tag(f"votedup:{api_username}")

    write_ids_to_file(favorite_ids, "favorites")
    write_ids_to_file(upvote_ids, "upvotes")

    print("Done.")

E6-FavUp-Purge.py

Instructions:

  • Script requires E6-FavUp-List.py to be run first, to generate the master id lists.
  • After those files have been generated, Run the script with python.
    • If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
    • Optional: If you don't want to clear out a certain type, comment out the section in if __name__ == "__main__": with #
  • The script will go through and remove posts from favorites first, then upvotes.
    • This will take a while, removing one post per second to abide by API rate limits.

Code:

import requests
from requests.auth import HTTPBasicAuth
import time
import os


this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")
output_folder = os.path.join(this_folder, "output")
favorites_file = os.path.join(output_folder, "favorites_ids.txt")
upvotes_file = os.path.join(output_folder, "upvotes_ids.txt")

fav_url_fmt = "https://e621.net/favorites/{}.json"
vote_url_fmt = "https://e621.net/posts/{}/votes.json"

headers = {"User-Agent": "E6-FavUp-Purge/1.2 (by DirtyDerg on e621)"}
rate_limit_seconds = 1
last_request_time = time.time()


def rate_limit():
    global last_request_time
    elapsed = time.time() - last_request_time
    if elapsed < rate_limit_seconds:
        time.sleep(rate_limit_seconds - elapsed)
    last_request_time = time.time()


def read_ids(path):
    if not os.path.isfile(path):
        print(f"Missing file: {path}")
        return []

    with open(path, "r", encoding="utf-8") as f:
        text = f.read().strip()

    if not text:
        return []

    ids = []
    for token in text.split():
        try:
            ids.append(int(token))
        except ValueError:
            print(f"Invalid token in {path!r}: {token!r}")
    return ids


try:
    with open(apikey_file_path, "r", encoding="utf-8") as api_file:
        api_data = api_file.read().splitlines()
except FileNotFoundError:
    with open(apikey_file_path, "a", encoding="utf-8") as api_file:
        api_file.write("username=\napi_key=")
    print("apikey.txt created. Add your credentials and rerun.")
    raise SystemExit(1)

api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()

session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)


def clear_favorites(ids):
    total = len(ids)
    print(f"Clearing {total} favorites...")
    for i, pid in enumerate(ids, start=1):
        rate_limit()
        response = session.delete(fav_url_fmt.format(pid), headers=headers, auth=auth)
        if response.status_code in (200, 204):
            print(f"[{i}/{total}] Unfavorited post {pid}")
        else:
            print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")


def clear_upvotes(ids):
    total = len(ids)
    print(f"Clearing {total} upvotes...")
    for i, pid in enumerate(ids, start=1):
        rate_limit()
        response = session.delete(vote_url_fmt.format(pid), headers=headers, auth=auth)
        if response.status_code in (200, 204):
            print(f"[{i}/{total}] Cleared vote for post {pid}")
        else:
            print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")


if __name__ == "__main__":
    # Comment with '#' to skip either favorites or upvotes clearing
    
    favorite_ids = read_ids(favorites_file)
    if favorite_ids:
        clear_favorites(favorite_ids)
    else:
        print("No favorites found.")

    upvote_ids = read_ids(upvotes_file)
    if upvote_ids:
        clear_upvotes(upvote_ids)
    else:
        print("No upvotes found.")

    print("Done.")
E6-FavUp-PurgeSet.py

Instructions:

  • Change SET_ID to the ID of your set (i.e. 60450, NOT the shortname)
    • Set can be either private or public, but you must be the owner (maybe a maintainer, I haven't checked)
  • Run the script with python.
    • If it isn't already created, apikey.txt will be made, and you will need to put your username and API key inside.
  • The script will go through and check if any of your favorites/upvotes exist in the set your chose.
  • If a post exists in both, it will remove it from the set.

Code:

import requests
from requests.auth import HTTPBasicAuth
import time
import os


SET_ID = SET_ID # Change to set ID, not shortname

this_folder = os.path.dirname(os.path.realpath(__file__))
apikey_file_path = os.path.join(this_folder, "apikey.txt")

posts_url = "https://e621.net/posts.json"
set_url = f"https://e621.net/post_sets/{SET_ID}.json"
remove_posts_url = f"https://e621.net/post_sets/{SET_ID}/remove_posts.json"
headers = {"User-Agent": "E6-FavUp-SetClean/1.2 (by DirtyDerg on e621)"}
base_params = {"limit": 200}

rate_limit_seconds = 1
last_request_time = time.time()


def rate_limit():
    global last_request_time
    elapsed = time.time() - last_request_time
    if elapsed < rate_limit_seconds:
        time.sleep(rate_limit_seconds - elapsed)
    last_request_time = time.time()


try:
    with open(apikey_file_path, "r", encoding="utf-8") as api_file:
        api_data = api_file.read().splitlines()
except FileNotFoundError:
    with open(apikey_file_path, "a", encoding="utf-8") as api_file:
        api_file.write("username=\napi_key=")
    print("apikey.txt created. Add your credentials and rerun.")
    raise SystemExit(1)

api_username = api_data[0].split("=", 1)[1].strip().replace(" ", "_")
api_key = api_data[1].split("=", 1)[1].strip()

session = requests.Session()
auth = HTTPBasicAuth(api_username, api_key)


def get_all_ids_for_tag(tag_expr):
    params = dict(base_params)
    params["tags"] = tag_expr

    post_ids = []
    lowest_id = -1
    stop = False

    print(f"Fetching post IDs for: {tag_expr!r}...")

    while not stop:
        rate_limit()
        response = session.get(posts_url, headers=headers, params=params, auth=auth)

        if response.status_code != 200:
            print(f"ERROR {response.status_code}: {response.text}")
            break

        data = response.json()
        posts = data.get("posts", [])
        if not posts:
            break

        for post in posts:
            pid = post["id"]
            post_ids.append(pid)
            if pid < lowest_id or lowest_id == -1:
                lowest_id = pid

        if len(posts) < params["limit"]:
            stop = True
        else:
            params["page"] = f"b{lowest_id}"

        print(f"Fetched {len(post_ids)} IDs so far...")

    print(f"Retrieved {len(post_ids)} posts for {tag_expr!r}.")
    return post_ids


def get_set_post_ids():
    rate_limit()
    response = session.get(set_url, headers=headers, auth=auth)
    if response.status_code != 200:
        print(f"ERROR retrieving set {SET_ID}: {response.status_code}")
        return set()
    data = response.json()
    return set(data.get("post_ids", []))


def remove_overlapping_posts(user_ids, label):
    set_post_ids = get_set_post_ids()
    overlap = [pid for pid in user_ids if pid in set_post_ids]
    total = len(overlap)

    if not overlap:
        print(f"[{label}] 0 posts to remove.")
        return

    print(f"Removing {total} {label} from set {SET_ID}...")
    data = {"post_ids": overlap}

    for i, pid in enumerate(overlap, start=1):
        rate_limit()
        response = session.post(remove_posts_url, headers=headers, json={"post_ids": [pid]}, auth=auth)
        if response.status_code in (200, 201, 204):
            print(f"[{i}/{total}] Removed post {pid}")
        else:
            print(f"[{i}/{total}] ERROR {response.status_code}: {response.text}")

    print(f"[{label}] Complete: {total} posts processed.")


if __name__ == "__main__":
    # Comment with '#' to skip purging either favorites or upvotes

    favorite_ids = get_all_ids_for_tag(f"fav:{api_username}")
    remove_overlapping_posts(favorite_ids, "favorites")

    upvote_ids = get_all_ids_for_tag(f"votedup:{api_username}")
    remove_overlapping_posts(upvote_ids, "upvotes")

    print("Done.")

Original page: https://e621.net/forum_topics/60955