Compare commits

...

3 commits

Author SHA1 Message Date
Hugo van Kemenade ddc80fc5c3 Update copyright year 2022-08-29 14:34:43 +03:00
Hugo van Kemenade 09fcc776a7 Refactor exceptions into package 2022-08-29 14:30:08 +03:00
Hugo van Kemenade 620323eab0 Refactor helper functions into a utils package 2022-08-29 14:28:22 +03:00
4 changed files with 265 additions and 222 deletions

View file

@ -3,7 +3,7 @@
# A Python interface to Last.fm and Libre.fm
#
# Copyright 2008-2010 Amr Hassan
# Copyright 2013-2021 hugovk
# Copyright 2013-2022 hugovk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -22,17 +22,13 @@ from __future__ import annotations
import collections
import hashlib
import html.entities
import logging
import os
import re
import shelve
import ssl
import tempfile
import time
import xml.dom
from urllib.parse import quote_plus
from xml.dom import Node, minidom
from xml.dom import minidom
import httpx
@ -43,12 +39,35 @@ except ImportError:
# Python 3.7 and lower
import importlib_metadata # type: ignore
from .exceptions import MalformedResponseError, NetworkError, PyLastError, WSError
from .utils import (
_collect_nodes,
_number,
_parse_response,
_string_output,
_unescape_htmlentity,
_unicode,
_url_safe,
cleanup_nodes,
md5,
)
__author__ = "Amr Hassan, hugovk, Mice Pápai"
__copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2021 hugovk, 2017 Mice Pápai"
__copyright__ = "Copyright (C) 2008-2010 Amr Hassan, 2013-2022 hugovk, 2017 Mice Pápai"
__license__ = "apache2"
__email__ = "amr.hassan@gmail.com"
__version__ = importlib_metadata.version(__name__)
__all__ = [
# Exceptions
MalformedResponseError,
NetworkError,
PyLastError,
WSError,
# Utils
cleanup_nodes,
md5,
]
# 1 : This error does not exist
STATUS_INVALID_SERVICE = 2
@ -938,7 +957,7 @@ class _Request:
client.close()
return response_text
def execute(self, cacheable: bool = False) -> xml.dom.minidom.Document:
def execute(self, cacheable: bool = False) -> minidom.Document:
"""Returns the XML DOM response of the POST Request from the server"""
if self.network.is_caching_enabled() and cacheable:
@ -1089,13 +1108,6 @@ Image = collections.namedtuple(
)
def _string_output(func):
def r(*args):
return str(func(*args))
return r
class _BaseObject:
"""An abstract webservices object."""
@ -1393,81 +1405,6 @@ class _Taggable(_BaseObject):
return seq
class PyLastError(Exception):
"""Generic exception raised by PyLast"""
pass
class WSError(PyLastError):
"""Exception related to the Network web service"""
def __init__(self, network, status, details) -> None:
self.status = status
self.details = details
self.network = network
@_string_output
def __str__(self) -> str:
return self.details
def get_id(self):
"""Returns the exception ID, from one of the following:
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_OPERATION_FAILED = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
STATUS_TEMPORARILY_UNAVAILABLE = 16
STATUS_LOGIN_REQUIRED = 17
STATUS_TRIAL_EXPIRED = 18
STATUS_NOT_ENOUGH_CONTENT = 20
STATUS_NOT_ENOUGH_MEMBERS = 21
STATUS_NOT_ENOUGH_FANS = 22
STATUS_NOT_ENOUGH_NEIGHBOURS = 23
STATUS_NO_PEAK_RADIO = 24
STATUS_RADIO_NOT_FOUND = 25
STATUS_API_KEY_SUSPENDED = 26
STATUS_DEPRECATED = 27
STATUS_RATE_LIMIT_EXCEEDED = 29
"""
return self.status
class MalformedResponseError(PyLastError):
"""Exception conveying a malformed response from the music network."""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return (
f"Malformed response from {self.network.name}. "
f"Underlying error: {self.underlying_error}"
)
class NetworkError(PyLastError):
"""Exception conveying a problem in sending a request to Last.fm"""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return f"NetworkError: {self.underlying_error}"
class _Opus(_Taggable):
"""An album or track."""
@ -2720,89 +2657,6 @@ class TrackSearch(_Search):
return seq
def md5(text):
"""Returns the md5 hash of a string."""
h = hashlib.md5()
h.update(_unicode(text).encode("utf-8"))
return h.hexdigest()
def _unicode(text):
if isinstance(text, bytes):
return str(text, "utf-8")
else:
return str(text)
def cleanup_nodes(doc):
"""
Remove text nodes containing only whitespace
"""
for node in doc.documentElement.childNodes:
if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace():
doc.documentElement.removeChild(node)
return doc
def _collect_nodes(
limit, sender, method_name, cacheable, params=None, stream: bool = False
):
"""
Returns a sequence of dom.Node objects about as close to limit as possible
"""
if not params:
params = sender._get_params()
def _stream_collect_nodes():
node_count = 0
page = 1
end_of_pages = False
while not end_of_pages and (not limit or (limit and node_count < limit)):
params["page"] = str(page)
tries = 1
while True:
try:
doc = sender._request(method_name, cacheable, params)
break # success
except Exception as e:
if tries >= 3:
raise PyLastError() from e
# Wait and try again
time.sleep(1)
tries += 1
doc = cleanup_nodes(doc)
# break if there are no child nodes
if not doc.documentElement.childNodes:
break
main = doc.documentElement.childNodes[0]
if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"):
total_pages = _number(
main.getAttribute("totalPages") or main.getAttribute("totalpages")
)
else:
raise PyLastError("No total pages attribute")
for node in main.childNodes:
if not node.nodeType == xml.dom.Node.TEXT_NODE and (
not limit or (node_count < limit)
):
node_count += 1
yield node
end_of_pages = page >= total_pages
page += 1
return _stream_collect_nodes() if stream else list(_stream_collect_nodes())
def _extract(node, name, index: int = 0):
"""Extracts a value from the xml string"""
@ -2878,51 +2732,3 @@ def _extract_tracks(doc, network):
artist = _extract(node, "name", 1)
seq.append(Track(artist, name, network))
return seq
def _url_safe(text):
"""Does all kinds of tricks on a text to make it safe to use in a URL."""
return quote_plus(quote_plus(str(text))).lower()
def _number(string):
"""
Extracts an int from a string.
Returns a 0 if None or an empty string was passed.
"""
if not string:
return 0
else:
try:
return int(string)
except ValueError:
return float(string)
def _unescape_htmlentity(string):
mapping = html.entities.name2codepoint
for key in mapping:
string = string.replace(f"&{key};", chr(mapping[key]))
return string
def _parse_response(response: str) -> xml.dom.minidom.Document:
response = str(response).replace("opensearch:", "")
try:
doc = minidom.parseString(response)
except xml.parsers.expat.ExpatError:
# Try again. For performance, we only remove when needed in rare cases.
doc = minidom.parseString(_remove_invalid_xml_chars(response))
return doc
def _remove_invalid_xml_chars(string: str) -> str:
return re.sub(
r"[^\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFF]+", "", string
)
# End of file

