148 lines
5.1 KiB
Python
148 lines
5.1 KiB
Python
import argparse
|
|
from src.api.synapse import Synapse
|
|
from src.api.media_info import get_media_info
|
|
from src.api.user_info import get_user_info
|
|
from src.data.database import StickerDatabase
|
|
|
|
|
|
def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"):
|
|
user_info = synapse.get_user_info(_user_id).json()
|
|
avatar_url = user_info['avatar_url']
|
|
avatar_id = avatar_url.split('/')[-1]
|
|
media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json()
|
|
media_ids = []
|
|
full_size = 0
|
|
|
|
# Get the list of sticker ids
|
|
database = StickerDatabase()
|
|
sticker_ids_list = database.get_sticker_ids()
|
|
sticker_ids = []
|
|
for s in sticker_ids_list:
|
|
sticker_ids.append(s[0])
|
|
|
|
for media in media_files['media']:
|
|
|
|
# Check if there is the avatar in the media
|
|
if media['media_id'] == avatar_id:
|
|
print(f"Avatar detected! {avatar_id}")
|
|
pass
|
|
|
|
# Check if there are any stickers in the medias
|
|
elif media['media_id'] in sticker_ids:
|
|
print(f"Sticker detected! {media['media_id']}")
|
|
pass
|
|
else:
|
|
get_media_info(media)
|
|
full_size += media['media_length']
|
|
media_ids.append(media['media_id'])
|
|
print("================================================")
|
|
print("Number of media: {}".format(len(media_ids)))
|
|
print("Total space usage: {}MB".format(int(full_size / (1024 * 1024))))
|
|
return media_ids
|
|
|
|
|
|
def delete_media(media_ids):
|
|
for i, media_id in enumerate(media_ids):
|
|
delete = synapse.delete_media_by_id(media_id)
|
|
print(i + 1, delete.json())
|
|
|
|
|
|
def get_users_list(num):
|
|
users = []
|
|
users_list = synapse.get_users_media_usage(num).json()
|
|
for user in users_list['users']:
|
|
if user['user_id'] == "@telegrambot:matrix.hirad.it":
|
|
pass
|
|
else:
|
|
get_user_info(user)
|
|
users.append(user['user_id'])
|
|
return users
|
|
|
|
|
|
if __name__ == '__main__':
|
|
synapse = Synapse()
|
|
parser = argparse.ArgumentParser(
|
|
description="Clean Synapse media"
|
|
)
|
|
|
|
# User
|
|
parser.add_argument("-u",
|
|
"--user",
|
|
metavar="USERNAME",
|
|
help="Enter username")
|
|
parser.add_argument("-n",
|
|
"--number",
|
|
type=int,
|
|
default=100,
|
|
metavar="NUMBER",
|
|
help="Enter number of media to display (default: 100)")
|
|
parser.add_argument("-s",
|
|
"--size",
|
|
type=int,
|
|
default=1024,
|
|
metavar="SIZE",
|
|
help="Enter size limit in KB (default: 1024)")
|
|
|
|
# List
|
|
parser.add_argument("-l",
|
|
"--list",
|
|
action="store_true",
|
|
help="Get the list of users with highest media usage")
|
|
list_group = parser.add_argument_group("List options")
|
|
list_group.add_argument("-e",
|
|
"--numbers",
|
|
type=int,
|
|
default=10,
|
|
metavar="NUMBER",
|
|
help="Enter number of users to display (default: 10)")
|
|
parser.add_argument("-y",
|
|
action="store_true",
|
|
help="Run the script without asking for confirmation")
|
|
parser.add_argument("-o",
|
|
"--order",
|
|
type=str,
|
|
default="media_length",
|
|
metavar="ORDER",
|
|
help="Enter order by (default: media_length)[media_length|created_ts]")
|
|
parser.add_argument("-d",
|
|
"--dir",
|
|
type=str,
|
|
default="b",
|
|
metavar="DIR",
|
|
help="Enter direction (default: b)[f|b]")
|
|
args = parser.parse_args()
|
|
|
|
if args.user:
|
|
username = args.user
|
|
user_id = f"@{username}:{synapse.homeserver}"
|
|
number_of_media = args.number
|
|
size_of_media = args.size
|
|
medias = get_user_media(
|
|
_user_id=user_id,
|
|
number=number_of_media,
|
|
size=size_of_media,
|
|
order=args.order,
|
|
direction=args.dir
|
|
)
|
|
if args.y:
|
|
delete_media(media_ids=medias)
|
|
else:
|
|
confirm = input("Are you sure you want to delete the files? (y/n)")
|
|
if confirm.lower() == 'y':
|
|
delete_media(media_ids=medias)
|
|
|
|
if args.list:
|
|
number_of_users = args.numbers
|
|
list_of_users = get_users_list(number_of_users)
|
|
for user in list_of_users:
|
|
print(user)
|
|
medias = get_user_media(_user_id=user, order=args.order, direction=args.dir)
|
|
if args.y:
|
|
delete_media(medias)
|
|
else:
|
|
confirm = input("Are you sure you want to delete the files? (y/n/c) ")
|
|
if confirm.lower() == 'y':
|
|
delete_media(medias)
|
|
elif confirm.lower() == 'c':
|
|
exit()
|