adding media related code

This commit is contained in:
Hirad 2025-07-23 11:53:15 +03:30
parent b803afe70c
commit f4c77481e7
8 changed files with 310 additions and 43 deletions

26
src/synclean/api/media.py Normal file
View file

@ -0,0 +1,26 @@
from urllib.parse import urlencode
from synclean.api.synapse import SynapseApiClient
from synclean.models.media import DeleteMedia
from synclean.utils.utils import convert_datetime_to_ts, convert_megabytes_to_bytes
class MediaApi:
def __init__(self, client: SynapseApiClient):
self.client = client
def delete_media_by_id(self, server_name: str, media_id: str):
response = self.client.request("DELETE", f"/v1/media/{server_name}/{media_id}")
return response["deleted_media"]
def delete_media_by_date_size(self, datetime: str, size: float = 0.0, keep_profiles: bool = True):
timestamp = convert_datetime_to_ts(datetime)
size_in_bytes = convert_megabytes_to_bytes(size)
params = {
"before_ts": timestamp,
"size_gt": size_in_bytes,
"keep_profiles": keep_profiles
}
query_string = urlencode(params)
response = self.client.request("POST", f"/v1/media/delete", query_string)
return DeleteMedia.from_api_response(response)

View file

@ -1,21 +1,24 @@
from typing import List
from synclean.api.synapse import SynapseApiClient from synclean.api.synapse import SynapseApiClient
from synclean.models.pagination import UserMediaPaginationParams, UserPaginationParams from synclean.models.pagination import UserMediaStatsPaginationParams, UserPaginationParams, UserMediaPaginationParams
from synclean.models.user import User, UserList from synclean.models.user import User, UserList
from synclean.models.user_media import UserMediaList from synclean.models.user_media import UserMediaList, MediaItem
from synclean.models.user_media_stats import UserMediaStatsList, UserMediaStats
class UserAPI: class UserAPI:
def __init__(self, client: SynapseApiClient): def __init__(self, client: SynapseApiClient):
self.client = client self.client = client
def get_users_list_by_media(self, pagination: UserMediaPaginationParams = None) -> UserMediaList: def get_users_list_by_media(self, pagination: UserMediaStatsPaginationParams = None) -> UserMediaStatsList:
if pagination is None: if pagination is None:
pagination = UserMediaPaginationParams() pagination = UserMediaStatsPaginationParams()
params = pagination.to_query_params() params = pagination.to_query_params()
response = self.client.request("GET", "/v1/statistics/users/media", params) response = self.client.request("GET", "/v1/statistics/users/media", params)
return UserMediaList.from_api_response(response) return UserMediaStatsList.from_api_response(response)
def get_users_list(self, pagination: UserPaginationParams) -> UserList: def get_users_list(self, pagination: UserPaginationParams) -> UserList:
if pagination is None: if pagination is None:
@ -30,6 +33,39 @@ class UserAPI:
response = self.client.request("GET", f"/v2/users/{username}") response = self.client.request("GET", f"/v2/users/{username}")
return User.from_api_response(response) return User.from_api_response(response)
def get_user_media_stats_by_id(self, username: str) -> UserMediaStats:
response = self.client.request("GET", f"/v1/statistics/users/media?search_term={username}")
response = response["users"][0]
return UserMediaStats.from_api_response(response)
def get_media_by_user_id(self, username: str, pagination: UserMediaPaginationParams) -> UserMediaList:
if pagination is None:
pagination = UserMediaPaginationParams
params = pagination.to_query_params()
response = self.client.request("GET", f"/v1/users/{username}/media", params)
return UserMediaList.from_api_response(response)
def get_all_user_media(self, username: str, pagination: UserMediaPaginationParams) -> List[MediaItem]:
if pagination is None:
pagination = UserMediaPaginationParams
all_media = []
pagination.offset = 0
while True:
params = pagination.to_query_params()
response = self.client.request("GET", f"/v1/users/{username}/media", params)
media_page = UserMediaList.from_api_response(response)
all_media.extend(media_page.media)
# print(f"{media_page.next_token}")
if media_page.next_token == 0:
break
pagination.offset = media_page.next_token
return all_media
def get_media_by_type(self, user_id: str, media_type: str, pagination: UserMediaPaginationParams) -> List[MediaItem]:
all_media = self.get_all_user_media(user_id, pagination)
return [item for item in all_media if item.media_type.startswith(media_type)]