78
src/pylast/exceptions.py Normal file
View file

@ -0,0 +1,78 @@
from __future__ import annotations
from .utils import _string_output
class PyLastError(Exception):
"""Generic exception raised by PyLast"""
pass
class WSError(PyLastError):
"""Exception related to the Network web service"""
def __init__(self, network, status, details) -> None:
self.status = status
self.details = details
self.network = network
@_string_output
def __str__(self) -> str:
return self.details
def get_id(self):
"""Returns the exception ID, from one of the following:
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_OPERATION_FAILED = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
STATUS_TEMPORARILY_UNAVAILABLE = 16
STATUS_LOGIN_REQUIRED = 17
STATUS_TRIAL_EXPIRED = 18
STATUS_NOT_ENOUGH_CONTENT = 20
STATUS_NOT_ENOUGH_MEMBERS = 21
STATUS_NOT_ENOUGH_FANS = 22
STATUS_NOT_ENOUGH_NEIGHBOURS = 23
STATUS_NO_PEAK_RADIO = 24
STATUS_RADIO_NOT_FOUND = 25
STATUS_API_KEY_SUSPENDED = 26
STATUS_DEPRECATED = 27
STATUS_RATE_LIMIT_EXCEEDED = 29
"""
return self.status
class MalformedResponseError(PyLastError):
"""Exception conveying a malformed response from the music network."""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return (
f"Malformed response from {self.network.name}. "
f"Underlying error: {self.underlying_error}"
)
class NetworkError(PyLastError):
"""Exception conveying a problem in sending a request to Last.fm"""
def __init__(self, network, underlying_error) -> None:
self.network = network
self.underlying_error = underlying_error
def __str__(self) -> str:
return f"NetworkError: {self.underlying_error}"

