import argparse from src.api.synapse import Synapse from src.api.media_info import get_media_info from src.api.user_info import get_user_info from src.data.database import StickerDatabase def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"): user_info = synapse.get_user_info(_user_id).json() avatar_url = user_info['avatar_url'] avatar_id = avatar_url.split('/')[-1] media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json() media_ids = [] full_size = 0 # Get the list of sticker ids database = StickerDatabase() sticker_ids_list = database.get_sticker_ids() sticker_ids = [] for s in sticker_ids_list: sticker_ids.append(s[0]) for media in media_files['media']: # Check if there is the avatar in the media if media['media_id'] == avatar_id: print(f"Avatar detected! {avatar_id}") pass # Check if there are any stickers in the medias elif media['media_id'] in sticker_ids: print(f"Sticker detected! {media['media_id']}") pass else: get_media_info(media) full_size += media['media_length'] media_ids.append(media['media_id']) print("================================================") print("Number of media: {}".format(len(media_ids))) print("Total space usage: {}MB".format(int(full_size / (1024 * 1024)))) return media_ids def delete_media(media_ids): for i, media_id in enumerate(media_ids): delete = synapse.delete_media_by_id(media_id) print(i + 1, delete.json()) def get_users_list(num): users = [] users_list = synapse.get_users_media_usage(num).json() for user in users_list['users']: if user['user_id'] == "@telegrambot:matrix.hirad.it": pass else: get_user_info(user) users.append(user['user_id']) return users if __name__ == '__main__': synapse = Synapse() parser = argparse.ArgumentParser( description="Clean Synapse media" ) # User parser.add_argument("-u", "--user", metavar="USERNAME", help="Enter username") parser.add_argument("-n", "--number", type=int, default=100, metavar="NUMBER", help="Enter number of media to display (default: 100)") parser.add_argument("-s", "--size", type=int, default=1024, metavar="SIZE", help="Enter size limit in KB (default: 1024)") # List parser.add_argument("-l", "--list", action="store_true", help="Get the list of users with highest media usage") list_group = parser.add_argument_group("List options") list_group.add_argument("-e", "--numbers", type=int, default=10, metavar="NUMBER", help="Enter number of users to display (default: 10)") parser.add_argument("-y", action="store_true", help="Run the script without asking for confirmation") parser.add_argument("-o", "--order", type=str, default="media_length", metavar="ORDER", help="Enter order by (default: media_length)[media_length|created_ts]") parser.add_argument("-d", "--dir", type=str, default="b", metavar="DIR", help="Enter direction (default: b)[f|b]") args = parser.parse_args() if args.user: username = args.user user_id = f"@{username}:{synapse.homeserver}" number_of_media = args.number size_of_media = args.size medias = get_user_media( _user_id=user_id, number=number_of_media, size=size_of_media, order=args.order, direction=args.dir ) if args.y: delete_media(media_ids=medias) else: confirm = input("Are you sure you want to delete the files? (y/n)") if confirm.lower() == 'y': delete_media(media_ids=medias) if args.list: number_of_users = args.numbers list_of_users = get_users_list(number_of_users) for user in list_of_users: print(user) medias = get_user_media(_user_id=user, order=args.order, direction=args.dir) if args.y: delete_media(medias) else: confirm = input("Are you sure you want to delete the files? (y/n/c) ") if confirm.lower() == 'y': delete_media(medias) elif confirm.lower() == 'c': exit()