Compare commits

...

3 commits

Author SHA1 Message Date
Hugo van Kemenade ddc80fc5c3 Update copyright year 2022-08-29 14:34:43 +03:00
Hugo van Kemenade 09fcc776a7 Refactor exceptions into package 2022-08-29 14:30:08 +03:00
Hugo van Kemenade 620323eab0 Refactor helper functions into a utils package 2022-08-29 14:28:22 +03:00
4 changed files with 265 additions and 222 deletions

View file

@ -3,7 +3,7 @@
# A Python interface to Last.fm and Libre.fm # A Python interface to Last.fm and Libre.fm
# #
# Copyright 2008-2010 Amr Hassan # Copyright 2008-2010 Amr Hassan
# Copyright 2013-2021 hugovk # Copyright 2013-2022 hugovk
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -22,17 +22,13 @@ from __future__ import annotations
import collections import collections
import hashlib import hashlib
import html.entities
import logging import logging
import os import os
import re
import shelve import shelve
import ssl import ssl
import tempfile import tempfile
import time import time
import xml.dom from xml.dom import minidom
from urllib.parse import quote_plus
from xml.dom import Node, minidom
import httpx import httpx
@ -43,12 +39,35 @@ except ImportError:
# Python 3.7 and lower # Python 3.7 and lower
import importlib_metadata # type: ignore import importlib_metadata # type: ignore
from .exceptions import MalformedResponseError, NetworkError, PyLastError, WSError
from .utils import (
_collect_nodes,
_number,
_parse_response,
_string_output,
_unescape_htmlentity,
_unicode,
_url_safe,
cleanup_nodes,
md5,
)
__author__ = "Amr Hassan, hugovk, Mice Pápai" __author__ = "Amr Hassan, hugovk, Mice Pápai"
__copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2021 hugovk, 2017 Mice Pápai" __copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2022 hugovk, 2017 Mice Pápai"
__license__ = "apache2" __license__ = "apache2"
__email__ = "amr.hassan@gmail.com" __email__ = "amr.hassan@gmail.com"
__version__ = importlib_metadata.version(__name__) __version__ = importlib_metadata.version(__name__)
__all__ = [
# Exceptions
MalformedResponseError,
NetworkError,
PyLastError,
WSError,
# Utils
cleanup_nodes,
md5,
]
# 1 : This error does not exist # 1 : This error does not exist
STATUS_INVALID_SERVICE = 2 STATUS_INVALID_SERVICE = 2
@ -938,7 +957,7 @@ class _Request:
client.close() client.close()
return response_text return response_text
def execute(self, cacheable: bool = False) -> xml.dom.minidom.Document: def execute(self, cacheable: bool = False) -> minidom.Document:
"""Returns the XML DOM response of the POST Request from the server""" """Returns the XML DOM response of the POST Request from the server"""
if self.network.is_caching_enabled() and cacheable: if self.network.is_caching_enabled() and cacheable:
@ -1089,13 +1108,6 @@ Image = collections.namedtuple(
) )
def _string_output(func):
def r(*args):
return str(func(*args))
return r
class _BaseObject: class _BaseObject:
"""An abstract webservices object.""" """An abstract webservices object."""
@ -1393,81 +1405,6 @@ class _Taggable(_BaseObject):
return seq return seq
class PyLastError(Exception):
"""Generic exception raised by PyLast"""
pass
class WSError(PyLastError):
"""Exception related to the Network web service"""
def __init__(self, network, status, details) -> None:
self.status = status
self.details = details
self.network = network
@_string_output
def __str__(self) -> str:
return self.details
def get_id(self):
"""Returns the exception ID, from one of the following:
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_OPERATION_FAILED = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
STATUS_TEMPORARILY_UNAVAILABLE = 16
STATUS_LOGIN_REQUIRED = 17
STATUS_TRIAL_EXPIRED = 18
STATUS_NOT_ENOUGH_CONTENT = 20
STATUS_NOT_ENOUGH_MEMBERS = 21
STATUS_NOT_ENOUGH_FANS = 22
STATUS_NOT_ENOUGH_NEIGHBOURS = 23
STATUS_NO_PEAK_RADIO = 24
STATUS_RADIO_NOT_FOUND = 25
STATUS_API_KEY_SUSPENDED = 26
STATUS_DEPRECATED = 27
STATUS_RATE_LIMIT_EXCEEDED = 29
"""
return self.status
class MalformedResponseError(PyLastError):
"""Exception conveying a malformed response from the music network."""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return (
f"Malformed response from {self.network.name}. "
f"Underlying error: {self.underlying_error}"
)
class NetworkError(PyLastError):
"""Exception conveying a problem in sending a request to Last.fm"""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return f"NetworkError: {self.underlying_error}"
class _Opus(_Taggable): class _Opus(_Taggable):
"""An album or track.""" """An album or track."""
@ -2720,89 +2657,6 @@ class TrackSearch(_Search):
return seq return seq
def md5(text):
"""Returns the md5 hash of a string."""
h = hashlib.md5()
h.update(_unicode(text).encode("utf-8"))
return h.hexdigest()
def _unicode(text):
if isinstance(text, bytes):
return str(text, "utf-8")
else:
return str(text)
def cleanup_nodes(doc):
"""
Remove text nodes containing only whitespace
"""
for node in doc.documentElement.childNodes:
if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace():
doc.documentElement.removeChild(node)
return doc
def _collect_nodes(
limit, sender, method_name, cacheable, params=None, stream: bool = False
):
"""
Returns a sequence of dom.Node objects about as close to limit as possible
"""
if not params:
params = sender._get_params()
def _stream_collect_nodes():
node_count = 0
page = 1
end_of_pages = False
while not end_of_pages and (not limit or (limit and node_count < limit)):
params["page"] = str(page)
tries = 1
while True:
try:
doc = sender._request(method_name, cacheable, params)
break # success
except Exception as e:
if tries >= 3:
raise PyLastError() from e
# Wait and try again
time.sleep(1)
tries += 1
doc = cleanup_nodes(doc)
# break if there are no child nodes
if not doc.documentElement.childNodes:
break
main = doc.documentElement.childNodes[0]
if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"):
total_pages = _number(
main.getAttribute("totalPages") or main.getAttribute("totalpages")
)
else:
raise PyLastError("No total pages attribute")
for node in main.childNodes:
if not node.nodeType == xml.dom.Node.TEXT_NODE and (
not limit or (node_count < limit)
):
node_count += 1
yield node
end_of_pages = page >= total_pages
page += 1
return _stream_collect_nodes() if stream else list(_stream_collect_nodes())
def _extract(node, name, index: int = 0): def _extract(node, name, index: int = 0):
"""Extracts a value from the xml string""" """Extracts a value from the xml string"""
@ -2878,51 +2732,3 @@ def _extract_tracks(doc, network):
artist = _extract(node, "name", 1) artist = _extract(node, "name", 1)
seq.append(Track(artist, name, network)) seq.append(Track(artist, name, network))
return seq return seq
def _url_safe(text):
"""Does all kinds of tricks on a text to make it safe to use in a URL."""
return quote_plus(quote_plus(str(text))).lower()
def _number(string):
"""
Extracts an int from a string.
Returns a 0 if None or an empty string was passed.
"""
if not string:
return 0
else:
try:
return int(string)
except ValueError:
return float(string)
def _unescape_htmlentity(string):
mapping = html.entities.name2codepoint
for key in mapping:
string = string.replace(f"&{key};", chr(mapping[key]))
return string
def _parse_response(response: str) -> xml.dom.minidom.Document:
response = str(response).replace("opensearch:", "")
try:
doc = minidom.parseString(response)
except xml.parsers.expat.ExpatError:
# Try again. For performance, we only remove when needed in rare cases.
doc = minidom.parseString(_remove_invalid_xml_chars(response))
return doc
def _remove_invalid_xml_chars(string: str) -> str:
return re.sub(
r"[^\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFF]+", "", string
)
# End of file