View file

@ -16,13 +16,24 @@ class RoomOrderBy(Enum):
HISTORY_VISIBILITY = "history_visibility" HISTORY_VISIBILITY = "history_visibility"
STATE_EVENTS = "state_events" STATE_EVENTS = "state_events"
class UserMediaOrderBy(Enum): class UserMediaStatsOrderBy(Enum):
"""Available user ordering options for media.""" """Available user ordering options for media."""
USER_ID = "user_id" USER_ID = "user_id"
DISPLAY_NAME = "display_name" DISPLAY_NAME = "display_name"
MEDIA_LENGTH = "media_length" MEDIA_LENGTH = "media_length"
MEDIA_COUNT = "media_count" MEDIA_COUNT = "media_count"
class UserMediaOrderBy(Enum):
"""Available user ordering options for media."""
MEDIA_ID = "media_id"
UPLOAD_NAME = "upload_name"
CREATED_TS = "created_ts"
LAST_ACCESS_TS = "last_access_ts"
MEDIA_LENGTH = "media_length"
MEDIA_TYPE = "media_type"
QUARANTINED_BY = "quarantined_by"
SAFE_FROM_QUARANTINE = "safe_from_quarantine"
class UserOrderBy(Enum): class UserOrderBy(Enum):
"""Available user ordering options.""" """Available user ordering options."""
NAME = "name" NAME = "name"

View file

@ -27,3 +27,17 @@ class MediaURIs:
def total_count(self) -> int: def total_count(self) -> int:
"""Get total media count.""" """Get total media count."""
return len(self.local) + len(self.remote) return len(self.local) + len(self.remote)
@dataclass
class DeleteMedia:
deleted_media: List[str] = field(default_factory=list)
total: int = 0
@classmethod
def from_api_response(cls, data: dict) -> "DeleteMedia":
"""Create DeleteMedia from API response."""
return cls(
deleted_media=data.get("deleted_media", []),
total=data.get("total", 0)
)

View file

@ -1,6 +1,6 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Dict, Any, TypeVar, Generic from typing import Optional, Dict, Any, TypeVar, Generic
from synclean.models.enums import RoomOrderBy, Direction, UserMediaOrderBy, UserOrderBy from synclean.models.enums import RoomOrderBy, Direction, UserMediaStatsOrderBy, UserOrderBy, UserMediaOrderBy
OrderByType = TypeVar("OrderByType") OrderByType = TypeVar("OrderByType")
@ -44,4 +44,10 @@ class UserPaginationParams(PaginationParams[UserOrderBy]):
@dataclass @dataclass
class UserMediaPaginationParams(PaginationParams[UserMediaOrderBy]): class UserMediaPaginationParams(PaginationParams[UserMediaOrderBy]):
"""Pagination parameters for users based on media.""" """Pagination parameters for users based on media."""
order_by = UserOrderBy = UserMediaOrderBy.MEDIA_LENGTH order_by = UserOrderBy = UserMediaOrderBy.MEDIA_LENGTH
@dataclass
class UserMediaStatsPaginationParams(PaginationParams[UserMediaStatsOrderBy]):
"""Pagination parameters for users based on media."""
order_by = UserOrderBy = UserMediaStatsOrderBy.MEDIA_LENGTH

View file

