28 lines
No EOL
1 KiB
Python
28 lines
No EOL
1 KiB
Python
from logging import Logger
|
|
from typing import Dict, Any
|
|
|
|
import requests
|
|
from synclean.models.config import SynapseConfig
|
|
from synclean.utils.exceptions import AuthenticationError, APIError
|
|
|
|
|
|
class SynapseApiClient:
|
|
"""Synapse API endpoints."""
|
|
|
|
def __init__(self, config: SynapseConfig) -> None:
|
|
self.config = config
|
|
self.logger = Logger(__name__)
|
|
self.session = requests.Session()
|
|
self.session.headers.update(config.headers)
|
|
|
|
def request(self, method: str, endpoint: str, params = None) -> Dict[str, Any]:
|
|
"""Send a request to the Synapse API."""
|
|
url = f"{self.config.admin_api_url}{endpoint}"
|
|
try:
|
|
response = self.session.request(method, url, params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise AuthenticationError(f"Authentication failed: {e}") from e
|
|
raise APIError(f"Network error: {e}") from e |