From 7a501880c5f59fd02eb4f92dfe2174bf82c5a8bc Mon Sep 17 00:00:00 2001 From: Hirad Date: Tue, 12 Nov 2024 12:16:28 +0330 Subject: [PATCH] prevent stickers deletion --- cleansy.py | 16 ++++++++++++++++ src/data/database.py | 34 ++++++++++++++++++++++++++++++++++ store_stickers.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+) create mode 100644 store_stickers.py diff --git a/cleansy.py b/cleansy.py index 9db54a9..8330340 100644 --- a/cleansy.py +++ b/cleansy.py @@ -2,6 +2,7 @@ import argparse from src.api.synapse import Synapse from src.api.media_info import get_media_info from src.api.user_info import get_user_info +from src.data.database import StickerDatabase def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"): @@ -11,10 +12,25 @@ def get_user_media(_user_id, number=100, size=1024, order="media_length", direct media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json() media_ids = [] full_size = 0 + + # Get the list of sticker ids + database = StickerDatabase() + sticker_ids_list = database.get_sticker_ids() + sticker_ids = [] + for s in sticker_ids_list: + sticker_ids.append(s[0]) + for media in media_files['media']: + + # Check if there is the avatar in the media if media['media_id'] == avatar_id: print(f"Avatar detected! {avatar_id}") pass + + # Check if there are any stickers in the medias + elif media['media_id'] in sticker_ids: + print(f"Sticker detected! {media['media_id']}") + pass else: get_media_info(media) full_size += media['media_length'] diff --git a/src/data/database.py b/src/data/database.py index e69de29..0fe379d 100644 --- a/src/data/database.py +++ b/src/data/database.py @@ -0,0 +1,34 @@ +import sqlite3 + + +class StickerDatabase: + def __init__(self): + self.conn = sqlite3.connect('exclude_media.db') + self.cur = self.conn.cursor() + self.create_table() + + def create_table(self): + query = """CREATE TABLE IF NOT EXISTS stickers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT, + sticker_pack TEXT, + sticker_id TEXT)""" + self.cur.execute(query) + self.conn.commit() + + def insert_sticker(self, username, stickerpack, sticker_id): + query = "SELECT 1 FROM stickers WHERE user_id=? AND sticker_pack=? AND sticker_id=?" + self.cur.execute(query, (username, stickerpack, sticker_id)) + if self.cur.fetchone() is None: + query = "INSERT INTO stickers (user_id, sticker_pack, sticker_id) VALUES (?, ?, ?)" + self.cur.execute(query, (username, stickerpack, sticker_id)) + self.conn.commit() + + def get_sticker_ids(self): + query = "SELECT sticker_id FROM stickers" + self.cur.execute(query) + return self.cur.fetchall() + + def close_conn(self): + self.conn.close() + print("Database connection closed!") diff --git a/store_stickers.py b/store_stickers.py new file mode 100644 index 0000000..2a2ecfe --- /dev/null +++ b/store_stickers.py @@ -0,0 +1,44 @@ +import os +import json +from src.data.database import StickerDatabase + + +def load_json(filename): + with open(filename, 'r') as f: + return json.load(f) + + +def get_sticker_packs(index): + sticker_pack_list = [] + for i in index['packs']: + sticker_pack_list.append(i) + return sticker_pack_list + + +def get_sticker_ids(stickerpack): + stickers_list = [] + for i in stickerpack['stickers']: + url = i['url'] + sticker_id = url.split('/')[-1] + stickers_list.append(sticker_id) + return stickers_list + + +if __name__ == '__main__': + username = input("Enter username: ") + packs_directory = input("Enter directory containing index.json file: ") + index_file = os.path.join(packs_directory, 'index.json') + sticker_db = StickerDatabase() + + index_data = load_json(index_file) + sticker_packs = get_sticker_packs(index_data) + for pack in sticker_packs: + print(f"Sticker pack: {pack}") + sticker_pack_file = os.path.join(packs_directory, pack) + sticker_pack_data = load_json(sticker_pack_file) + sticker_pack_title = sticker_pack_data['net.maunium.telegram.pack']['short_name'] + sticker_ids = get_sticker_ids(sticker_pack_data) + print(f"Sticker pack title: {sticker_pack_title}") + for sticker in sticker_ids: + sticker_db.insert_sticker(username, sticker_pack_title, sticker) + print(f"Added sticker {sticker} to the database.")