78
src/pylast/exceptions.py Normal file
View file

@ -0,0 +1,78 @@
from __future__ import annotations
from .utils import _string_output
class PyLastError(Exception):
"""Generic exception raised by PyLast"""
pass
class WSError(PyLastError):
"""Exception related to the Network web service"""
def __init__(self, network, status, details) -> None:
self.status = status
self.details = details
self.network = network
@_string_output
def __str__(self) -> str:
return self.details
def get_id(self):
"""Returns the exception ID, from one of the following:
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_OPERATION_FAILED = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
STATUS_TEMPORARILY_UNAVAILABLE = 16
STATUS_LOGIN_REQUIRED = 17
STATUS_TRIAL_EXPIRED = 18
STATUS_NOT_ENOUGH_CONTENT = 20
STATUS_NOT_ENOUGH_MEMBERS = 21
STATUS_NOT_ENOUGH_FANS = 22
STATUS_NOT_ENOUGH_NEIGHBOURS = 23
STATUS_NO_PEAK_RADIO = 24
STATUS_RADIO_NOT_FOUND = 25
STATUS_API_KEY_SUSPENDED = 26
STATUS_DEPRECATED = 27
STATUS_RATE_LIMIT_EXCEEDED = 29
"""
return self.status
class MalformedResponseError(PyLastError):
"""Exception conveying a malformed response from the music network."""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return (
f"Malformed response from {self.network.name}. "
f"Underlying error: {self.underlying_error}"
)
class NetworkError(PyLastError):
"""Exception conveying a problem in sending a request to Last.fm"""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return f"NetworkError: {self.underlying_error}"

