initial commit

This commit is contained in:
Hirad 2024-07-07 08:17:04 +03:30
parent 5bc3d84b89
commit 604850a16b
10 changed files with 260 additions and 0 deletions

131
cleansy.py Normal file
View file

@ -0,0 +1,131 @@
import argparse
from src.api.synapse import Synapse
from src.api.media_info import get_media_info
from src.api.user_info import get_user_info
def get_user_media(_user_id, number=100, size=1024, order="media_length", direction="b"):
user_info = synapse.get_user_info(_user_id).json()
avatar_url = user_info['avatar_url']
avatar_id = avatar_url.split('/')[-1]
media_files = synapse.get_user_media(_user_id, limit=number, order=order, direction=direction).json()
media_ids = []
full_size = 0
for media in media_files['media']:
if media['media_id'] == avatar_id:
print(f"Avatar detected! {avatar_id}")
pass
else:
get_media_info(media)
full_size += media['media_length']
media_ids.append(media['media_id'])
print("================================================")
print("Number of media: {}".format(len(media_ids)))
print("Total space usage: {}MB".format(int(full_size / (1024 * 1024))))
return media_ids
def delete_media(media_ids):
for i, media_id in enumerate(media_ids):
delete = synapse.delete_media_by_id(media_id)
print(i + 1, delete.json())
def get_users_list(num):
users = []
users_list = synapse.get_users_media_usage(num).json()
for user in users_list['users']:
if user['user_id'] == "@telegrambot:matrix.hirad.it":
pass
else:
get_user_info(user)
users.append(user['user_id'])
return users
if __name__ == '__main__':
synapse = Synapse()
parser = argparse.ArgumentParser(
description="Clean Synapse media"
)
# User
parser.add_argument("-u",
"--user",
metavar="USERNAME",
help="Enter username")
parser.add_argument("-n",
"--number",
type=int,
default=100,
metavar="NUMBER",
help="Enter number of media to display (default: 100)")
parser.add_argument("-s",
"--size",
type=int,
default=1024,
metavar="SIZE",
help="Enter size limit in KB (default: 1024)")
# List
parser.add_argument("-l",
"--list",
action="store_true",
help="Get the list of users with highest media usage")
list_group = parser.add_argument_group("List options")
list_group.add_argument("-e",
"--numbers",
type=int,
default=10,
metavar="NUMBER",
help="Enter number of users to display (default: 10)")
parser.add_argument("-y",
action="store_true",
help="Run the script without asking for confirmation")
parser.add_argument("-o",
"--order",
type=str,
default="media_length",
metavar="ORDER",
help="Enter order by (default: media_length)[media_length|created_ts]")
parser.add_argument("-d",
"--dir",
type=str,
default="b",
metavar="DIR",
help="Enter direction (default: b)[f|b]")
args = parser.parse_args()
if args.user:
username = args.user
user_id = f"@{username}:{synapse.homeserver}"
number_of_media = args.number
size_of_media = args.size
medias = get_user_media(
_user_id=user_id,
number=number_of_media,
size=size_of_media,
order=args.order,
direction=args.dir
)
if args.y:
delete_media(media_ids=medias)
else:
confirm = input("Are you sure you want to delete the files? (y/n)")
if confirm.lower() == 'y':
delete_media(media_ids=medias)
if args.list:
number_of_users = args.numbers
list_of_users = get_users_list(number_of_users)
for user in list_of_users:
print(user)
medias = get_user_media(_user_id=user, order=args.order, direction=args.dir)
if args.y:
delete_media(medias)
else:
confirm = input("Are you sure you want to delete the files? (y/n/c) ")
if confirm.lower() == 'y':
delete_media(medias)
elif confirm.lower() == 'c':
exit()

0
src/__init__.py Normal file
View file

0
src/api/__init__.py Normal file
View file

16
src/api/media_info.py Normal file
View file