@ -1,42 +1,106 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Optional, Any, Dict from datetime import datetime
from typing import Optional, List
@dataclass @dataclass
class UserMedia: class MediaItem:
display_name: str created_ts: int
media_count: int media_id: str
media_length: int media_length: int
user_id: str media_type: str
safe_from_quarantine: bool
def media_length_mb(self, precision: int = 2) -> float: last_access_ts: Optional[int] = None
"""Convert media length to MB.""" quarantined_by: Optional[str] = None
mb = self.media_length / (1024 * 1024) upload_name: Optional[str] = None
return round(mb, precision)
@classmethod @classmethod
def from_api_response(cls, data: Dict[str, Any]) -> "UserMedia": def from_api_response(cls, data: dict) -> "MediaItem":
"""Create UserMedia from API response.""" """Create a MediaItem instance from API response"""
return cls( return cls(
display_name=data.get("displayname"), created_ts=data.get('created_ts'),
media_count=data.get("media_count"), last_access_ts=data.get('last_access_ts'),
media_length=data.get("media_length"), media_id=data.get('media_id'),
user_id=data.get("user_id") media_length=data.get('media_length'),
media_type=data.get('media_type'),
quarantined_by=data.get('quarantined_by'),
safe_from_quarantine=data.get('safe_from_quarantine'),
upload_name=data.get('upload_name')
) )
@property
def created_datetime(self) -> datetime:
"""Get the creation timestamp as a datetime object"""
return datetime.fromtimestamp(self.created_ts / 1000)
@dataclass @property
def last_access_datetime(self) -> Optional[datetime]:
"""Get the last access timestamp as a datetime object"""
if self.last_access_ts is None:
return None
return datetime.fromtimestamp(self.last_access_ts / 1000)
@property
def is_quarantined(self) -> bool:
"""Check if the media is quarantined"""
return self.quarantined_by is not None
@property
def file_extension(self) -> Optional[str]:
"""Extract the file extension from upload name"""
if not self.upload_name:
return None
return self.upload_name.split('.')[-1] if '.' in self.upload_name else None
def is_image(self) -> bool:
"""Check if the media is an image"""
return self.media_type.startswith('image/')
def get_size_formatted(self) -> str:
"""Return human-readable file size"""
size = self.media_length
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f} {unit}"
size /= 1024
return f"{size:.1f} TB"
@dataclass(frozen=True)
class UserMediaList: class UserMediaList:
users: List[UserMedia] """Represent the response from the media API"""
media: List[MediaItem]
next_token: int
total: int total: int
next_token: Optional[int] = None
@classmethod @classmethod
def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaList": def from_api_response(cls, data: dict) -> "UserMediaList":
"""Create UserMediaList from API response.""" """Create a MediaApiResponse instance from API response"""
users = [UserMedia.from_api_response(user_data) for user_data in data.get("users", [])] media_items = [
MediaItem.from_api_response(item_data) for item_data in data.get('media', [])
]
return cls( return cls(
users=users, media=media_items,
total=data.get("total", 0), next_token=data.get('next_token', 0),
next_token=data.get("next_token") total=data.get('total', 0)
) )
@property
def has_more_items(self) -> bool:
"""Check if there are more items to fetch"""
return len(self.media) < self.total
@property
def image_count(self) -> int:
"""Count the number of image items"""
return sum(1 for item in self.media if item.is_image())
@property
def quarantined_count(self) -> int:
"""Count the number of quarantined items"""
return sum(1 for item in self.media if item.is_quarantined)
@property
def video_files(self) -> List[MediaItem]:
"""Get a list of video media items"""
return [item for item in self.media if item.media_type.startswith('video/')]

View file

