From f4c77481e752a676992cd0d84ea6a2b7c7f896c7 Mon Sep 17 00:00:00 2001 From: Hirad Date: Wed, 23 Jul 2025 11:53:15 +0330 Subject: [PATCH] adding media related code --- src/synclean/api/media.py | 26 ++++++ src/synclean/api/users.py | 46 +++++++++- src/synclean/models/enums.py | 13 ++- src/synclean/models/media.py | 14 +++ src/synclean/models/pagination.py | 10 +- src/synclean/models/user_media.py | 116 ++++++++++++++++++------ src/synclean/models/user_media_stats.py | 43 +++++++++ src/synclean/service/user_service.py | 85 +++++++++++++++-- 8 files changed, 310 insertions(+), 43 deletions(-) create mode 100644 src/synclean/api/media.py create mode 100644 src/synclean/models/user_media_stats.py diff --git a/src/synclean/api/media.py b/src/synclean/api/media.py new file mode 100644 index 0000000..2da9f32 --- /dev/null +++ b/src/synclean/api/media.py @@ -0,0 +1,26 @@ +from urllib.parse import urlencode + +from synclean.api.synapse import SynapseApiClient +from synclean.models.media import DeleteMedia +from synclean.utils.utils import convert_datetime_to_ts, convert_megabytes_to_bytes + + +class MediaApi: + def __init__(self, client: SynapseApiClient): + self.client = client + + def delete_media_by_id(self, server_name: str, media_id: str): + response = self.client.request("DELETE", f"/v1/media/{server_name}/{media_id}") + return response["deleted_media"] + + def delete_media_by_date_size(self, datetime: str, size: float = 0.0, keep_profiles: bool = True): + timestamp = convert_datetime_to_ts(datetime) + size_in_bytes = convert_megabytes_to_bytes(size) + params = { + "before_ts": timestamp, + "size_gt": size_in_bytes, + "keep_profiles": keep_profiles + } + query_string = urlencode(params) + response = self.client.request("POST", f"/v1/media/delete", query_string) + return DeleteMedia.from_api_response(response) \ No newline at end of file diff --git a/src/synclean/api/users.py b/src/synclean/api/users.py index d5dc6de..f793abe 100644 --- a/src/synclean/api/users.py +++ b/src/synclean/api/users.py @@ -1,21 +1,24 @@ +from typing import List + from synclean.api.synapse import SynapseApiClient -from synclean.models.pagination import UserMediaPaginationParams, UserPaginationParams +from synclean.models.pagination import UserMediaStatsPaginationParams, UserPaginationParams, UserMediaPaginationParams from synclean.models.user import User, UserList -from synclean.models.user_media import UserMediaList +from synclean.models.user_media import UserMediaList, MediaItem +from synclean.models.user_media_stats import UserMediaStatsList, UserMediaStats class UserAPI: def __init__(self, client: SynapseApiClient): self.client = client - def get_users_list_by_media(self, pagination: UserMediaPaginationParams = None) -> UserMediaList: + def get_users_list_by_media(self, pagination: UserMediaStatsPaginationParams = None) -> UserMediaStatsList: if pagination is None: - pagination = UserMediaPaginationParams() + pagination = UserMediaStatsPaginationParams() params = pagination.to_query_params() response = self.client.request("GET", "/v1/statistics/users/media", params) - return UserMediaList.from_api_response(response) + return UserMediaStatsList.from_api_response(response) def get_users_list(self, pagination: UserPaginationParams) -> UserList: if pagination is None: @@ -30,6 +33,39 @@ class UserAPI: response = self.client.request("GET", f"/v2/users/{username}") return User.from_api_response(response) + def get_user_media_stats_by_id(self, username: str) -> UserMediaStats: + response = self.client.request("GET", f"/v1/statistics/users/media?search_term={username}") + response = response["users"][0] + return UserMediaStats.from_api_response(response) + def get_media_by_user_id(self, username: str, pagination: UserMediaPaginationParams) -> UserMediaList: + if pagination is None: + pagination = UserMediaPaginationParams + params = pagination.to_query_params() + response = self.client.request("GET", f"/v1/users/{username}/media", params) + return UserMediaList.from_api_response(response) + + def get_all_user_media(self, username: str, pagination: UserMediaPaginationParams) -> List[MediaItem]: + if pagination is None: + pagination = UserMediaPaginationParams + + all_media = [] + pagination.offset = 0 + + while True: + params = pagination.to_query_params() + response = self.client.request("GET", f"/v1/users/{username}/media", params) + media_page = UserMediaList.from_api_response(response) + all_media.extend(media_page.media) + # print(f"{media_page.next_token}") + if media_page.next_token == 0: + break + pagination.offset = media_page.next_token + + return all_media + + def get_media_by_type(self, user_id: str, media_type: str, pagination: UserMediaPaginationParams) -> List[MediaItem]: + all_media = self.get_all_user_media(user_id, pagination) + return [item for item in all_media if item.media_type.startswith(media_type)] \ No newline at end of file diff --git a/src/synclean/models/enums.py b/src/synclean/models/enums.py index 73b46b8..a562110 100644 --- a/src/synclean/models/enums.py +++ b/src/synclean/models/enums.py @@ -16,13 +16,24 @@ class RoomOrderBy(Enum): HISTORY_VISIBILITY = "history_visibility" STATE_EVENTS = "state_events" -class UserMediaOrderBy(Enum): +class UserMediaStatsOrderBy(Enum): """Available user ordering options for media.""" USER_ID = "user_id" DISPLAY_NAME = "display_name" MEDIA_LENGTH = "media_length" MEDIA_COUNT = "media_count" +class UserMediaOrderBy(Enum): + """Available user ordering options for media.""" + MEDIA_ID = "media_id" + UPLOAD_NAME = "upload_name" + CREATED_TS = "created_ts" + LAST_ACCESS_TS = "last_access_ts" + MEDIA_LENGTH = "media_length" + MEDIA_TYPE = "media_type" + QUARANTINED_BY = "quarantined_by" + SAFE_FROM_QUARANTINE = "safe_from_quarantine" + class UserOrderBy(Enum): """Available user ordering options.""" NAME = "name" diff --git a/src/synclean/models/media.py b/src/synclean/models/media.py index b390415..4faf9c0 100644 --- a/src/synclean/models/media.py +++ b/src/synclean/models/media.py @@ -27,3 +27,17 @@ class MediaURIs: def total_count(self) -> int: """Get total media count.""" return len(self.local) + len(self.remote) + + +@dataclass +class DeleteMedia: + deleted_media: List[str] = field(default_factory=list) + total: int = 0 + + @classmethod + def from_api_response(cls, data: dict) -> "DeleteMedia": + """Create DeleteMedia from API response.""" + return cls( + deleted_media=data.get("deleted_media", []), + total=data.get("total", 0) + ) \ No newline at end of file diff --git a/src/synclean/models/pagination.py b/src/synclean/models/pagination.py index 4dc9c8b..a67ea29 100644 --- a/src/synclean/models/pagination.py +++ b/src/synclean/models/pagination.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from typing import Optional, Dict, Any, TypeVar, Generic -from synclean.models.enums import RoomOrderBy, Direction, UserMediaOrderBy, UserOrderBy +from synclean.models.enums import RoomOrderBy, Direction, UserMediaStatsOrderBy, UserOrderBy, UserMediaOrderBy OrderByType = TypeVar("OrderByType") @@ -44,4 +44,10 @@ class UserPaginationParams(PaginationParams[UserOrderBy]): @dataclass class UserMediaPaginationParams(PaginationParams[UserMediaOrderBy]): """Pagination parameters for users based on media.""" - order_by = UserOrderBy = UserMediaOrderBy.MEDIA_LENGTH \ No newline at end of file + order_by = UserOrderBy = UserMediaOrderBy.MEDIA_LENGTH + + +@dataclass +class UserMediaStatsPaginationParams(PaginationParams[UserMediaStatsOrderBy]): + """Pagination parameters for users based on media.""" + order_by = UserOrderBy = UserMediaStatsOrderBy.MEDIA_LENGTH \ No newline at end of file diff --git a/src/synclean/models/user_media.py b/src/synclean/models/user_media.py index 90d7dbf..f3af6b8 100644 --- a/src/synclean/models/user_media.py +++ b/src/synclean/models/user_media.py @@ -1,42 +1,106 @@ from dataclasses import dataclass -from typing import List, Optional, Any, Dict +from datetime import datetime +from typing import Optional, List @dataclass -class UserMedia: - display_name: str - media_count: int +class MediaItem: + created_ts: int + media_id: str media_length: int - user_id: str - - def media_length_mb(self, precision: int = 2) -> float: - """Convert media length to MB.""" - mb = self.media_length / (1024 * 1024) - return round(mb, precision) + media_type: str + safe_from_quarantine: bool + last_access_ts: Optional[int] = None + quarantined_by: Optional[str] = None + upload_name: Optional[str] = None @classmethod - def from_api_response(cls, data: Dict[str, Any]) -> "UserMedia": - """Create UserMedia from API response.""" + def from_api_response(cls, data: dict) -> "MediaItem": + """Create a MediaItem instance from API response""" return cls( - display_name=data.get("displayname"), - media_count=data.get("media_count"), - media_length=data.get("media_length"), - user_id=data.get("user_id") + created_ts=data.get('created_ts'), + last_access_ts=data.get('last_access_ts'), + media_id=data.get('media_id'), + media_length=data.get('media_length'), + media_type=data.get('media_type'), + quarantined_by=data.get('quarantined_by'), + safe_from_quarantine=data.get('safe_from_quarantine'), + upload_name=data.get('upload_name') ) + @property + def created_datetime(self) -> datetime: + """Get the creation timestamp as a datetime object""" + return datetime.fromtimestamp(self.created_ts / 1000) -@dataclass + @property + def last_access_datetime(self) -> Optional[datetime]: + """Get the last access timestamp as a datetime object""" + if self.last_access_ts is None: + return None + return datetime.fromtimestamp(self.last_access_ts / 1000) + + @property + def is_quarantined(self) -> bool: + """Check if the media is quarantined""" + return self.quarantined_by is not None + + @property + def file_extension(self) -> Optional[str]: + """Extract the file extension from upload name""" + if not self.upload_name: + return None + return self.upload_name.split('.')[-1] if '.' in self.upload_name else None + + def is_image(self) -> bool: + """Check if the media is an image""" + return self.media_type.startswith('image/') + + def get_size_formatted(self) -> str: + """Return human-readable file size""" + size = self.media_length + for unit in ['B', 'KB', 'MB', 'GB']: + if size < 1024: + return f"{size:.1f} {unit}" + size /= 1024 + return f"{size:.1f} TB" + + +@dataclass(frozen=True) class UserMediaList: - users: List[UserMedia] + """Represent the response from the media API""" + media: List[MediaItem] + next_token: int total: int - next_token: Optional[int] = None @classmethod - def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaList": - """Create UserMediaList from API response.""" - users = [UserMedia.from_api_response(user_data) for user_data in data.get("users", [])] + def from_api_response(cls, data: dict) -> "UserMediaList": + """Create a MediaApiResponse instance from API response""" + media_items = [ + MediaItem.from_api_response(item_data) for item_data in data.get('media', []) + ] return cls( - users=users, - total=data.get("total", 0), - next_token=data.get("next_token") - ) \ No newline at end of file + media=media_items, + next_token=data.get('next_token', 0), + total=data.get('total', 0) + ) + + @property + def has_more_items(self) -> bool: + """Check if there are more items to fetch""" + return len(self.media) < self.total + + @property + def image_count(self) -> int: + """Count the number of image items""" + return sum(1 for item in self.media if item.is_image()) + + @property + def quarantined_count(self) -> int: + """Count the number of quarantined items""" + return sum(1 for item in self.media if item.is_quarantined) + + @property + def video_files(self) -> List[MediaItem]: + """Get a list of video media items""" + return [item for item in self.media if item.media_type.startswith('video/')] diff --git a/src/synclean/models/user_media_stats.py b/src/synclean/models/user_media_stats.py new file mode 100644 index 0000000..5229adc --- /dev/null +++ b/src/synclean/models/user_media_stats.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass +from typing import List, Optional, Any, Dict + + +@dataclass +class UserMediaStats: + display_name: str + media_count: int + media_length: int + user_id: str + + def media_length_mb(self, precision: int = 2) -> float: + """Convert media length to MB.""" + mb = self.media_length / (1024 * 1024) + return round(mb, precision) + + @classmethod + def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaStats": + """Create UserMedia from API response.""" + return cls( + user_id=data.get("user_id"), + display_name=data.get("displayname"), + media_count=data.get("media_count"), + media_length=data.get("media_length") + ) + + + +@dataclass +class UserMediaStatsList: + users: List[UserMediaStats] + total: int + next_token: Optional[int] = None + + @classmethod + def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaStatsList": + """Create UserMediaList from API response.""" + users = [UserMediaStats.from_api_response(user_data) for user_data in data.get("users", [])] + return cls( + users=users, + total=data.get("total", 0), + next_token=data.get("next_token") + ) \ No newline at end of file diff --git a/src/synclean/service/user_service.py b/src/synclean/service/user_service.py index 367c8c9..8a4ddbd 100644 --- a/src/synclean/service/user_service.py +++ b/src/synclean/service/user_service.py @@ -1,10 +1,11 @@ from typing import Optional, List from synclean.api.users import UserAPI -from synclean.models.enums import UserMediaOrderBy, Direction, UserOrderBy -from synclean.models.pagination import UserMediaPaginationParams, UserPaginationParams -from synclean.models.user import UserList -from synclean.models.user_media import UserMediaList +from synclean.models.enums import UserMediaStatsOrderBy, Direction, UserOrderBy, UserMediaOrderBy +from synclean.models.pagination import UserMediaStatsPaginationParams, UserPaginationParams, UserMediaPaginationParams +from synclean.models.user import UserList, User +from synclean.models.user_media import UserMediaList, MediaItem +from synclean.models.user_media_stats import UserMediaStatsList, UserMediaStats class UserService: @@ -18,11 +19,11 @@ class UserService: order_by: str, direction: str, search: Optional[str] - ) -> UserMediaList | None: - order_by_enum = UserMediaOrderBy(order_by) + ) -> UserMediaStatsList | None: + order_by_enum = UserMediaStatsOrderBy(order_by) direction_enum = Direction(direction) - pagination_params = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum, search) + pagination_params = UserMediaStatsPaginationParams(limit, offset, order_by_enum, direction_enum, search) users = self.user_api.get_users_list_by_media(pagination_params) if users: @@ -46,5 +47,71 @@ class UserService: return users return None - def get_user_details_by_id(self, user_id: str): - pass \ No newline at end of file + def get_user_details_by_id(self, user_id: str) -> User | None: + user_details = self.user_api.get_user_details_by_name(user_id) + if user_details: + return user_details + + print("No user found") + return None + + def get_user_media_stats_by_id(self, user_id: str) -> UserMediaStats | None: + if user_id.startswith('@'): + user_id = user_id[1:] + if ':' in user_id: + user_id = user_id.split(':')[0] + + user_media = self.user_api.get_user_media_stats_by_id(user_id) + if user_media: + return user_media + return None + + def get_user_media_by_id( + self, + user_id: str, + limit: int, + offset: int, + order_by: str, + direction: str + ) -> UserMediaList | None: + order_by_enum = UserMediaOrderBy(order_by) + direction_enum = Direction(direction) + + pagination = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum) + + user_media = self.user_api.get_media_by_user_id(user_id, pagination) + if user_media: + return user_media + return None + + def get_user_media_by_type( + self, + user_id: str, + limit: int, + offset: int, + order_by: str, + direction: str, + media_type: str + ) -> dict[str, List[MediaItem]] | None: + order_by_enum = UserMediaOrderBy(order_by) + direction_enum = Direction(direction) + + pagination = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum) + + user_media = self.user_api.get_all_user_media(user_id, pagination) + if user_media: + filtered_media = [media for media in user_media if media.media_type.startswith(media_type)] + if filtered_media: + total_size = sum(media.media_length for media in filtered_media) + media_by_type = { + "media": filtered_media, + "total_size": total_size, + "total_count": len(filtered_media) + } + return media_by_type + else: + return None + return None + + def delete_user_media(self, avatar: str): + pass