# -*- coding: utf-8 -*- # # pylast - A Python interface to Last.fm # Copyright (C) 2008-2009 Amr Hassan # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA # # http://code.google.com/p/pylast/ __name__ = 'pylast' __version__ = '0.3.0b' __doc__ = 'A Python interface to Last.fm' __author__ = 'Amr Hassan' __email__ = 'amr.hassan@gmail.com' __proxy = None __proxy_enabled = False __cache_dir = None __cache_enabled = False __last_call_time = 0 import hashlib import httplib import urllib import threading from xml.dom import minidom import os import time from logging import * STATUS_INVALID_SERVICE = 2 STATUS_INVALID_METHOD = 3 STATUS_AUTH_FAILED = 4 STATUS_INVALID_FORMAT = 5 STATUS_INVALID_PARAMS = 6 STATUS_INVALID_RESOURCE = 7 STATUS_TOKEN_ERROR = 8 STATUS_INVALID_SK = 9 STATUS_INVALID_API_KEY = 10 STATUS_OFFLINE = 11 STATUS_SUBSCRIBERS_ONLY = 12 STATUS_INVALID_SIGNATURE = 13 STATUS_TOKEN_UNAUTHORIZED = 14 STATUS_TOKEN_EXPIRED = 15 EVENT_ATTENDING = '0' EVENT_MAYBE_ATTENDING = '1' EVENT_NOT_ATTENDING = '2' PERIOD_OVERALL = 'overall' PERIOD_3MONTHS = '3month' PERIOD_6MONTHS = '6month' PERIOD_12MONTHS = '12month' IMAGE_SMALL = 0 IMAGE_MEDIUM = 1 IMAGE_LARGE = 2 IMAGE_EXTRA_LARGE = 3 DOMAIN_ENGLISH = 'www.last.fm' DOMAIN_GERMAN = 'www.lastfm.de' DOMAIN_SPANISH = 'www.lastfm.es' DOMAIN_FRENCH = 'www.lastfm.fr' DOMAIN_ITALIAN = 'www.lastfm.it' DOMAIN_POLISH = 'www.lastfm.pl' DOMAIN_PORTUGUESE = 'www.lastfm.com.br' DOMAIN_SWEDISH = 'www.lastfm.se' DOMAIN_TURKISH = 'www.lastfm.com.tr' DOMAIN_RUSSIAN = 'www.lastfm.ru' DOMAIN_JAPANESE = 'www.lastfm.jp' DOMAIN_CHINESE = 'cn.last.fm' USER_MALE = 'Male' USER_FEMALE = 'Female' SCROBBLE_SOURCE_USER = "P" SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST = "R" SCROBBLE_SOURCE_PERSONALIZED_BROADCAST = "E" SCROBBLE_SOURCE_LASTFM = "L" SCROBBLE_SOURCE_UNKNOWN = "U" SCROBBLE_MODE_PLAYED = "L" SCROBBLE_MODE_BANNED = "B" SCROBBLE_MODE_SKIPPED = "S" class _ThreadedCall(threading.Thread): """Facilitates calling a function on another thread.""" def __init__(self, sender, funct, funct_args, callback, callback_args): threading.Thread.__init__(self) self.funct = funct self.funct_args = funct_args self.callback = callback self.callback_args = callback_args self.sender = sender def run(self): output = [] if self.funct: if self.funct_args: output = self.funct(*self.funct_args) else: output = self.funct() if self.callback: if self.callback_args: self.callback(self.sender, output, *self.callback_args) else: self.callback(self.sender, output) class _Request(object): """Representing an abstract web service operation.""" HOST_NAME = 'ws.audioscrobbler.com' HOST_SUBDIR = '/2.0/' def __init__(self, method_name, params, api_key, api_secret, session_key = None): self.params = params self.api_secret = api_secret self.params["api_key"] = api_key self.params["method"] = method_name if session_key: self.params["sk"] = session_key self.sign_it() def sign_it(self): """Sign this request.""" if not "api_sig" in self.params.keys(): self.params['api_sig'] = self._get_signature() def _get_signature(self): """Returns a 32-character hexadecimal md5 hash of the signature string.""" keys = self.params.keys()[:] keys.sort() string = unicode() for name in keys: string += name string += self.params[name] string += self.api_secret return md5(string.encode('utf-8')) def _get_cache_key(self): """The cache key is a string of concatenated sorted names and values.""" keys = self.params.keys() keys.sort() cache_key = str() for key in keys: if key != "api_sig" and key != "api_key" and key != "sk": cache_key += urllib.quote_plus(key) + urllib.quote_plus(urllib.quote_plus(self.params[key])) return cache_key def _is_cached(self): """Returns True if the request is available in the cache.""" return os.path.exists(os.path.join(_get_cache_dir(), self._get_cache_key())) def _get_cached_response(self): """Returns a file object of the cached response.""" if not self._is_cached(): response = self._download_response() response_file = open(os.path.join(_get_cache_dir(), self._get_cache_key()), "w") response_file.write(response) response_file.close() return open(os.path.join(_get_cache_dir(), self._get_cache_key()), "r").read() def _download_response(self): """Returns a response body string from the server.""" # Delay the call if necessary _delay_call() data = [] for name in self.params.keys(): data.append('='.join((name, urllib.quote_plus(self.params[name].encode('utf-8'))))) data = '&'.join(data) headers = { "Content-type": "application/x-www-form-urlencoded", 'Accept-Charset': 'utf-8', 'User-Agent': __name__ + '/' + __version__ } if is_proxy_enabled(): conn = httplib.HTTPConnection(host = _get_proxy()[0], port = _get_proxy()[1]) conn.request(method='POST', url="http://" + self.HOST_NAME + self.HOST_SUBDIR, body=data, headers=headers) else: conn = httplib.HTTPConnection(host=self.HOST_NAME) conn.request(method='POST', url=self.HOST_SUBDIR, body=data, headers=headers) response = conn.getresponse().read() self._check_response_for_errors(response) return response def execute(self, cacheable = False): """Returns the XML DOM response of the POST Request from the server""" if is_caching_enabled() and cacheable: response = self._get_cached_response() else: response = self._download_response() return minidom.parseString(response) def _check_response_for_errors(self, response): """Checks the response for errors and raises one if any exists.""" doc = minidom.parseString(response) e = doc.getElementsByTagName('lfm')[0] if e.getAttribute('status') != "ok": e = doc.getElementsByTagName('error')[0] status = e.getAttribute('code') details = e.firstChild.data.strip() raise ServiceException(status, details) class SessionKeyGenerator(object): """Methods of generating a session key: 1) Web Authentication: a. sg = SessionKeyGenerator(API_KEY, API_SECRET) b. url = sg.get_web_auth_url() c. Ask the user to open the url and authorize you, and wait for it. d. session_key = sg.get_web_auth_session_key(url) 2) Username and Password Authentication: a. username = raw_input("Please enter your username: ") b. md5_password = pylast.md5(raw_input("Please enter your password: ") c. session_key = SessionKeyGenerator(API_KEY, API_SECRET).get_session_key(username, md5_password) A session key's lifetime is infinie, unless the user provokes the rights of the given API Key. """ def __init__(self, api_key, api_secret): self.api_key = api_key self.api_secret = api_secret self.web_auth_tokens = {} def _get_web_auth_token(self): """Retrieves a token from Last.fm for web authentication. The token then has to be authorized from getAuthURL before creating session. """ request = _Request('auth.getToken', dict(), self.api_key, self.api_secret) # default action is that a request is signed only when # a session key is provided. request.sign_it() doc = request.execute() e = doc.getElementsByTagName('token')[0] return e.firstChild.data def get_web_auth_url(self): """The user must open this page, and you first, then call get_web_auth_session_key(url) after that.""" token = self._get_web_auth_token() url = 'http://www.last.fm/api/auth/?api_key=%(api)s&token=%(token)s' % \ {'api': self.api_key, 'token': token} self.web_auth_tokens[url] = token return url def get_web_auth_session_key(self, url): """Retrieves the session key of a web authorization process by its url.""" if url in self.web_auth_tokens.keys(): token = self.web_auth_tokens[url] else: token = "" #that's gonna raise a ServiceException of an unauthorized token when the request is executed. request = _Request('auth.getSession', {'token': token}, self.api_key, self.api_secret) # default action is that a request is signed only when # a session key is provided. request.sign_it() doc = request.execute() return doc.getElementsByTagName('key')[0].firstChild.data def get_session_key(self, username, md5_password): """Retrieve a session key with a username and a md5 hash of the user's password.""" params = {"username": username, "authToken": md5(username + md5_password)} request = _Request("auth.getMobileSession", params, self.api_key, self.api_secret) # default action is that a request is signed only when # a session key is provided. request.sign_it() doc = request.execute() return _extract(doc, "key") class _BaseObject(object): """An abstract webservices object.""" def __init__(self, api_key, api_secret, session_key): self.api_key = api_key self.api_secret = api_secret self.session_key = session_key self.auth_data = (self.api_key, self.api_secret, self.session_key) def _request(self, method_name, cacheable = False, params = None): if not params: params = self._get_params() return _Request(method_name, params, *self.auth_data).execute(cacheable) def _get_params(): """Returns the most common set of parameters between all objects.""" return dict() class _Taggable(object): """Common functions for classes with tags.""" def __init__(self, ws_prefix): self.ws_prefix = ws_prefix def add_tags(self, *tags): """Adds one or several tags. * *tags: Any number of tag names or Tag objects. """ for tag in tags: self._add_tag(tag) def _add_tag(self, tag): """Adds one or several tags. * tag: one tag name or a Tag object. """ if isinstance(tag, Tag): tag = tag.get_name() params = self._get_params() params['tags'] = unicode(tag) self._request(self.ws_prefix + '.addTags', False, params) info("Tagged " + repr(self) + " as (" + repr(tag) + ")") def _remove_tag(self, single_tag): """Remove a user's tag from this object.""" if isinstance(single_tag, Tag): single_tag = single_tag.get_name() params = self._get_params() params['tag'] = unicode(single_tag) self._request(self.ws_prefix + '.removeTag', False, params) info("Removed tag (" + repr(tag) + ") from " + repr(self)) def get_tags(self): """Returns a list of the tags set by the user to this object.""" # Uncacheable because it can be dynamically changed by the user. params = self._get_params() doc = _Request(self.ws_prefix + '.getTags', params, *self.auth_data).execute(cacheable = False) tag_names = _extract_all(doc, 'name') tags = [] for tag in tag_names: tags.append(Tag(tag, *self.auth_data)) return tags def remove_tags(self, *tags): """Removes one or several tags from this object. * *tags: Any number of tag names or Tag objects. """ for tag in tags: self._remove_tag(tag) def clear_tags(self): """Clears all the user-set tags. """ self.remove_tags(*(self.get_tags())) def set_tags(self, *tags): """Sets this object's tags to only those tags. * *tags: any number of tag names. """ c_old_tags = [] old_tags = [] c_new_tags = [] new_tags = [] to_remove = [] to_add = [] tags_on_server = self.get_tags() for tag in tags_on_server: c_old_tags.append(tag.get_name().lower()) old_tags.append(tag.get_name()) for tag in tags: c_new_tags.append(tag.lower()) new_tags.append(tag) for i in range(0, len(old_tags)): if not c_old_tags[i] in c_new_tags: to_remove.append(old_tags[i]) for i in range(0, len(new_tags)): if not c_new_tags[i] in c_old_tags: to_add.append(new_tags[i]) self.remove_tags(*to_remove) self.add_tags(*to_add) def get_top_tags(self, limit = None): """Returns a list of the most frequently used Tags on this object.""" doc = self._request(self.ws_prefix + '.getTopTags', True) elements = doc.getElementsByTagName('tag') list = [] for element in elements: if limit and len(list) >= limit: break tag_name = _extract(element, 'name') tagcount = _extract(element, 'count') list.append(TopItem(Tag(tag_name, *self.auth_data), tagcount)) return list class ServiceException(Exception): """Exception related to the Last.fm web service""" def __init__(self, lastfm_status, details): self._lastfm_status = lastfm_status self._details = details def __str__(self): return self._details def get_id(self): """Returns the exception ID, from one of the following: STATUS_INVALID_SERVICE = 2 STATUS_INVALID_METHOD = 3 STATUS_AUTH_FAILED = 4 STATUS_INVALID_FORMAT = 5 STATUS_INVALID_PARAMS = 6 STATUS_INVALID_RESOURCE = 7 STATUS_TOKEN_ERROR = 8 STATUS_INVALID_SK = 9 STATUS_INVALID_API_KEY = 10 STATUS_OFFLINE = 11 STATUS_SUBSCRIBERS_ONLY = 12 STATUS_TOKEN_UNAUTHORIZED = 14 STATUS_TOKEN_EXPIRED = 15 """ return self._lastfm_status class TopItem (object): """A top item in a list that has a weight. Returned from functions like get_top_tracks() and get_top_artists().""" def __init__(self, item, weight): object.__init__(self) self.item = item self.weight = _number(weight) def __repr__(self): return "Item: " + self.get_item().__repr__() + ", Weight: " + str(self.get_weight()) def get_item(self): """Returns the item.""" return self.item def get_weight(self): """Returns the weight of the itme in the list.""" return self.weight class LibraryItem (object): """An item in a User's Library. It could be an artist, an album or a track.""" def __init__(self, item, playcount, tagcount): object.__init__(self) self.item = item self.playcount = _number(playcount) self.tagcount = _number(tagcount) def __repr__(self): return "Item: " + self.get_item().__repr__() + ", Playcount: " + str(self.get_playcount()) + ", Tagcount: " + str(self.get_tagcount()) def get_item(self): """Returns the itme.""" return self.item def get_playcount(self): """Returns the item's playcount in the Library.""" return self.playcount def get_tagcount(self): """Returns the item's tagcount in the Library.""" return self.tagcount class Album(_BaseObject, _Taggable): """A Last.fm album.""" def __init__(self, artist, title, api_key, api_secret, session_key): """ Create an album instance. # Parameters: * artist str|Artist: An artist name or an Artist object. * title str: The album title. """ _BaseObject.__init__(self, api_key, api_secret, session_key) _Taggable.__init__(self, 'album') if isinstance(artist, Artist): self.artist = artist else: self.artist = Artist(artist, *self.auth_data) self.title = title def __repr__(self): return self.get_artist().get_name().encode('utf-8') + ' - ' + self.get_title().encode('utf-8') def __eq__(self, other): return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower()) def __ne__(self, other): return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower()) def _get_params(self): return {'artist': self.get_artist().get_name(), 'album': self.get_title(), } def get_artist(self): """Returns the associated Artist object.""" return self.artist def get_title(self): """Returns the album title.""" return self.title def get_name(self): """Returns the album title (alias to Album.get_title).""" return self.get_title() def get_release_date(self): """Retruns the release date of the album.""" return _extract(self._request("album.getInfo", cacheable = True), "releasedate") def get_image_url(self, size = IMAGE_EXTRA_LARGE): """Returns the associated image URL. # Parameters: * size int: The image size. Possible values: o IMAGE_EXTRA_LARGE o IMAGE_LARGE o IMAGE_MEDIUM o IMAGE_SMALL """ return _extract_all(self._request("album.getInfo", cacheable = True), 'image')[size] def get_id(self): """Returns the Last.fm ID.""" return _extract(self._request("album.getInfo", cacheable = True), "id") def get_playcount(self): """Returns the number of plays on Last.fm.""" return _number(_extract(self._request("album.getInfo", cacheable = True), "playcount")) def get_listener_count(self): """Returns the number of liteners on Last.fm.""" return _number(_extract(self._request("album.getInfo", cacheable = True), "listeners")) def get_top_tags(self, limit = None): """Returns a list of the most-applied tags to this album.""" # BROKEN: Web service is currently broken. return None def get_tracks(self): """Returns the list of Tracks on this album.""" uri = 'lastfm://playlist/album/%s' %self.get_id() return XSPF(uri, *self.auth_data).get_tracks() def get_mbid(self): """Returns the MusicBrainz id of the album.""" return _extract(self._request("album.getInfo", cacheable = True), "mbid") def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the album page on Last.fm. # Parameters: * domain_name str: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/music/%(artist)s/%(album)s' artist = _get_url_safe(self.get_artist().get_name()) album = _get_url_safe(self.get_title()) return url %{'domain': domain_name, 'artist': artist, 'album': album} class Artist(_BaseObject, _Taggable): """A Last.fm artist.""" def __init__(self, name, api_key, api_secret, session_key): """Create an artist object. # Parameters: * name str: The artist's name. """ _BaseObject.__init__(self, api_key, api_secret, session_key) _Taggable.__init__(self, 'artist') self.name = name def __repr__(self): return self.get_name().encode('utf-8') def __eq__(self, other): return self.get_name().lower() == other.get_name().lower() def __ne__(self, other): return self.get_name().lower() != other.get_name().lower() def _get_params(self): return {'artist': self.get_name()} def get_name(self): """Returns the name of the artist.""" return self.name def get_image_url(self, size = IMAGE_LARGE): """Returns the associated image URL. # Parameters: * size int: The image size. Possible values: o IMAGE_LARGE o IMAGE_MEDIUM o IMAGE_SMALL """ return _extract_all(self._request("artist.getInfo", True), "image")[size] def get_playcount(self): """Returns the number of plays on Last.fm.""" return _number(_extract(self._request("artist.getInfo", True), "playcount")) def get_listener_count(self): """Returns the number of liteners on Last.fm.""" return _number(_extract(self._request("artist.getInfo", True), "listeners")) def is_streamable(self): """Returns True if the artist is streamable.""" return bool(_number(_extract(self._request("artist.getInfo", True), "streamable"))) def get_bio_published_date(self): """Returns the date on which the artist's biography was published.""" return _extract(self._request("artist.getInfo", True), "published") def get_bio_summary(self): """Returns the summary of the artist's biography.""" return _extract(self._request("artist.getInfo", True), "summary") def get_bio_content(self): """Returns the content of the artist's biography.""" return _extract(self._request("artist.getInfo", True), "content") def get_upcoming_events(self): """Returns a list of the upcoming Events for this artist.""" doc = self._request('artist.getEvents', True) ids = _extract_all(doc, 'id') events = [] for id in ids: events.append(Event(id, *self.auth_data)) return events def get_similar(self, limit = None): """Returns the similar artists on Last.fm.""" params = self._get_params() if limit: params['limit'] = unicode(limit) doc = self._request('artist.getSimilar', True, params) names = _extract_all(doc, 'name') artists = [] for name in names: artists.append(Artist(name, *self.auth_data)) return artists def get_top_albums(self): """Retuns a list of the top albums.""" doc = self._request('artist.getTopAlbums', True) list = [] for node in doc.getElementsByTagName("album"): name = _extract(node, "name") artist = _extract(node, "name", 1) playcount = _extract(node, "playcount") list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) return list def get_top_tracks(self): """Returns a list of the most played Tracks by this artist.""" doc = self._request("artist.getTopTracks", True) list = [] for track in doc.getElementsByTagName('track'): title = _extract(track, "name") artist = _extract(track, "name", 1) playcount = _number(_extract(track, "playcount")) list.append( TopItem(Track(artist, title, *self.auth_data), playcount) ) return list def get_top_fans(self, limit = None): """Returns a list of the Users who played this artist the most. # Parameters: * limit int: Max elements. """ params = self._get_params() doc = self._request('artist.getTopFans', True) list = [] elements = doc.getElementsByTagName('user') for element in elements: if limit and len(list) >= limit: break name = _extract(element, 'name') weight = _number(_extract(element, 'weight')) list.append(TopItem(User(name, *self.auth_data), weight)) return list def share(self, users, message = None): """Shares this artist (sends out recommendations). # Parameters: * users [User|str,]: A list that can contain usernames, emails, User objects, or all of them. * message str: A message to include in the recommendation message. """ #last.fm currently accepts a max of 10 recipient at a time while(len(users) > 10): section = users[0:9] users = users[9:] self.share(section, message) nusers = [] for user in users: if isinstance(user, User): nusers.append(user.get_name()) else: nusers.append(user) params = self._get_params() recipients = ','.join(nusers) params['recipient'] = recipients if message: params['message'] = unicode(message) self._request('artist.share', False, params) info(repr(self) + " was shared with " + repr(users)) def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the artist page on Last.fm. # Parameters: * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/music/%(artist)s' artist = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'artist': artist} class Event(_BaseObject): """A Last.fm event.""" def __init__(self, event_id, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.id = unicode(event_id) def __repr__(self): return "Event #" + self.get_id() def __eq__(self, other): return self.get_id() == other.get_id() def __ne__(self, other): return self.get_id() != other.get_id() def _get_params(self): return {'event': self.get_id()} def attend(self, attending_status): """Sets the attending status. * attending_status: The attending status. Possible values: o EVENT_ATTENDING o EVENT_MAYBE_ATTENDING o EVENT_NOT_ATTENDING """ params = self._get_params() params['status'] = unicode(attending_status) doc = self._request('event.attend', False, params) info("Attendance to " + repr(self) + " was set to " + repr(attending_status)) def get_id(self): """Returns the id of the event on Last.fm. """ return self.id def get_title(self): """Returns the title of the event. """ doc = self._request("event.getInfo", True) return _extract(doc, "title") def get_headliner(self): """Returns the headliner of the event. """ doc = self._request("event.getInfo", True) return Artist(_extract(doc, "headliner"), *self.auth_data) def get_artists(self): """Returns a list of the participating Artists. """ doc = self._request("event.getInfo", True) names = _extract_all(doc, "artist") artists = [] for name in names: artists.append(Artist(name, *self.auth_data)) return artists def get_venue(self): """Returns the venue where the event is held.""" doc = self._request("event.getInfo", True) venue_url = _extract(doc, "url") venue_id = _number(venue_url[venue_url.rfind("/") + 1:]) return Venue(venue_id, *self.auth_data) def get_start_date(self): """Returns the date when the event starts.""" doc = self._request("event.getInfo", True) return _extract(doc, "startDate") def get_description(self): """Returns the description of the event. """ doc = self._request("event.getInfo", True) return _extract(doc, "description") def get_image_url(self, size = IMAGE_LARGE): """Returns the associated image URL. * size: The image size. Possible values: o IMAGE_LARGE o IMAGE_MEDIUM o IMAGE_SMALL """ doc = self._request("event.getInfo", True) return _extract_all(doc, "image")[size] def get_attendance_count(self): """Returns the number of attending people. """ doc = self._request("event.getInfo", True) return _number(_extract(doc, "attendance")) def get_review_count(self): """Returns the number of available reviews for this event. """ doc = self._request("event.getInfo", True) return _number(_extract(doc, "reviews")) def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the event page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/event/%(id)s' return url %{'domain': domain_name, 'id': self.get_id()} def share(self, users, message = None): """Shares this event (sends out recommendations). * users: A list that can contain usernames, emails, User objects, or all of them. * message: A message to include in the recommendation message. """ #last.fm currently accepts a max of 10 recipient at a time while(len(users) > 10): section = users[0:9] users = users[9:] self.share(section, message) nusers = [] for user in users: if isinstance(user, User): nusers.append(user.get_name()) else: nusers.append(user) params = self._get_params() recipients = ','.join(nusers) params['recipient'] = recipients if message: params['message'] = unicode(message) self._request('event.share', False, params) info(repr(self) + " was shared with " + repr(users)) class Country(_BaseObject): """A country at Last.fm.""" def __init__(self, name, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.name = name def __repr__(self): return self.get_name().encode('utf-8') def __eq__(self, other): self.get_name().lower() == other.get_name().lower() def __ne__(self, other): self.get_name() != other.get_name() def _get_params(self): return {'country': self.get_name()} def _get_name_from_code(self, alpha2code): # TODO: Have this function lookup the alpha-2 code and return the country name. return alpha2code def get_name(self): """Returns the country name. """ return self.name def get_top_artists(self): """Returns a sequence of the most played artists.""" doc = self._request('geo.getTopArtists', True) list = [] for node in doc.getElementsByTagName("artist"): name = _extract(node, 'name') playcount = _extract(node, "playcount") list.append(TopItem(Artist(name, *self.auth_data), playcount)) return list def get_top_tracks(self): """Returns a sequence of the most played tracks""" doc = self._request("geo.getTopTracks", True) list = [] for n in doc.getElementsByTagName('track'): title = _extract(n, 'name') artist = _extract(n, 'name', 1) playcount = _number(_extract(n, "playcount")) list.append( TopItem(Track(artist, title, *self.auth_data), playcount)) return list def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the event page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/place/%(country_name)s' country_name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'country_name': country_name} class Library(_BaseObject): """A user's Last.fm library.""" def __init__(self, user, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) if isinstance(user, User): self.user = user else: self.user = User(user, *self.auth_data) self._albums_index = 0 self._artists_index = 0 self._tracks_index = 0 def __repr__(self): return self.get_user().__repr__() + "'s Library" def _get_params(self): return {'user': self.user.get_name()} def get_user(self): """Returns the user who owns this library.""" return self.user def add_album(self, album): """Add an album to this library.""" params = self._get_params() params["artist"] = album.get_artist.get_name() params["album"] = album.get_name() self._request("library.addAlbum", False, params) info(repr(album) + " was added to " + repr(self)) def add_artist(self, artist): """Add an artist to this library.""" params = self._get_params() params["artist"] = artist.get_name() self._request("library.addArtist", False, params) info(repr(artist) + " was added to " + repr(self)) def add_track(self, track): """Add a track to this library.""" params = self._get_prams() params["track"] = track.get_title() self._request("library.addTrack", False, params) info(repr(track) + " was added to " + repr(self)) def _get_albums_pagecount(self): """Returns the number of album pages in this library.""" doc = self._request("library.getAlbums", True) return _number(doc.getElementsByTagName("albums")[0].getAttribute("totalPages")) def is_end_of_albums(self): """Returns True when the last page of albums has ben retrieved.""" if self._albums_index >= self._get_albums_pagecount(): return True else: return False def _get_artists_pagecount(self): """Returns the number of artist pages in this library.""" doc = self._request("library.getArtists", True) return _number(doc.getElementsByTagName("artists")[0].getAttribute("totalPages")) def is_end_of_artists(self): """Returns True when the last page of artists has ben retrieved.""" if self._artists_index >= self._get_artists_pagecount(): return True else: return False def _get_tracks_pagecount(self): """Returns the number of track pages in this library.""" doc = self._request("library.getTracks", True) return _number(doc.getElementsByTagName("tracks")[0].getAttribute("totalPages")) def is_end_of_tracks(self): """Returns True when the last page of tracks has ben retrieved.""" if self._tracks_index >= self._get_tracks_pagecount(): return True else: return False def get_albums_page(self): """Retreives the next page of albums in the Library. Returns a sequence of TopItem objects. Use the function extract_items like extract_items(Library.get_albums_page()) to return only a sequence of Album objects with no extra data. Example: ------- library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) while not library.is_end_of_albums(): print library.get_albums_page() """ self._albums_index += 1 params = self._get_params() params["page"] = str(self._albums_index) list = [] doc = self._request("library.getAlbums", True, params) for node in doc.getElementsByTagName("album"): name = _extract(node, "name") artist = _extract(node, "name", 1) playcount = _number(_extract(node, "playcount")) tagcount = _number(_extract(node, "tagcount")) list.append(LibraryItem(Album(artist, name, *self.auth_data), playcount, tagcount)) return list def get_artists_page(self): """Retreives the next page of artists in the Library. Returns a sequence of TopItem objects. Use the function extract_items like extract_items(Library.get_artists_page()) to return only a sequence of Artist objects with no extra data. Example: ------- library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) while not library.is_end_of_artists(): print library.get_artists_page() """ self._artists_index += 1 params = self._get_params() params["page"] = str(self._artists_index) list = [] doc = self._request("library.getArtists", True, params) for node in doc.getElementsByTagName("artist"): name = _extract(node, "name") playcount = _number(_extract(node, "playcount")) tagcount = _number(_extract(node, "tagcount")) list.append(LibraryItem(Artist(name, *self.auth_data), playcount, tagcount)) return list def get_tracks_page(self): """Retreives the next page of tracks in the Library. Returns a sequence of TopItem objects. Use the function extract_items like extract_items(Library.get_tracks_page()) to return only a sequence of Track objects with no extra data. Example: ------- library = Library("rj", API_KEY, API_SECRET, SESSION_KEY) while not library.is_end_of_tracks(): print library.get_tracks_page() """ self._tracks_index += 1 params = self._get_params() params["page"] = str(self._tracks_index) list = [] doc = self._request("library.getTracks", True, params) for node in doc.getElementsByTagName("track"): name = _extract(node, "name") artist = _extract(node, "name", 1) playcount = _number(_extract(node, "playcount")) tagcount = _number(_extract(node, "tagcount")) list.append(LibraryItem(Track(artist, name, *self.auth_data), playcount, tagcount)) return list class Playlist(_BaseObject): """A Last.fm user playlist.""" def __init__(self, user, id, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) if isinstance(user, User): self.user = user else: self.user = User(user, *self.auth_data) self.id = unicode(id) def __repr__(self): return repr(self.user) + "'s playlist # " + repr(self.id) def _get_info_node(self): """Returns the node from user.getPlaylists where this playlist's info is.""" doc = self._request("user.getPlaylists", True) for node in doc.getElementsByTagName("playlist"): if _extract(node, "id") == str(self.get_id()): return node def _get_params(self): return {'user': self.user.get_name(), 'playlistID': self.get_id()} def get_id(self): """Returns the playlist id.""" return self.id def get_user(self): """Returns the owner user of this playlist.""" return self.user def get_tracks(self): """Returns a list of the tracks on this user playlist.""" uri = u'lastfm://playlist/%s' %self.get_id() return XSPF(uri, *self.auth_data).get_tracks() def add_track(self, track): """Adds a Track to this Playlist.""" params = self._get_params() params['artist'] = track.get_artist().get_name() params['track'] = track.get_title() self._request('playlist.addTrack', False, params) info(repr(track) + " was added to " + repr(self)) def get_title(self): """Returns the title of this playlist.""" return _extract(self._get_info_node(), "title") def get_creation_date(self): """Returns the creation date of this playlist.""" return _extract(self._get_info_node(), "date") def get_size(self): """Returns the number of tracks in this playlist.""" return _number(_extract(self._get_info_node(), "size")) def get_description(self): """Returns the description of this playlist.""" return _extract(self._get_info_node(), "description") def get_duration(self): """Returns the duration of this playlist in milliseconds.""" return _number(_extract(self._get_info_node(), "duration")) def is_streamable(self): """Returns True if the playlist is streamable. For a playlist to be streamable, it needs at least 45 tracks by 15 different artists.""" if _extract(self._get_info_node(), "streamable") == '1': return True else: return False def has_track(self, track): """Checks to see if track is already in the playlist. * track: Any Track object. """ return track in self.get_tracks() def get_image_url(self, size = IMAGE_LARGE): """Returns the associated image URL. * size: The image size. Possible values: o IMAGE_LARGE o IMAGE_MEDIUM o IMAGE_SMALL """ return _extract(self._get_info_node(), "image")[size] def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the playlist on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = "http://%(domain)s/user/%(user)s/library/playlists/%(appendix)s" english_url = _extract(self._get_info_node(), "url") appendix = english_url[english_url.rfind("/") + 1:] return url %{'domain': domain_name, 'appendix': appendix, "user": self.get_user().get_name()} class Tag(_BaseObject): """A Last.fm object tag.""" # TODO: getWeeklyArtistChart (too lazy, i'll wait for when someone requests it) def __init__(self, name, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.name = name def _get_params(self): return {'tag': self.get_name()} def __repr__(self): return self.get_name().encode('utf-8') def __eq__(self): return self.get_name().lower() == other.get_name().lower() def __ne__(self): return self.get_name().lower() != other.get_name().lower() def get_name(self): """Returns the name of the tag. """ return self.name def get_similar(self): """Returns the tags similar to this one, ordered by similarity. """ doc = self._request('tag.getSimilar', True) list = [] names = _extract_all(doc, 'name') for name in names: list.append(Tag(name, *self.auth_data)) return list def get_top_albums(self): """Retuns a list of the top albums.""" doc = self._request('tag.getTopAlbums', True) list = [] for node in doc.getElementsByTagName("album"): name = _extract(node, "name") artist = _extract(node, "name", 1) playcount = _extract(node, "playcount") list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) return list def get_top_tracks(self): """Returns a list of the most played Tracks by this artist.""" doc = self._request("tag.getTopTracks", True) list = [] for track in doc.getElementsByTagName('track'): title = _extract(track, "name") artist = _extract(track, "name", 1) playcount = _number(_extract(track, "playcount")) list.append( TopItem(Track(artist, title, *self.auth_data), playcount) ) return list def get_top_artists(self): """Returns a sequence of the most played artists.""" doc = self._request('tag.getTopArtists', True) list = [] for node in doc.getElementsByTagName("artist"): name = _extract(node, 'name') playcount = _extract(node, "playcount") list.append(TopItem(Artist(name, *self.auth_data), playcount)) return list def get_weekly_chart_dates(self): """Returns a list of From and To tuples for the available charts.""" doc = self._request("tag.getWeeklyChartList", True) list = [] for node in doc.getElementsByTagName("chart"): list.append( (node.getAttribute("from"), node.getAttribute("to")) ) return list def get_weekly_artist_charts(self, from_date = None, to_date = None): """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("tag.getWeeklyArtistChart", True, params) list = [] for node in doc.getElementsByTagName("artist"): item = Artist(_extract(node, "name"), *self.auth_data) weight = _extract(node, "weight") list.append(TopItem(item, weight)) return list def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the tag page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/tag/%(name)s' name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'name': name} class Track(_BaseObject, _Taggable): """A Last.fm track.""" def __init__(self, artist, title, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) _Taggable.__init__(self, 'track') if isinstance(artist, Artist): self.artist = artist else: self.artist = Artist(artist, *self.auth_data) self.title = title def __repr__(self): return self.get_artist().get_name().encode('utf-8') + ' - ' + self.get_title().encode('utf-8') def __eq__(self, other): return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower()) def __ne__(self, other): return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower()) def _get_params(self): return {'artist': self.get_artist().get_name(), 'track': self.get_title()} def get_artist(self): """Returns the associated Artist object.""" return self.artist def get_title(self): """Returns the track title.""" return self.title def get_name(self): """Returns the track title (alias to Track.get_title).""" return self.get_title() def get_id(self): """Returns the track id on Last.fm.""" doc = self._request("track.getInfo", True) return _extract(doc, "id") def get_duration(self): """Returns the track duration.""" doc = self._request("track.getInfo", True) return _number(_extract(doc, "duration")) def get_mbid(self): """Returns the MusicBrainz ID of this track.""" doc = self._request("track.getInfo", True) return _extract(doc, "mbid") def get_listener_count(self): """Returns the listener count.""" doc = self._request("track.getInfo", True) return _number(_extract(doc, "listeners")) def get_playcount(self): """Returns the play count.""" doc = self._request("track.getInfo", True) return _number(_extract(doc, "playcount")) def is_streamable(self): """Returns True if the track is available at Last.fm.""" doc = self._request("track.getInfo", True) return _extract(doc, "streamable") == "1" def is_fulltrack_available(self): """Returns True if the fulltrack is available for streaming.""" doc = self._request("track.getInfo", True) return doc.getElementsByTagName("streamable")[0].getAttribute("fulltrack") == "1" def get_album(self): """Returns the album object of this track.""" doc = self._request("track.getInfo", True) albums = doc.getElementsByTagName("album") if len(albums) == 0: return node = doc.getElementsByTagName("album")[0] return Album(_extract(node, "artist"), _extract(node, "title")) def get_wiki_published_date(self): """Returns the date of publishing this version of the wiki.""" doc = self._request("track.getInfo", True) if len(doc.getElementsByTagName("wiki")) == 0: return node = doc.getElementsByTagName("wiki")[0] return _extract(node, "published") def get_wiki_summary(self): """Returns the summary of the wiki.""" doc = self._request("track.getInfo", True) if len(doc.getElementsByTagName("wiki")) == 0: return node = doc.getElementsByTagName("wiki")[0] return _extract(node, "summary") def get_wiki_content(self): """Returns the content of the wiki.""" doc = self._request("track.getInfo", True) if len(doc.getElementsByTagName("wiki")) == 0: return node = doc.getElementsByTagName("wiki")[0] return _extract(node, "content") def love(self): """Adds the track to the user's loved tracks. """ self._request('track.love') def ban(self): """Ban this track from ever playing on the radio. """ self._request('track.ban') def get_similar(self): """Returns similar tracks for this track on Last.fm, based on listening data. """ doc = self._request('track.getSimilar', True) list = [] for node in doc.getElementsByTagName("track"): title = _extract(node, 'name') artist = _extract(node, 'name', 1) list.append(Track(artist, title, *self.auth_data)) return list def get_top_fans(self, limit = None): """Returns a list of the Users who played this track.""" doc = self._request('track.getTopFans', True) list = [] elements = doc.getElementsByTagName('user') for element in elements: if limit and len(list) >= limit: break name = _extract(element, 'name') weight = _number(_extract(element, 'weight')) list.append(TopItem(User(name, *self.auth_data), weight)) return list def share(self, users, message = None): """Shares this track (sends out recommendations). * users: A list that can contain usernames, emails, User objects, or all of them. * message: A message to include in the recommendation message. """ #last.fm currently accepts a max of 10 recipient at a time while(len(users) > 10): section = users[0:9] users = users[9:] self.share(section, message) nusers = [] for user in users: if isinstance(user, User): nusers.append(user.get_name()) else: nusers.append(user) params = self._get_params() recipients = ','.join(nusers) params['recipient'] = recipients if message: params['message'] = unicode(message) self._request('track.share', False, params) def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the track page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/music/%(artist)s/_/%(title)s' artist = _get_url_safe(self.get_artist().get_name()) title = _get_url_safe(self.get_title()) return url %{'domain': domain_name, 'artist': artist, 'title': title} class Group(_BaseObject): """A Last.fm group.""" def __init__(self, group_name, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.name = group_name def __repr__(self): return self.get_name().encode('utf-8') def __eq__(self, other): return self.get_name().lower() == other.get_name().lower() def __ne__(self, other): return self.get_name() != other.get_name() def _get_params(self): return {'group': self.get_name()} def get_name(self): """Returns the group name. """ return self.name def get_weekly_chart_dates(self): """Returns a list of From and To tuples for the available charts.""" doc = self._request("group.getWeeklyChartList", True) list = [] for node in doc.getElementsByTagName("chart"): list.append( (node.getAttribute("from"), node.getAttribute("to")) ) return list def get_weekly_artist_charts(self, from_date = None, to_date = None): """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("group.getWeeklyArtistChart", True, params) list = [] for node in doc.getElementsByTagName("artist"): item = Artist(_extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def get_weekly_album_charts(self, from_date = None, to_date = None): """Returns the weekly album charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("group.getWeeklyAlbumChart", True, params) list = [] for node in doc.getElementsByTagName("album"): item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def get_weekly_track_charts(self, from_date = None, to_date = None): """Returns the weekly track charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("group.getWeeklyTrackChart", True, params) list = [] for node in doc.getElementsByTagName("track"): item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the group page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/group/%(name)s' name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'name': name} class XSPF(_BaseObject): "A Last.fm XSPF playlist.""" def __init__(self, uri, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.uri = uri def _get_params(self): return {'playlistURL': self.get_uri()} def __repr__(self): return self.get_uri() def __eq__(self, other): return self.get_uri() == other.get_uri() def __ne__(self, other): return self.get_uri() != other.get_uri() def get_uri(self): """Returns the Last.fm playlist URI. """ return self.uri def get_tracks(self): """Returns the tracks on this playlist.""" doc = self._request('playlist.fetch', True) list = [] for n in doc.getElementsByTagName('track'): title = _extract(n, 'title') artist = _extract(n, 'creator') list.append(Track(artist, title, *self.auth_data)) return list class User(_BaseObject): """A Last.fm user.""" def __init__(self, user_name, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.name = user_name self._past_events_index = 0 self._recommended_events_index = 0 self._recommended_artists_index = 0 def __repr__(self): return self.get_name().encode('utf-8') def __eq__(self, another): return self.get_name() == another.get_name() def __ne__(self, another): return self.get_name() != another.get_name() def _get_params(self): return {"user": self.get_name()} def get_name(self): """Returns the nuser name.""" return self.name def get_upcoming_events(self): """Returns all the upcoming events for this user. """ doc = self._request('user.getEvents', True) ids = _extract_all(doc, 'id') events = [] for id in ids: events.append(Event(id, *self.auth_data)) return events def get_friends(self, limit = None): """Returns a list of the user's friends. """ params = self._get_params() if limit: params['limit'] = unicode(limit) doc = self._request('user.getFriends', True, params) names = _extract_all(doc, 'name') list = [] for name in names: list.append(User(name, *self.auth_data)) return list def get_loved_tracks(self): """Returns the last 50 tracks loved by this user. """ doc = self._request('user.getLovedTracks', True) list = [] for track in doc.getElementsByTagName('track'): title = _extract(track, 'name', 0) artist = _extract(track, 'name', 1) list.append(Track(artist, title, *self.auth_data)) return list def get_neighbours(self, limit = None): """Returns a list of the user's friends.""" params = self._get_params() if limit: params['limit'] = unicode(limit) doc = self._request('user.getNeighbours', True, params) list = [] names = _extract_all(doc, 'name') for name in names: list.append(User(name, *self.auth_data)) return list def _get_past_events_pagecount(self): """Returns the number of pages in the past events.""" params = self._get_params() params["page"] = str(self._past_events_index) doc = self._request("user.getPastEvents", True, params) return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages")) def is_end_of_past_events(self): """Returns True if the end of Past Events was reached.""" return self._past_events_index >= self._get_past_events_pagecount() def get_past_events_page(self, ): """Retruns a paginated list of all events a user has attended in the past. Example: -------- while not user.is_end_of_past_events(): print user.get_past_events_page() """ self._past_events_index += 1 params = self._get_params() params["page"] = str(self._past_events_index) doc = self._request('user.getPastEvents', True, params) list = [] for id in _extract_all(doc, 'id'): list.append(Event(id, *self.auth_data)) return list def get_playlists(self): """Returns a list of Playlists that this user owns.""" doc = self._request("user.getPlaylists", True) playlists = [] for id in _extract_all(doc, "id"): playlists.append(Playlist(self.get_name(), id, *self.auth_data)) return playlists def get_now_playing(self): """Returns the currently playing track, or None if nothing is playing. """ params = self._get_params() params['limit'] = '1' list = [] doc = self._request('user.getRecentTracks', False, params) e = doc.getElementsByTagName('track')[0] if not e.hasAttribute('nowplaying'): return None artist = _extract(e, 'artist') title = _extract(e, 'name') return Track(artist, title, *self.auth_data) def get_recent_tracks(self, limit = None): """Returns this user's recent listened-to tracks. """ params = self._get_params() if limit: params['limit'] = unicode(limit) doc = self._request('user.getRecentTracks', False, params) list = [] for track in doc.getElementsByTagName('track'): title = _extract(track, 'name') artist = _extract(track, 'artist') if track.hasAttribute('nowplaying'): continue #to prevent the now playing track from sneaking in here list.append(Track(artist, title, *self.auth_data)) return list def get_top_albums(self, period = PERIOD_OVERALL): """Returns the top albums played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS o PERIOD_6MONTHS o PERIOD_12MONTHS """ params = self._get_params() params['period'] = period doc = self._request('user.getTopAlbums', True, params) list = [] for album in doc.getElementsByTagName('album'): name = _extract(album, 'name') artist = _extract(album, 'name', 1) playcount = _extract(album, "playcount") list.append(TopItem(Album(artist, name, *self.auth_data), playcount)) return list def get_top_artists(self, period = PERIOD_OVERALL): """Returns the top artists played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS o PERIOD_6MONTHS o PERIOD_12MONTHS """ params = self._get_params() params['period'] = period doc = self._request('user.getTopArtists', True, params) list = [] for node in doc.getElementsByTagName('artist'): name = _extract(node, 'name') playcount = _extract(node, "playcount") list.append(TopItem(Artist(name, *self.auth_data), playcount)) return list def get_top_tags(self, limit = None): """Returns a sequence of the top tags used by this user with their counts as (Tag, tagcount). * limit: The limit of how many tags to return. """ doc = self._request("user.getTopTags", True) list = [] for node in doc.getElementsByTagName("tag"): list.append(TopItem(Tag(_extract(node, "name"), *self.auth_data), _extract(node, "count"))) return list def get_top_tracks(self, period = PERIOD_OVERALL): """Returns the top tracks played by a user. * period: The period of time. Possible values: o PERIOD_OVERALL o PERIOD_3MONTHS o PERIOD_6MONTHS o PERIOD_12MONTHS """ params = self._get_params() params['period'] = period doc = self._request('user.getTopTracks', True, params) list = [] for track in doc.getElementsByTagName('track'): name = _extract(track, 'name') artist = _extract(track, 'name', 1) playcount = _extract(track, "playcount") list.append(TopItem(Track(artist, name, *self.auth_data), playcount)) return list def get_weekly_chart_dates(self): """Returns a list of From and To tuples for the available charts.""" doc = self._request("user.getWeeklyChartList", True) list = [] for node in doc.getElementsByTagName("chart"): list.append( (node.getAttribute("from"), node.getAttribute("to")) ) return list def get_weekly_artist_charts(self, from_date = None, to_date = None): """Returns the weekly artist charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("user.getWeeklyArtistChart", True, params) list = [] for node in doc.getElementsByTagName("artist"): item = Artist(_extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def get_weekly_album_charts(self, from_date = None, to_date = None): """Returns the weekly album charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("user.getWeeklyAlbumChart", True, params) list = [] for node in doc.getElementsByTagName("album"): item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def get_weekly_track_charts(self, from_date = None, to_date = None): """Returns the weekly track charts for the week starting from the from_date value to the to_date value.""" params = self._get_params() if from_date and to_date: params["from"] = from_date params["to"] = to_date doc = self._request("user.getWeeklyTrackChart", True, params) list = [] for node in doc.getElementsByTagName("track"): item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data) weight = _extract(node, "playcount") list.append(TopItem(item, weight)) return list def compare_with_user(self, user, shared_artists_limit = None): """Compare this user with another Last.fm user. Returns a sequence (tasteometer_score, (shared_artist1, shared_artist2, ...)) user: A User object or a username string/unicode object. """ if isinstance(user, User): user = user.get_name() params = self._get_params() if shared_artists_limit: params['limit'] = unicode(shared_artists_limit) params['type1'] = 'user' params['type2'] = 'user' params['value1'] = self.get_name() params['value2'] = user doc = _Request('tasteometer.compare', params, *self.auth_data).execute() score = _extract(doc, 'score') artists = doc.getElementsByTagName('artists')[0] shared_artists_names = _extract_all(artists, 'name') shared_artists_list = [] for name in shared_artists_names: shared_artists_list.append(Artist(name, *self.auth_data)) return (score, shared_artists_list) def getRecommendedEvents(self, page = None, limit = None): """Returns a paginated list of all events recommended to a user by Last.fm, based on their listening profile. * page: The page number of results to return. * limit: The limit of events to return. """ params = self._get_params() if page: params['page'] = unicode(page) if limit: params['limit'] = unicode(limit) doc = _Request('user.getRecommendedEvents', params, *self.auth_data).execute() ids = _extract_all(doc, 'id') list = [] for id in ids: list.append(Event(id, *self.auth_data)) return list def get_url(self, domain_name = DOMAIN_ENGLISH): """Returns the url of the user page on Last.fm. * domain_name: Last.fm's language domain. Possible values: o DOMAIN_ENGLISH o DOMAIN_GERMAN o DOMAIN_SPANISH o DOMAIN_FRENCH o DOMAIN_ITALIAN o DOMAIN_POLISH o DOMAIN_PORTUGUESE o DOMAIN_SWEDISH o DOMAIN_TURKISH o DOMAIN_RUSSIAN o DOMAIN_JAPANESE o DOMAIN_CHINESE """ url = 'http://%(domain)s/user/%(name)s' name = _get_url_safe(self.get_name()) return url %{'domain': domain_name, 'name': name} def get_library(self): """Returns the associated Library object. """ return Library(self, *self.auth_data) class AuthenticatedUser(User): def __init__(self, api_key, api_secret, session_key): User.__init__(self, "", api_key, api_secret, session_key); def _get_params(self): return {} def get_name(self): """Returns the name of the authenticated user.""" doc = self._request("user.getInfo", True) self.name = _extract(doc, "name") return self.name def get_id(self): """Returns the user id.""" doc = self._request("user.getInfo", True) return _extract(doc, "id") def get_image_url(self): """Returns the user's avatar.""" doc = self._request("user.getInfo", True) return _extract(doc, "image") def get_language(self): """Returns the language code of the language used by the user.""" doc = self._request("user.getInfo", True) return _extract(doc, "lang") def get_country(self): """Returns the name of the country of the user.""" doc = self._request("user.getInfo", True) return Country(_extract(doc, "country"), *self.auth_data) def get_age(self): """Returns the user's age.""" doc = self._request("user.getInfo", True) return _number(_extract(doc, "age")) def get_gender(self): """Returns the user's gender. Either USER_MALE or USER_FEMALE.""" doc = self._request("user.getInfo", True) return _extract(doc, "gender") if value == 'm': return USER_MALE elif value == 'f': return USER_FEMALE return None def is_subscriber(self): """Returns whether the user is a subscriber or not. True or False.""" doc = self._request("user.getInfo", True) return _extract(doc, "subscriber") == "1" def get_playcount(self): """Returns the user's playcount so far.""" doc = self._request("user.getInfo", True) return _number(_extract(doc, "playcount")) def _get_recommended_events_pagecount(self): """Returns the number of pages in the past events.""" params = self._get_params() params["page"] = str(self._recommended_events_index) doc = self._request("user.getRecommendedEvents", True, params) return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages")) def is_end_of_recommended_events(self): """Returns True if the end of Past Events was reached.""" return self._recommended_events_index >= self._get_recommended_events_pagecount() def get_recommended_events_page(self, ): """Retruns a paginated list of all events a user has attended in the past. Example: -------- while not user.is_end_of_recommended_events(): print user.get_recommended_events_page() """ self._recommended_events_index += 1 params = self._get_params() params["page"] = str(self._recommended_events_index) doc = self._request('user.getRecommendedEvents', True, params) list = [] for id in _extract_all(doc, 'id'): list.append(Event(id, *self.auth_data)) return list def _get_recommended_artists_pagecount(self): """Returns the number of pages in the past artists.""" params = self._get_params() params["page"] = str(self._recommended_artists_index) doc = self._request("user.getRecommendedArtists", True, params) return _number(doc.getElementsByTagName("recommendations")[0].getAttribute("totalPages")) def is_end_of_recommended_artists(self): """Returns True if the end of Past Artists was reached.""" return self._recommended_artists_index >= self._get_recommended_artists_pagecount() def get_recommended_artists_page(self, ): """Retruns a paginated list of all artists a user has attended in the past. Example: -------- while not user.is_end_of_recommended_artists(): print user.get_recommended_artists_page() """ self._recommended_artists_index += 1 params = self._get_params() params["page"] = str(self._recommended_artists_index) doc = self._request('user.getRecommendedArtists', True, params) list = [] for name in _extract_all(doc, 'name'): list.append(Artist(name, *self.auth_data)) return list class _Search(_BaseObject): """An abstract class. Use one of its derivatives.""" def __init__(self, ws_prefix, search_terms, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self._ws_prefix = ws_prefix self.search_terms = search_terms self._last_page_index = 0 def _get_params(self): params = {} for key in self.search_terms.keys(): params[key] = self.search_terms[key] return params def get_total_result_count(self): """Returns the total count of all the results.""" doc = self._request(self._ws_prefix + ".search", True) return _extract(doc, "opensearch:totalResults") def _retreive_page(self, page_index): """Returns the node of matches to be processed""" params = self._get_params() params["page"] = str(page_index) doc = self._request(self._ws_prefix + ".search", True, params) return doc.getElementsByTagName(self._ws_prefix + "matches")[0] def _retrieve_next_page(self): self._last_page_index += 1 return self._retreive_page(self._last_page_index) class AlbumSearch(_Search): """Search for an album by name.""" def __init__(self, album_name, api_key, api_secret, session_key): _Search.__init__(self, "album", {"album": album_name}, api_key, api_secret, session_key) def get_next_page(self): """Returns the next page of results as a sequence of Album objects.""" master_node = self._retrieve_next_page() list = [] for node in master_node.getElementsByTagName("album"): list.append(Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)) return list class ArtistSearch(_Search): """Search for an artist by artist name.""" def __init__(self, artist_name, api_key, api_secret, session_key): _Search.__init__(self, "artist", {"artist": artist_name}, api_key, api_secret, session_key) def get_next_page(self): """Returns the next page of results as a sequence of Artist objects.""" master_node = self._retrieve_next_page() list = [] for node in master_node.getElementsByTagName("artist"): list.append(Artist(_extract(node, "name"), *self.auth_data)) return list class TagSearch(_Search): """Search for a tag by tag name.""" def __init__(self, tag_name, api_key, api_secret, session_key): _Search.__init__(self, "tag", {"tag": tag_name}, api_key, api_secret, session_key) def get_next_page(self): """Returns the next page of results as a sequence of Tag objects.""" master_node = self._retrieve_next_page() list = [] for node in master_node.getElementsByTagName("tag"): list.append(Tag(_extract(node, "name"), *self.auth_data)) return list class TrackSearch(_Search): """Search for a track by track title. If you don't wanna narrow the results down by specifying the artist name, set it to empty string.""" def __init__(self, artist_name, track_title, api_key, api_secret, session_key): _Search.__init__(self, "track", {"track": track_title, "artist": artist_name}, api_key, api_secret, session_key) def get_next_page(self): """Returns the next page of results as a sequence of Track objects.""" master_node = self._retrieve_next_page() list = [] for node in master_node.getElementsByTagName("track"): list.append(Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)) return list class VenueSearch(_Search): """Search for a venue by its name. If you don't wanna narrow the results down by specifying a country, set it to empty string.""" def __init__(self, venue_name, country_name, api_key, api_secret, session_key): _Search.__init__(self, "venue", {"venue": venue_name, "country": country_name}, api_key, api_secret, session_key) def get_next_page(self): """Returns the next page of results as a sequence of Track objects.""" master_node = self._retrieve_next_page() list = [] for node in master_node.getElementsByTagName("venue"): list.append(Venue(_extract(node, "id"), *self.auth_data)) return list class Venue(_BaseObject): """A venue where events are held.""" # TODO: waiting for a venue.getInfo web service to use. def __init__(self, id, api_key, api_secret, session_key): _BaseObject.__init__(self, api_key, api_secret, session_key) self.id = _number(id) def __repr__(self): return "Venue #" + str(self.id) def __eq__(self, other): return self.get_id() == other.get_id() def _get_params(self): return {"venue": self.get_id()} def get_id(self): """Returns the id of the venue.""" return self.id def get_upcoming_events(self): """Returns the upcoming events in this venue.""" doc = self._request("venue.getEvents", True) list = [] for node in doc.getElementsByTagName("event"): list.append(Event(_extract(node, "id"), *self.auth_data)) return list def get_past_events(self): """Returns the past events held in this venue.""" doc = self._request("venue.getEvents", True) list = [] for node in doc.getElementsByTagName("event"): list.append(Event(_extract(node, "id"), *self.auth_data)) return list def create_new_playlist(title, description, api_key, api_secret, session_key): """Creates a playlist for the authenticated user and returns it. * title: The title of the new playlist. * description: The description of the new playlist. """ params = dict() params['title'] = unicode(title) params['description'] = unicode(description) doc = _Request('playlist.create', params, api_key, api_secret, session_key).execute() id = doc.getElementsByTagName("id")[0].firstChild.data user = doc.getElementsByTagName('playlists')[0].getAttribute('user') return Playlist(user, id, api_key, api_secret, session_key) def get_authenticated_user(api_key, api_secret, session_key): """Returns the authenticated user.""" return AuthenticatedUser(api_key, api_secret, session_key) def md5(text): """Returns the md5 hash of a string.""" hash = hashlib.md5() hash.update(text.encode('utf-8')) return hash.hexdigest() def enable_proxy(host, port): """Enable a default web proxy.""" global __proxy global __proxy_enabled __proxy = [host, _number(port)] __proxy_enabled = True def disable_proxy(): """Disable using the web proxy.""" global __proxy_enabled __proxy_enabled = False def is_proxy_enabled(): """Returns True if a web proxy is enabled.""" global __proxy_enabled return __proxy_enabled def _get_proxy(): """Returns proxy details.""" global __proxy return __proxy def async_call(sender, call, callback = None, call_args = None, callback_args = None): """This is the function for setting up an asynchronous operation. * call: The function to call asynchronously. * callback: The function to call after the operation is complete, Its prototype has to be like: callback(sender, output[, param1, param3, ... ]) * call_args: A sequence of args to be passed to call. * callback_args: A sequence of args to be passed to callback. """ thread = _ThreadedCall(sender, call, call_args, callback, callback_args) thread.start() def enable_caching(cache_dir = None): """Enables caching request-wide for all cachable calls. * cache_dir: A directory path to use to store the caching data. Set to None to use a temporary directory. """ global __cache_dir global __cache_enabled if cache_dir == None: import tempfile __cache_dir = tempfile.mkdtemp() else: if not os.path.exists(cache_dir): os.mkdir(cache_dir) __cache_dir = cache_dir __cache_enabled = True def disable_caching(): """Disables all caching features.""" global __cache_enabled __cache_enabled = False def is_caching_enabled(): """Returns True if caching is enabled.""" global __cache_enabled return __cache_enabled def _get_cache_dir(): """Returns the directory in which cache files are saved.""" global __cache_dir global __cache_enabled return __cache_dir def _extract(node, name, index = 0): """Extracts a value from the xml string""" nodes = node.getElementsByTagName(name) if len(nodes): if nodes[index].firstChild: return nodes[index].firstChild.data.strip() else: return None def _extract_all(node, name, limit_count = None): """Extracts all the values from the xml string. returning a list.""" list = [] for i in range(0, len(node.getElementsByTagName(name))): if len(list) == limit_count: break list.append(_extract(node, name, i)) return list def _get_url_safe(text): """Does all kinds of tricks on a text to make it safe to use in a url.""" if type(text) == type(unicode()): text = text.encode('utf-8') return urllib.quote_plus(urllib.quote_plus(text)).lower() def _number(string): """Extracts an int from a string. Returns a 0 if None or an empty string was passed.""" if not string: return 0 elif string == "": return 0 else: return int(string) def search_for_album(album_name, api_key, api_secret, session_key): """Searches for an album by its name. Returns a AlbumSearch object. Use get_next_page() to retreive sequences of results.""" return AlbumSearch(album_name, api_key, api_secret, session_key) def search_for_artist(artist_name, api_key, api_secret, session_key): """Searches of an artist by its name. Returns a ArtistSearch object. Use get_next_page() to retreive sequences of results.""" return ArtistSearch(artist_name, api_key, api_secret, session_key) def search_for_tag(tag_name, api_key, api_secret, session_key): """Searches of a tag by its name. Returns a TagSearch object. Use get_next_page() to retreive sequences of results.""" return TagSearch(tag_name, api_key, api_secret, session_key) def search_for_track(artist_name, track_name, api_key, api_secret, session_key): """Searches of a track by its name and its artist. Set artist to an empty string if not available. Returns a TrackSearch object. Use get_next_page() to retreive sequences of results.""" return TrackSearch(artist_name, track_name, api_key, api_secret, session_key) def search_for_venue(venue_name, country_name, api_key, api_secret, session_key): """Searches of a venue by its name and its country. Set country_name to an empty string if not available. Returns a VenueSearch object. Use get_next_page() to retreive sequences of results.""" return VenueSearch(venue_name, country_name, api_key, api_secret, session_key) def extract_items(topitems_or_libraryitems): """Extracts a sequence of items from a sequence of TopItem or LibraryItem objects.""" list = [] for i in topitems_or_libraryitems: list.append(i.get_item()) return list def get_top_tags(api_key, api_secret, session_key): """Returns a sequence of the most used Last.fm tags as a sequence of TopItem objects.""" doc = _Request("tag.getTopTags", dict(), api_key, api_secret, session_key).execute(True) list = [] for node in doc.getElementsByTagName("tag"): tag = Tag(_extract(node, "name"), api_key, api_secret, session_key) weight = _extract(node, "count") list.append(TopItem(tag, weight)) return list def get_track_by_mbid(mbid, api_key, api_secret, session_key): """Looks up a track by its MusicBrainz ID.""" params = {"mbid": unicode(mbid)} doc = _Request("track.getInfo", params, api_key, api_secret, session_key).execute(True) return Track(_extract(doc, "name", 1), _extract(doc, "name"), api_key, api_secret, session_key) def get_artist_by_mbid(mbid, api_key, api_secret, session_key): """Loooks up an artist by its MusicBrainz ID.""" params = {"mbid": unicode(mbid)} doc = _Request("artist.getInfo", params, api_key, api_secret, session_key).execute(True) return Artist(_extract(doc, "name"), api_key, api_secret, session_key) def get_album_by_mbid(mbid, api_key, api_secret, session_key): """Looks up an album by its MusicBrainz ID.""" params = {"mbid": unicode(mbid)} doc = _Request("album.getInfo", params, api_key, api_secret, session_key).execute(True) return Album(_extract(doc, "artist"), _extract(doc, "name"), api_key, api_secret, session_key) def _delay_call(): """Makes sure that web service calls are at least a second apart.""" global __last_call_time # delay time in seconds DELAY_TIME = 1.0 now = time.time() if (now - __last_call_time) < DELAY_TIME: time.sleep(1) __last_call_time = now def clear_cache(): """Clears the cache data and starts fresh.""" global __cache_dir if not os.path.exists(__cache_dir): return for file in os.listdir(__cache_dir): os.remove(os.path.join(__cache_dir, file)) # ------------------------------------------------------------ class ScrobblingException(Exception): def __inint__(self, message): Exception.__init__(self) self.message = message def __repr__(self): return self.message class BannedClient(ScrobblingException): def __init__(self): ScrobblingException.__init__(self, "This version of the client has been banned") class BadAuthentication(ScrobblingException): def __init__(self): ScrobblingException.__init__(self, "Bad authentication token") class BadTime(ScrobblingException): def __init__(self): ScrobblingException.__init__(self, "Time provided is not close enough to current time") class BadSession(ScrobblingException): def __init__(self): ScrobblingException.__init__(self, "Bad session id, consider re-handshaking") class _ScrobblerRequest(object): def __init__(self, url, params): self.params = params self.hostname = url[url.find("//") + 2:url.rfind("/")] self.subdir = url[url.rfind("/"):] def execute(self): """Returns a string response of this request.""" connection = httplib.HTTPConnection(self.hostname) data = [] for name in self.params.keys(): value = urllib.quote_plus(self.params[name].encode('utf-8')) data.append('='.join((name, value))) data = "&".join(data) headers = { "Content-type": "application/x-www-form-urlencoded", "Accept-Charset": "utf-8", "User-Agent": __name__ + "/" + __version__, "HOST": self.hostname } debug("Scrobbler Request:\n\tHOST :" + self.hostname + "\n\tSUBDIR: " + self.subdir + "\n\tDATA:" + repr(data) + "\n\tHEADERS: " + repr(headers)) connection.request("POST", self.subdir, data, headers) response = connection.getresponse().read() self._check_response_for_errors(response) debug("Scrobbler Response:\n\t" + response) return response def _check_response_for_errors(self, response): """When passed a string response it checks for erros, raising any exceptions as necessary.""" lines = response.split("\n") status_line = lines[0] if status_line == "OK": return elif status_line == "BANNED": raise BannedClient() elif status_line == "BADAUTH": raise BadAuthenticationException() elif status_line == "BADTIME": raise BadTime() elif status_line == "BADSESSION": raise BadSession() elif status_line.startswith("FAILED "): reason = status_line[status_line.find("FAILED ")+len("FAILED "):] raise ScrobblingException(reason) class Scrobbler(object): """A class for scrobbling tracks to Last.fm""" session_id = None nowplaying_url = None submissions_url = None def __init__(self, client_id, client_version, username, md5_password): self.client_id = client_id self.client_version = client_version self.username = username self.password = md5_password def _do_handshake(self): """Handshakes with the server""" timestamp = str(int(time.time())) token = md5(self.password + timestamp) params = {"hs": "true", "p": "1.2.1", "c": self.client_id, "v": self.client_version, "u": self.username, "t": timestamp, "a": token} response = _ScrobblerRequest("http://post.audioscrobbler.com:80/", params).execute().split("\n") self.session_id = response[1] self.nowplaying_url = response[2] self.submissions_url = response[3] def _get_session_id(self, new = False): """Returns a handshake. If new is true, then it will be requested from the server even if one was cached.""" if not self.session_id or new: debug("Doing a scrobbling handshake") self._do_handshake() return self.session_id def report_now_playing(self, artist, title, album = "", duration = "", track_number = "", mbid = ""): params = {"s": self._get_session_id(), "a": artist, "t": title, "b": album, "l": duration, "n": track_number, "m": mbid} response = _ScrobblerRequest(self.nowplaying_url, params).execute() try: _ScrobblerRequest(self.nowplaying_url, params).execute() except BadSession: self._do_handshake() self.report_now_playing(artist, title, album, duration, track_number, mbid) info(artist + " - " + title + " was reported as now-playing") def scrobble(self, artist, title, time_started, source, mode, duration, album="", track_number="", mbid=""): """Scrobble a track. parameters: artist: Artist name. title: Track title. time_started: UTC timestamp of when the track started playing. source: The source of the track SCROBBLE_SOURCE_USER: Chosen by the user (the most common value, unless you have a reason for choosing otherwise, use this). SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST: Non-personalised broadcast (e.g. Shoutcast, BBC Radio 1). SCROBBLE_SOURCE_PERSONALIZED_BROADCAST: Personalised recommendation except Last.fm (e.g. Pandora, Launchcast). SCROBBLE_SOURCE_LASTFM: ast.fm (any mode). In this case, the 5-digit recommendation_key value must be set. SCROBBLE_SOURCE_UNKNOWN: Source unknown. mode: The submission mode SCROBBLE_MODE_PLAYED: The track was played. SCROBBLE_MODE_SKIPPED: The track was skipped (Only if source was Last.fm) SCROBBLE_MODE_BANNED: The track was banned (Only if source was Last.fm) duration: Track duration in seconds. album: The album name. track_number: The track number on the album. mbid: MusicBrainz ID. """ params = {"s": self._get_session_id(), "a[0]": artist, "t[0]": title, "i[0]": str(time_started), "o[0]": source, "r[0]": mode, "l[0]": str(duration), "b[0]": album, "n[0]": track_number, "m[0]": mbid} response = _ScrobblerRequest(self.submissions_url, params).execute() info(artist + " - " + title + " was scrobbled")