159
src/pylast/utils.py Normal file
View file

@ -0,0 +1,159 @@
from __future__ import annotations
import hashlib
import html
import re
import time
import warnings
import xml
from urllib.parse import quote_plus
from xml.dom import Node, minidom
import pylast
def cleanup_nodes(doc: minidom.Document) -> minidom.Document:
"""
cleanup_nodes is deprecated and will be removed in pylast 6.0
"""
warnings.warn(
"cleanup_nodes is deprecated and will be removed in pylast 6.0",
DeprecationWarning,
stacklevel=2,
)
return _cleanup_nodes(doc)
def md5(text: str) -> str:
"""Returns the md5 hash of a string."""
h = hashlib.md5()
h.update(_unicode(text).encode("utf-8"))
return h.hexdigest()
def _collect_nodes(
limit, sender, method_name, cacheable, params=None, stream: bool = False
):
"""
Returns a sequence of dom.Node objects about as close to limit as possible
"""
if not params:
params = sender._get_params()
def _stream_collect_nodes():
node_count = 0
page = 1
end_of_pages = False
while not end_of_pages and (not limit or (limit and node_count < limit)):
params["page"] = str(page)
tries = 1
while True:
try:
doc = sender._request(method_name, cacheable, params)
break # success
except Exception as e:
if tries >= 3:
raise pylast.PyLastError() from e
# Wait and try again
time.sleep(1)
tries += 1
doc = _cleanup_nodes(doc)
# break if there are no child nodes
if not doc.documentElement.childNodes:
break
main = doc.documentElement.childNodes[0]
if main.hasAttribute("totalPages") or main.hasAttribute("totalpages"):
total_pages = _number(
main.getAttribute("totalPages") or main.getAttribute("totalpages")
)
else:
raise pylast.PyLastError("No total pages attribute")
for node in main.childNodes:
if not node.nodeType == xml.dom.Node.TEXT_NODE and (
not limit or (node_count < limit)
):
node_count += 1
yield node
end_of_pages = page >= total_pages
page += 1
return _stream_collect_nodes() if stream else list(_stream_collect_nodes())
def _cleanup_nodes(doc: minidom.Document) -> minidom.Document:
"""
Remove text nodes containing only whitespace
"""
for node in doc.documentElement.childNodes:
if node.nodeType == Node.TEXT_NODE and node.nodeValue.isspace():
doc.documentElement.removeChild(node)
return doc
def _number(string: str | None) -> float:
"""
Extracts an int from a string.
Returns a 0 if None or an empty string was passed.
"""
if not string:
return 0
else:
try:
return int(string)
except ValueError:
return float(string)
def _parse_response(response: str) -> xml.dom.minidom.Document:
response = str(response).replace("opensearch:", "")
try:
doc = minidom.parseString(response)
except xml.parsers.expat.ExpatError:
# Try again. For performance, we only remove when needed in rare cases.
doc = minidom.parseString(_remove_invalid_xml_chars(response))
return doc
def _remove_invalid_xml_chars(string: str) -> str:
return re.sub(
r"[^\u0009\u000A\u000D\u0020-\uD7FF\uE000-\uFFFD\u10000-\u10FFF]+", "", string
)
def _string_output(func):
def r(*args):
return str(func(*args))
return r
def _unescape_htmlentity(string: str) -> str:
mapping = html.entities.name2codepoint
for key in mapping:
string = string.replace(f"&{key};", chr(mapping[key]))
return string
def _unicode(text: bytes | str) -> str:
if isinstance(text, bytes):
return str(text, "utf-8")
else:
return str(text)
def _url_safe(text: str) -> str:
"""Does all kinds of tricks on a text to make it safe to use in a URL."""
return quote_plus(quote_plus(str(text))).lower()

View file

@ -45,7 +45,7 @@ def test_cast_and_hash(obj) -> None:
],
)
def test__remove_invalid_xml_chars(test_input: str, expected: str) -> None:
assert pylast._remove_invalid_xml_chars(test_input) == expected
assert pylast.utils._remove_invalid_xml_chars(test_input) == expected
@pytest.mark.parametrize(