From 41ce2c6489f543f846376f24e25e50732554f2c6 Mon Sep 17 00:00:00 2001 From: Amr Hassan Date: Mon, 2 Feb 2009 14:53:42 +0000 Subject: [PATCH] 0.3a in the trunk now. --- INSTALL | 2 +- changes.txt | 78 - pylast.py | 4543 +++++++++++++++++++++------------------------ setup.py => setup | 2 + 4 files changed, 2092 insertions(+), 2533 deletions(-) delete mode 100644 changes.txt rename setup.py => setup (93%) mode change 100644 => 100755 diff --git a/INSTALL b/INSTALL index c1f5063..c0d0f8f 100644 --- a/INSTALL +++ b/INSTALL @@ -2,4 +2,4 @@ Installation Instructions ========================= 1. cd into this directory -2. Execute "python setup.py install". you'll probably need to add sudo to the beginning. +2. Execute "python setup install". you'll probably need to add sudo to the beginning. diff --git a/changes.txt b/changes.txt deleted file mode 100644 index 592a129..0000000 --- a/changes.txt +++ /dev/null @@ -1,78 +0,0 @@ -0.2.18 - * Track.getAlbum doesn't crash when the album could not be determined. - * Most of User functions now exist in the new AuthenticatedUser due to a web services limitation. - * Track.getImage is removed, it's the same as Track.getAlbum().getImage(). - -0.2.17 - * All the getTopTags and getTopTagsWithCounts return an empty sequence if failed instead of None. - * toStr() is now less crashy. - * fixed User.getPastEvents. - * new: User.compareWithUser, User.getRecommendedEvents, Library.addAlbum, Library.addArtist, Library.addTrack. - * created AlbumSearch class. - -0.2.15 - * API Breakage, changed the design of Asynchronizer.async_call. - * Added: Artist.getTopTagsWithCounts, Track.getTopTagsWithCounts and User.getTopTagsWithCounts. - * Added: Artist.getTopFansWithWeights, Track.getTopFansWithWeights. - -0.2.14 - * Changed the version numbering system. - * Fixed Authentication and MD5 with non-ASCII characters (issue #7) - * Created UserPlaylist class. - * User.getPlaylistIDs is now deprecated. - * track.addToPlaylist is now deprecated. - * User.fetchPlaylist is now deprecated. - * Created UserPlaylistCreator class. - -0.2b13 - * fixed: User.get_friends limit parameter (issue #5) - * changed: using hashlib module instead of deprecated md5 module. - -0.2b12 - * fixed: some unicode problems. - * fix: a typo in the function name Exceptionable.clear_errors() - * fixed: User.fetchPlaylist (issue #3) - * api change: User.getPlaylistsIDs is now User.getPlaylistsData, and User.getPlaylistsIDs returns a list of IDs now. - -0.2b10 - * fixed: getURL now returns the link with it's parameters double quoted as it should. - * Artist.getTopTags, Track.getTopTags, Album.getTopTags all now have a limit argument. - * added: ServiceException.getID. - * changed: Taggable.addTags posts each tag separately. - -0.2b9 - * fixed: crashes when adding a job to Asynchronizer and starting it when it had already started. - -0.2b8 - * Asynchronizer.async_call now accepts None as callback. - * moved all the tag related functions to a separate Taggable class. - * added Taggable.setTags - -0.2b7 - * Added Track.getWikiPublishedDate, Track.getSummary, Track.getContent. - * fixes - -0.2b6 - * fixes - -0.2b5 - * Moved changes to an external file "changes.txt". - * Added new webservices as: Track.getId, Track.getDuration, Track.getListenerCount, Track.getPlayCount, - Track.getAlbumName, Track.getAlbum, Track.getImage, Event.share. - * Reverted where all objects retrieve all available metadata on the server from the server, now optional. - - -0.2b4 - * Added Track.getArtistName. - * Added numerous functions to User, Making use of the new User.getInfo webservice. - * Every object now retrieves all the metadata from the webservices even the trivial ones - like the album name or artist for proper casing. Use the object's attributes (like Album.artist_name and - Album.title instead of Album.getArtistName() and Album.getTitle() if you can't afford the delay - caused by retrieving the data from a remote server. - -0.2b3 - * Added a little work-around on python's threading.Thread to make Asynchronizer objects - able to restart more than once. - -0.2b2 - * All http values are now url-encoded. illegal characters (like '&') will no longer cause a crash. diff --git a/pylast.py b/pylast.py index 4d7c5a1..7c3b8e4 100644 --- a/pylast.py +++ b/pylast.py @@ -20,25 +20,36 @@ # # http://code.google.com/p/pylast/ -__name__ = 'pyLast' -__version__ = '0.2.18' +# TODO: +# - go over all the apis from the official documentation. [done] +# - write better documentation of each function as you go. [done] +# - internal caching [done]. +# - easy proxy support [done]. +# - implement call delay. +# - better adherence to naming guidelines (http://www.python.org/doc/essays/styleguide.html) [done] +# - implement __repr__, __eq__ and __ne__ everywhere possible. [done] + +__name__ = 'pylast' +__version__ = '0.3.0a' __doc__ = 'A Python interface to the Last.fm API.' __author__ = 'Amr Hassan' __email__ = 'amr.hassan@gmail.com' -API_SERVER = 'ws.audioscrobbler.com' -API_SUBDIR = '/2.0/' + +__proxy = None +__proxy_enabled = False +__cache_dir = None +__cache_enabled = False +__last_call_time = 0 import hashlib import httplib import urllib import threading from xml.dom import minidom +import os +import time -USE_SILENT_EXCEPTIONS = True - -STATUS_OK = 'ok' -STATUS_FAILED = 'failed' STATUS_INVALID_SERVICE = 2 STATUS_INVALID_METHOD = 3 STATUS_AUTH_FAILED = 4 @@ -84,35 +95,371 @@ DOMAIN_CHINESE = 'cn.last.fm' USER_MALE = 'Male' USER_FEMALE = 'Female' -def warn(message): - print "Warning:", message - -def warn_deprecated(old, new): - warn('%s is now deprecated. Use %s instead.' %(old, new)) - -def _status2str(lastfm_status): +class _ThreadedCall(threading.Thread): + """Facilitates calling a function on another thread.""" - statuses = { - STATUS_OK: 'OK', - STATUS_FAILED: 'Failed', - STATUS_INVALID_METHOD: 'Invalid Method - No method with that name in this package.', - STATUS_TOKEN_ERROR: 'Token Error - There was an error granting the Request token.', - STATUS_INVALID_SERVICE: 'Invalid Service - This service does not exist.', - STATUS_AUTH_FAILED: 'Authentication Failed - You do not have permissions to access the service.', - STATUS_INVALID_FORMAT: "Invalid Format - This service doesn't exist in that format.", - STATUS_INVALID_PARAMS: 'Invalid Parameters - Your Request is missing a required parameter.', - STATUS_INVALID_RESOURCE: 'Invalid Resource Specified.', - STATUS_INVALID_SK: 'Invalid Session Key - Please re-authenticate.', - STATUS_INVALID_API_KEY: 'Invalid API Key - You must be granted a valid key by last.fm.', - STATUS_OFFLINE: 'Service Offline - This service is temporarily offline. Try again later.', - STATUS_SUBSCRIBERS_ONLY: 'Subscribers Only - This service is only available to paid last.fm subscribers.', - STATUS_TOKEN_UNAUTHORIZED: 'Unauthorized Token - This token has not been authorized.', - STATUS_TOKEN_EXPIRED: 'Token Expired -This token has expired.', - STATUS_INVALID_SIGNATURE: 'Invalid method signature supplied.', - } + def __init__(self, sender, funct, funct_args, callback, callback_args): + + threading.Thread.__init__(self) + + self.funct = funct + self.funct_args = funct_args + self.callback = callback + self.callback_args = callback_args + + self.sender = sender - return statuses[int(lastfm_status)] + def run(self): + + output = [] + + if self.funct: + if self.funct_args: + output = self.funct(*self.funct_args) + else: + output = self.funct() + + if self.callback: + if self.callback_args: + self.callback(self.sender, output, *self.callback_args) + else: + self.callback(self.sender, output) + +class _Request(object): + """Representing an abstract web service operation.""" + + HOST_NAME = 'ws.audioscrobbler.com' + HOST_SUBDIR = '/2.0/' + + def __init__(self, method_name, params, api_key, api_secret = None, session_key = None): + self.params = params + self.api_secret = api_secret + + self.params["api_key"] = api_key + self.params["method"] = method_name + self.params["sk"] = session_key + + if session_key: + self.sign_it() + + def sign_it(self): + """Sign this request.""" + + if not "api_sig" in self.params.keys(): + self.params['api_sig'] = self._get_signature() + + def _get_signature(self): + """Returns a 32-character hexadecimal md5 hash of the signature string.""" + + keys = self.params.keys()[:] + + keys.sort() + + string = unicode() + + for name in keys: + string += name + string += self.params[name] + + string += self.api_secret + + return md5(string.encode('utf-8')) + + def _get_cache_key(self): + """The cache key is a string of concatenated sorted names and values.""" + + keys = self.params.keys() + keys.sort() + + cache_key = str() + + for key in keys: + if key != "api_sig" and key != "api_key" and key != "sk": + cache_key += urllib.quote_plus(key) + urllib.quote_plus(urllib.quote_plus(self.params[key])) + + return cache_key + + def _is_cached(self): + """Returns True if the request is available in the cache.""" + + return os.path.exists(os.path.join(_get_cache_dir(), self._get_cache_key())) + + def _get_cached_response(self): + """Returns a file object of the cached response.""" + + if not self._is_cached(): + response = self._download_response() + + response_file = open(os.path.join(_get_cache_dir(), self._get_cache_key()), "w") + response_file.write(response) + response_file.close() + + return open(os.path.join(_get_cache_dir(), self._get_cache_key()), "r").read() + + def _download_response(self): + """Returns a response body string from the server.""" + + # Delay the call if necessary + _delay_call() + + data = [] + for name in self.params.keys(): + data.append('='.join((name, urllib.quote_plus(self.params[name].encode('utf-8'))))) + data = '&'.join(data) + + headers = { + "Content-type": "application/x-www-form-urlencoded", + 'Accept-Charset': 'utf-8', + 'User-Agent': __name__ + '/' + __version__ + } + + if is_proxy_enabled(): + conn = httplib.HTTPConnection(host = _get_proxy()[0], port = _get_proxy()[1]) + conn.request(method='POST', url="http://" + HOST_NAME + HOST_SUBDIR, + body=data, headers=headers) + else: + conn = httplib.HTTPConnection(host=self.HOST_NAME) + conn.request(method='POST', url=self.HOST_SUBDIR, body=data, headers=headers) + + response = conn.getresponse().read() + self._check_response_for_errors(response) + return response + + def execute(self, cacheable = False): + """Returns the XML DOM response of the POST Request from the server""" + + if is_caching_enabled() and cacheable: + response = self._get_cached_response() + else: + response = self._download_response() + + return minidom.parseString(response) + + def _check_response_for_errors(self, response): + """Checks the response for errors and raises one if any exists.""" + + doc = minidom.parseString(response) + e = doc.getElementsByTagName('lfm')[0] + + if e.getAttribute('status') != "ok": + e = doc.getElementsByTagName('error')[0] + status = e.getAttribute('code') + details = e.firstChild.data.strip() + raise ServiceException(status, details) + +class SessionKeyGenerator(object): + """Methods of generating a session key: + 1) Web Authentication: + a. sg = SessionKeyGenerator(API_KEY, API_SECRET) + b. url = sg.get_web_auth_url() + c. Ask the user to open the url and authorize you, and wait for it. + d. session_key = sg.get_web_auth_session_key(url) + 2) Username and Password Authentication: + a. username = raw_input("Please enter your username: ") + b. md5_password = pylast.md5(raw_input("Please enter your password: ") + c. session_key = SessionKeyGenerator(API_KEY, API_SECRET).get_session_key(username, md5_password) + + A session key's lifetime is infinie, unless the user provokes the rights of the given API Key. + """ + + def __init__(self, api_key, api_secret): + self.api_key = api_key + self.api_secret = api_secret + self.web_auth_tokens = {} + + def _get_web_auth_token(self): + """Retrieves a token from Last.fm for web authentication. + The token then has to be authorized from getAuthURL before creating session. + """ + + request = _Request('auth.getToken', dict(), self.api_key, self.api_secret) + request.sign_it() + + doc = request.execute() + + e = doc.getElementsByTagName('token')[0] + return e.firstChild.data + + def get_web_auth_url(self): + """The user must open this page, and you first, then call get_web_auth_session_key(url) after that.""" + + token = self._get_web_auth_token() + + url = 'http://www.last.fm/api/auth/?api_key=%(api)s&token=%(token)s' % \ + {'api': self.api_key, 'token': token} + + self.web_auth_tokens[url] = token + + return url + + def get_web_auth_session_key(self, url): + """Retrieves the session key of a web authorization process by its url.""" + + if url in self.web_auth_tokens.keys(): + token = self.web_auth_tokens[url] + else: + token = "" #that's gonna raise a ServiceException of an unauthorized token when the request is executed. + + request = _Request('auth.getSession', {'token': token}, self.api_key, self.api_secret) + request.sign_it() + + doc = request.execute() + + return doc.getElementsByTagName('key')[0].firstChild.data + + def get_session_key(self, username, md5_password): + """Retrieve a session key with a username and a md5 hash of the user's password.""" + + params = {"username": username, "authToken": md5(username + md5_password)} + request = _Request("auth.getMobileSession", params, self.api_key, self.api_secret) + request.sign_it() + + doc = request.execute() + + return doc.getElementsByTagName('key')[0].firstChild.data + +class _BaseObject(object): + """An abstract webservices object.""" + + def __init__(self, api_key, api_secret, session_key): + + self.api_key = api_key + self.api_secret = api_secret + self.session_key = session_key + + self.auth_data = (self.api_key, self.api_secret, self.session_key) + + def _request(self, method_name, cacheable = False, params = None): + if not params: + params = self._get_params() + + return _Request(method_name, params, *self.auth_data).execute(cacheable) + + def _get_params(): + """Returns the most common set of parameters between all objects.""" + + return dict() + +class _Taggable(object): + """Common functions for classes with tags.""" + + def __init__(self, ws_prefix): + self.ws_prefix = ws_prefix + + def add_tags(self, *tags): + """Adds one or several tags. + * *tags: Any number of tag names or Tag objects. + """ + + for tag in tags: + self._add_tag(tag) + + def _add_tag(self, tag): + """Adds one or several tags. + * tag: one tag name or a Tag object. + """ + + if isinstance(tag, Tag): + tag = tag.get_name() + + params = self._get_params() + params['tags'] = unicode(tag) + + self._request(self.ws_prefix + '.addTags', False, params) + + def _remove_tag(self, single_tag): + """Remove a user's tag from this object.""" + + if isinstance(single_tag, Tag): + single_tag = single_tag.get_name() + + params = self._get_params() + params['tag'] = unicode(single_tag) + + self._request(self.ws_prefix + '.removeTag', False, params) + + def get_tags(self): + """Returns a list of the tags set by the user to this object.""" + + # Uncacheable because it can be dynamically changed by the user. + params = self._get_params() + doc = _Request(self.ws_prefix + '.getTags', params, *self.auth_data).execute(cacheable = False) + + tag_names = _extract_all(doc, 'name') + tags = [] + for tag in tag_names: + tags.append(Tag(tag, *self.auth_data)) + + return tags + + def remove_tags(self, *tags): + """Removes one or several tags from this object. + * *tags: Any number of tag names or Tag objects. + """ + + for tag in tags: + self._remove_tag(tag) + + def clear_tags(self): + """Clears all the user-set tags. """ + + self.remove_tags(*(self.get_tags())) + + def set_tags(self, *tags): + """Sets this object's tags to only those tags. + * *tags: any number of tag names. + """ + + c_old_tags = [] + old_tags = [] + c_new_tags = [] + new_tags = [] + + to_remove = [] + to_add = [] + + tags_on_server = self.get_tags() + if tags_on_server == None: + return + + for tag in tags_on_server: + c_old_tags.append(tag.get_name().lower()) + old_tags.append(tag.get_name()) + + for tag in tags: + c_new_tags.append(tag.lower()) + new_tags.append(tag) + + for i in range(0, len(old_tags)): + if not c_old_tags[i] in c_new_tags: + to_remove.append(old_tags[i]) + + for i in range(0, len(new_tags)): + if not c_new_tags[i] in c_old_tags: + to_add.append(new_tags[i]) + + self.remove_tags(*to_remove) + self.add_tags(*to_add) + + def get_top_tags(self, limit = None): + """Returns a list of the most frequently used Tags on this object.""" + + doc = self._request(self.ws_prefix + '.getTopTags', True) + + elements = doc.getElementsByTagName('tag') + list = [] + + for element in elements: + if limit and len(list) >= limit: + break + tag_name = _extract(element, 'name') + tagcount = _extract(element, 'count') + + list.append(TopItem(Tag(tag_name, *self.auth_data), tagcount)) + + return list + class ServiceException(Exception): """Exception related to the Last.fm web service""" @@ -121,9 +468,9 @@ class ServiceException(Exception): self._details = details def __str__(self): - return "%s: %s." %(_status2str(self._lastfm_status), self._details) + return self._details - def getID(self): + def get_id(self): """Returns the exception ID, from one of the following: STATUS_INVALID_SERVICE = 2 STATUS_INVALID_METHOD = 3 @@ -142,1000 +489,259 @@ class ServiceException(Exception): return self._lastfm_status -class _ThreadedFunct(threading.Thread): - """A class used by _Asynchronizer.""" +class TopItem (object): + """A top item in a list that has a weight. Returned from functions like get_top_tracks() and get_top_artists().""" - def __init__(self, sender, funct, funct_args, callback, callback_args): - threading.Thread.__init__(self) + def __init__(self, item, weight): + object.__init__(self) - self.funct = funct - self.funct_args = funct_args - self.callback = callback - self.callback_args = callback_args - - self.sender = sender + self.item = item + self.weight = _number(weight) - def run(self): - if self.funct: - if self.funct_args: - output = self.funct(*self.funct_args) - else: - output = self.funct() - - if self.callback: - if self.callback_args: - self.callback(self.sender, output, *self.callback_args) - else: - self.callback(self.sender, output) - -class _Asynchronizer(object): - """This class helps performing asynchronous operations less painfully.""" + def __repr__(self): + return "Item: " + self.get_item().__repr__() + ", Weight: " + str(self.get_weight()) - def async_call(self, call, callback = None, call_args = None, callback_args = None): - """This is the function for setting up an asynchronous operation. - * call: The function to call asynchronously. - * callback: The function to call after the operation is complete. - * call_args: A sequence of args to be passed to call. - * callback_args: A sequence of args to be passed to callback. - """ + def get_item(self): + """Returns the item.""" - thread = _ThreadedFunct(self, call, call_args, callback, callback_args) - thread.start() - -class _Exceptionable(object): - """An abstract class that adds support for error reporting.""" + return self.item - def __init__(self, parent = None): - self.__errors = [] - self.__raising_exceptions = not USE_SILENT_EXCEPTIONS + def get_weight(self): + """Returns the weight of the itme in the list.""" - #An _Exceptionable parent to mirror all the errors to automatically. - self._parent = parent - - def last_error(self): - """Returns the last error, or None.""" - - if len(self.__errors): - return self.__errors[len(self.__errors) -1] - else: - return None - - def _report_error(self, exception): - - if self.get_raising_exceptions(): - raise exception - - self.__errors.append(exception) - - if self._parent: - self._parent._mirror_errors(self) - - def _mirror_errors(self, exceptional): - """Mirrors the errors from another Exceptional object""" - - for e in exceptional.get_all_errors(): - self._report_error(e) - - def clear_errors(self): - """Clear the error log for this object.""" - self.__errors = [] - - def get_all_errors(self): - """Return a list of exceptions raised about this object.""" - - return self.__errors - - def enable_raising_exceptions(self): - """Enable raising the exceptions about this object.""" - self.__raising_exceptions = True - - def disable_raising_exceptions(self): - """Disable raising the exceptions about this object, but still report them to the log.""" - self.__raising_exceptions = False - - def get_raising_exceptions(self): - """Get the status on raising exceptions.""" - return self.__raising_exceptions - -class _Request(_Exceptionable): - """Representing an abstract web service operation.""" - - def __init__(self, parent, method_name, api_key, params, sign_it = False, secret = None): - _Exceptionable.__init__(self, parent) - - self.method_name = method_name - self.api_key = api_key - self.params = params - self.sign_it = sign_it - self.secret = secret - - def _getSignature(self): - """Returns a 32-character hexadecimal md5 hash of the signature string.""" - - keys = self.params.keys()[:] - - keys.sort() - - string = unicode() - - for name in keys: - string += name - string += self.params[name] - - string += self.secret - - hash = hashlib.md5() - hash.update(string.encode('utf-8')) - - return hash.hexdigest() - - def execute(self): - """Returns the XML DOM response of the POST Request from the server""" - - self.params['api_key'] = self.api_key - self.params['method'] = self.method_name - if self.sign_it: - self.params['api_sig'] = self._getSignature() - - data = [] - for name in self.params.keys(): - data.append('='.join((name, urllib.quote_plus(self.params[name].encode('utf-8'))))) - - try: - conn = httplib.HTTPConnection(API_SERVER) - headers = { - "Content-type": "application/x-www-form-urlencoded", - 'Accept-Charset': 'utf-8', - 'User-Agent': __name__ + '/' + __version__ - } - conn.request('POST', API_SUBDIR, '&'.join(data), headers) - response = conn.getresponse() - except Exception, e: - self._report_error(e) - return None - - doc = minidom.parse(response) - - if self.__checkResponseStatus(doc) == STATUS_OK: - return doc - - return None - - def __checkResponseStatus(self, xml_dom): - """Checks the response for errors and raises one if any exists.""" - - doc = xml_dom - e = doc.getElementsByTagName('lfm')[0] - - if e.getAttribute('status') == STATUS_OK: - return STATUS_OK - else: - e = doc.getElementsByTagName('error')[0] - status = e.getAttribute('code') - details = e.firstChild.data.strip() - self._report_error(ServiceException(status, details)) - -class SessionGenerator(_Asynchronizer, _Exceptionable): - """Steps of authorization: - 1. Retrieve token: token = getToken() - 2. Authorize this token by openning the web page at the URL returned by getAuthURL(token) - 3. Call getSessionKey(token) to collect the session parameters. - - A session key's lifetime is infinie, unless the user provokes the rights of the given API Key. - """ - - def __init__(self, api_key, secret): - _Asynchronizer.__init__(self) - _Exceptionable.__init__(self) - - self.api_key = api_key - self.secret = secret - - - def getToken(self): - """Retrieves a token from Last.fm. - The token then has to be authorized from getAuthURL before creating session. - """ - - doc = _Request(self, 'auth.getToken', self.api_key, dict(), True, self.secret).execute() - - if not doc: - return None - - e = doc.getElementsByTagName('token')[0] - return e.firstChild.data - - - def getAuthURL(self, token): - """The user must open this page, and authorize the given token.""" - - url = 'http://www.last.fm/api/auth/?api_key=%(api)s&token=%(token)s' % \ - {'api': self.api_key, 'token': token} - return url - - def getSessionData(self, token): - """Retrieves session data for the authorized token. - getSessionData(token) --> {'name': str, 'key': str, 'subscriber': bool} - - [DEPRECATED] - Use SessionGenerator.getSessionKey and AuthenticatedUser instead. - """ - - warn_deprecated("SessionGenerator.getSessionData", "SessionGenerator.getSessionKey and AuthenticatedUser") - - params = {'token': token} - doc = _Request(self, 'auth.getSession', self.api_key, params, True, self.secret).execute() - - if not doc: - return None - - name_e = doc.getElementsByTagName('name')[0] - key_e = doc.getElementsByTagName('key')[0] - subscriber_e = doc.getElementsByTagName('subscriber')[0] - - data = {} - data['name'] = name_e.firstChild.data - data['key'] = key_e.firstChild.data - data['subscriber'] = bool(subscriber_e.firstChild.data) - - return data - - def getSessionKey(self, token): - """Retrieves the authorized session key. - """ - - params = {'token': token} - doc = _Request(self, 'auth.getSession', self.api_key, params, True, self.secret).execute() - - if not doc: - return None - - return doc.getElementsByTagName('key')[0].firstChild.data - -class _BaseObject(_Asynchronizer, _Exceptionable): - """An abstract webservices object.""" - - def __init__(self, api_key, secret, session_key): - _Asynchronizer.__init__(self) - _Exceptionable.__init__(self) - - self.api_key = api_key - self.secret = secret - self.session_key = session_key - - self.auth_data = (self.api_key, self.secret, self.session_key) - - def _extract(self, node, name, index = 0): - """Extracts a value from the xml string""" - - nodes = node.getElementsByTagName(name) - - if len(nodes): - if nodes[index].firstChild: - return nodes[index].firstChild.data.strip() - else: - return None - - def _extract_all(self, node, name, limit_count = None): - """Extracts all the values from the xml string. returning a list.""" - - list = [] - - for i in range(0, len(node.getElementsByTagName(name))): - if len(list) == limit_count: - break - - list.append(self._extract(node, name, i)) - - return list - - def _get_url_safe(self, text): - - if type(text) == type(unicode()): - text = text.encode('utf-8') - - return urllib.quote_plus(urllib.quote_plus(text)) - - def toStr(): - return "" - - def _hash(self): - return self.toStr().lower() - - def __str__(self): - return self.toStr() - -class _Cacheable(object): - """Common functions for objects that can have cached metadata""" - - def __init__(self): - - self._cached_info = None - - def _getInfo(self): - """Abstract function, should be inherited""" - - def _getCachedInfo(self, *key_names): - """Returns the cached collection of info regarding this object - If not available in cache, it will be downloaded first. - """ - - if not self._cached_info: - self._cached_info = self._getInfo() - - if not self._cached_info: - return None - - value_or_container = self._cached_info - for key in key_names: - - if not len(value_or_container): - return None - - value_or_container = value_or_container[key] - - return value_or_container + return self.weight -class _Taggable(object): - """Common functions for classes with tags.""" +class LibraryItem (object): + """An item in a User's Library. It could be an artist, an album or a track.""" - def __init__(self, ws_prefix): - self.ws_prefix = ws_prefix + def __init__(self, item, playcount, tagcount): + object.__init__(self) + + self.item = item + self.playcount = _number(playcount) + self.tagcount = _number(tagcount) - def addTags(self, *tags): - """Adds one or several tags. - * *tags: Any number of tag names or Tag objects. - """ - - for tag in tags: - self._addTag(tag) - + def __repr__(self): + return "Item: " + self.get_item().__repr__() + ", Playcount: " + str(self.get_playcount()) + ", Tagcount: " + str(self.get_tagcount()) - def _addTag(self, tag): - """Adds one or several tags. - * tag: one tag name or a Tag object. - """ + def get_item(self): + """Returns the itme.""" - if isinstance(tag, Tag): - tag = tag.getName() - - params = self._getParams() - params['tags'] = unicode(tag) - - _Request(self, self.ws_prefix + '.addTags', self.api_key, params, True, self.secret).execute() + return self.item - def _removeTag(self, single_tag): - """Remove a user's tag from this object.""" + def get_playcount(self): + """Returns the item's playcount in the Library.""" - if isinstance(single_tag, Tag): - single_tag = single_tag.getName() + return self.playcount - params = self._getParams() - params['tag'] = unicode(single_tag) + def get_tagcount(self): + """Returns the item's tagcount in the Library.""" - _Request(self, self.ws_prefix + '.removeTag', self.api_key, params, True, self.secret).execute() + return self.tagcount - def getTags(self): - """Returns a list of the user-set tags to this object.""" - - params = self._getParams() - doc = _Request(self, self.ws_prefix + '.getTags', self.api_key, params, True, self.secret).execute() - - if not doc: - return None - - tag_names = self._extract_all(doc, 'name') - tags = [] - for tag in tag_names: - tags.append(Tag(tag, *self.auth_data)) - - return tags - - def removeTags(self, *tags): - """Removes one or several tags from this object. - * *tags: Any number of tag names or Tag objects. - """ - - for tag in tags: - self._removeTag(tag) - - def clearTags(self): - """Clears all the user-set tags. """ - - self.removeTags(*(self.getTags())) - - def setTags(self, *tags): - """Sets this object's tags to only those tags. - * *tags: any number of tag names. - """ - - c_old_tags = [] - old_tags = [] - c_new_tags = [] - new_tags = [] - - to_remove = [] - to_add = [] - - tags_on_server = self.getTags() - if tags_on_server == None: - return - - for tag in tags_on_server: - c_old_tags.append(tag.getName().lower()) - old_tags.append(tag.getName()) - - for tag in tags: - c_new_tags.append(tag.lower()) - new_tags.append(tag) - - for i in range(0, len(old_tags)): - if not c_old_tags[i] in c_new_tags: - to_remove.append(old_tags[i]) - - for i in range(0, len(new_tags)): - if not c_new_tags[i] in c_old_tags: - to_add.append(new_tags[i]) - - self.removeTags(*to_remove) - self.addTags(*to_add) - -class Album(_BaseObject, _Cacheable, _Taggable): +class Album(_BaseObject, _Taggable): """A Last.fm album.""" - def __init__(self, artist_name, album_title, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - _Cacheable.__init__(self) + def __init__(self, artist, title, api_key, api_secret, session_key): + """ + Create an album instance. + # Parameters: + * artist str|Artist: An artist name or an Artist object. + * title str: The album title. + """ + + _BaseObject.__init__(self, api_key, api_secret, session_key) _Taggable.__init__(self, 'album') - self.artist_name = artist_name - self.title = album_title + if isinstance(artist, Artist): + self.artist = artist + else: + self.artist = Artist(artist, *self.auth_data) - self._cached_info = None - - def _getParams(self): - return {'artist': self.artist_name, 'album': self.title, 'sk': self.session_key} + self.title = title - def _getInfo(self): - """Returns a dictionary with various metadata values.""" - - params = self._getParams() - - doc = _Request(self, 'album.getInfo', self.api_key, params).execute() - - if not doc: - return None - - data = {} - - data['name'] = self._extract(doc, 'name') - data['artist'] = self._extract(doc, 'artist') - data['id'] = self._extract(doc, 'id') - data['release_date'] = self._extract(doc, 'releasedate') - data['images'] = self._extract_all(doc, 'image') - data['listeners'] = self._extract(doc, 'listeners') - data['play_count'] = self._extract(doc, 'playcount') - - tags_element = doc.getElementsByTagName('toptags')[0] - data['top_tags'] = self._extract_all(tags_element, 'name') - - return data + def __repr__(self): + return self.get_artist().get_name().encode('utf-8') + ' - ' + self.get_title().encode('utf-8') - def getArtist(self): - """Returns the associated Artist object. """ - - return Artist(self.getArtistName(), *self.auth_data) + def __eq__(self, other): + return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower()) - def getArtistName(self, from_server = False): - """Returns the artist name. - * from_server: If set to True, the value will be retrieved from the server. - """ - - if from_server: - return self._getCachedInfo('artist') - else: - return self.artist_name + def __ne__(self, other): + return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower()) - def getTitle(self, from_server = False): - """Returns the album title. - * from_server: If set to True, the value will be retrieved from the server. - """ - - if from_server: - return self._getCachedInfo('name') - else: - return self.title + def _get_params(self): + return {'artist': self.get_artist().get_name(), 'album': self.get_title(), } - def getName(self, from_server = False): - """Returns the album title (alias to Album.getTitle). - * from_server: If set to True, the value will be retrieved from the server. - """ + def get_artist(self): + """Returns the associated Artist object.""" - return self.getTitle(from_server) + return self.artist - def getReleaseDate(self): + def get_title(self): + """Returns the album title.""" + + return self.title + + def get_name(self): + """Returns the album title (alias to Album.get_title).""" + + return self.get_title() + + def get_release_date(self): """Retruns the release date of the album.""" - return self._getCachedInfo('release_date') + return _extract(self._request("album.getInfo", cacheable = True), "releasedate") - def getImage(self, size = IMAGE_EXTRA_LARGE): + def get_image_url(self, size = IMAGE_EXTRA_LARGE): """Returns the associated image URL. - * size: The image size. Possible values: - o IMAGE_EXTRA_LARGE - o IMAGE_LARGE - o IMAGE_MEDIUM - o IMAGE_SMALL + # Parameters: + * size int: The image size. Possible values: + o IMAGE_EXTRA_LARGE + o IMAGE_LARGE + o IMAGE_MEDIUM + o IMAGE_SMALL """ - return self._getCachedInfo('images', size) + return _extract_all(self._request("album.getInfo", cacheable = True), 'image')[size] - def getID(self): - """Returns the Last.fm ID. """ + def get_id(self): + """Returns the Last.fm ID.""" - return self._getCachedInfo('id') + return _extract(self._request("album.getInfo", cacheable = True), "id") - def getPlayCount(self): + def get_playcount(self): """Returns the number of plays on Last.fm.""" - return int(self._getCachedInfo('play_count')) + return _number(_extract(self._request("album.getInfo", cacheable = True), "playcount")) - def getListenerCount(self): + def get_listener_count(self): """Returns the number of liteners on Last.fm.""" - return int(self._getCachedInfo('listeners')) + return _number(_extract(self._request("album.getInfo", cacheable = True), "listeners")) - def getTopTags(self, limit = None): - """Returns a list of the most-applied tags to this album. """ + def get_top_tags(self, limit = None): + """Returns a list of the most-applied tags to this album.""" - #Web services currently broken. - #TODO: add getTopTagsWithCounts + # BROKEN: Web service is currently broken. - l = [] - for tag in self._getCachedInfo('top_tags'): - if limit and len(l) >= limit: - break - - l.append(Tag(tag, *self.auth_data)) - - return l + return None - def fetchPlaylist(self): - """Returns the list of Tracks on this album. """ + def get_tracks(self): + """Returns the list of Tracks on this album.""" - uri = 'lastfm://playlist/album/%s' %self.getID() + uri = 'lastfm://playlist/album/%s' %self.get_id() - return Playlist(uri, *self.auth_data).fetch() + return XSPF(uri, *self.auth_data).get_tracks() - def getURL(self, domain_name = DOMAIN_ENGLISH): + def get_mbid(self): + """Returns the MusicBrainz id of the album.""" + + return _extract(self._request("album.getInfo", cacheable = True), "mbid") + + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the album page on Last.fm. - * domain_name: Last.fm's language domain. Possible values: - o DOMAIN_ENGLISH - o DOMAIN_GERMAN - o DOMAIN_SPANISH - o DOMAIN_FRENCH - o DOMAIN_ITALIAN - o DOMAIN_POLISH - o DOMAIN_PORTUGUESE - o DOMAIN_SWEDISH - o DOMAIN_TURKISH - o DOMAIN_RUSSIAN - o DOMAIN_JAPANESE - o DOMAIN_CHINESE + # Parameters: + * domain_name str: Last.fm's language domain. Possible values: + o DOMAIN_ENGLISH + o DOMAIN_GERMAN + o DOMAIN_SPANISH + o DOMAIN_FRENCH + o DOMAIN_ITALIAN + o DOMAIN_POLISH + o DOMAIN_PORTUGUESE + o DOMAIN_SWEDISH + o DOMAIN_TURKISH + o DOMAIN_RUSSIAN + o DOMAIN_JAPANESE + o DOMAIN_CHINESE """ url = 'http://%(domain)s/music/%(artist)s/%(album)s' - artist = self._get_url_safe(self.getArtist().getName()) - album = self._get_url_safe(self.getTitle()) + artist = _get_url_safe(self.get_artist().get_name()) + album = _get_url_safe(self.get_title()) return url %{'domain': domain_name, 'artist': artist, 'album': album} - def toStr(self): - """Returns a string representation of the object.""" - - return self.getArtist().getName().encode('utf-8') + ' - ' + self.getTitle().encode('utf-8') - -class Track(_BaseObject, _Cacheable, _Taggable): - """A Last.fm track.""" - - def __init__(self, artist_name, title, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - _Cacheable.__init__(self) - _Taggable.__init__(self, 'track') - - self.artist_name = artist_name - self.title = title - - self._cached_info = None - - def _getParams(self): - return {'sk': self.session_key, 'artist': self.artist_name, 'track': self.title} - - def _getInfo(self): - """Returns a dictionary with vairous metadata values about this track.""" - - params = self._getParams() - doc = _Request(self, 'track.getInfo', self.api_key, params).execute() - - if not doc: - return None - - data = {} - - data['id'] = self._extract(doc, 'id') - data['title'] = self._extract(doc, 'name') - data['duration'] = self._extract(doc, 'duration') - data['listener_count'] = self._extract(doc, 'listeners') - data['play_count'] = self._extract(doc, 'playcount') - data['artist_name'] = self._extract(doc, 'name', 1) - data['album_name'] = self._extract(doc, 'title') - data['images'] = self._extract_all(doc, 'image') - - tags_element = doc.getElementsByTagName('toptags')[0] - top_tags = self._extract_all(tags_element, 'name') - - data['top_tags'] = [] - - for tag in top_tags: - data['top_tags'].append(Tag(tag, *self.auth_data)) - - if len(doc.getElementsByTagName('wiki')) > 0: - wiki_element = doc.getElementsByTagName('wiki')[0] - data['wiki'] = {} - data['wiki']['published_date'] = self._extract(wiki_element, 'published') - data['wiki']['summary'] = self._extract(wiki_element, 'summary') - data['wiki']['content'] = self._extract(wiki_element, 'content') - - return data - - def getArtist(self, from_server = False): - """Returns the associated Artist object. - * from_server: If set to True, the artist name will be retrieved from the server. - """ - - return Artist(self.getArtistName(from_server), *self.auth_data) - - def getArtistName(self, from_server = False): - """Returns the name of the artist. - * from_server: If set to True, the value will be retrieved from the server. - """ - - if from_server: - return self._getCachedInfo('artist_name') - else: - return self.artist_name - - def getTitle(self, from_server = False): - """Returns the track title. - * from_server: If set to True, the value will be retrieved from the server. - """ - - if from_server: - return self._getCachedInfo('title') - else: - return self.title - - def getName(self, from_server = False): - """Returns the track title (alias to Track.getTitle). - * from_server: If set to True, the value will be retrieved from the server. - """ - - return self.getTitle(from_server) - - def getID(self): - """Returns the track id on Last.fm.""" - - return self._getCachedInfo('id') - - def getDuration(self): - """Returns the track duration.""" - - return int(self._getCachedInfo('duration')) - - def getListenerCount(self): - """Returns the listener count.""" - - return int(self._getCachedInfo('listener_count')) - - def getPlayCount(self): - """Returns the play count.""" - - return int(self._getCachedInfo('play_count')) - - def getAlbumName(self): - """Returns the name of the album.""" - - return self._getCachedInfo('album_name') - - def getAlbum(self): - """Returns the album object of this track.""" - - if self.getAlbumName(): - return Album(self.getArtistName(), self.getAlbumName(), *self.auth_data) - - def addToPlaylist(self, playlist_id): - """Adds this track to a user playlist. - * playlist_id: The unique playlist ID. - - [DEPRECATED] - Use UserPlaylist.addTrack instead. - """ - - warn_deprecated('Track.addToPlaylist', 'UserPlaylist.addTrack') - - params = self._getParams() - params['playlistID'] = unicode(playlist_id) - - _Request(self, 'playlist.addTrack', self.api_key, params, True, self.secret).execute() - - def love(self): - """Adds the track to the user's loved tracks. """ - - params = self._getParams() - _Request(self, 'track.love', self.api_key, params, True, self.secret).execute() - - def ban(self): - """Ban this track from ever playing on the radio. """ - - params = self._getParams() - _Request(self, 'track.ban', self.api_key, params, True, self.secret).execute() - - def getSimilar(self): - """Returns similar tracks for this track on Last.fm, based on listening data. """ - - params = self._getParams() - doc = _Request(self, 'track.getSimilar', self.api_key, params).execute() - - if not doc: - return None - - tracks = doc.getElementsByTagName('track') - - data = [] - for track in tracks: - extra_info = {} - - title = self._extract(track, 'name', 0) - artist = self._extract(track, 'name', 1) - - data.append(Track(artist, title, *self.auth_data)) - - return data - - def getTopFans(self, limit = None): - """Returns the top fans for this track on Last.fm. """ - - pairs = self.getTopFansWithWeights(limit) - - if not pairs: - return None - - list = [] - for p in pairs: - list.append(pairs[0]) - - return list - - def getTopFansWithWeights(self, limit = None): - """Returns the top fans for this track as a sequence of (User, weight). """ - - params = self._getParams() - doc = _Request(self, 'track.getTopFans', self.api_key, params).execute() - - if not doc: - return None - - list = [] - - elements = doc.getElementsByTagName('user') - - for element in elements: - if limit and len(list) >= limit: - break - - name = self._extract(element, 'name') - weight = self._extract(element, 'weight') - - list.append((User(name, *self.auth_data), weight)) - - return list - - def getTopTags(self, limit = None): - """Returns the top tags for this track on Last.fm, ordered by tag count.""" - - pairs = self.getTopTagsWithCounts(limit) - - if not pairs: - return [] - - list = [] - for pair in pairs: - list.append(pair[0]) - - return list - - def getTopTagsWithCounts(self, limit = None): - """Returns the top tags for this track on Last.fm, ordered by tag count as a sequence of (Tag, tag_count) tuples. """ - - params = self._getParams() - doc = _Request(self, 'track.getTopTags', self.api_key, params).execute() - - if not doc: - return [] - - list = [] - elements = doc.getElementsByTagName('tag') - - for element in elements: - if limit and len(list) >= limit: - break - - tag_name = self._extract(element, 'name') - tag_count = self._extract(element, 'count') - - list.append((Tag(tag_name, *self.auth_data), tag_count)) - - return list - - def share(self, users, message = None): - """Shares this track (sends out recommendations). - * users: A list that can contain usernames, emails, User objects, or all of them. - * message: A message to include in the recommendation message. - """ - - #last.fm currently accepts a max of 10 recipient at a time - while(len(users) > 10): - section = users[0:9] - users = users[9:] - self.share(section, message) - - nusers = [] - for user in users: - if isinstance(user, User): - nusers.append(user.getName()) - else: - nusers.append(user) - - params = self._getParams() - recipients = ','.join(nusers) - params['recipient'] = recipients - if message: params['message'] = unicode(message) - - _Request(self, 'track.share', self.api_key, params, True, self.secret).execute() - - def getURL(self, domain_name = DOMAIN_ENGLISH): - """Returns the url of the track page on Last.fm. - * domain_name: Last.fm's language domain. Possible values: - o DOMAIN_ENGLISH - o DOMAIN_GERMAN - o DOMAIN_SPANISH - o DOMAIN_FRENCH - o DOMAIN_ITALIAN - o DOMAIN_POLISH - o DOMAIN_PORTUGUESE - o DOMAIN_SWEDISH - o DOMAIN_TURKISH - o DOMAIN_RUSSIAN - o DOMAIN_JAPANESE - o DOMAIN_CHINESE - """ - url = 'http://%(domain)s/music/%(artist)s/_/%(title)s' - - artist = self._get_url_safe(self.getArtist().getName()) - title = self._get_url_safe(self.getTitle()) - - return url %{'domain': domain_name, 'artist': artist, 'title': title} - - def getWikiPublishedDate(self): - """Returns the date of publishing the wiki content.""" - - return self._getCachedInfo('wiki', 'published_date') - - def getWikiContent(self): - """Returns the full wiki content.""" - - return self._getCachedInfo('wiki', 'content') - - def getWikiSummary(self): - """Returns the wiki summary.""" - - return self._getCachedInfo('wiki', 'summary') - - def toStr(self): - """Returns a string representation of the object.""" - - return self.getArtist().getName().encode('utf-8') + ' - ' + self.getTitle().encode('utf-8') - -class Artist(_BaseObject, _Cacheable, _Taggable): +class Artist(_BaseObject, _Taggable): """A Last.fm artist.""" - def __init__(self, artist_name, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - _Cacheable.__init__(self) + def __init__(self, name, api_key, api_secret, session_key): + """Create an artist object. + # Parameters: + * name str: The artist's name. + """ + + _BaseObject.__init__(self, api_key, api_secret, session_key) _Taggable.__init__(self, 'artist') - self.name = artist_name - - def _getParams(self): - return {'sk': self.session_key, 'artist': self.name} + self.name = name - def _getInfo(self): - """Get the metadata for an artist on Last.fm, Includes biography""" - - params = self._getParams() - doc = _Request(self, 'artist.getInfo', self.api_key, params).execute() - - if not doc: - return None - - data = {} - - data['name'] = self._extract(doc, 'name') - data['images'] = self._extract_all(doc, 'image', 3) - data['streamable'] = self._extract(doc, 'streamable') - data['listeners'] = self._extract(doc, 'listeners') - data['play_count'] = self._extract(doc, 'playcount') - bio = {} - bio['published'] = self._extract(doc, 'published') - bio['summary'] = self._extract(doc, 'summary') - bio['content'] = self._extract(doc, 'content') - data['bio'] = bio - - return data + def __repr__(self): + return self.get_name().encode('utf-8') - def getName(self, from_server = False): - """Returns the name of the artist. - * from_server: If set to True, the value will be retrieved from the server. - """ - - if from_server: - return self._getCachedInfo('name') - else: - return self.name + def __eq__(self, other): + return self.get_name().lower() == other.get_name().lower() - def getImage(self, size = IMAGE_LARGE): + def __ne__(self, other): + return self.get_name().lower() != other.get_name().lower() + + def _get_params(self): + return {'artist': self.get_name()} + + def get_name(self): + """Returns the name of the artist.""" + + return self.name + + def get_image_url(self, size = IMAGE_LARGE): """Returns the associated image URL. - * size: The image size. Possible values: - o IMAGE_LARGE - o IMAGE_MEDIUM - o IMAGE_SMALL + # Parameters: + * size int: The image size. Possible values: + o IMAGE_LARGE + o IMAGE_MEDIUM + o IMAGE_SMALL """ - return self._getCachedInfo('images', size) + return _extract_all(self._request("artist.getInfo", True), "image")[size] - def getPlayCount(self): - """Returns the number of plays on Last.fm. """ + def get_playcount(self): + """Returns the number of plays on Last.fm.""" - return int(self._getCachedInfo('play_count')) + return _number(_extract(self._request("artist.getInfo", True), "playcount")) - def getListenerCount(self): - """Returns the number of liteners on Last.fm. """ + def get_listener_count(self): + """Returns the number of liteners on Last.fm.""" - return int(self._getCachedInfo('listeners')) + return _number(_extract(self._request("artist.getInfo", True), "listeners")) - def getBioPublishedDate(self): - """Returns the date on which the artist's biography was published. """ + def is_streamable(self): + """Returns True if the artist is streamable.""" - return self._getCachedInfo('bio', 'published') + return bool(_number(_extract(self._request("artist.getInfo", True), "streamable"))) - def getBioSummary(self): - """Returns the summary of the artist's biography. """ + def get_bio_published_date(self): + """Returns the date on which the artist's biography was published.""" - return self._getCachedInfo('bio', 'summary') + return _extract(self._request("artist.getInfo", True), "published") - def getBioContent(self): - """Returns the content of the artist's biography. """ + def get_bio_summary(self): + """Returns the summary of the artist's biography.""" - return self._getCachedInfo('bio', 'content') + return _extract(self._request("artist.getInfo", True), "summary") - def getEvents(self): - """Returns a list of the upcoming Events for this artist. """ + def get_bio_content(self): + """Returns the content of the artist's biography.""" - params = self._getParams() - doc = _Request(self, 'artist.getEvents', self.api_key, params).execute() + return _extract(self._request("artist.getInfo", True), "content") + + def get_upcoming_events(self): + """Returns a list of the upcoming Events for this artist.""" - ids = self._extract_all(doc, 'id') + doc = self._request('artist.getEvents', True) + + ids = _extract_all(doc, 'id') events = [] for id in ids: @@ -1143,67 +749,63 @@ class Artist(_BaseObject, _Cacheable, _Taggable): return events - def getSimilar(self, limit = None): - """Returns the similar artists on Last.fm. - * limit: The limit of similar artists to retrieve. - """ + def get_similar(self, limit = None): + """Returns the similar artists on Last.fm.""" - params = self._getParams() + params = self._get_params() if limit: params['limit'] = unicode(limit) - doc = _Request(self, 'artist.getSimilar', self.api_key, params).execute() + doc = self._request('artist.getSimilar', True, params) - if not doc: - return None - - names = self._extract_all(doc, 'name') + names = _extract_all(doc, 'name') artists = [] for name in names: artists.append(Artist(name, *self.auth_data)) return artists - - def getTopAlbums(self): - """Returns a list of the top Albums by this artist on Last.fm. """ + + def get_top_albums(self): + """Retuns a list of the top albums.""" - params = self._getParams() - doc = _Request(self, 'artist.getTopAlbums', self.api_key, params).execute() - - if not doc: - return None - - names = self._extract_all(doc, 'name') - - albums = [] - for name in names: - albums.append(Album(self.getName(), name, *self.auth_data)) - - return albums - - def getTopFans(self, limit = None): - """Returns a list of the Users who listened to this artist the most. """ - - pairs = self.getTopFansWithWeights(limit) - - if not pairs: - return None + doc = self._request('artist.getTopAlbums', True) list = [] - for p in pairs: - list.append(pairs[0]) + + for node in doc.getElementsByTagName("album"): + name = _extract(node, "name") + artist = _extract(node, "name", 1) + playcount = _extract(node, "playcount") + + list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) + + return list + + def get_top_tracks(self): + """Returns a list of the most played Tracks by this artist.""" + + doc = self._request("artist.getTopTracks", True) + + list = [] + for track in doc.getElementsByTagName('track'): + + title = _extract(track, "name") + artist = _extract(track, "name", 1) + playcount = _number(_extract(track, "playcount")) + + list.append( TopItem(Track(artist, title, *self.auth_data), playcount) ) return list - def getTopFansWithWeights(self, limit = None): - """Returns a list of the Users who listened to this artist the most as a sequence of (User, weight).""" + def get_top_fans(self, limit = None): + """Returns a list of the Users who played this artist the most. + # Parameters: + * limit int: Max elements. + """ - params = self._getParams() - doc = _Request(self, 'artist.getTopFans', self.api_key, params).execute() - - if not doc: - return None + params = self._get_params() + doc = self._request('artist.getTopFans', True) list = [] @@ -1213,73 +815,18 @@ class Artist(_BaseObject, _Cacheable, _Taggable): if limit and len(list) >= limit: break - name = self._extract(element, 'name') - weight = self._extract(element, 'weight') + name = _extract(element, 'name') + weight = _number(_extract(element, 'weight')) - list.append((User(name, *self.auth_data), weight)) - - return list - - def getTopTags(self, limit = None): - """Returns a list of the most frequently used Tags on this artist. """ - - pairs = self.getTopTagsWithCounts(limit) - - if not pairs: - return [] - - list = [] - for pair in pairs: - list.append(pair[0]) - - return list - - def getTopTagsWithCounts(self, limit = None): - """Returns a list of tuples (Tag, tag_count) of the most frequently used Tags on this artist. """ - - params = self._getParams() - doc = _Request(self, 'artist.getTopTags', self.api_key, params).execute() - - if not doc: - return [] - - elements = doc.getElementsByTagName('tag') - list = [] - - for element in elements: - if limit and len(list) >= limit: - break - tag_name = self._extract(element, 'name') - tag_count = self._extract(element, 'count') - - list.append((Tag(tag_name, *self.auth_data), tag_count)) - - return list - - def getTopTracks(self): - """Returns a list of the most listened to Tracks by this artist. """ - - params = self._getParams() - doc = _Request(self, 'artist.getTopTracks', self.api_key, params).execute() - - if not doc: - return None - - list = [] - - for track in doc.getElementsByTagName('track'): - t = {} - title = self._extract(track, 'name') - artist = self.getName() - - list.append(Track(artist, title, *self.auth_data)) + list.append(TopItem(User(name, *self.auth_data), weight)) return list def share(self, users, message = None): """Shares this artist (sends out recommendations). - * users: A list that can contain usernames, emails, User objects, or all of them. - * message: A message to include in the recommendation message. + # Parameters: + * users [User|str,]: A list that can contain usernames, emails, User objects, or all of them. + * message str: A message to include in the recommendation message. """ #last.fm currently accepts a max of 10 recipient at a time @@ -1291,19 +838,20 @@ class Artist(_BaseObject, _Cacheable, _Taggable): nusers = [] for user in users: if isinstance(user, User): - nusers.append(user.getName()) + nusers.append(user.get_name()) else: nusers.append(user) - params = self._getParams() + params = self._get_params() recipients = ','.join(nusers) params['recipient'] = recipients if message: params['message'] = unicode(message) - _Request(self, 'artist.share', self.api_key, params, True, self.secret).execute() + self._request('artist.share', False, params) - def getURL(self, domain_name = DOMAIN_ENGLISH): + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the artist page on Last.fm. + # Parameters: * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN @@ -1321,26 +869,30 @@ class Artist(_BaseObject, _Cacheable, _Taggable): url = 'http://%(domain)s/music/%(artist)s' - artist = self._get_url_safe(self.getName()) + artist = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'artist': artist} - - def toStr(self): - """Returns a string representation of the object.""" - - return self.getName().encode('utf-8') -class Event(_BaseObject, _Cacheable): + +class Event(_BaseObject): """A Last.fm event.""" - def __init__(self, event_id, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - _Cacheable.__init__(self) + def __init__(self, event_id, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) self.id = unicode(event_id) - def _getParams(self): - return {'sk': self.session_key, 'event': self.getID()} + def __repr__(self): + return "Event #" + self.get_id() + + def __eq__(self, other): + return self.get_id() == other.get_id() + + def __ne__(self, other): + return self.get_id() != other.get_id() + + def _get_params(self): + return {'event': self.get_id()} def attend(self, attending_status): """Sets the attending status. @@ -1350,71 +902,34 @@ class Event(_BaseObject, _Cacheable): o EVENT_NOT_ATTENDING """ - params = self._getParams() + params = self._get_params() params['status'] = unicode(attending_status) - doc = _Request(self, 'event.attend', self.api_key, params, True, self.secret).execute() + doc = self._request('event.attend', False, params) - def _getInfo(self): - """Get the metadata for an event on Last.fm - Includes attendance and lineup information""" - - params = self._getParams() - - doc = _Request(self, 'event.getInfo', self.api_key, params).execute() - - if not doc: - return None - - data = {} - data['title'] = self._extract(doc, 'title') - artists = [] - for i in range(0, len(doc.getElementsByTagName('artist'))): - artists.append(self._extract(doc, 'artist', i)) - data['artists'] = artists - data['headliner'] = self._extract(doc, 'headliner') - - venue = {} - venue['name'] = self._extract(doc, 'name') - venue['city'] = self._extract(doc, 'city') - venue['country'] = self._extract(doc, 'country') - venue['street'] = self._extract(doc, 'street') - venue['postal_code'] = self._extract(doc, 'postalcode') - - geo = {} - geo['lat'] = self._extract(doc, 'geo:lat') - geo['long'] = self._extract(doc, 'geo:long') - - venue['geo'] = geo - venue['time_zone'] = self._extract(doc, 'timezone') - - data['venue'] = venue - data['description'] = self._extract(doc, 'description') - data['images'] = self._extract_all(doc, 'image') - data['attendance'] = self._extract(doc, 'attendance') - data['reviews'] = self._extract(doc, 'reviews') - - - return data - - def getID(self): + def get_id(self): """Returns the id of the event on Last.fm. """ return self.id - def getTitle(self): + def get_title(self): """Returns the title of the event. """ - return self._getCachedInfo('title') + doc = self._request("event.getInfo", True) + + return _extract(doc, "title") - def getHeadliner(self): + def get_headliner(self): """Returns the headliner of the event. """ - return self._getCachedInfo('headliner') + doc = self._request("event.getInfo", True) + + return Artist(_extract(doc, "headliner"), *self.auth_data) - def getArtists(self): + def get_artists(self): """Returns a list of the participating Artists. """ - names = self._getCachedInfo('artists') + doc = self._request("event.getInfo", True) + names = _extract_all(doc, "artist") artists = [] for name in names: @@ -1422,48 +937,31 @@ class Event(_BaseObject, _Cacheable): return artists - def getVenueName(self): - """Returns the name of the venue where the event is held. """ + def get_venue(self): + """Returns the venue where the event is held.""" - return self._getCachedInfo('venue', 'name') + doc = self._request("event.getInfo", True) + + venue_url = _extract(doc, "url") + venue_id = _number(venue_url[venue_url.rfind("/") + 1:]) + + return Venue(venue_id, *self.auth_data) - def getCityName(self): - """Returns the name of the city where the event is held. """ + def get_start_date(self): + """Returns the date when the event starts.""" - return self._getCachedInfo('venue', 'city') - - def getCountryName(self): - """Returns the name of the country where the event is held. """ + doc = self._request("event.getInfo", True) - return self._getCachedInfo('venue', 'country') - - def getPostalCode(self): - """Returns the postal code of where the event is held. """ + return _extract(doc, "startDate") - return self._getCachedInfo('venue', 'postal_code') - - def getStreetName(self): - """Returns the name of the street where the event is held. """ - - return self._getCachedInfo('venue', 'street') - - def getGeoPoint(self): - """Returns a tuple of latitude and longitude values of where the event is held. """ - - i = (self._getCachedInfo('venue', 'geo', 'lat'), self._getCachedInfo('venue', 'geo', 'long')) - return i - - def getTimeZone(self): - """Returns the timezone of where the event is held. """ - - return self._getCachedInfo('venue', 'time_zone') - - def getDescription(self): + def get_description(self): """Returns the description of the event. """ - return self._getCachedInfo('description') + doc = self._request("event.getInfo", True) + + return _extract(doc, "description") - def getImage(self, size = IMAGE_LARGE): + def get_image_url(self, size = IMAGE_LARGE): """Returns the associated image URL. * size: The image size. Possible values: o IMAGE_LARGE @@ -1471,20 +969,25 @@ class Event(_BaseObject, _Cacheable): o IMAGE_SMALL """ + doc = self._request("event.getInfo", True) - return self._getCachedInfo('images', size) + return _extract_all(doc, "image")[size] - def getAttendanceCount(self): + def get_attendance_count(self): """Returns the number of attending people. """ - return self._getCachedInfo('attendance') + doc = self._request("event.getInfo", True) + + return _number(_extract(doc, "attendance")) - def getReviewCount(self): + def get_review_count(self): """Returns the number of available reviews for this event. """ - return self._getCachedInfo('reviews') + doc = self._request("event.getInfo", True) + + return _number(_extract(doc, "reviews")) - def getURL(self, domain_name = DOMAIN_ENGLISH): + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the event page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH @@ -1503,7 +1006,7 @@ class Event(_BaseObject, _Cacheable): url = 'http://%(domain)s/event/%(id)s' - return url %{'domain': domain_name, 'id': self.getID()} + return url %{'domain': domain_name, 'id': self.get_id()} def share(self, users, message = None): """Shares this event (sends out recommendations). @@ -1520,97 +1023,80 @@ class Event(_BaseObject, _Cacheable): nusers = [] for user in users: if isinstance(user, User): - nusers.append(user.getName()) + nusers.append(user.get_name()) else: nusers.append(user) - params = self._getParams() + params = self._get_params() recipients = ','.join(nusers) params['recipient'] = recipients if message: params['message'] = unicode(message) - _Request(self, 'event.share', self.api_key, params, True, self.secret).execute() - - def toStr(self): - """Returns a string representation of the object.""" - - sa = "" - artists = self.getArtists() - for i in range(0, len(artists)): - if i == 0: - sa = artists[i].getName().encode('utf-8') - continue - elif i< len(artists)-1: - sa += ', ' - sa += artists[i].getName().encode('utf-8') - continue - elif i == len(artists) - 1: - sa += ' and ' - sa += artists[i].getName().encode('utf-8') - - return "%(title)s: %(artists)s at %(place)s" %{'title': self.getTitle().encode('utf-8'), 'artists': sa, 'place': self.getVenueName().encode('utf-8')} + self._request('event.share', False, params) + class Country(_BaseObject): """A country at Last.fm.""" - # TODO geo.getEvents - - def __init__(self, country_name, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) + def __init__(self, name, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) - self.name = country_name + self.name = name - def _getParams(self): - return {'country': self.name} + def __repr__(self): + return self.get_name().encode('utf-8') - def __str__(self): - return self.toStr() + def __eq__(self, other): + self.get_name().lower() == other.get_name().lower() - def getName(self): + def __ne__(self, other): + self.get_name() != other.get_name() + + def _get_params(self): + return {'country': self.get_name()} + + def _get_name_from_code(self, alpha2code): + # TODO: Have this function lookup the alpha-2 code and return the country name. + + return alpha2code + + def get_name(self): """Returns the country name. """ return self.name - def getTopArtists(self): - """Returns a tuple of the most popular Artists in the country, ordered by popularity. """ + def get_top_artists(self): + """Returns a sequence of the most played artists.""" - params = self._getParams() - doc = _Request(self, 'geo.getTopArtists', self.api_key, params).execute() + doc = self._request('geo.getTopArtists', True) - if not doc: - return None + list = [] + for node in doc.getElementsByTagName("artist"): + name = _extract(node, 'name') + playcount = _extract(node, "playcount") - names = self._extract_all(doc, 'name') + list.append(TopItem(Artist(name, *self.auth_data), playcount)) - artists = [] - for name in names: - artists.append(Artist(name, *self.auth_data)) - - return artists + return list - def getTopTracks(self, location = None): - """Returns a tuple of the most popular Tracks in the country, ordered by popularity. - * location: A metro name, to fetch the charts for (must be within the country specified). - """ + def get_top_tracks(self): + """Returns a sequence of the most played tracks""" - params = self._getParams() - if location: - params['location'] = unicode(location) - - doc = _Request(self, 'geo.getTopTracks', self.api_key, params).execute() + doc = self._request("geo.getTopTracks", True) list = [] for n in doc.getElementsByTagName('track'): - title = self._extract(n, 'name') - artist = self._extract(n, 'name', 1) + title = _extract(n, 'name') + artist = _extract(n, 'name', 1) + playcount = _number(_extract(n, "playcount")) - list.append(Track(artist, title, *self.auth_data)) + list.append( TopItem(Track(artist, title, *self.auth_data), playcount)) return list - def getURL(self, domain_name = DOMAIN_ENGLISH): + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the event page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH @@ -1629,120 +1115,302 @@ class Country(_BaseObject): url = 'http://%(domain)s/place/%(country_name)s' - country_name = self._get_url_safe(self.getName()) + country_name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'country_name': country_name} - def toStr(self): - """Returns a string representation of the object.""" + +class Library(_BaseObject): + """A user's Last.fm library.""" + + def __init__(self, user, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) - return self.getName().encode('utf-8') - -class Group(_BaseObject): - """A Last.fm group.""" - - def __init__(self, group_name, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) + if isinstance(user, User): + self.user = user + else: + self.user = User(user, *self.auth_data) - self.name = group_name + self._albums_index = 0 + self._artists_index = 0 + self._tracks_index = 0 - def _getParams(self): - return {'group': self.name} + def __repr__(self): + return self.get_user().__repr__() + "'s Library" - def getName(self): - """Returns the group name. """ - return self.name + def _get_params(self): + return {'user': self.user.get_name()} - def getTopWeeklyAlbums(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Albums in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList. - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. + def get_user(self): + """Returns the user who owns this library.""" + + return self.user + + def add_album(self, album): + """Add an album to this library.""" + + params = self._get_params() + params["artist"] = album.get_artist.get_name() + params["album"] = album.get_name() + + self._request("library.addAlbum", False, params) + + def add_artist(self, artist): + """Add an artist to this library.""" + + params = self._get_params() + params["artist"] = artist.get_name() + + self._request("library.addArtist", False, params) + + def add_track(self, track): + """Add a track to this library.""" + + params = self._get_prams() + params["track"] = track.get_title() + + self._request("library.addTrack", False, params) + + def _get_albums_pagecount(self): + """Returns the number of album pages in this library.""" + + doc = self._request("library.get_albums", True) + + return _number(doc.getElementsByTagName("albums")[0].getAttribute("totalPages")) + + def is_end_of_albums(self): + """Returns True when the last page of albums has ben retrieved.""" + + if self._albums_index >= self._get_albums_pagecount(): + return True + else: + return False + + def _get_artists_pagecount(self): + """Returns the number of artist pages in this library.""" + + doc = self._request("library.getArtists", True) + + return _number(doc.getElementsByTagName("artists")[0].getAttribute("totalPages")) + + def is_end_of_artists(self): + """Returns True when the last page of artists has ben retrieved.""" + + if self._artists_index >= self._get_artists_pagecount(): + return True + else: + return False + + def _get_tracks_pagecount(self): + """Returns the number of track pages in this library.""" + + doc = self._request("library.getTracks", True) + + return _number(doc.getElementsByTagName("tracks")[0].getAttribute("totalPages")) + + def is_end_of_tracks(self): + """Returns True when the last page of tracks has ben retrieved.""" + + if self._tracks_index >= self._get_tracks_pagecount(): + return True + else: + return False + + def get_albums_page(self): + """Retreives the next page of albums in the Library. Returns a sequence of TopItem objects. + Use the function extract_items like extract_items(Library.get_albums_page()) to return only a sequence of + Album objects with no extra data. + + Example: + ------- + library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) + + while not library.is_end_of_albums(): + print library.get_albums_page() """ - params = self._getParams() - if from_value and to_value: - params['from'] = unicode(from_value) - params['to'] = unicode(to_value) + self._albums_index += 1 - doc = _Request(self, 'group.getWeeklyAlbumChart', self.api_key, params).execute() + params = self._get_params() + params["page"] = str(self._albums_index) list = [] - - for n in doc.getElementsByTagName('album'): - artist = self._extract(n, 'artist') - name = self._extract(n, 'name') + doc = self._request("library.get_albums", True, params) + for node in doc.getElementsByTagName("album"): + name = _extract(node, "name") + artist = _extract(node, "name", 1) + playcount = _number(_extract(node, "playcount")) + tagcount = _number(_extract(node, "tagcount")) - list.append(Album(artist, name, *self.auth_data)) + list.append(LibraryItem(Album(artist, name, *self.auth_data), playcount, tagcount)) return list - def getTopWeeklyArtists(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Artists in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList. - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. + def get_artists_page(self): + """Retreives the next page of artists in the Library. Returns a sequence of TopItem objects. + Use the function extract_items like extract_items(Library.get_artists_page()) to return only a sequence of + Artist objects with no extra data. + + Example: + ------- + library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) + + while not library.is_end_of_artists(): + print library.get_artists_page() """ + + self._artists_index += 1 - params = self._getParams() - if from_value and to_value: - params['from'] = unicode(from_value) - params['to'] = unicode(to_value) - - doc = _Request(self, 'group.getWeeklyArtistChart', self.api_key, params).execute() + params = self._get_params() + params["page"] = str(self._artists_index) list = [] - - names = self._extract_all(doc, 'name') - for name in names: - list.append(Artist(name, *self.auth_data)) + doc = self._request("library.getArtists", True, params) + for node in doc.getElementsByTagName("artist"): + name = _extract(node, "name") + + playcount = _number(_extract(node, "playcount")) + tagcount = _number(_extract(node, "tagcount")) + + list.append(LibraryItem(Artist(name, *self.auth_data), playcount, tagcount)) return list - def getTopWeeklyTracks(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Tracks in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList. - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. + def get_tracks_page(self): + """Retreives the next page of tracks in the Library. Returns a sequence of TopItem objects. + Use the function extract_items like extract_items(Library.get_tracks_page()) to return only a sequence of + Track objects with no extra data. + + Example: + ------- + library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) + + while not library.is_end_of_tracks(): + print library.get_tracks_page() """ - params = self._getParams() - if from_value and to_value: - params['from'] = from_value - params['to'] = to_value + self._tracks_index += 1 - doc = _Request(self, 'group.getWeeklyTrackChart', self.api_key, params).execute() - - if not doc: - return None + params = self._get_params() + params["page"] = str(self._tracks_index) list = [] - for track in doc.getElementsByTagName('track'): - artist = self._extract(track, 'artist') - title = self._extract(track, 'name') + doc = self._request("library.getTracks", True, params) + for node in doc.getElementsByTagName("track"): + name = _extract(node, "name") + artist = _extract(node, "name", 1) + playcount = _number(_extract(node, "playcount")) + tagcount = _number(_extract(node, "tagcount")) - list.append(Track(artist, title, *self.auth_data)) + list.append(LibraryItem(Track(artist, name, *self.auth_data), playcount, tagcount)) return list - - def getWeeklyChartList(self): - """Returns a list of range pairs to use with the chart methods. """ - - params = self._getParams() - doc = _Request(self, 'group.getWeeklyChartList', self.api_key, params).execute() - - if not doc: - return None - - list = [] - for chart in doc.getElementsByTagName('chart'): - c = {} - c['from'] = chart.getAttribute('from') - c['to'] = chart.getAttribute('to') - list.append(c) - - return list - def getURL(self, domain_name = DOMAIN_ENGLISH): - """Returns the url of the group page on Last.fm. +class Playlist(_BaseObject): + """A Last.fm user playlist.""" + + def __init__(self, user, id, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + + if isinstance(user, User): + self.user = user + else: + self.user = User(user, *self.auth_data) + + self.id = unicode(id) + + def _get_info_node(self): + """Returns the node from user.getPlaylists where this playlist's info is.""" + + doc = self._request("user.getPlaylists", True) + + for node in doc.getElementsByTagName("playlist"): + if _extract(node, "id") == str(self.get_id()): + return node + + def _get_params(self): + return {'user': self.user.get_name(), 'playlistID': self.get_id()} + + def get_id(self): + """Returns the playlist id.""" + + return self.id + + def get_user(self): + """Returns the owner user of this playlist.""" + + return self.user + + def get_tracks(self): + """Returns a list of the tracks on this user playlist.""" + + uri = u'lastfm://playlist/%s' %self.get_id() + + return XSPF(uri, *self.auth_data).get_tracks() + + def add_track(self, track): + """Adds a Track to this Playlist.""" + + params = self._get_params() + params['artist'] = track.get_artist().get_name() + params['track'] = track.get_title() + + self._request('playlist.addTrack', False, params) + + def get_title(self): + """Returns the title of this playlist.""" + + return _extract(self._get_info_node(), "title") + + def get_creation_date(self): + """Returns the creation date of this playlist.""" + + return _extract(self._get_info_node(), "date") + + def get_size(self): + """Returns the number of tracks in this playlist.""" + + return _number(_extract(self._get_info_node(), "size")) + + def get_description(self): + """Returns the description of this playlist.""" + + return _extract(self._get_info_node(), "description") + + def get_duration(self): + """Returns the duration of this playlist in milliseconds.""" + + return _number(_extract(self._get_info_node(), "duration")) + + def is_streamable(self): + """Returns True if the playlist is streamable. + For a playlist to be streamable, it needs at least 45 tracks by 15 different artists.""" + + if _extract(self._get_info_node(), "streamable") == '1': + return True + else: + return False + + def has_track(self, track): + """Checks to see if track is already in the playlist. + * track: Any Track object. + """ + + return track in self.get_tracks() + + def get_image_url(self, size = IMAGE_LARGE): + """Returns the associated image URL. + * size: The image size. Possible values: + o IMAGE_LARGE + o IMAGE_MEDIUM + o IMAGE_SMALL + """ + + return _extract(self._get_info_node(), "image")[size] + + def get_url(self, domain_name = DOMAIN_ENGLISH): + """Returns the url of the playlist on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN @@ -1757,499 +1425,129 @@ class Group(_BaseObject): o DOMAIN_JAPANESE o DOMAIN_CHINESE """ + url = "http://%(domain)s/user/%(user)s/library/playlists/%(appendix)s" - url = 'http://%(domain)s/group/%(name)s' + english_url = _extract(self._get_info_node(), "url") + appendix = english_url[english_url.rfind("/") + 1:] - name = self._get_url_safe(self.getName()) + return url %{'domain': domain_name, 'appendix': appendix, "user": self.get_user().get_name()} - return url %{'domain': domain_name, 'name': name} - - def toStr(self): - """Returns a string representation of the object.""" - - return self.getName().encode('utf-8') - -class Library(_BaseObject): - """A user's Last.fm library.""" - - def __init__(self, username, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - - self._username = username - - self._albums_playcounts = {} - self._albums_tagcounts = {} - self._artists_playcounts = {} - self._artists_tagcounts = {} - self._tracks_playcounts = {} - self._tracks_tagcounts = {} - - self._album_pages = None - self._album_perpage = None - self._artist_pages = None - self._artist_perpage = None - self._track_pages = None - self._track_perpage = None - - def _getParams(self): - return {'sk': self.session_key, 'user': self._username} - - def getUser(self): - """Returns the user who owns this library.""" - - return User(self._username, *self.auth_data) - - def _get_albums_info(self): - - params = self._getParams() - doc = _Request(self, 'library.getAlbums', self.api_key, params).execute() - - if not doc: - return None - - self._album_pages = int(doc.getElementsByTagName('albums')[0].getAttribute('totalPages')) - self._album_perpage = int(doc.getElementsByTagName('albums')[0].getAttribute('perPage')) - - def _get_artists_info(self): - - params = self._getParams() - doc = _Request(self, 'library.getArtists', self.api_key, params).execute() - - if not doc: - return None - - self._artist_pages = int(doc.getElementsByTagName('artists')[0].getAttribute('totalPages')) - self._artist_perpage = int(doc.getElementsByTagName('artists')[0].getAttribute('perPage')) - - def _get_tracks_info(self): - - params = self._getParams() - doc = _Request(self, 'library.getTracks', self.api_key, params).execute() - - if not doc: - return None - - self._track_pages = int(doc.getElementsByTagName('tracks')[0].getAttribute('totalPages')) - self._track_perpage = int(doc.getElementsByTagName('tracks')[0].getAttribute('perPage')) - - def getAlbumsPageCount(self): - """Returns the number of pages you'd get when calling getAlbums. """ - - if self._album_pages: - return self._album_pages - - self._get_albums_info() - - return self._album_pages - - def getAlbumsPerPage(self): - """Returns the number of albums per page you'd get wen calling getAlbums. """ - - if self._album_perpage: - return self._album_perpage - - self._get_albums_info() - - return self._album_perpage - - def getArtistsPageCount(self): - """Returns the number of pages you'd get when calling getArtists(). """ - - if self._artist_pages: - return self._artist_pages - - self._get_artists_info() - - return self._artist_pages - - def getArtistsPerPage(self): - """Returns the number of artists per page you'd get wen calling getArtists(). """ - - if self._artist_perpage: - return self._artist_perpage - - self._get_artists_info() - - return self._artist_perpage - - def getTracksPageCount(self): - """Returns the number of pages you'd get when calling getTracks(). """ - - if self._track_pages: - return self._track_pages - - self._get_tracks_info() - - return self._track_pages - - def getTracksPerPage(self): - """Returns the number of tracks per page you'd get wen calling getTracks. """ - - if self._track_perpage: - return self._track_perpage - - self._get_tracks_info() - - return self._track_perpage - - def getAlbums(self, limit = None, page = None): - """Returns a paginated list of all the albums in a user's library. - * limit: The number of albums to retrieve. - * page: The page to retrieve (default is the first one). - """ - - params = self._getParams() - if limit: params['limit'] = unicode(limit) - if page: params['page'] = unicode(page) - - doc = _Request(self, 'library.getAlbums', self.api_key, params).execute() - - if not doc: - return [] - - albums = doc.getElementsByTagName('album') - list = [] - - for album in albums: - artist = self._extract(album, 'name', 1) - name = self._extract(album, 'name') - - playcount = self._extract(album, 'playcount') - tagcount = self._extract(album, 'tagcount') - - a = Album(artist, name, *self.auth_data) - list.append(a) - - self._albums_playcounts[a._hash()] = playcount - self._albums_tagcounts[a._hash()] = tagcount - - return list - - def getArtists(self, limit = None, page = None): - """Returns a paginated list of all the artists in a user's library. - * limit: The number of artists to retrieve. - * page: The page to retrieve (default is the first one). - """ - - params = self._getParams() - if limit: params['limit'] = unicode(limit) - if page: params['page'] = unicode(page) - - doc = _Request(self, 'library.getArtists', self.api_key, params).execute() - - if not doc: - return [] - - artists = doc.getElementsByTagName('artist') - list = [] - - for artist in artists: - name = self._extract(artist, 'name') - - playcount = self._extract(artist, 'playcount') - tagcount = self._extract(artist, 'tagcount') - - a = Artist(name, *self.auth_data) - list.append(a) - - self._artists_playcounts[a._hash()] = playcount - self._artists_tagcounts[a._hash()] = tagcount - - return list - - def getTracks(self, limit = None, page = None): - """Returns a paginated list of all the tracks in a user's library. """ - - params = self._getParams() - if limit: params['limit'] = unicode(limit) - if page: params['page'] = unicode(page) - - doc = _Request(self, 'library.getTracks', self.api_key, params).execute() - - if not doc: - return [] - - tracks = doc.getElementsByTagName('track') - list = [] - - for track in tracks: - - title = self._extract(track, 'name') - artist = self._extract(track, 'name', 1) - - playcount = self._extract(track, 'playcount') - tagcount = self._extract(track, 'tagcount') - - t = Track(artist, title, *self.auth_data) - list.append(t) - - self._tracks_playcounts[t._hash()] = playcount - self._tracks_tagcounts[t._hash()] = tagcount - - return list - - def getAlbumPlaycount(self, album_object): - """Goes through the library until it finds the playcount of this album and returns it (could take a relatively long time). - * album_object : The Album to find. - """ - - key = album_object._hash() - if key in self._albums_playcounts.keys(): - return self._albums_playcounts[key] - - for i in range(1, self.getAlbumsPageCount() +1): - stack = self.getAlbums(page = i) - - for album in stack: - if album._hash() == album_object._hash(): - return self._albums_playcounts[album._hash()] - - def getAlbumTagcount(self, album_object): - """Goes through the library until it finds the tagcount of this album and returns it (could take a relatively long time). - * album_object : The Album to find. - """ - - key = album_object._hash() - if key in self._albums_tagcounts.keys(): - return self._albums_tagcounts[key] - - for i in range(1, self.getAlbumsPageCount() +1): - stack = self.getAlbums(page = i) - - for album in stack: - if album._hash() == album_object._hash(): - return self._albums_tagcounts[album._hash()] - - def getArtistPlaycount(self, artist_object): - """Goes through the library until it finds the playcount of this artist and returns it (could take a relatively long time). - * artist_object : The Artist to find. - """ - - key = artist_object._hash() - if key in self._artists_playcounts.keys(): - return self._artists_playcounts[key] - - for i in range(1, self.getArtistsPageCount() +1): - stack = self.getArtists(page = i) - - for artist in stack: - if artist._hash() == artist_object._hash(): - return self._artists_playcounts[artist._hash()] - - def getArtistTagcount(self, artist_object): - """Goes through the library until it finds the tagcount of this artist and returns it (could take a relatively long time). - * artist_object : The Artist to find. - """ - - key = artist_object._hash() - if key in self._artists_tagcounts.keys(): - return self._artists_tagcounts[key] - - for i in range(1, self.getArtistsPageCount() +1): - stack = self.getArtist(page = i) - - for artist in stack: - if artist._hash() == artist_object._hash(): - return self._artists_tagcounts[artist._hash()] - - def getTrackPlaycount(self, track_object): - """Goes through the library until it finds the playcount of this track and returns it (could take a relatively long time). - * track_object : The Track to find. - """ - - key = track_object._hash() - if key in self._tracks_playcounts.keys(): - return self._tracks_playcounts[key] - - for i in range(1, self.getTracksPageCount() +1): - stack = self.getTracks(page = i) - - for track in stack: - if track._hash() == track_object._hash(): - return self._tracks_playcounts[track._hash()] - - def getTrackTagcount(self, track_object): - """Goes through the library until it finds the tagcount of this track and returns it (could take a relatively long time). - * track_object : The Track to find. - """ - - key = track_object._hash() - if key in self._tracks_tagcounts.keys(): - return self._tracks_tagcounts[key] - - for i in range(1, self.getTracksPageCount() +1): - stack = self.getTracks(page = i) - - for track in stack: - if track._hash() == track_object._hash(): - return self._tracks_tagcounts[track._hash()] - - def addAlbum(self, album): - """Add an album to a user's Last.fm library. """ - - params = self._getParams() - params['artist'] = album.getArtist().getName() - params['album'] = album.getName() - - _Request(self, 'library.addAlbum', self.api_key, params, True, self.secret).execute() - - def addArtist(self, artist): - """Add an artist to a user's Last.fm library.""" - - if isinstance(artist, Artist): - artist = artist.getName() - - params = self._getParams() - params['artist'] = artist - - _Request(self, 'library.addArtist', self.api_key, params, True, self.secret).execute() - - def addTrack(self, track): - """Add a track to a user's Last.fm library.""" - - params = self._getParams() - params['artist'] = track.getArtistName() - params['track'] = track.getTitle() - - _Request(self, 'library.addTrack', self.api_key, params, True, self.secret).execute() - -class Playlist(_BaseObject): - "An abstract Last.fm playlist.""" - - def __init__(self, playlist_uri, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) - - self._playlist_uri = playlist_uri - - def _getParams(self): - return {'playlistURL': self._playlist_uri} - - def getPlaylistURI(self): - """Returns the Last.fm playlist URI. """ - - return self._playlist_uri - - def fetch(self): - """Returns the tracks on this playlist.""" - - params = self._getParams() - - doc = _Request(self, 'playlist.fetch', self.api_key, params).execute() - - if not doc: - return None - - data = {} - - data['title'] = self._extract(doc, 'title') - list = [] - - for n in doc.getElementsByTagName('track'): - title = self._extract(n, 'title') - artist = self._extract(n, 'creator') - - list.append(Track(artist, title, *self.auth_data)) - - return list - - def toStr(self): - """Returns a string representation of the object.""" - - return self.getPlaylistURI() class Tag(_BaseObject): """A Last.fm object tag.""" # TODO: getWeeklyArtistChart (too lazy, i'll wait for when someone requests it) - def __init__(self, tag_name, api_key, secret, session_key): - _BaseObject.__init__(self, api_key, secret, session_key) + def __init__(self, name, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) - self.name = tag_name + self.name = name - def _getParams(self): - return {'tag': self.name} + def _get_params(self): + return {'tag': self.get_name()} - def getName(self): + def __repr__(self): + return self.get_name().encode('utf-8') + + def __eq__(self): + return self.get_name().lower() == other.get_name().lower() + + def __ne__(self): + return self.get_name().lower() != other.get_name().lower() + + def get_name(self): """Returns the name of the tag. """ return self.name - def getSimilar(self): + def get_similar(self): """Returns the tags similar to this one, ordered by similarity. """ - params = self._getParams() - doc = _Request(self, 'tag.getSimilar', self.api_key, params).execute() - - if not doc: - return None + doc = self._request('tag.getSimilar', True) list = [] - names = self._extract_all(doc, 'name') + names = _extract_all(doc, 'name') for name in names: list.append(Tag(name, *self.auth_data)) return list - def getTopAlbums(self): - """Returns a list of the top Albums tagged by this tag, ordered by tag count. """ + def get_top_albums(self): + """Retuns a list of the top albums.""" - params = self._getParams() - doc = _Request(self, 'tag.getTopAlbums', self.api_key, params).execute() + doc = self._request('tag.getTopAlbums', True) list = [] - for n in doc.getElementsByTagName('album'): - name = self._extract(n, 'name') - artist = self._extract(n, 'name', 1) + for node in doc.getElementsByTagName("album"): + name = _extract(node, "name") + artist = _extract(node, "name", 1) + playcount = _extract(node, "playcount") - list.append(Album(artist, name, *self.auth_data)) + list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) return list - - def getTopArtists(self): - """Returns a list of the top Artists tagged by this tag, ordered by tag count. """ - params = self._getParams() - doc = _Request(self, 'tag.getTopArtists', self.api_key, params).execute() + def get_top_tracks(self): + """Returns a list of the most played Tracks by this artist.""" + + doc = self._request("tag.getTopTracks", True) list = [] - - names = self._extract_all(doc, 'name') - - for name in names: - list.append(Artist(name, *self.auth_data)) - - return list - - def getTopTracks(self): - """Returns a list of the top Tracks tagged by this tag, ordered by tag count. """ - - params = self._getParams() - doc = _Request(self, 'tag.getTopTracks', self.api_key, params).execute() - - list = [] - - for n in doc.getElementsByTagName('track'): - title = self._extract(n, 'name') - artist = self._extract(n, 'name', 1) + for track in doc.getElementsByTagName('track'): - list.append(Track(artist, title, *self.auth_data)) + title = _extract(track, "name") + artist = _extract(track, "name", 1) + playcount = _number(_extract(track, "playcount")) + + list.append( TopItem(Track(artist, title, *self.auth_data), playcount) ) return list - def fetchPlaylist(self, free_tracks_only = False): - """Returns lit of the tracks tagged by this tag. - * free_tracks_only: Set to True to include only free tracks. - """ + def get_top_artists(self): + """Returns a sequence of the most played artists.""" - uri = 'lastfm://playlist/tag/%s' %self.getName() - if free_tracks_only: - uri += '/freetracks' + doc = self._request('tag.getTopArtists', True) - return Playlist(uri, *self.auth_data).fetch() - - def getURL(self, domain_name = DOMAIN_ENGLISH): + list = [] + for node in doc.getElementsByTagName("artist"): + name = _extract(node, 'name') + playcount = _extract(node, "playcount") + + list.append(TopItem(Artist(name, *self.auth_data), playcount)) + + return list + + def get_weekly_chart_dates(self): + """Returns a list of From and To tuples for the available charts.""" + + doc = self._request("tag.getWeeklyChartList", True) + + list = [] + for node in doc.getElementsByTagName("chart"): + list.append( (node.getAttribute("from"), node.getAttribute("to")) ) + + return list + + def get_weekly_artist_charts(self, from_date = None, to_date = None): + """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" + + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date + + doc = self._request("tag.getWeeklyArtistChart", True, params) + + list = [] + for node in doc.getElementsByTagName("artist"): + item = Artist(_extract(node, "name"), *self.auth_data) + weight = _extract(node, "weight") + list.append(TopItem(item, weight)) + + return list + + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the tag page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH @@ -2268,14 +1566,389 @@ class Tag(_BaseObject): url = 'http://%(domain)s/tag/%(name)s' - name = self._get_url_safe(self.getName()) + name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'name': name} - def toStr(self): - """Returns a string representation of the object.""" +class Track(_BaseObject, _Taggable): + """A Last.fm track.""" + + def __init__(self, artist, title, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + _Taggable.__init__(self, 'track') - return self.getName().encode('utf-8') + if isinstance(artist, Artist): + self.artist = artist + else: + self.artist = Artist(artist, *self.auth_data) + + self.title = title + + def __repr__(self): + return self.get_artist().get_name().encode('utf-8') + ' - ' + self.get_title().encode('utf-8') + + def __eq__(self, other): + return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower()) + + def __ne__(self, other): + return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower()) + + def _get_params(self): + return {'artist': self.get_artist().get_name(), 'track': self.get_title()} + + def get_artist(self): + """Returns the associated Artist object.""" + + return self.artist + + def get_title(self): + """Returns the track title.""" + + return self.title + + def get_name(self): + """Returns the track title (alias to Track.get_title).""" + + return self.get_title() + + def get_id(self): + """Returns the track id on Last.fm.""" + + doc = self._request("track.getInfo", True) + + return _extract(doc, "id") + + def get_duration(self): + """Returns the track duration.""" + + doc = self._request("track.getInfo", True) + + return _number(_extract(doc, "duration")) + + def get_mbid(self): + """Returns the MusicBrainz ID of this track.""" + + doc = self._request("track.getInfo", True) + + return _extract(doc, "mbid") + + def get_listener_count(self): + """Returns the listener count.""" + + doc = self._request("track.getInfo", True) + + return _number(_extract(doc, "listeners")) + + def get_playcount(self): + """Returns the play count.""" + + doc = self._request("track.getInfo", True) + return _number(_extract(node, "playcount")) + + def is_streamable(self): + """Returns True if the track is available at Last.fm.""" + + doc = self._request("track.getInfo", True) + return _extract(node, "streamable") == "1" + + def is_fulltrack_available(self): + """Returns True if the fulltrack is available for streaming.""" + + doc = self._request("track.getInfo", True) + return doc.getElementsByTagName("streamable")[0].getAttribute("fulltrack") == "1" + + def get_album(self): + """Returns the album object of this track.""" + + doc = self._request("track.getInfo", True) + + albums = doc.getElementsByTagName("album") + + if len(albums) == 0: + return + + node = doc.getElementsByTagName("album")[0] + return Album(_extract(node, "artist"), _extract(node, "title")) + + def get_wiki_published_date(self): + """Returns the date of publishing this version of the wiki.""" + + doc = self._request("track.getInfo", True) + + if len(doc.getElementsByTagName("wiki")) == 0: + return + + node = doc.getElementsByTagName("wiki")[0] + + return _extract(node, "published") + + def get_wiki_summary(self): + """Returns the summary of the wiki.""" + + doc = self._request("track.getInfo", True) + + if len(doc.getElementsByTagName("wiki")) == 0: + return + + node = doc.getElementsByTagName("wiki")[0] + + return _extract(node, "summary") + + def get_wiki_content(self): + """Returns the content of the wiki.""" + + doc = self._request("track.getInfo", True) + + if len(doc.getElementsByTagName("wiki")) == 0: + return + + node = doc.getElementsByTagName("wiki")[0] + + return _extract(node, "content") + + def love(self): + """Adds the track to the user's loved tracks. """ + + self._request('track.love') + + def ban(self): + """Ban this track from ever playing on the radio. """ + + self._request('track.ban') + + def get_similar(self): + """Returns similar tracks for this track on Last.fm, based on listening data. """ + + doc = self._request('track.getSimilar', True) + + list = [] + for node in doc.getElementsByTagName("track"): + title = _extract(node, 'name') + artist = _extract(node, 'name', 1) + + list.append(Track(artist, title, *self.auth_data)) + + return list + + def get_top_fans(self, limit = None): + """Returns a list of the Users who played this track.""" + + doc = self._request('track.getTopFans', True) + + list = [] + + elements = doc.getElementsByTagName('user') + + for element in elements: + if limit and len(list) >= limit: + break + + name = _extract(element, 'name') + weight = _number(_extract(element, 'weight')) + + list.append(TopItem(User(name, *self.auth_data), weight)) + + return list + + def share(self, users, message = None): + """Shares this track (sends out recommendations). + * users: A list that can contain usernames, emails, User objects, or all of them. + * message: A message to include in the recommendation message. + """ + + #last.fm currently accepts a max of 10 recipient at a time + while(len(users) > 10): + section = users[0:9] + users = users[9:] + self.share(section, message) + + nusers = [] + for user in users: + if isinstance(user, User): + nusers.append(user.get_name()) + else: + nusers.append(user) + + params = self._get_params() + recipients = ','.join(nusers) + params['recipient'] = recipients + if message: params['message'] = unicode(message) + + self._request('track.share', False, params) + + def get_url(self, domain_name = DOMAIN_ENGLISH): + """Returns the url of the track page on Last.fm. + * domain_name: Last.fm's language domain. Possible values: + o DOMAIN_ENGLISH + o DOMAIN_GERMAN + o DOMAIN_SPANISH + o DOMAIN_FRENCH + o DOMAIN_ITALIAN + o DOMAIN_POLISH + o DOMAIN_PORTUGUESE + o DOMAIN_SWEDISH + o DOMAIN_TURKISH + o DOMAIN_RUSSIAN + o DOMAIN_JAPANESE + o DOMAIN_CHINESE + """ + url = 'http://%(domain)s/music/%(artist)s/_/%(title)s' + + artist = _get_url_safe(self.get_artist().get_name()) + title = _get_url_safe(self.get_title()) + + return url %{'domain': domain_name, 'artist': artist, 'title': title} + +class Group(_BaseObject): + """A Last.fm group.""" + + def __init__(self, group_name, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + + self.name = group_name + + def __repr__(self): + return self.get_name().encode('utf-8') + + def __eq__(self, other): + return self.get_name().lower() == other.get_name().lower() + + def __ne__(self, other): + return self.get_name() != other.get_name() + + def _get_params(self): + return {'group': self.get_name()} + + def get_name(self): + """Returns the group name. """ + return self.name + + def get_weekly_chart_dates(self): + """Returns a list of From and To tuples for the available charts.""" + + doc = self._request("group.getWeeklyChartList", True) + + list = [] + for node in doc.getElementsByTagName("chart"): + list.append( (node.getAttribute("from"), node.getAttribute("to")) ) + + return list + + def get_weekly_artist_charts(self, from_date = None, to_date = None): + """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" + + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date + + doc = self._request("group.getWeeklyArtistChart", True, params) + + list = [] + for node in doc.getElementsByTagName("artist"): + item = Artist(_extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) + + return list + + def get_weekly_album_charts(self, from_date = None, to_date = None): + """Returns the weekly album charts for the week starting from the from_date value to the to_date value.""" + + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date + + doc = self._request("group.getWeeklyAlbumChart", True, params) + + list = [] + for node in doc.getElementsByTagName("album"): + item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) + + return list + + def get_weekly_track_charts(self, from_date = None, to_date = None): + """Returns the weekly track charts for the week starting from the from_date value to the to_date value.""" + + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date + + doc = self._request("group.getWeeklyTrackChart", True, params) + + list = [] + for node in doc.getElementsByTagName("track"): + item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) + + return list + + def get_url(self, domain_name = DOMAIN_ENGLISH): + """Returns the url of the group page on Last.fm. + * domain_name: Last.fm's language domain. Possible values: + o DOMAIN_ENGLISH + o DOMAIN_GERMAN + o DOMAIN_SPANISH + o DOMAIN_FRENCH + o DOMAIN_ITALIAN + o DOMAIN_POLISH + o DOMAIN_PORTUGUESE + o DOMAIN_SWEDISH + o DOMAIN_TURKISH + o DOMAIN_RUSSIAN + o DOMAIN_JAPANESE + o DOMAIN_CHINESE + """ + + url = 'http://%(domain)s/group/%(name)s' + + name = _get_url_safe(self.get_name()) + + return url %{'domain': domain_name, 'name': name} + +class XSPF(_BaseObject): + "A Last.fm XSPF playlist.""" + + def __init__(self, uri, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + + self.uri = uri + + def _get_params(self): + return {'playlistURL': self.get_uri()} + + def __repr__(self): + return self.get_uri() + + def __eq__(self, other): + return self.get_uri() == other.get_uri() + + def __ne__(self, other): + return self.get_uri() != other.get_uri() + + def get_uri(self): + """Returns the Last.fm playlist URI. """ + + return self.uri + + def get_tracks(self): + """Returns the tracks on this playlist.""" + + doc = self._request('playlist.fetch', True) + + list = [] + for n in doc.getElementsByTagName('track'): + title = _extract(n, 'title') + artist = _extract(n, 'creator') + + list.append(Track(artist, title, *self.auth_data)) + + return list class User(_BaseObject): """A Last.fm user.""" @@ -2284,25 +1957,34 @@ class User(_BaseObject): _BaseObject.__init__(self, api_key, api_secret, session_key) self.name = user_name - - def _getParams(self): - return {'sk': self.session_key, "user": self.getName()} - def getName(self): + self._past_events_index = 0 + self._recommended_events_index = 0 + self._recommended_artists_index = 0 + + def __repr__(self): + return self.get_name().encode('utf-8') + + def __eq__(self, another): + return self.get_name() == another.get_name() + + def __ne__(self, another): + return self.get_name() != another.get_name() + + def _get_params(self): + return {"user": self.get_name()} + + def get_name(self): """Returns the nuser name.""" return self.name - def getEvents(self): + def get_upcoming_events(self): """Returns all the upcoming events for this user. """ - params = self._getParams() - doc = _Request(self, 'user.getEvents', self.api_key, params).execute() + doc = self._request('user.getEvents', True) - if not doc: - return None - - ids = self._extract_all(doc, 'id') + ids = _extract_all(doc, 'id') events = [] for id in ids: @@ -2310,209 +1992,136 @@ class User(_BaseObject): return events - def getFriends(self, limit = None): + def get_friends(self, limit = None): """Returns a list of the user's friends. """ - params = self._getParams() + params = self._get_params() if limit: params['limit'] = unicode(limit) - doc = _Request(self, 'user.getFriends', self.api_key, params).execute() + doc = self._request('user.getFriends', True, params) - if not doc: - return None + names = _extract_all(doc, 'name') list = [] - users = doc.getElementsByTagName('user') - - names = self._extract_all(doc, 'name') - for name in names: list.append(User(name, *self.auth_data)) return list - def getLovedTracks(self): + def get_loved_tracks(self): """Returns the last 50 tracks loved by this user. """ - params = self._getParams() - doc = _Request(self, 'user.getLovedTracks', self.api_key, params).execute() - - if not doc: - return None + doc = self._request('user.getLovedTracks', True) list = [] - for track in doc.getElementsByTagName('track'): - title = self._extract(track, 'name', 0) - artist = self._extract(track, 'name', 1) + title = _extract(track, 'name', 0) + artist = _extract(track, 'name', 1) list.append(Track(artist, title, *self.auth_data)) return list - def getNeighbours(self, limit = None): - """Returns a list of the user's friends. - * limit: A limit for how many neighbours to show. - """ + def get_neighbours(self, limit = None): + """Returns a list of the user's friends.""" - params = self._getParams() + params = self._get_params() if limit: params['limit'] = unicode(limit) - doc = _Request(self, 'user.getNeighbours', self.api_key, params).execute() - - if not doc: - return None + doc = self._request('user.getNeighbours', True, params) list = [] - names = self._extract_all(doc, 'name') + names = _extract_all(doc, 'name') for name in names: list.append(User(name, *self.auth_data)) return list - def getPastEvents(self, limit = None, page = None): + def _get_past_events_pagecount(self): + """Returns the number of pages in the past events.""" + + params = self._get_params() + params["page"] = str(self._past_events_index) + doc = self._request("user.getPastEvents", True, params) + + return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages")) + + def is_end_of_past_events(self): + """Returns True if the end of Past Events was reached.""" + + return self._past_events_index >= self._get_past_events_pagecount() + + def get_past_events_page(self, ): """Retruns a paginated list of all events a user has attended in the past. - * limit: The limit number of events to return. - * page: The page of results to return. + + Example: + -------- + + while not user.is_end_of_past_events(): + print user.get_past_events_page() + """ - params = self._getParams() - if limit: - params['limit'] = unicode(limit) - if page: - params['page'] = unicode(page) + self._past_events_index += 1 + params = self._get_params() + params["page"] = str(self._past_events_index) - doc = _Request(self, 'user.getPastEvents', self.api_key, params).execute() + doc = self._request('user.getPastEvents', True, params) - if not doc: - return None - - ids = self._extract_all(doc, 'id') list = [] - - for id in ids: + for id in _extract_all(doc, 'id'): list.append(Event(id, *self.auth_data)) return list - - def getPlaylists(self): - """Returns a list of UserPlaylists that this user owns.""" + + def get_playlists(self): + """Returns a list of Playlists that this user owns.""" - data = self.getPlaylistsData() - - if not data: - return [] - - ids = [] - for p in data: - ids.append(p['id']) + doc = self._request("user.getPlaylists", True) playlists = [] - - for id in ids: - playlists.append(UserPlaylist(self.getName(), id, *self.auth_data)) + for id in _extract_all(doc, "id"): + playlists.append(Playlist(self.get_name(), id, *self.auth_data)) return playlists - def getPlaylistsData(self): - """Returns a list of dictionaries for each playlist. """ - - params = self._getParams() - doc = _Request(self, 'user.getPlaylists', self.api_key, params).execute() - - if not doc: - return None - - list = [] - for playlist in doc.getElementsByTagName('playlist'): - p = {} - p['id'] = self._extract(playlist, 'id') - p['title'] = self._extract(playlist, 'title') - p['date'] = self._extract(playlist, 'date') - p['size'] = int(self._extract(playlist, 'size')) - p['description'] = self._extract(playlist, 'description') - p['duration'] = self._extract(playlist, 'duration') - p['streamable'] = self._extract(playlist, 'streamable') - p['images'] = self._extract_all(playlist, 'image') - p['url_appendix'] = self._extract(playlist, 'url')[19:] - - list.append(p) - - return list - - def getPlaylistIDs(self): - """Returns a list the playlists IDs this user has created. - - [DEPRECATED] - Use User.getPlaylists() instead. - """ - - warn_deprecated('User.getPlaylistIDs', 'User.getPlaylists') - - ids = [] - for i in self.getPlaylistsData(): - ids.append(i['id']) - - return ids - - def fetchPlaylist(self, playlist_id): - """Returns a list of the tracks on a playlist. - * playlist_id: A unique last.fm playlist ID, can be retrieved from getPlaylistIDs(). - - [DEPRECATED] - Use UserPlaylist.getTracks() instead. - """ - - warn_deprecated('User.fetchPlaylist', 'UserPlaylist.getTracks') - - uri = u'lastfm://playlist/%s' %unicode(playlist_id) - - return Playlist(uri, *self.auth_data).fetch() - - def getNowPlaying(self): + def get_now_playing(self): """Returns the currently playing track, or None if nothing is playing. """ - params = self._getParams() + params = self._get_params() params['limit'] = '1' list = [] - doc = _Request(self, 'user.getRecentTracks', self.api_key, params).execute() - - if not doc: - return None + doc = self._request('user.getRecentTracks', False, params) e = doc.getElementsByTagName('track')[0] if not e.hasAttribute('nowplaying'): return None - artist = self._extract(e, 'artist') - title = self._extract(e, 'name') + artist = _extract(e, 'artist') + title = _extract(e, 'name') return Track(artist, title, *self.auth_data) - def getRecentTracks(self, limit = None): + def get_recent_tracks(self, limit = None): """Returns this user's recent listened-to tracks. """ - params = self._getParams() + params = self._get_params() if limit: params['limit'] = unicode(limit) + doc = self._request('user.getRecentTracks', False, params) + list = [] - - doc = _Request(self, 'user.getRecentTracks', self.api_key, params).execute() - - if not doc: - return None - for track in doc.getElementsByTagName('track'): - title = self._extract(track, 'name') - artist = self._extract(track, 'artist') + title = _extract(track, 'name') + artist = _extract(track, 'artist') if track.hasAttribute('nowplaying'): continue #to prevent the now playing track from sneaking in here @@ -2521,8 +2130,8 @@ class User(_BaseObject): return list - def getTopAlbums(self, period = PERIOD_OVERALL): - """Returns the top albums listened to by a user. + def get_top_albums(self, period = PERIOD_OVERALL): + """Returns the top albums played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS @@ -2530,25 +2139,23 @@ class User(_BaseObject): o PERIOD_12MONTHS """ - params = self._getParams() + params = self._get_params() params['period'] = period - doc = _Request(self, 'user.getTopAlbums', self.api_key, params).execute() - - if not doc: - return None + doc = self._request('user.getTopAlbums', True, params) list = [] for album in doc.getElementsByTagName('album'): - name = self._extract(album, 'name') - artist = self._extract(album, 'name', 1) + name = _extract(album, 'name') + artist = _extract(album, 'name', 1) + playcount = _extract(album, "playcount") - list.append(Album(artist, name, *self.auth_data)) + list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) return list - def getTopArtists(self, period = PERIOD_OVERALL): - """Returns the top artists listened to by a user. + def get_top_artists(self, period = PERIOD_OVERALL): + """Returns the top artists played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS @@ -2556,64 +2163,35 @@ class User(_BaseObject): o PERIOD_12MONTHS """ - params = self._getParams() + params = self._get_params() params['period'] = period - doc = _Request(self, 'user.getTopArtists', self.api_key, params).execute() - - if not doc: - return [] + doc = self._request('user.getTopArtists', True, params) list = [] - - names = self._extract_all(doc, 'name') - for name in names: - list.append(Artist(name, *self.auth_data)) - - return list - - def getTopTags(self, limit = None): - """Returns a sequence of the top tags used by this user with their counts as (Tag, tag_count). - * limit: The limit of how many tags to return. - """ - - pairs = self.getTopTagsWithCounts(limit) - if not pairs: - return [] - - list = [] - for pair in pairs: - list.append(pair[0]) - - return list - - def getTopTagsWithCounts(self, limit = None): - """Returns the top tags used by this user. - * limit: The limit of how many tags to return. - """ - - params = self._getParams() - if limit: - params['limit'] = unicode(limit) - - doc = _Request(self, 'user.getTopTags', self.api_key, params).execute() - - if not doc: - return [] - - list = [] - elements = doc.getElementsByTagName('tag') - - for element in elements: - tag_name = self._extract(element, 'name') - tag_count = self._extract(element, 'count') + for node in doc.getElementsByTagName('artist'): + name = _extract(node, 'name') + playcount = _extract(node, "playcount") - list.append((Tag(tag_name, *self.auth_data), tag_count)) + list.append(TopItem(Artist(name, *self.auth_data), playcount)) return list - def getTopTracks(self, period = PERIOD_OVERALL): - """Returns the top tracks listened to by a user. + def get_top_tags(self, limit = None): + """Returns a sequence of the top tags used by this user with their counts as (Tag, tagcount). + * limit: The limit of how many tags to return. + """ + + doc = self._request("user.getTopTags", True) + + list = [] + for node in doc.getElementsByTagName("tag"): + list.append(TopItem(Tag(_extract(node, "name"), *self.auth_data), _extract(node, "count"))) + + return list + + def get_top_tracks(self, period = PERIOD_OVERALL): + """Returns the top tracks played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS @@ -2621,143 +2199,109 @@ class User(_BaseObject): o PERIOD_12MONTHS """ - params = self._getParams() + params = self._get_params() params['period'] = period - doc = _Request(self, 'user.getTopTracks', self.api_key, params).execute() - - if not doc: - return [] + doc = self._request('user.getTopTracks', True, params) list = [] for track in doc.getElementsByTagName('track'): - title = self._extract(track, 'name') - artist = self._extract(track, 'name', 1) + name = _extract(track, 'name') + artist = _extract(track, 'name', 1) + playcount = _extract(track, "playcount") - list.append(Track(artist, title, *self.auth_data)) + list.append(TopItem(Track(artist, name, *self.auth_data), playcount)) return list - def getTopWeeklyAlbums(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Albums in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList(). - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. - """ + def get_weekly_chart_dates(self): + """Returns a list of From and To tuples for the available charts.""" - params = self._getParams() - if from_value and to_value: - params['from'] = from_value - params['to'] = to_value - - doc = _Request(self, 'user.getWeeklyAlbumChart', self.api_key, params).execute() - - if not doc: - return [] + doc = self._request("user.getWeeklyChartList", True) list = [] - - for n in doc.getElementsByTagName('album'): - artist = self._extract(n, 'artist') - name = self._extract(n, 'name') - - list.append(Album(artist, name, *self.auth_data)) + for node in doc.getElementsByTagName("chart"): + list.append( (node.getAttribute("from"), node.getAttribute("to")) ) return list - def getTopWeeklyArtists(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Artists in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList(). - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. - """ + def get_weekly_artist_charts(self, from_date = None, to_date = None): + """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" - params = self._getParams() - if from_value and to_value: - params['from'] = from_value - params['to'] = to_value + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date - doc = _Request(self, 'user.getWeeklyArtistChart', self.api_key, params).execute() - - if not doc: - return [] + doc = self._request("user.getWeeklyArtistChart", True, params) list = [] - - names = self._extract_all(doc, 'name') - for name in names: - list.append(Artist(name, *self.auth_data)) + for node in doc.getElementsByTagName("artist"): + item = Artist(_extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) return list - - def getTopWeeklyTracks(self, from_value = None, to_value = None): - """Returns a tuple of the most frequently listened to Tracks in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList(). - * from_value: The value marking the beginning of a week. - * to_value: The value marking the end of a week. - """ + + def get_weekly_album_charts(self, from_date = None, to_date = None): + """Returns the weekly album charts for the week starting from the from_date value to the to_date value.""" - params = self._getParams() - if from_value and to_value: - params['from'] = from_value - params['to'] = to_value + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date - doc = _Request(self, 'user.getWeeklyTrackChart', self.api_key, params).execute() - - if not doc: - return [] + doc = self._request("user.getWeeklyAlbumChart", True, params) list = [] - for track in doc.getElementsByTagName('track'): - artist = self._extract(track, 'artist') - title = self._extract(track, 'name') - - - list.append(Track(artist, title, *self.auth_data)) + for node in doc.getElementsByTagName("album"): + item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) return list - - def getWeeklyChartList(self): - """Returns a list of range pairs to use with the chart methods.""" + + def get_weekly_track_charts(self, from_date = None, to_date = None): + """Returns the weekly track charts for the week starting from the from_date value to the to_date value.""" - params = self._getParams() - doc = _Request(self, 'user.getWeeklyChartList', self.api_key, params).execute() + params = self._get_params() + if from_date and to_date: + params["from"] = from_date + params["to"] = to_date - if not doc: - return [] + doc = self._request("user.getWeeklyTrackChart", True, params) list = [] - for chart in doc.getElementsByTagName('chart'): - c = {} - c['from'] = chart.getAttribute('from') - c['to'] = chart.getAttribute('to') - - list.append(c) + for node in doc.getElementsByTagName("track"): + item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) + weight = _extract(node, "playcount") + list.append(TopItem(item, weight)) return list - def compareWithUser(self, user, shared_artists_limit = None): - """Compare this user with another Last.fm user. Returns a sequence (tasteometer_score, (shared_artist1, shared_artist2, ...)) + def compare_with_user(self, user, shared_artists_limit = None): + """Compare this user with another Last.fm user. + Returns a sequence (tasteometer_score, (shared_artist1, shared_artist2, ...)) user: A User object or a username string/unicode object. """ if isinstance(user, User): - user = user.getName() + user = user.get_name() - params = self._getParams() + params = self._get_params() if shared_artists_limit: params['limit'] = unicode(shared_artists_limit) params['type1'] = 'user' params['type2'] = 'user' - params['value1'] = self.getName() + params['value1'] = self.get_name() params['value2'] = user - doc = _Request(self, 'tasteometer.compare', self.api_key, params).execute() + doc = _Request('tasteometer.compare', params, *self.auth_data).execute() - if not doc: - return None - - score = self._extract(doc, 'score') + score = _extract(doc, 'score') artists = doc.getElementsByTagName('artists')[0] - shared_artists_names = self._extract_all(artists, 'name') + shared_artists_names = _extract_all(artists, 'name') shared_artists_list = [] @@ -2772,25 +2316,22 @@ class User(_BaseObject): * limit: The limit of events to return. """ - params = self._getParams() + params = self._get_params() if page: params['page'] = unicode(page) if limit: params['limit'] = unicode(limit) - doc = _Request(self, 'user.getRecommendedEvents', self.api_key, params, True, self.secret).execute() + doc = _Request('user.getRecommendedEvents', params, *self.auth_data).execute() - if not doc: - return [] - - ids = self._extract_all(doc, 'id') + ids = _extract_all(doc, 'id') list = [] for id in ids: list.append(Event(id, *self.auth_data)) return list - def getURL(self, domain_name = DOMAIN_ENGLISH): + def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the user page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH @@ -2808,81 +2349,72 @@ class User(_BaseObject): """ url = 'http://%(domain)s/user/%(name)s' - name = self._get_url_safe(self.getName()) + name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'name': name} - def getLibrary(self): + def get_library(self): """Returns the associated Library object. """ - return Library(self.getName(), *self.auth_data) + return Library(self, *self.auth_data) - def toStr(self): - """Returns a string representation of the object.""" - - return self.getName().encode('utf-8') - -class AuthenticatedUser(User, _Cacheable): +class AuthenticatedUser(User): def __init__(self, api_key, api_secret, session_key): - User.__init__("", api_key, api_secret, session_key); - _Cacheable.__init__(self) - - self._cached_info = None + User.__init__(self, "", api_key, api_secret, session_key); - def _getParams(self): - return {'sk': self.session_key} + def _get_params(self): + return {} + + def get_name(self): + """Returns the name of the authenticated user.""" + + doc = self._request("user.getInfo", True) + + self.name = _extract(doc, "name") + return self.name - def _getInfo(self): - """Returns a dictionary with various metadata values.""" + def get_id(self): + """Returns the user id.""" - params = self._getParams() - doc = _Request(self, 'user.getInfo', self.api_key, params, True, self.secret).execute() + doc = self._request("user.getInfo", True) - if not doc: - return None - - data = {} - - data['name'] = self._extract(doc, 'name') - data['image'] = self._extract(doc, 'image') - data['language'] = self._extract(doc, 'lang') - data['country'] = self._extract(doc, 'country') - data['age'] = self._extract(doc, 'age') - data['gender'] = self._extract(doc, 'gender') - data['subscriber'] = self._extract(doc, 'subscriber') - data['play_count'] = self._extract(doc, 'playcount') - - return data + return _extract(doc, "id") - def getName(self): - """Returns the user name.""" - - return self._getCachedInfo('name') - - def getImage(self): + def get_image_url(self): """Returns the user's avatar.""" + + doc = self._request("user.getInfo", True) - return self._getCachedInfo('image') + return _extract(doc, "image") - def getLanguage(self): + def get_language(self): """Returns the language code of the language used by the user.""" - return self._getCachedInfo('language') + doc = self._request("user.getInfo", True) + + return _extract(doc, "lang") - def getCountryName(self): + def get_country(self): """Returns the name of the country of the user.""" - return self._getCachedInfo('country') + doc = self._request("user.getInfo", True) + + return Country(_extract(doc, "country"), *self.auth_data) - def getAge(self): + def get_age(self): """Returns the user's age.""" - return self._getCachedInfo('age') + doc = self._request("user.getInfo", True) + + return _number(_extract(doc, "age")) - def getGender(self): + def get_gender(self): """Returns the user's gender. Either USER_MALE or USER_FEMALE.""" - value = self._getCachedInfo('gender') + doc = self._request("user.getInfo", True) + + return _extract(doc, "gender") + if value == 'm': return USER_MALE elif value == 'f': @@ -2890,417 +2422,520 @@ class AuthenticatedUser(User, _Cacheable): return None - def isSubscriber(self): + def is_subscriber(self): """Returns whether the user is a subscriber or not. True or False.""" - value = self._getCachedInfo('subscriber') + doc = self._request("user.getInfo", True) - if value == '1': - return True - elif value == '0': - return False - - return None + return _extract(doc, "subscriber") == "1" - def getPlayCount(self): + def get_playcount(self): """Returns the user's playcount so far.""" - return int(self._getCachedInfo('play_count')) + doc = self._request("user.getInfo", True) + + return _number(_extract(doc, "playcount")) -class _Search(_BaseObject): - """An abstract class. Use one of its derivatives.""" + def _get_recommended_events_pagecount(self): + """Returns the number of pages in the past events.""" + + params = self._get_params() + params["page"] = str(self._recommended_events_index) + doc = self._request("user.getRecommendedEvents", True, params) + + return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages")) - def __init__(self, api_key, api_secret, session_key): - _BaseObject.__init__(self, api_key, api_secret, session_key) + def is_end_of_recommended_events(self): + """Returns True if the end of Past Events was reached.""" - self._limit = None - self._page = None - self._total_result_count = None - - def getLimit(self): - """Returns the limit of the Search.""" + return self._recommended_events_index >= self._get_recommended_events_pagecount() - return self._limit - - def getPage(self): - """Returns the last page retrieved.""" + def get_recommended_events_page(self, ): + """Retruns a paginated list of all events a user has attended in the past. - return self._page - - def getTotalResultCount(self): - """Returns the total count of all the results.""" + Example: + -------- - return self._total_result_count - - def getResults(self, limit = 30, page = 1): - pass - - def getFirstMatch(self): - """Returns the first match.""" + while not user.is_end_of_recommended_events(): + print user.get_recommended_events_page() - matches = self.getResults(1) - if matches: - return matches[0] - -class ArtistSearch(_Search): - """Search for an artist by artist name.""" - - def __init__(self, artist_name, api_key, api_secret, session_key): - _Search.__init__(self, api_key, api_secret, session_key) - - self._artist_name = artist_name - - def _getParams(self): - return {'sk': self.session_key, 'artist': self.getArtistName()} - - def getArtistName(self): - """Returns the artist name.""" - - return self._artist_name - - def getResults(self, limit = 30, page = 1): - """Returns the matches sorted by relevance. - * limit: Limit the number of artists returned at one time. Default (maximum) is 30. - * page: Scan into the results by specifying a page number. Defaults to first page. """ - params = self._getParams() - params['limit'] = unicode(limit) - params['page'] = unicode(page) + self._recommended_events_index += 1 + params = self._get_params() + params["page"] = str(self._recommended_events_index) - doc = _Request(self, 'artist.Search', self.api_key, params).execute() - - if not doc: - return None - - self._total_result_count = self._extract(doc, 'opensearch:totalResults') - self._page = page - self._limit = limit - - e = doc.getElementsByTagName('artistmatches')[0] - - names = self._extract_all(e, 'name') + doc = self._request('user.getRecommendedEvents', True, params) list = [] - for name in names: + for id in _extract_all(doc, 'id'): + list.append(Event(id, *self.auth_data)) + + return list + + def _get_recommended_artists_pagecount(self): + """Returns the number of pages in the past artists.""" + + params = self._get_params() + params["page"] = str(self._recommended_artists_index) + doc = self._request("user.getRecommendedArtists", True, params) + + return _number(doc.getElementsByTagName("recommendations")[0].getAttribute("totalPages")) + + def is_end_of_recommended_artists(self): + """Returns True if the end of Past Artists was reached.""" + + return self._recommended_artists_index >= self._get_recommended_artists_pagecount() + + def get_recommended_artists_page(self, ): + """Retruns a paginated list of all artists a user has attended in the past. + + Example: + -------- + + while not user.is_end_of_recommended_artists(): + print user.get_recommended_artists_page() + + """ + + self._recommended_artists_index += 1 + params = self._get_params() + params["page"] = str(self._recommended_artists_index) + + doc = self._request('user.getRecommendedArtists', True, params) + + list = [] + for name in _extract_all(doc, 'name'): list.append(Artist(name, *self.auth_data)) return list + +class _Search(_BaseObject): + """An abstract class. Use one of its derivatives.""" + + def __init__(self, ws_prefix, search_terms, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + + self._ws_prefix = ws_prefix + self.search_terms = search_terms + + self._last_page_index = 0 + + def _get_params(self): + params = {} + + for key in self.search_terms.keys(): + params[key] = self.search_terms[key] + + return params + + def get_total_result_count(self): + """Returns the total count of all the results.""" + + doc = self._request(self._ws_prefix + ".search", True) + + return _extract(doc, "opensearch:totalResults") + + def _retreive_page(self, page_index): + """Returns the node of matches to be processed""" + + params = self._get_params() + params["page"] = str(page_index) + doc = self._request(self._ws_prefix + ".search", True, params) + + return doc.getElementsByTagName(self._ws_prefix + "matches")[0] + + def _retrieve_next_page(self): + self._last_page_index += 1 + return self._retreive_page(self._last_page_index) class AlbumSearch(_Search): """Search for an album by name.""" def __init__(self, album_name, api_key, api_secret, session_key): - _Search.__init__(self, api_key, api_secret, session_key) - self._album_name = album_name - - def _getParams(self): - return {'sk': self.session_key, 'album': self.getAlbumName()} + _Search.__init__(self, "album", {"album": album_name}, api_key, api_secret, session_key) + + def get_next_page(self): + """Returns the next page of results as a sequence of Album objects.""" - def getAlbumName(self): - """Returns the album name.""" - - return self._album_name - - def getResults(self, limit = 30, page = 1): - """Returns the matches sorted by relevance. - * limit: Limit the number of albums returned at one time. Default (maximum) is 30. - * page: Scan into the results by specifying a page number. Defaults to first page. - """ - - params = self._getParams() - params['limit'] = unicode(limit) - params['page'] = unicode(page) - - doc = _Request(self, 'album.search', self.api_key, params).execute() - - if not doc: - return [] - - self._total_result_count = self._extract(doc, 'opensearch:totalResults') - self._page = page - self._limit = limit - - e = doc.getElementsByTagName('albummatches')[0] - - names = self._extract_all(e, 'name') - artists = self._extract_all(e, 'artist') + master_node = self._retrieve_next_page() list = [] - for i in range(0, len(names)): - list.append(Album(artists[i], names[i], *self.auth_data)) + for node in master_node.getElementsByTagName("album"): + list.append(Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)) return list -class UserPlaylist(_BaseObject, _Cacheable): - """A Last.fm user playlist.""" +class ArtistSearch(_Search): + """Search for an artist by artist name.""" - def __init__(self, username, playlist_id, api_key, api_secret, session_key): - _BaseObject.__init__(self, api_key, api_secret, session_key) - _Cacheable.__init__(self) - - self._username = username - self._playlist_id = unicode(playlist_id) - - self._cached_info = None + def __init__(self, artist_name, api_key, api_secret, session_key): + _Search.__init__(self, "artist", {"artist": artist_name}, api_key, api_secret, session_key) - def _getInfo(self): + def get_next_page(self): + """Returns the next page of results as a sequence of Artist objects.""" - playlists = self.getUser().getPlaylistsData() - data = None + master_node = self._retrieve_next_page() - for p in playlists: - if p['id'] == self.getPlaylistID(): - data = p + list = [] + for node in master_node.getElementsByTagName("artist"): + list.append(Artist(_extract(node, "name"), *self.auth_data)) - return data - - def _getParams(self): - return {'sk': self.session_key, 'user': self._username, 'playlistID': self._playlist_id} - - def getPlaylistID(self): - """Returns the playlist id.""" - - return self._playlist_id - - def getUser(self): - """Returns the owner user of this playlist.""" - - return User(self._username, *self.auth_data) - - def getTracks(self): - """Returns a list of the tracks on this user playlist.""" - - uri = u'lastfm://playlist/%s' %unicode(self.getPlaylistID()) - - return Playlist(uri, *self.auth_data).fetch() - - def addTrack(self, track): - """Adds a Track to this UserPlaylist. - * track: Any Track object. - """ - - params = self._getParams() - params['artist'] = track.getArtist().getName() - params['track'] = track.getTitle() - - _Request(self, 'playlist.addTrack', self.api_key, params, True, self.secret).execute() - - print self.last_error() - - def getTitle(self): - """Returns the title of this playlist.""" - - return self._getCachedInfo('title') - - def getCreationDate(self): - """Returns the creation date of this playlist.""" - - return self._getCachedInfo('date') - - def getSize(self): - """Returns the size of this playlist.""" - - return int(self._getCachedInfo('size')) - - def getDescription(self): - """Returns the description of this playlist.""" - - return self._getCachedInfo('description') - - def getDuration(self): - """Returns the duration of this playlist.""" - - return int(self._getCachedInfo('duration')) - - def isStreamable(self): - """Returns True if the playlist is streamable. - For a playlist to be streamable, it needs at least 45 tracks by 15 different artists.""" - - if self._getCachedInfo('streamable') == '1': - return True - else: - return False - - def hasTrack(self, track): - """Checks to see if track is already in the playlist. - * track: Any Track object. - """ - - tracks = self.getTracks() - - if not tracks: - return False - - has_it = False - for t in tracks: - if track._hash() == t._hash(): - has_it = True - break - - return has_it + return list - def getImage(self, size = IMAGE_LARGE): - """Returns the associated image URL. - * size: The image size. Possible values: - o IMAGE_LARGE - o IMAGE_MEDIUM - o IMAGE_SMALL - """ - - return self._getCachedInfo('images', size) - - def getURL(self, domain_name = DOMAIN_ENGLISH): - """Returns the url of the playlist on Last.fm. - * domain_name: Last.fm's language domain. Possible values: - o DOMAIN_ENGLISH - o DOMAIN_GERMAN - o DOMAIN_SPANISH - o DOMAIN_FRENCH - o DOMAIN_ITALIAN - o DOMAIN_POLISH - o DOMAIN_PORTUGUESE - o DOMAIN_SWEDISH - o DOMAIN_TURKISH - o DOMAIN_RUSSIAN - o DOMAIN_JAPANESE - o DOMAIN_CHINESE - """ - url = 'http://%(domain)s/%(appendix)s' - - return url %{'domain': domain_name, 'appendix': self._getCachedInfo('url_appendix')} - - -class UserPlaylistCreator(_BaseObject): - """Used to create playlists for the authenticated user.""" - - def __init__(self, api_key, api_secret, session_key): - _BaseObject.__init__(self, api_key, api_secret, session_key) - - def _getParams(self): - return {'sk': self.session_key} - - def create(self, title, description): - """Creates a playlist for the authenticated user and returns it. - * title: The title of the new playlist. - * description: The description of the new playlist. - """ - - params = self._getParams() - - params['title'] = unicode(title) - params['description'] = unicode(description) - - doc = _Request(self, 'playlist.create', self.api_key, params, True, self.secret).execute() - - if not doc: - return None - - id = self._extract(doc, 'id') - user = doc.getElementsByTagName('playlists')[0].getAttribute('user') - - return UserPlaylist(user, id, *self.auth_data) - - class TagSearch(_Search): """Search for a tag by tag name.""" def __init__(self, tag_name, api_key, api_secret, session_key): - _Search.__init__(self, api_key, api_secret, session_key) - self._tag_name = tag_name - - def _getParams(self): - return {'sk': self.session_key, 'tag': self.getTagName()} + _Search.__init__(self, "tag", {"tag": tag_name}, api_key, api_secret, session_key) - def getTagName(self): - """Returns the tag name.""" + def get_next_page(self): + """Returns the next page of results as a sequence of Tag objects.""" - return self._tag_name - - def getResults(self, limit = 30, page = 1): - """Returns the matches sorted by relevance. - * limit: Limit the number of artists returned at one time. Default (maximum) is 30. - * page: Scan into the results by specifying a page number. Defaults to first page. - """ - - params = self._getParams() - params['limit'] = unicode(limit) - params['page'] = unicode(page) - - doc = _Request(self, 'tag.Search', self.api_key, params).execute() - - if not doc: - return None - - self._total_result_count = self._extract(doc, 'opensearch:totalResults') - self._page = page - self._limit = limit - - e = doc.getElementsByTagName('tagmatches')[0] - - names = self._extract_all(e, 'name') + master_node = self._retrieve_next_page() list = [] - for name in names: - list.append(Tag(name, *self.auth_data)) + for node in master_node.getElementsByTagName("tag"): + list.append(Tag(_extract(node, "name"), *self.auth_data)) return list class TrackSearch(_Search): """Search for a track by track title. If you don't wanna narrow the results down - by specifying the artist name, set it to None""" + by specifying the artist name, set it to empty string.""" - def __init__(self, track_title, artist_name, api_key, api_secret, session_key): - _Search.__init__(self, api_key, api_secret, session_key) + def __init__(self, artist_name, track_title, api_key, api_secret, session_key): - self._track_title = track_title - self._artist_name = artist_name + _Search.__init__(self, "track", {"track": track_title, "artist": artist_name}, api_key, api_secret, session_key) - def _getParams(self): - params = {'sk': self.session_key, 'track': self.getTrackTitle()} - if self.getTrackArtistName(): - params['artist'] = self.getTrackArtistName() + def get_next_page(self): + """Returns the next page of results as a sequence of Track objects.""" - return params - - def getTrackTitle(self): - """Returns the track title.""" - - return self._track_title - - def getTrackArtistName(self): - """Returns the artist name.""" - - return self._artist_name - - def getResults(self, limit = 30, page = 1): - """Returns the matches sorted by relevance. - * limit: Limit the number of artists returned at one time. Default (maximum) is 30. - * page: Scan into the results by specifying a page number. Defaults to first page. - """ - - params = self._getParams() - params['limit'] = unicode(limit) - params['page'] = unicode(page) - - doc = _Request(self, 'track.Search', self.api_key, params).execute() - - if not doc: - return None - - self._total_result_count = self._extract(doc, 'opensearch:totalResults') - self._page = page - self._limit = limit - - e = doc.getElementsByTagName('trackmatches')[0] - - titles = self._extract_all(e, 'name') - artists = self._extract_all(e, 'artist') + master_node = self._retrieve_next_page() list = [] - for i in range(0, len(titles)): - list.append(Track(artists[i], titles[i], *self.auth_data)) + for node in master_node.getElementsByTagName("track"): + list.append(Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)) return list + +class VenueSearch(_Search): + """Search for a venue by its name. If you don't wanna narrow the results down + by specifying a country, set it to empty string.""" + + def __init__(self, venue_name, country_name, api_key, api_secret, session_key): + + _Search.__init__(self, "venue", {"venue": venue_name, "country": country_name}, api_key, api_secret, session_key) + + def get_next_page(self): + """Returns the next page of results as a sequence of Track objects.""" + + master_node = self._retrieve_next_page() + + list = [] + for node in master_node.getElementsByTagName("venue"): + list.append(Venue(_extract(node, "id"), *self.auth_data)) + + return list + +class Venue(_BaseObject): + """A venue where events are held.""" + + # TODO: waiting for a venue.getInfo web service to use. + + def __init__(self, id, api_key, api_secret, session_key): + _BaseObject.__init__(self, api_key, api_secret, session_key) + + self.id = _number(id) + + def __repr__(self): + return "Venue #" + str(self.id) + + def __eq__(self, other): + return self.get_id() == other.get_id() + + def _get_params(self): + return {"venue": self.get_id()} + + def get_id(self): + """Returns the id of the venue.""" + + return self.id + + def get_upcoming_events(self): + """Returns the upcoming events in this venue.""" + + doc = self._request("venue.getEvents", True) + + list = [] + for node in doc.getElementsByTagName("event"): + list.append(Event(_extract(node, "id"), *self.auth_data)) + + return list + + def get_past_events(self): + """Returns the past events held in this venue.""" + + doc = self._request("venue.getEvents", True) + + list = [] + for node in doc.getElementsByTagName("event"): + list.append(Event(_extract(node, "id"), *self.auth_data)) + + return list + +def create_new_playlist(title, description, api_key, api_secret, session_key): + """Creates a playlist for the authenticated user and returns it. + * title: The title of the new playlist. + * description: The description of the new playlist. + """ + + params = dict() + params['title'] = unicode(title) + params['description'] = unicode(description) + + doc = _Request('playlist.create', params, api_key, api_secret, session_key).execute() + + id = doc.getElementsByTagName("id")[0].firstChild.data + user = doc.getElementsByTagName('playlists')[0].getAttribute('user') + + return Playlist(user, id, api_key, api_secret, session_key) + +def get_authenticated_user(api_key, api_secret, session_key): + """Returns the authenticated user.""" + + return AuthenticatedUser(api_key, api_secret, session_key) + +def md5(text): + """Returns the md5 hash of a string.""" + + hash = hashlib.md5() + hash.update(text.encode('utf-8')) + + return hash.hexdigest() + +def enable_proxy(host, port): + """Enable a default web proxy.""" + + global __proxy + global __proxy_enabled + + __proxy = [host, _number(port)] + __proxy_enabled = True + +def disable_proxy(): + """Disable using the web proxy.""" + + global __proxy_enabled + + __proxy_enabled = False + +def is_proxy_enabled(): + """Returns True if a web proxy is enabled.""" + + global __proxy_enabled + + return __proxy_enabled + +def _get_proxy(): + """Returns proxy details.""" + + global __proxy + + return __proxy + +def async_call(sender, call, callback = None, call_args = None, callback_args = None): + """This is the function for setting up an asynchronous operation. + * call: The function to call asynchronously. + * callback: The function to call after the operation is complete, Its prototype has to be like: + callback(sender, output[, param1, param3, ... ]) + * call_args: A sequence of args to be passed to call. + * callback_args: A sequence of args to be passed to callback. + """ + + thread = _ThreadedCall(sender, call, call_args, callback, callback_args) + thread.start() + +def enable_caching(cache_dir = None): + """Enables caching request-wide for all cachable calls. + * cache_dir: A directory path to use to store the caching data. Set to None to use a temporary directory. + """ + + global __cache_dir + global __cache_enabled + + if cache_dir == None: + import tempfile + __cache_dir = tempfile.mkdtemp() + else: + if not os.path.exists(cache_dir): + os.mkdir(cache_dir) + __cache_dir = cache_dir + + __cache_enabled = True + +def disable_caching(): + """Disables all caching features.""" + + global __cache_enabled + + __cache_enabled = False + + print "cache is disabled" + +def is_caching_enabled(): + """Returns True if caching is enabled.""" + + global __cache_enabled + + return __cache_enabled + +def _get_cache_dir(): + """Returns the directory in which cache files are saved.""" + + global __cache_dir + global __cache_enabled + + return __cache_dir + +def _extract(node, name, index = 0): + """Extracts a value from the xml string""" + + nodes = node.getElementsByTagName(name) + + if len(nodes): + if nodes[index].firstChild: + return nodes[index].firstChild.data.strip() + else: + return None + +def _extract_all(node, name, limit_count = None): + """Extracts all the values from the xml string. returning a list.""" + + list = [] + + for i in range(0, len(node.getElementsByTagName(name))): + if len(list) == limit_count: + break + + list.append(_extract(node, name, i)) + + return list + +def _get_url_safe(text): + """Does all kinds of tricks on a text to make it safe to use in a url.""" + + if type(text) == type(unicode()): + text = text.encode('utf-8') + + return urllib.quote_plus(urllib.quote_plus(text)).lower() + +def _number(string): + """Extracts an int from a string. Returns a 0 if None or an empty string was passed.""" + + if not string: + return 0 + elif string == "": + return 0 + else: + return int(string) + +def search_for_album(album_name, api_key, api_secret, session_key): + """Searches for an album by its name. Returns a AlbumSearch object. + Use get_next_page() to retreive sequences of results.""" + + return AlbumSearch(album_name, api_key, api_secret, session_key) + +def search_for_artist(artist_name, api_key, api_secret, session_key): + """Searches of an artist by its name. Returns a ArtistSearch object. + Use get_next_page() to retreive sequences of results.""" + + return ArtistSearch(artist_name, api_key, api_secret, session_key) + +def search_for_tag(tag_name, api_key, api_secret, session_key): + """Searches of a tag by its name. Returns a TagSearch object. + Use get_next_page() to retreive sequences of results.""" + + return TagSearch(tag_name, api_key, api_secret, session_key) + +def search_for_track(artist_name, track_name, api_key, api_secret, session_key): + """Searches of a track by its name and its artist. Set artist to an empty string if not available. + Returns a TrackSearch object. + Use get_next_page() to retreive sequences of results.""" + + return TrackSearch(artist_name, track_name, api_key, api_secret, session_key) + +def search_for_venue(venue_name, country_name, api_key, api_secret, session_key): + """Searches of a venue by its name and its country. Set country_name to an empty string if not available. + Returns a VenueSearch object. + Use get_next_page() to retreive sequences of results.""" + + return VenueSearch(venue_name, country_name, api_key, api_secret, session_key) + +def extract_items(topitems_or_libraryitems): + """Extracts a sequence of items from a sequence of TopItem or LibraryItem objects.""" + + list = [] + for i in topitems_or_libraryitems: + list.append(i.get_item()) + + return list + +def get_top_tags(api_key, api_secret, session_key): + """Returns a sequence of the most used Last.fm tags as a sequence of TopItem objects.""" + + doc = _Request("tag.getTopTags", dict(), api_key, api_secret, session_key).execute(True) + list = [] + for node in doc.getElementsByTagName("tag"): + tag = Tag(_extract(node, "name"), api_key, api_secret, session_key) + weight = _extract(node, "count") + + list.append(TopItem(tag, weight)) + + return list + +def get_track_by_mbid(mbid, api_key, api_secret, session_key): + """Looks up a track by its MusicBrainz ID.""" + + params = {"mbid": unicode(mbid)} + + doc = _Request("track.getInfo", params, api_key, api_secret, session_key).execute(True) + + return Track(_extract(doc, "name", 1), _extract(doc, "name"), api_key, api_secret, session_key) + +def get_artist_by_mbid(mbid, api_key, api_secret, session_key): + """Loooks up an artist by its MusicBrainz ID.""" + + params = {"mbid": unicode(mbid)} + + doc = _Request("artist.getInfo", params, api_key, api_secret, session_key).execute(True) + + return Artist(_extract(doc, "name"), api_key, api_secret, session_key) + +def get_album_by_mbid(mbid, api_key, api_secret, session_key): + """Looks up an album by its MusicBrainz ID.""" + + params = {"mbid": unicode(mbid)} + + doc = _Request("album.getInfo", params, api_key, api_secret, session_key).execute(True) + + return Album(_extract(doc, "artist"), _extract(doc, "name"), api_key, api_secret, session_key) + +def _delay_call(): + """Makes sure that web service calls are at least a second apart.""" + + global __last_call_time + + # delay time in seconds + DELAY_TIME = 1.0 + now = time.time() + + print "Last call:", __last_call_time + print "Now:", int(time.time()) + + if (now - __last_call_time) < DELAY_TIME: + time.sleep(1) + + __last_call_time = now diff --git a/setup.py b/setup old mode 100644 new mode 100755 similarity index 93% rename from setup.py rename to setup index f7dbf78..b1fd822 --- a/setup.py +++ b/setup @@ -1,3 +1,5 @@ +#!/usr/bin/env python + from distutils.core import setup import pylast