159
src/pylast/utils.py Normal file
View file

@ -0,0 +1,159 @@
from __future__ import annotations
import hashlib
import html
import re
import time
import warnings
import xml
from urllib.parse import quote_plus
from xml.dom import Node, minidom
import pylast
def cleanup_nodes(doc: minidom.Document) -> minidom.Document:
"""
cleanup_nodes is deprecated and will be removed in pylast 6.0
"""
warnings.warn(
"cleanup_nodes is deprecated and will be removed in pylast 6.0",
DeprecationWarning,
stacklevel=2,
)
return _cleanup_nodes(doc)
def md5(text: str) -> str:
"""Returns the md5 hash of a string."""
h = hashlib.md5()
h.update(_unicode(text).encode("utf-8"))
return h.hexdigest()
def _collect_nodes(
limit, sender, method_name, cacheable, params=None, stream: bool = False
):
"""
Returns a sequence of dom.Node objects about as close to limit as possible
"""
if not params:
params = sender._get_params()
def _stream_collect_nodes():
node_count = 0
page = 1
end_of_pages = False
while not end_of_pages and (not limit or (limit and node_count < limit)):
params["page"] = str(page)
tries = 1
while True:
try:
doc = sender._request(method_name, cacheable, params)
break # success
except Exception as e:
if tries >= 3:
raise pylast.PyLastError() from e
# Wait and try again
time.sleep(1)
tries += 1
doc = _cleanup_nodes(doc)
# break if there are no child nodes
if not doc.documentElement.childNodes:
break
main = doc.documentElement.childNodes[0]
if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"):
total_pages = _number(
main.getAttribute("totalPages") or main.getAttribute("totalpages")
)
else:
raise pylast.PyLastError("No total pages attribute")
for node in main.childNodes:
if not node.nodeType == xml.dom.Node.TEXT_NODE and (
not limit or (node_count < limit)
):
node_count += 1
yield node
end_of_pages = page >= total_pages
page += 1
return _stream_collect_nodes() if stream else list(_stream_collect_nodes())
def _cleanup_nodes(doc: minidom.Document) -> minidom.Document:
"""
Remove text nodes containing only whitespace
"""
for node in doc.documentElement.childNodes:
if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace():
doc.documentElement.removeChild(node)
return doc
def _number(string: str | None) -> float:
"""
Extracts an int from a string.
Returns a 0 if None or an empty string was passed.
"""
if not string:
return 0
else:
try:
return int(string)
except ValueError:
return float(string)
def _parse_response(response: str) -> xml.dom.minidom.Document:
response = str(response).replace("opensearch:", "")
try:
doc = minidom.parseString(response)
except xml.parsers.expat.ExpatError:
# Try again. For performance, we only remove when needed in rare cases.
doc = minidom.parseString(_remove_invalid_xml_chars(response))
return doc
def _remove_invalid_xml_chars(string: str) -> str:
return re.sub(
r"[^\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFF]+", "", string
)
def _string_output(func):
def r(*args):
return str(func(*args))
return r
def _unescape_htmlentity(string: str) -> str:
mapping = html.entities.name2codepoint
for key in mapping:
string = string.replace(f"&{key};", chr(mapping[key]))
return string
def _unicode(text: bytes | str) -> str:
if isinstance(text, bytes):
return str(text, "utf-8")
else:
return str(text)
def _url_safe(text: str) -> str:
"""Does all kinds of tricks on a text to make it safe to use in a URL."""
return quote_plus(quote_plus(str(text))).lower()

View file

@ -45,7 +45,7 @@ def test_cast_and_hash(obj) -> None:
], ],
) )
def test__remove_invalid_xml_chars(test_input: str, expected: str) -> None: def test__remove_invalid_xml_chars(test_input: str, expected: str) -> None:
assert pylast._remove_invalid_xml_chars(test_input) == expected assert pylast.utils._remove_invalid_xml_chars(test_input) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(