prevent stickers deletion

This commit is contained in:
Hirad 2024-11-12 12:16:28 +03:30
parent fc1dbe721b
commit 7a501880c5
3 changed files with 94 additions and 0 deletions

View file

@ -2,6 +2,7 @@ import argparse
from src.api.synapse import Synapse from src.api.synapse import Synapse
from src.api.media_info import get_media_info from src.api.media_info import get_media_info
from src.api.user_info import get_user_info from src.api.user_info import get_user_info
from src.data.database import StickerDatabase
def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"): def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"):
@ -11,10 +12,25 @@ def get_user_media(_user_id, number=100, size=1024, order="media_length", direct
media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json() media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json()
media_ids = [] media_ids = []
full_size = 0 full_size = 0
# Get the list of sticker ids
database = StickerDatabase()
sticker_ids_list = database.get_sticker_ids()
sticker_ids = []
for s in sticker_ids_list:
sticker_ids.append(s[0])
for media in media_files['media']: for media in media_files['media']:
# Check if there is the avatar in the media
if media['media_id'] == avatar_id: if media['media_id'] == avatar_id:
print(f"Avatar detected! {avatar_id}") print(f"Avatar detected! {avatar_id}")
pass pass
# Check if there are any stickers in the medias
elif media['media_id'] in sticker_ids:
print(f"Sticker detected! {media['media_id']}")
pass
else: else:
get_media_info(media) get_media_info(media)
full_size += media['media_length'] full_size += media['media_length']

View file

@ -0,0 +1,34 @@
import sqlite3
class StickerDatabase:
def __init__(self):
self.conn = sqlite3.connect('exclude_media.db')
self.cur = self.conn.cursor()
self.create_table()
def create_table(self):
query = """CREATE TABLE IF NOT EXISTS stickers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
sticker_pack TEXT,
sticker_id TEXT)"""
self.cur.execute(query)
self.conn.commit()
def insert_sticker(self, username, stickerpack, sticker_id):
query = "SELECT 1 FROM stickers WHERE user_id=? AND sticker_pack=? AND sticker_id=?"
self.cur.execute(query, (username, stickerpack, sticker_id))
if self.cur.fetchone() is None:
query = "INSERT INTO stickers (user_id, sticker_pack, sticker_id) VALUES (?, ?, ?)"
self.cur.execute(query, (username, stickerpack, sticker_id))
self.conn.commit()
def get_sticker_ids(self):
query = "SELECT sticker_id FROM stickers"
self.cur.execute(query)
return self.cur.fetchall()
def close_conn(self):
self.conn.close()
print("Database connection closed!")

44
store_stickers.py Normal file
View file

@ -0,0 +1,44 @@
import os
import json
from src.data.database import StickerDatabase
def load_json(filename):
with open(filename, 'r') as f:
return json.load(f)
def get_sticker_packs(index):
sticker_pack_list = []
for i in index['packs']:
sticker_pack_list.append(i)
return sticker_pack_list
def get_sticker_ids(stickerpack):
stickers_list = []
for i in stickerpack['stickers']:
url = i['url']
sticker_id = url.split('/')[-1]
stickers_list.append(sticker_id)
return stickers_list
if __name__ == '__main__':
username = input("Enter username: ")
packs_directory = input("Enter directory containing index.json file: ")
index_file = os.path.join(packs_directory, 'index.json')
sticker_db = StickerDatabase()
index_data = load_json(index_file)
sticker_packs = get_sticker_packs(index_data)
for pack in sticker_packs:
print(f"Sticker pack: {pack}")
sticker_pack_file = os.path.join(packs_directory, pack)
sticker_pack_data = load_json(sticker_pack_file)
sticker_pack_title = sticker_pack_data['net.maunium.telegram.pack']['short_name']
sticker_ids = get_sticker_ids(sticker_pack_data)
print(f"Sticker pack title: {sticker_pack_title}")
for sticker in sticker_ids:
sticker_db.insert_sticker(username, sticker_pack_title, sticker)
print(f"Added sticker {sticker} to the database.")