@ -0,0 +1,43 @@
from dataclasses import dataclass
from typing import List, Optional, Any, Dict
@dataclass
class UserMediaStats:
display_name: str
media_count: int
media_length: int
user_id: str
def media_length_mb(self, precision: int = 2) -> float:
"""Convert media length to MB."""
mb = self.media_length / (1024 * 1024)
return round(mb, precision)
@classmethod
def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaStats":
"""Create UserMedia from API response."""
return cls(
user_id=data.get("user_id"),
display_name=data.get("displayname"),
media_count=data.get("media_count"),
media_length=data.get("media_length")
)
@dataclass
class UserMediaStatsList:
users: List[UserMediaStats]
total: int
next_token: Optional[int] = None
@classmethod
def from_api_response(cls, data: Dict[str, Any]) -> "UserMediaStatsList":
"""Create UserMediaList from API response."""
users = [UserMediaStats.from_api_response(user_data) for user_data in data.get("users", [])]
return cls(
users=users,
total=data.get("total", 0),
next_token=data.get("next_token")
)

View file

@ -1,10 +1,11 @@
from typing import Optional, List from typing import Optional, List
from synclean.api.users import UserAPI from synclean.api.users import UserAPI
from synclean.models.enums import UserMediaOrderBy, Direction, UserOrderBy from synclean.models.enums import UserMediaStatsOrderBy, Direction, UserOrderBy, UserMediaOrderBy
from synclean.models.pagination import UserMediaPaginationParams, UserPaginationParams from synclean.models.pagination import UserMediaStatsPaginationParams, UserPaginationParams, UserMediaPaginationParams
from synclean.models.user import UserList from synclean.models.user import UserList, User
from synclean.models.user_media import UserMediaList from synclean.models.user_media import UserMediaList, MediaItem
from synclean.models.user_media_stats import UserMediaStatsList, UserMediaStats
class UserService: class UserService:
@ -18,11 +19,11 @@ class UserService:
order_by: str, order_by: str,
direction: str, direction: str,
search: Optional[str] search: Optional[str]
) -> UserMediaList | None: ) -> UserMediaStatsList | None:
order_by_enum = UserMediaOrderBy(order_by) order_by_enum = UserMediaStatsOrderBy(order_by)
direction_enum = Direction(direction) direction_enum = Direction(direction)
pagination_params = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum, search) pagination_params = UserMediaStatsPaginationParams(limit, offset, order_by_enum, direction_enum, search)
users = self.user_api.get_users_list_by_media(pagination_params) users = self.user_api.get_users_list_by_media(pagination_params)
if users: if users:
@ -46,5 +47,71 @@ class UserService:
return users return users
return None return None
def get_user_details_by_id(self, user_id: str): def get_user_details_by_id(self, user_id: str) -> User | None:
pass user_details = self.user_api.get_user_details_by_name(user_id)
if user_details:
return user_details
print("No user found")
return None
def get_user_media_stats_by_id(self, user_id: str) -> UserMediaStats | None:
if user_id.startswith('@'):
user_id = user_id[1:]
if ':' in user_id:
user_id = user_id.split(':')[0]
user_media = self.user_api.get_user_media_stats_by_id(user_id)
if user_media:
return user_media
return None
def get_user_media_by_id(
self,
user_id: str,
limit: int,
offset: int,
order_by: str,
direction: str
) -> UserMediaList | None:
order_by_enum = UserMediaOrderBy(order_by)
direction_enum = Direction(direction)
pagination = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum)
user_media = self.user_api.get_media_by_user_id(user_id, pagination)
if user_media:
return user_media
return None
def get_user_media_by_type(
self,
user_id: str,
limit: int,
offset: int,
order_by: str,
direction: str,
media_type: str
) -> dict[str, List[MediaItem]] | None:
order_by_enum = UserMediaOrderBy(order_by)
direction_enum = Direction(direction)
pagination = UserMediaPaginationParams(limit, offset, order_by_enum, direction_enum)
user_media = self.user_api.get_all_user_media(user_id, pagination)
if user_media:
filtered_media = [media for media in user_media if media.media_type.startswith(media_type)]
if filtered_media:
total_size = sum(media.media_length for media in filtered_media)
media_by_type = {
"media": filtered_media,
"total_size": total_size,
"total_count": len(filtered_media)
}
return media_by_type
else:
return None
return None
def delete_user_media(self, avatar: str):
pass