@ -0,0 +1,16 @@
from datetime import datetime
def get_media_info(media):
media_id = media['media_id']
media_type = media['media_type']
media_length = media['media_length']
media_name = media['upload_name']
created_ts = media['created_ts']
media_mb_size = media_length / (1024 * 1024)
media_size = round(media_mb_size, 2)
unix_ts = datetime.fromtimestamp(created_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
print(f"ID: {media_id}, Name: {media_name}, Type: {media_type}, Size: {media_size}MB, Created: {unix_ts}")

51
src/api/synapse.py Normal file
View file

@ -0,0 +1,51 @@
import requests
from src.config.config_parser import Config
class Synapse:
def __init__(self):
self.homeserver = None
self.access_token = None
self.header = None
self.base_url = None
self.initialize()
def initialize(self):
config = Config()
self.homeserver, self.access_token = config.get_config()
self.base_url = f"https://{self.homeserver}/_synapse/admin"
self.header = {"Authorization": f"Bearer {self.access_token}"}
def request(self, url, method):
try:
return requests.request(url=url, headers=self.header, method=method)
except Exception as e:
print(f"Error making request: {e}")
return None
def delete_media_by_id(self, media_id):
url = f"{self.base_url}/v1/media/{self.homeserver}/{media_id}"
return self.request(url, "DELETE")
def get_users_media_usage(self, limit=100, order="media_length", direction="b"):
url = f"{self.base_url}/v1/statistics/users/media?limit={limit}&order_by={order}&dir={direction}"
return self.request(url, "GET")
def search_for_user(self, search_term, limit=10, order="media_length", direction="b"):
url = f"{self.base_url}/v1/statistics/users/media?search_term={search_term}&order_by={order}&dir={direction}"
return self.request(url, "GET")
def get_rooms_list(self, room_name=None, batch=0):
if room_name:
url = f"{self.base_url}/v1/rooms?search_term={room_name}&from={batch}"
else:
url = f"{self.base_url}/v1/rooms?from={batch}"
return self.request(url, "GET")
def get_user_info(self, user_id):
url = f"{self.base_url}/v2/users/{user_id}"
return self.request(url, "GET")
def get_user_media(self, user_id, limit=100, order="media_length", direction="b"):
url = f"{self.base_url}/v1/users/{user_id}/media?limit={limit}&order_by={order}&dir={direction}"
return self.request(url, "GET")

14
src/api/user_info.py Normal file
View file

@ -0,0 +1,14 @@
def get_user_info(user):
user_id = user['user_id']
display_name = user['displayname']
media_count = user['media_count']
media_length = user['media_length']
media_mb_size = media_length / (1024 * 1024)
media_size = round(media_mb_size, 2)
output = f"Username: {display_name}, ID: {user_id}, Media Count: {media_count}, Media Size: {media_size}MB"
print(output)
print(len(output) * "-")

0
src/config/__init__.py Normal file
View file

View file

@ -0,0 +1,48 @@
import os
from configparser import ConfigParser
CONFIG_FILE_PATH = 'config.ini'
class Config:
def __init__(self):
self.config = ConfigParser()
self.config_file = 'config.ini'
self.get_config()
def check_config_exists(self):
if os.path.exists(self.config_file):
return True
else:
return False
def get_config(self):
if self.check_config_exists():
try:
self.config.read(self.config_file)
homeserver = self.config.get('synapse', 'homeserver')
access_token = self.config.get('synapse', 'access_token')
return homeserver, access_token
except Exception as e:
print(f"Error reading config file: {e}")
return None, None
else:
homeserver = input("Enter homeserver url: ")
access_token = input("Enter access token: ")
if homeserver and access_token:
self.config['synapse'] = {
'homeserver': homeserver,
'access_token': access_token
}
with open(self.config_file, 'w') as configfile:
self.config.write(configfile)
def get_excluded_users(self):
try:
self.config.read(self.config_file)
excluded_users = self.config.get('options', 'excluded_users').split(',')
return excluded_users
except Exception as e:
print(f"Error reading config file: {e}")
return []

0
src/data/__init__.py Normal file
View file

0
src/data/database.py Normal file
View file