2786 lines
72 KiB
Python
2786 lines
72 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# pylast - Python bindings for the Last.fm webservices.
|
|
# Copyright (C) 2008-2009 Amr Hassan
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
|
# USA
|
|
#
|
|
# For help regarding using this library, please visit the official
|
|
# documentation at http://code.google.com/p/pylast/wiki/Documentation
|
|
|
|
LIB_NAME = 'pyLast'
|
|
LIB_VERSION = '0.2b8'
|
|
|
|
API_SERVER = 'ws.audioscrobbler.com'
|
|
API_SUBDIR = '/2.0/'
|
|
|
|
import md5
|
|
import httplib
|
|
import urllib
|
|
import threading
|
|
from xml.dom import minidom
|
|
|
|
USE_SILENT_EXCEPTIONS = True
|
|
|
|
STATUS_OK = 'ok'
|
|
STATUS_FAILED = 'failed'
|
|
STATUS_INVALID_SERVICE = 2
|
|
STATUS_INVALID_METHOD = 3
|
|
STATUS_AUTH_FAILED = 4
|
|
STATUS_INVALID_FORMAT = 5
|
|
STATUS_INVALID_PARAMS = 6
|
|
STATUS_INVALID_RESOURCE = 7
|
|
STATUS_TOKEN_ERROR = 8
|
|
STATUS_INVALID_SK = 9
|
|
STATUS_INVALID_API_KEY = 10
|
|
STATUS_OFFLINE = 11
|
|
STATUS_SUBSCRIBERS_ONLY = 12
|
|
STATUS_TOKEN_UNAUTHORIZED = 14
|
|
STATUS_TOKEN_EXPIRED = 15
|
|
|
|
EVENT_ATTENDING = '0'
|
|
EVENT_MAYBE_ATTENDING = '1'
|
|
EVENT_NOT_ATTENDING = '2'
|
|
|
|
PERIOD_OVERALL = 'overall'
|
|
PERIOD_3MONTHS = '3month'
|
|
PERIOD_6MONTHS = '6month'
|
|
PERIOD_12MONTHS = '12month'
|
|
|
|
IMAGE_SMALL = 0
|
|
IMAGE_MEDIUM = 1
|
|
IMAGE_LARGE = 2
|
|
|
|
DOMAIN_ENGLISH = 'www.last.fm'
|
|
DOMAIN_GERMAN = 'www.lastfm.de'
|
|
DOMAIN_SPANISH = 'www.lastfm.es'
|
|
DOMAIN_FRENCH = 'www.lastfm.fr'
|
|
DOMAIN_ITALIAN = 'www.lastfm.it'
|
|
DOMAIN_POLISH = 'www.lastfm.pl'
|
|
DOMAIN_PORTUGUESE = 'www.lastfm.com.br'
|
|
DOMAIN_SWEDISH = 'www.lastfm.se'
|
|
DOMAIN_TURKISH = 'www.lastfm.com.tr'
|
|
DOMAIN_RUSSIAN = 'www.lastfm.ru'
|
|
DOMAIN_JAPANESE = 'www.lastfm.jp'
|
|
DOMAIN_CHINESE = 'cn.last.fm'
|
|
|
|
USER_MALE = 'Male'
|
|
USER_FEMALE = 'Female'
|
|
|
|
def _status2str(lastfm_status):
|
|
|
|
statuses = {
|
|
STATUS_OK: 'OK',
|
|
STATUS_FAILED: 'Failed',
|
|
STATUS_INVALID_METHOD: 'Invalid Method - No method with that name in this package',
|
|
STATUS_TOKEN_ERROR: 'Token Error - There was an error granting the request token',
|
|
STATUS_INVALID_SERVICE: 'Invalid Service - This service does not exist',
|
|
STATUS_AUTH_FAILED: 'Authentication Failed - You do not have permissions to access the service',
|
|
STATUS_INVALID_FORMAT: "Invalid Format - This service doesn't exist in that format",
|
|
STATUS_INVALID_PARAMS: 'Invalid Parameters - Your request is missing a required parameter',
|
|
STATUS_INVALID_RESOURCE: 'Invalid Resource Specified',
|
|
STATUS_INVALID_SK: 'Invalid Session Key - Please re-authenticate',
|
|
STATUS_INVALID_API_KEY: 'Invalid API Key - You must be granted a valid key by last.fm',
|
|
STATUS_OFFLINE: 'Service Offline - This service is temporarily offline. Try again later.',
|
|
STATUS_SUBSCRIBERS_ONLY: 'Subscribers Only - This service is only available to paid last.fm subscribers',
|
|
STATUS_TOKEN_UNAUTHORIZED: 'Unauthorized Token - This token has not been authorized',
|
|
STATUS_TOKEN_EXPIRED: 'Token Expired -This token has expired'
|
|
}
|
|
|
|
return statuses[int(lastfm_status)]
|
|
|
|
class ServiceException(Exception):
|
|
"""Exception related to the Last.fm web service"""
|
|
|
|
def __init__(self, lastfm_status, details):
|
|
self._lastfm_status = lastfm_status
|
|
self._details = details
|
|
|
|
def __str__(self):
|
|
return "%s: %s." %(_status2str(self._lastfm_status), self._details)
|
|
|
|
class Asynchronizer(threading.Thread):
|
|
"""Hopingly, this class would help perform asynchronous operations less painfully.
|
|
For inherited use only. And you must call Asynchronizer.__init__(descendant) before usage.
|
|
"""
|
|
|
|
def __init__(self):
|
|
threading.Thread.__init__(self)
|
|
|
|
self._calls = {} #calls is structured like this: {call_pointer: (arg1, arg2, ...)}
|
|
self._callbacks = {} #callbacks is structred like this: {call_pointer: callback_pointer}
|
|
|
|
def run(self):
|
|
"""Avoid running this function. Use start() to begin the thread's work."""
|
|
|
|
for call in self._calls.keys():
|
|
|
|
output = call(*(self._calls[call]))
|
|
callback = self._callbacks[call]
|
|
|
|
if callback: #callback can be None if not wanted
|
|
callback(self, output)
|
|
|
|
del self._calls[call]
|
|
del self._callbacks[call]
|
|
|
|
def async_call(self, callback, call, *call_args):
|
|
"""This is the function for setting up an asynchronous operation.
|
|
* callback: the function to callback afterwards, accepting two argument, one being the sender and the other is the return of the target call.
|
|
* call: the target call.
|
|
* *call_args: any number of arguments to pass to the target call function.
|
|
"""
|
|
|
|
self._calls[call] = call_args
|
|
self._callbacks[call] = callback
|
|
|
|
def start(self):
|
|
"""Since that Python thread objects can only be started once. This is my little work-around."""
|
|
|
|
threading.Thread.__init__(self)
|
|
super(Asynchronizer, self).start()
|
|
|
|
class Exceptionable(object):
|
|
"""An abstract class that adds support for error reporting."""
|
|
|
|
def __init__(self, parent = None):
|
|
self.__errors = []
|
|
self.__raising_exceptions = not USE_SILENT_EXCEPTIONS
|
|
|
|
#An Exceptionable parent to mirror all the errors to automatically.
|
|
self._parent = parent
|
|
|
|
def last_error(self):
|
|
"""Returns the last error, or None."""
|
|
|
|
if len(self.__errors):
|
|
return self.__errors[len(self.__errors) -1]
|
|
else:
|
|
return None
|
|
|
|
def _report_error(self, exception):
|
|
|
|
if self.get_raising_exceptions():
|
|
raise exception
|
|
|
|
self.__errors.append(exception)
|
|
|
|
if self._parent:
|
|
self._parent._mirror_errors(self)
|
|
|
|
def _mirror_errors(self, exceptional):
|
|
"""Mirrors the errors from another Exceptional object"""
|
|
|
|
for e in exceptional.get_all_errors():
|
|
self._report_error(e)
|
|
|
|
def clear_erros(self):
|
|
"""Clear the error log for this object."""
|
|
self.__errors = []
|
|
|
|
def get_all_errors(self):
|
|
"""Return a list of exceptions raised about this object."""
|
|
|
|
return self.__errors
|
|
|
|
def enable_raising_exceptions(self):
|
|
"""Enable raising the exceptions about this object."""
|
|
self.__raising_exceptions = True
|
|
|
|
def disable_raising_exceptions(self):
|
|
"""Disable raising the exceptions about this object, but still report them to the log."""
|
|
self.__raising_exceptions = False
|
|
|
|
def get_raising_exceptions(self):
|
|
"""Get the status on raising exceptions."""
|
|
return self.__raising_exceptions
|
|
|
|
class Request(Exceptionable):
|
|
"""Representing an abstract web service operation."""
|
|
|
|
def __init__(self, parent, method_name, api_key, params, sign_it = False, secret = None):
|
|
Exceptionable.__init__(self, parent)
|
|
|
|
self.method_name = method_name
|
|
self.api_key = api_key
|
|
self.params = params
|
|
self.sign_it = sign_it
|
|
self.secret = secret
|
|
|
|
def _getSignature(self):
|
|
"""Returns a 32-character hexadecimal md5 hash of the signature string."""
|
|
|
|
keys = self.params.keys()[:]
|
|
|
|
keys.sort()
|
|
|
|
string = unicode()
|
|
|
|
for name in keys:
|
|
string += name
|
|
string += self.params[name]
|
|
|
|
string += self.secret
|
|
|
|
hash = md5.new()
|
|
hash.update(string)
|
|
|
|
return hash.hexdigest()
|
|
|
|
def execute(self):
|
|
"""Returns the XML DOM response of the POST request from the server"""
|
|
|
|
self.params['api_key'] = self.api_key
|
|
self.params['method'] = self.method_name
|
|
if self.sign_it:
|
|
self.params['api_sig'] = self._getSignature()
|
|
|
|
data = []
|
|
for name in self.params.keys():
|
|
data.append('='.join((name, urllib.quote_plus(self.params[name]))))
|
|
|
|
try:
|
|
conn = httplib.HTTPConnection(API_SERVER)
|
|
headers = {
|
|
"Content-type": "application/x-www-form-urlencoded",
|
|
'Accept-Charset': 'utf-8',
|
|
'User-Agent': LIB_NAME + '/' + LIB_VERSION
|
|
}
|
|
conn.request('POST', API_SUBDIR, '&'.join(data), headers)
|
|
response = conn.getresponse()
|
|
except Exception, e:
|
|
self._report_error(e)
|
|
return None
|
|
|
|
doc = minidom.parse(response)
|
|
|
|
if self.__checkResponseStatus(doc) == STATUS_OK:
|
|
return doc
|
|
|
|
return None
|
|
|
|
def __checkResponseStatus(self, xml_dom):
|
|
"""Checks the response for errors and raises one if any exists."""
|
|
|
|
doc = xml_dom
|
|
e = doc.getElementsByTagName('lfm')[0]
|
|
|
|
if e.getAttribute('status') == STATUS_OK:
|
|
return STATUS_OK
|
|
else:
|
|
e = doc.getElementsByTagName('error')[0]
|
|
status = e.getAttribute('code')
|
|
details = e.firstChild.data.strip()
|
|
self._report_error(ServiceException(status, details))
|
|
|
|
class SessionGenerator(Asynchronizer, Exceptionable):
|
|
"""Steps of authorization:
|
|
1. Retrieve token: token = getToken()
|
|
2. Authorize this token by openning the web page at the URL returned by getAuthURL(token)
|
|
3. Call getSessionData(token) to collect the session parameters.
|
|
|
|
A session key's lifetime is infinie, unless the user provokes the rights of the given API Key.
|
|
"""
|
|
|
|
def __init__(self, api_key, secret):
|
|
Asynchronizer.__init__(self)
|
|
Exceptionable.__init__(self)
|
|
|
|
self.api_key = api_key
|
|
self.secret = secret
|
|
|
|
|
|
def getToken(self):
|
|
"""Retrieves a token from Last.fm.
|
|
The token then has to be authorized from getAuthURL before creating session.
|
|
"""
|
|
|
|
|
|
doc = Request(self, 'auth.getToken', self.api_key, dict(), True, self.secret).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
e = doc.getElementsByTagName('token')[0]
|
|
return e.firstChild.data
|
|
|
|
|
|
def getAuthURL(self, token):
|
|
"""The user must open this page, and authorize the given token."""
|
|
|
|
url = 'http://www.last.fm/api/auth/?api_key=%(api)s&token=%(token)s' % \
|
|
{'api': self.api_key, 'token': token}
|
|
return url
|
|
|
|
def getSessionData(self, token):
|
|
"""Retrieves session data for the authorized token.
|
|
getSessionData(token) --> {'name': str, 'key': str, 'subscriber': bool}
|
|
"""
|
|
|
|
params = {'token': token}
|
|
doc = Request(self, 'auth.getSession', self.api_key, params, True, self.secret).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
name_e = doc.getElementsByTagName('name')[0]
|
|
key_e = doc.getElementsByTagName('key')[0]
|
|
subscriber_e = doc.getElementsByTagName('subscriber')[0]
|
|
|
|
data = {}
|
|
data['name'] = name_e.firstChild.data
|
|
data['key'] = key_e.firstChild.data
|
|
data['subscriber'] = bool(subscriber_e.firstChild.data)
|
|
|
|
return data
|
|
|
|
class BaseObject(Asynchronizer, Exceptionable):
|
|
"""An abstract webservices object."""
|
|
|
|
def __init__(self, api_key, secret, session_key):
|
|
Asynchronizer.__init__(self)
|
|
Exceptionable.__init__(self)
|
|
|
|
self.api_key = api_key
|
|
self.secret = secret
|
|
self.session_key = session_key
|
|
|
|
self.auth_data = (self.api_key, self.secret, self.session_key)
|
|
|
|
def _extract(self, node, name, index = 0):
|
|
"""Extracts a value from the xml string"""
|
|
|
|
nodes = node.getElementsByTagName(name)
|
|
|
|
if len(nodes):
|
|
if nodes[index].firstChild:
|
|
return nodes[index].firstChild.data.strip()
|
|
else:
|
|
return None
|
|
|
|
def _extract_all(self, node, name, limit_count = None):
|
|
"""Extracts all the values from the xml string. returning a list."""
|
|
|
|
list = []
|
|
|
|
for i in range(0, len(node.getElementsByTagName(name))):
|
|
if len(list) == limit_count:
|
|
break
|
|
|
|
list.append(self._extract(node, name, i))
|
|
|
|
return list
|
|
|
|
def _get_url_safe(self, text):
|
|
|
|
if type(text) == type(unicode()):
|
|
text = text.encode('utf-8')
|
|
|
|
return urllib.quote_plus(text)
|
|
|
|
def toStr():
|
|
return ""
|
|
|
|
def _hash(self):
|
|
return self.toStr().lower()
|
|
|
|
def __str__(self):
|
|
return self.toStr()
|
|
|
|
class Cacheable(object):
|
|
"""Common functions for objects that can have cached metadata"""
|
|
|
|
def __init__(self):
|
|
|
|
self._cached_info = None
|
|
|
|
def _getInfo(self):
|
|
"""Abstract function, should be inherited"""
|
|
|
|
def _getCachedInfo(self, *key_names):
|
|
"""Returns the cached collection of info regarding this object
|
|
If not available in cache, it will be downloaded first.
|
|
"""
|
|
|
|
if not self._cached_info:
|
|
self._cached_info = self._getInfo()
|
|
|
|
if not self._cached_info:
|
|
return None
|
|
|
|
value_or_container = self._cached_info
|
|
for key in key_names:
|
|
|
|
if not len(value_or_container):
|
|
return None
|
|
|
|
value_or_container = value_or_container[key]
|
|
|
|
return value_or_container
|
|
|
|
|
|
class Taggable(object):
|
|
"""Common functions for classes with tags."""
|
|
|
|
def __init__(self, ws_prefix):
|
|
self.ws_prefix = ws_prefix
|
|
|
|
def addTags(self, *tags):
|
|
"""Adds one or several tags.
|
|
* *tags: Any number of tag names or Tag objects.
|
|
"""
|
|
|
|
#last.fm currently accepts a max of 10 tags at a time
|
|
while(len(tags) > 10):
|
|
section = tags[0:9]
|
|
tags = tags[9:]
|
|
self.addTags(section)
|
|
|
|
if len(tags) == 0:
|
|
return None
|
|
|
|
ntags = []
|
|
for tag in tags:
|
|
if isinstance(tag, Tag):
|
|
ntags.append(tag.getName())
|
|
else:
|
|
ntags.append(tag)
|
|
|
|
tagstr = ','.join(ntags)
|
|
|
|
params = self._getParams()
|
|
params['tags'] = tagstr
|
|
|
|
Request(self, self.ws_prefix + '.addTags', self.api_key, params, True, self.secret).execute()
|
|
|
|
def _removeTag(self, single_tag):
|
|
"""Remove a user's tag from this object."""
|
|
|
|
if isinstance(single_tag, Tag):
|
|
single_tag = single_tag.getName()
|
|
|
|
params = self._getParams()
|
|
params['tag'] = single_tag
|
|
|
|
Request(self, self.ws_prefix + '.removeTag', self.api_key, params, True, self.secret).execute()
|
|
|
|
def getTags(self):
|
|
"""Returns a list of the user-set tags to this object."""
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, self.ws_prefix + '.getTags', self.api_key, params, True, self.secret).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
tag_names = self._extract_all(doc, 'name')
|
|
tags = []
|
|
for tag in tag_names:
|
|
tags.append(Tag(tag, *self.auth_data))
|
|
|
|
return tags
|
|
|
|
def removeTags(self, *tags):
|
|
"""Removes one or several tags from this object.
|
|
* *tags: Any number of tag names or Tag objects.
|
|
"""
|
|
|
|
for tag in tags:
|
|
self._removeTag(tag)
|
|
|
|
def clearTags(self):
|
|
"""Clears all the user-set tags. """
|
|
|
|
self.removeTags(*(self.getTags()))
|
|
|
|
def setTags(self, *tags):
|
|
"""Sets this object's tags to only those tags.
|
|
* *tags: any number of tag names.
|
|
"""
|
|
|
|
c_old_tags = []
|
|
old_tags = []
|
|
c_new_tags = []
|
|
new_tags = []
|
|
|
|
to_remove = []
|
|
to_add = []
|
|
|
|
for tag in self.getTags():
|
|
c_old_tags.append(tag.getName().lower())
|
|
old_tags.append(tag.getName())
|
|
|
|
for tag in tags:
|
|
c_new_tags.append(tag.lower())
|
|
new_tags.append(tag)
|
|
|
|
for i in range(0, len(old_tags)):
|
|
if not c_old_tags[i] in c_new_tags:
|
|
to_remove.append(old_tags[i])
|
|
|
|
for i in range(0, len(new_tags)):
|
|
if not c_new_tags[i] in c_old_tags:
|
|
to_add.append(new_tags[i])
|
|
|
|
self.removeTags(*to_remove)
|
|
self.addTags(*to_add)
|
|
|
|
class Album(BaseObject, Cacheable, Taggable):
|
|
|
|
def __init__(self, artist_name, album_title, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
Cacheable.__init__(self)
|
|
Taggable.__init__(self, 'album')
|
|
|
|
self.artist_name = artist_name
|
|
self.title = album_title
|
|
|
|
self._cached_info = None
|
|
|
|
def _getParams(self):
|
|
return {'artist': self.artist_name, 'album': self.title, 'sk': self.session_key}
|
|
|
|
def _getInfo(self):
|
|
"""Returns a dictionary with various metadata values."""
|
|
|
|
params = self._getParams()
|
|
|
|
doc = Request(self, 'album.getInfo', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
|
|
data['name'] = self._extract(doc, 'name')
|
|
data['artist'] = self._extract(doc, 'artist')
|
|
data['id'] = self._extract(doc, 'id')
|
|
data['release_date'] = self._extract(doc, 'releasedate')
|
|
data['images'] = self._extract_all(doc, 'image')
|
|
data['listeners'] = self._extract(doc, 'listeners')
|
|
data['play_count'] = self._extract(doc, 'playcount')
|
|
|
|
tags_element = doc.getElementsByTagName('toptags')[0]
|
|
data['top_tags'] = self._extract_all(tags_element, 'name')
|
|
|
|
return data
|
|
|
|
def getArtist(self):
|
|
"""Returns the associated Artist object. """
|
|
|
|
return Artist(self.getArtistName(), *self.auth_data)
|
|
|
|
def getArtistName(self, from_server = False):
|
|
"""Returns the artist name.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('artist')
|
|
else:
|
|
return self.artist_name
|
|
|
|
def getTitle(self, from_server = False):
|
|
"""Returns the album title.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('name')
|
|
else:
|
|
return self.title
|
|
|
|
def getReleaseDate(self):
|
|
"""Retruns the release date of the album."""
|
|
|
|
return self._getCachedInfo('release_date')
|
|
|
|
def getImage(self, size = IMAGE_LARGE):
|
|
"""Returns the associated image URL.
|
|
* size: The image size. Possible values:
|
|
o IMAGE_LARGE
|
|
o IMAGE_MEDIUM
|
|
o IMAGE_SMALL
|
|
"""
|
|
|
|
return self._getCachedInfo('images', size)
|
|
|
|
def getID(self):
|
|
"""Returns the Last.fm ID. """
|
|
|
|
return self._getCachedInfo('id')
|
|
|
|
def getPlayCount(self):
|
|
"""Returns the number of plays on Last.fm."""
|
|
|
|
return self._getCachedInfo('play_count')
|
|
|
|
def getListenerCount(self):
|
|
"""Returns the number of liteners on Last.fm."""
|
|
|
|
return self._getCachedInfo('listeners')
|
|
|
|
def getTopTags(self):
|
|
"""Returns a list of the most-applied tags to this album. """
|
|
|
|
l = []
|
|
for tag in self._getCachedInfo('top_tags'):
|
|
l.append(Tag(tag, *self.auth_data))
|
|
|
|
return l
|
|
|
|
|
|
def fetchPlaylist(self):
|
|
"""Returns the list of Tracks on this album. """
|
|
|
|
uri = 'lastfm://playlist/album/%s' %self._getCachedInfo('id')
|
|
|
|
return Playlist(uri, *self.auth_data).fetch()
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the album page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/music/%(artist)s/%(album)s'
|
|
|
|
artist = self._get_url_safe(self.getArtist().getName())
|
|
album = self._get_url_safe(self.getTitle())
|
|
|
|
return url %{'domain': domain_name, 'artist': artist, 'album': album}
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getArtist().getName() + u' - ' + self.getTitle()
|
|
|
|
class Track(BaseObject, Cacheable, Taggable):
|
|
def __init__(self, artist_name, title, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
Cacheable.__init__(self)
|
|
Taggable.__init__(self, 'track')
|
|
|
|
self.artist_name = artist_name
|
|
self.title = title
|
|
|
|
self._cached_info = None
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'artist': self.artist_name, 'track': self.title}
|
|
|
|
def _getInfo(self):
|
|
"""Returns a dictionary with vairous metadata values about this track."""
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'track.getInfo', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
|
|
data['id'] = self._extract(doc, 'id')
|
|
data['title'] = self._extract(doc, 'name')
|
|
data['duration'] = self._extract(doc, 'duration')
|
|
data['listener_count'] = self._extract(doc, 'listeners')
|
|
data['play_count'] = self._extract(doc, 'playcount')
|
|
data['artist_name'] = self._extract(doc, 'name', 1)
|
|
data['album_name'] = self._extract(doc, 'title')
|
|
data['images'] = self._extract_all(doc, 'image')
|
|
|
|
tags_element = doc.getElementsByTagName('toptags')[0]
|
|
top_tags = self._extract_all(tags_element, 'name')
|
|
|
|
data['top_tags'] = []
|
|
|
|
for tag in top_tags:
|
|
data['top_tags'].append(Tag(tag, *self.auth_data))
|
|
|
|
if len(doc.getElementsByTagName('wiki')) > 0:
|
|
wiki_element = doc.getElementsByTagName('wiki')[0]
|
|
data['wiki'] = {}
|
|
data['wiki']['published_date'] = self._extract(wiki_element, 'published')
|
|
data['wiki']['summary'] = self._extract(wiki_element, 'summary')
|
|
data['wiki']['content'] = self._extract(wiki_element, 'content')
|
|
|
|
return data
|
|
|
|
def getArtist(self, from_server = False):
|
|
"""Returns the associated Artist object.
|
|
* from_server: If set to True, the artist name will be retrieved from the server.
|
|
"""
|
|
|
|
return Artist(self.getArtistName(from_server), *self.auth_data)
|
|
|
|
def getArtistName(self, from_server = False):
|
|
"""Returns the name of the artist.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('artist_name')
|
|
else:
|
|
return self.artist_name
|
|
|
|
def getTitle(self, from_server = False):
|
|
"""Returns the track title.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('title')
|
|
else:
|
|
return self.title
|
|
|
|
def getID(self):
|
|
"""Returns the track id on Last.fm."""
|
|
|
|
return self._getCachedInfo('id')
|
|
|
|
def getDuration(self):
|
|
"""Returns the track duration."""
|
|
|
|
return self._getCachedInfo('duration')
|
|
|
|
def getListenerCount(self):
|
|
"""Returns the listener count."""
|
|
|
|
return self._getCachedInfo('listener_count')
|
|
|
|
def getPlayCount(self):
|
|
"""Returns the play count."""
|
|
|
|
return self._getCachedInfo('play_count')
|
|
|
|
def getAlbumName(self):
|
|
"""Returns the name of the album."""
|
|
|
|
return self._getCachedInfo('album_name')
|
|
|
|
def getAlbum(self):
|
|
"""Returns the album object of this track."""
|
|
|
|
return Album(self.getArtistName(), self.getAlbumName(), *self.auth_data)
|
|
|
|
def getImage(self, size = IMAGE_LARGE, if_na_get_artist_image = False):
|
|
"""Returns the associated image URL.
|
|
* size: The image size. Possible values:
|
|
o IMAGE_LARGE
|
|
o IMAGE_MEDIUM
|
|
o IMAGE_SMALL
|
|
* if_na_get_artist_image: If set to True, it will return the artist's image if the track has none.
|
|
"""
|
|
|
|
url = self._getCachedInfo('images', size)
|
|
|
|
if if_na_get_artist_image:
|
|
if not url or url.startswith('http://cdn.last.fm/depth/catalogue'):
|
|
url = self.getArtist().getImage(size)
|
|
|
|
return url
|
|
|
|
def addToPlaylist(self, playlist_id):
|
|
"""Adds this track to a user playlist.
|
|
* playlist_id: The unique playlist ID.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['playlistID'] = unicode(playlist_id)
|
|
|
|
Request(self, 'playlist.addTrack', self.api_key, params, True, self.secret).execute()
|
|
|
|
def love(self):
|
|
"""Adds the track to the user's loved tracks. """
|
|
|
|
params = self._getParams()
|
|
Request(self, 'track.love', self.api_key, params, True, self.secret).execute()
|
|
|
|
def ban(self):
|
|
"""Ban this track from ever playing on the radio. """
|
|
|
|
params = self._getParams()
|
|
Request(self, 'track.ban', self.api_key, params, True, self.secret).execute()
|
|
|
|
def getSimilar(self):
|
|
"""Returns similar tracks for this track on Last.fm, based on listening data. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'track.getSimilar', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
tracks = doc.getElementsByTagName('track')
|
|
|
|
data = []
|
|
for track in tracks:
|
|
extra_info = {}
|
|
|
|
title = self._extract(track, 'name', 0)
|
|
artist = self._extract(track, 'name', 1)
|
|
|
|
data.append(Track(artist, title, *self.auth_data))
|
|
|
|
return data
|
|
|
|
def getTopFans(self):
|
|
"""Returns the top fans for this track on Last.fm. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'track.getTopFans', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(User(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopTags(self):
|
|
"""Returns the top tags for this track on Last.fm, ordered by tag count. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'track.getTopTags', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
for name in names:
|
|
list.append(Tag(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def share(self, users, message = None):
|
|
"""Shares this track (sends out recommendations).
|
|
* users: A list that can contain usernames, emails, User objects, or all of them.
|
|
* message: A message to include in the recommendation message.
|
|
"""
|
|
|
|
#last.fm currently accepts a max of 10 recipient at a time
|
|
while(len(users) > 10):
|
|
section = users[0:9]
|
|
users = users[9:]
|
|
self.share(section, message)
|
|
|
|
nusers = []
|
|
for user in users:
|
|
if isinstance(user, User):
|
|
nusers.append(user.getName())
|
|
else:
|
|
nusers.append(user)
|
|
|
|
params = self._getParams()
|
|
recipients = ','.join(nusers)
|
|
params['recipient'] = recipients
|
|
if message: params['message'] = message
|
|
|
|
Request(self, 'track.share', self.api_key, params, True, self.secret).execute()
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the track page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
url = 'http://%(domain)s/music/%(artist)s/_/%(title)s'
|
|
|
|
artist = self._get_url_safe(self.getArtist().getName())
|
|
title = self._get_url_safe(self.getTitle())
|
|
|
|
return url %{'domain': domain_name, 'artist': artist, 'title': title}
|
|
|
|
def getWikiPublishedDate(self):
|
|
"""Returns the date of publishing the wiki content."""
|
|
|
|
return self._getCachedInfo('wiki', 'published_date')
|
|
|
|
def getWikiContent(self):
|
|
"""Returns the full wiki content."""
|
|
|
|
return self._getCachedInfo('wiki', 'content')
|
|
|
|
def getWikiSummary(self):
|
|
"""Returns the wiki summary."""
|
|
|
|
return self._getCachedInfo('wiki', 'summary')
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getArtist().getName() + u' - ' + self.getTitle()
|
|
|
|
class Artist(BaseObject, Cacheable, Taggable):
|
|
|
|
def __init__(self, artist_name, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
Cacheable.__init__(self)
|
|
Taggable.__init__(self, 'artist')
|
|
|
|
self.name = artist_name
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'artist': self.name}
|
|
|
|
def _getInfo(self):
|
|
"""Get the metadata for an artist on Last.fm, Includes biography"""
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getInfo', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
|
|
data['name'] = self._extract(doc, 'name')
|
|
data['images'] = self._extract_all(doc, 'image', 3)
|
|
data['streamable'] = self._extract(doc, 'streamable')
|
|
data['listeners'] = self._extract(doc, 'listeners')
|
|
data['play_count'] = self._extract(doc, 'playcount')
|
|
bio = {}
|
|
bio['published'] = self._extract(doc, 'published')
|
|
bio['summary'] = self._extract(doc, 'summary')
|
|
bio['content'] = self._extract(doc, 'content')
|
|
data['bio'] = bio
|
|
|
|
return data
|
|
|
|
def getName(self, from_server = False):
|
|
"""Returns the name of the artist.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('name')
|
|
else:
|
|
return self.name
|
|
|
|
def getImage(self, size = IMAGE_LARGE):
|
|
"""Returns the associated image URL.
|
|
* size: The image size. Possible values:
|
|
o IMAGE_LARGE
|
|
o IMAGE_MEDIUM
|
|
o IMAGE_SMALL
|
|
"""
|
|
|
|
return self._getCachedInfo('images', size)
|
|
|
|
def getPlayCount(self):
|
|
"""Returns the number of plays on Last.fm. """
|
|
|
|
return self._getCachedInfo('play_count')
|
|
|
|
def getListenerCount(self):
|
|
"""Returns the number of liteners on Last.fm. """
|
|
|
|
return self._getCachedInfo('listeners')
|
|
|
|
def getBioPublishedDate(self):
|
|
"""Returns the date on which the artist's biography was published. """
|
|
|
|
return self._getCachedInfo('bio', 'published')
|
|
|
|
def getBioSummary(self):
|
|
"""Returns the summary of the artist's biography. """
|
|
|
|
return self._getCachedInfo('bio', 'summary')
|
|
|
|
def getBioContent(self):
|
|
"""Returns the content of the artist's biography. """
|
|
|
|
return self._getCachedInfo('bio', 'content')
|
|
|
|
def getEvents(self):
|
|
"""Returns a list of the upcoming Events for this artist. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getEvents', self.api_key, params).execute()
|
|
|
|
ids = self._extract_all(doc, 'id')
|
|
|
|
events = []
|
|
for id in ids:
|
|
events.append(Event(id, *self.auth_data))
|
|
|
|
return events
|
|
|
|
def getSimilar(self, limit = None):
|
|
"""Returns the similar artists on Last.fm.
|
|
* limit: The limit of similar artists to retrieve.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = unicode(limit)
|
|
|
|
doc = Request(self, 'artist.getSimilar', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
artists = []
|
|
for name in names:
|
|
artists.append(Artist(name, *self.auth_data))
|
|
|
|
return artists
|
|
|
|
def getTopAlbums(self):
|
|
"""Returns a list of the top Albums by this artist on Last.fm. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getTopAlbums', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
albums = []
|
|
for name in names:
|
|
albums.append(Album(self.getName(), name, *self.auth_data))
|
|
|
|
return albums
|
|
|
|
def getTopFans(self):
|
|
"""Returns a list of the Users who listened to this artist the most. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getTopFans', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(User(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopTags(self):
|
|
"""Returns a list of the most frequently used Tags on this artist. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getTopTags', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
tags = []
|
|
for name in names:
|
|
tags.append(Tag(name, *self.auth_data))
|
|
|
|
return tags
|
|
|
|
def getTopTracks(self):
|
|
"""Returns a list of the most listened to Tracks by this artist. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'artist.getTopTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
for track in doc.getElementsByTagName('track'):
|
|
t = {}
|
|
title = self._extract(track, 'name')
|
|
artist = self.getName()
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def share(self, users, message = None):
|
|
"""Shares this artist (sends out recommendations).
|
|
* users: A list that can contain usernames, emails, User objects, or all of them.
|
|
* message: A message to include in the recommendation message.
|
|
"""
|
|
|
|
#last.fm currently accepts a max of 10 recipient at a time
|
|
while(len(users) > 10):
|
|
section = users[0:9]
|
|
users = users[9:]
|
|
self.share(section, message)
|
|
|
|
nusers = []
|
|
for user in users:
|
|
if isinstance(user, User):
|
|
nusers.append(user.getName())
|
|
else:
|
|
nusers.append(user)
|
|
|
|
params = self._getParams()
|
|
recipients = ','.join(nusers)
|
|
params['recipient'] = recipients
|
|
if message: params['message'] = message
|
|
|
|
Request(self, 'artist.share', self.api_key, params, True, self.secret).execute()
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the artist page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/music/%(artist)s'
|
|
|
|
artist = self._get_url_safe(self.getName())
|
|
|
|
return url %{'domain': domain_name, 'artist': artist}
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getName()
|
|
|
|
class Event(BaseObject, Cacheable):
|
|
"""Represents an event"""
|
|
|
|
def __init__(self, event_id, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
Cacheable.__init__(self)
|
|
|
|
self.id = unicode(event_id)
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'event': self.getID()}
|
|
|
|
def attend(self, attending_status):
|
|
"""Sets the attending status.
|
|
* attending_status: The attending status. Possible values:
|
|
o EVENT_ATTENDING
|
|
o EVENT_MAYBE_ATTENDING
|
|
o EVENT_NOT_ATTENDING
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['status'] = attending_status
|
|
|
|
doc = Request(self, 'event.attend', self.api_key, params, True, self.secret).execute()
|
|
|
|
def _getInfo(self):
|
|
"""Get the metadata for an event on Last.fm
|
|
Includes attendance and lineup information"""
|
|
|
|
params = self._getParams()
|
|
|
|
doc = Request(self, 'event.getInfo', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
data['title'] = self._extract(doc, 'title')
|
|
artists = []
|
|
for i in range(0, len(doc.getElementsByTagName('artist'))):
|
|
artists.append(self._extract(doc, 'artist', i))
|
|
data['artists'] = artists
|
|
data['headliner'] = self._extract(doc, 'headliner')
|
|
|
|
venue = {}
|
|
venue['name'] = self._extract(doc, 'name')
|
|
venue['city'] = self._extract(doc, 'city')
|
|
venue['country'] = self._extract(doc, 'country')
|
|
venue['street'] = self._extract(doc, 'street')
|
|
venue['postal_code'] = self._extract(doc, 'postalcode')
|
|
|
|
geo = {}
|
|
geo['lat'] = self._extract(doc, 'geo:lat')
|
|
geo['long'] = self._extract(doc, 'geo:long')
|
|
|
|
venue['geo'] = geo
|
|
venue['time_zone'] = self._extract(doc, 'timezone')
|
|
|
|
data['venue'] = venue
|
|
data['description'] = self._extract(doc, 'description')
|
|
data['images'] = self._extract_all(doc, 'image')
|
|
data['attendance'] = self._extract(doc, 'attendance')
|
|
data['reviews'] = self._extract(doc, 'reviews')
|
|
|
|
|
|
return data
|
|
|
|
def getID(self):
|
|
"""Returns the id of the event on Last.fm. """
|
|
return self.id
|
|
|
|
def getTitle(self):
|
|
"""Returns the title of the event. """
|
|
|
|
return self._getCachedInfo('title')
|
|
|
|
def getHeadliner(self):
|
|
"""Returns the headliner of the event. """
|
|
|
|
return self._getCachedInfo('headliner')
|
|
|
|
def getArtists(self):
|
|
"""Returns a list of the participating Artists. """
|
|
|
|
names = self._getCachedInfo('artists')
|
|
|
|
artists = []
|
|
for name in names:
|
|
artists.append(Artist(name, *self.auth_data))
|
|
|
|
return artists
|
|
|
|
def getVenueName(self):
|
|
"""Returns the name of the venue where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'name')
|
|
|
|
def getCityName(self):
|
|
"""Returns the name of the city where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'city')
|
|
|
|
def getCountryName(self):
|
|
"""Returns the name of the country where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'country')
|
|
|
|
def getPostalCode(self):
|
|
"""Returns the postal code of where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'postal_code')
|
|
|
|
def getStreetName(self):
|
|
"""Returns the name of the street where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'street')
|
|
|
|
def getGeoPoint(self):
|
|
"""Returns a tuple of latitude and longitude values of where the event is held. """
|
|
|
|
i = (self._getCachedInfo('venue', 'geo', 'lat'), self._getCachedInfo('venue', 'geo', 'long'))
|
|
return i
|
|
|
|
def getTimeZone(self):
|
|
"""Returns the timezone of where the event is held. """
|
|
|
|
return self._getCachedInfo('venue', 'time_zone')
|
|
|
|
def getDescription(self):
|
|
"""Returns the description of the event. """
|
|
|
|
return self._getCachedInfo('description')
|
|
|
|
def getImage(self, size = IMAGE_LARGE):
|
|
"""Returns the associated image URL.
|
|
* size: The image size. Possible values:
|
|
o IMAGE_LARGE
|
|
o IMAGE_MEDIUM
|
|
o IMAGE_SMALL
|
|
"""
|
|
|
|
|
|
return self._getCachedInfo('images', size)
|
|
|
|
def getAttendanceCount(self):
|
|
"""Returns the number of attending people. """
|
|
|
|
return self._getCachedInfo('attendance')
|
|
|
|
def getReviewCount(self):
|
|
"""Returns the number of available reviews for this event. """
|
|
|
|
return self._getCachedInfo('reviews')
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the event page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/event/%(id)s'
|
|
|
|
return url %{'domain': domain_name, 'id': self.getID()}
|
|
|
|
def share(self, users, message = None):
|
|
"""Shares this event (sends out recommendations).
|
|
* users: A list that can contain usernames, emails, User objects, or all of them.
|
|
* message: A message to include in the recommendation message.
|
|
"""
|
|
|
|
#last.fm currently accepts a max of 10 recipient at a time
|
|
while(len(users) > 10):
|
|
section = users[0:9]
|
|
users = users[9:]
|
|
self.share(section, message)
|
|
|
|
nusers = []
|
|
for user in users:
|
|
if isinstance(user, User):
|
|
nusers.append(user.getName())
|
|
else:
|
|
nusers.append(user)
|
|
|
|
params = self._getParams()
|
|
recipients = ','.join(nusers)
|
|
params['recipient'] = recipients
|
|
if message: params['message'] = message
|
|
|
|
Request(self, 'event.share', self.api_key, params, True, self.secret).execute()
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
sa = ""
|
|
artists = self.getArtists()
|
|
for i in range(0, len(artists)):
|
|
if i == 0:
|
|
sa = artists[i].getName()
|
|
continue
|
|
elif i< len(artists)-1:
|
|
sa += ', '
|
|
sa += artists[i].getName()
|
|
continue
|
|
elif i == len(artists) - 1:
|
|
sa += ' and '
|
|
sa += artists[i].getName()
|
|
|
|
return "%(title)s: %(artists)s at %(place)s" %{'title': self.getTitle(), 'artists': sa, 'place': self.getVenueName()}
|
|
|
|
class Country(BaseObject):
|
|
|
|
# TODO geo.getEvents
|
|
|
|
def __init__(self, country_name, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
|
|
self.name = country_name
|
|
|
|
def _getParams(self):
|
|
return {'country': self.name}
|
|
|
|
def __str__(self):
|
|
return self.toStr()
|
|
|
|
def getName(self):
|
|
"""Returns the country name. """
|
|
|
|
return self.name
|
|
|
|
def getTopArtists(self):
|
|
"""Returns a tuple of the most popular Artists in the country, ordered by popularity. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'geo.getTopArtists', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
artists = []
|
|
for name in names:
|
|
artists.append(Artist(name, *self.auth_data))
|
|
|
|
return artists
|
|
|
|
def getTopTracks(self, location = None):
|
|
"""Returns a tuple of the most popular Tracks in the country, ordered by popularity.
|
|
* location: A metro name, to fetch the charts for (must be within the country specified).
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if location:
|
|
params['location'] = location
|
|
|
|
doc = Request(self, 'geo.getTopTracks', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('track'):
|
|
|
|
title = self._extract(n, 'name')
|
|
artist = self._extract(n, 'name', 1)
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the event page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/place/%(country_name)s'
|
|
|
|
country_name = self._get_url_safe(self.getName())
|
|
|
|
return url %{'domain': domain_name, 'country_name': country_name}
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getName()
|
|
|
|
class Group(BaseObject):
|
|
|
|
def __init__(self, group_name, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
|
|
self.name = group_name
|
|
|
|
def _getParams(self):
|
|
return {'group': self.name}
|
|
|
|
def getName(self):
|
|
"""Returns the group name. """
|
|
return self.name
|
|
|
|
def getTopWeeklyAlbums(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Albums in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList.
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = unicode(from_value)
|
|
params['to'] = unicode(to_value)
|
|
|
|
doc = Request(self, 'group.getWeeklyAlbumChart', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('album'):
|
|
artist = self._extract(n, 'artist')
|
|
name = self._extract(n, 'name')
|
|
|
|
list.append(Album(artist, name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopWeeklyArtists(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Artists in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList.
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = unicode(from_value)
|
|
params['to'] = unicode(to_value)
|
|
|
|
doc = Request(self, 'group.getWeeklyArtistChart', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(Artist(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopWeeklyTracks(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Tracks in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList.
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = from_value
|
|
params['to'] = to_value
|
|
|
|
doc = Request(self, 'group.getWeeklyTrackChart', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for track in doc.getElementsByTagName('track'):
|
|
artist = self._extract(track, 'artist')
|
|
title = self._extract(track, 'name')
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getWeeklyChartList(self):
|
|
"""Returns a list of range pairs to use with the chart methods. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'group.getWeeklyChartList', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for chart in doc.getElementsByTagName('chart'):
|
|
c = {}
|
|
c['from'] = chart.getAttribute('from')
|
|
c['to'] = chart.getAttribute('to')
|
|
|
|
list.append(c)
|
|
|
|
return list
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the group page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/group/%(name)s'
|
|
|
|
name = self._get_url_safe(self.getName())
|
|
|
|
return url %{'domain': domain_name, 'name': name}
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getName()
|
|
|
|
class Library(BaseObject):
|
|
"""Represents a user's library."""
|
|
|
|
def __init__(self, username, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
|
|
self._username = username
|
|
|
|
self._albums_playcounts = {}
|
|
self._albums_tagcounts = {}
|
|
self._artists_playcounts = {}
|
|
self._artists_tagcounts = {}
|
|
self._tracks_playcounts = {}
|
|
self._tracks_tagcounts = {}
|
|
|
|
self._album_pages = None
|
|
self._album_perpage = None
|
|
self._artist_pages = None
|
|
self._artist_perpage = None
|
|
self._track_pages = None
|
|
self._track_perpage = None
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'user': self._username}
|
|
|
|
def getUser(self):
|
|
"""Returns the user who owns this library."""
|
|
|
|
return User(self._username, *self.auth_data)
|
|
|
|
def _get_albums_info(self):
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'library.getAlbums', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._album_pages = int(doc.getElementsByTagName('albums')[0].getAttribute('totalPages'))
|
|
self._album_perpage = int(doc.getElementsByTagName('albums')[0].getAttribute('perPage'))
|
|
|
|
def _get_artists_info(self):
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'library.getArtists', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._artist_pages = int(doc.getElementsByTagName('artists')[0].getAttribute('totalPages'))
|
|
self._artist_perpage = int(doc.getElementsByTagName('artists')[0].getAttribute('perPage'))
|
|
|
|
def _get_tracks_info(self):
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'library.getTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._track_pages = int(doc.getElementsByTagName('tracks')[0].getAttribute('totalPages'))
|
|
self._track_perpage = int(doc.getElementsByTagName('tracks')[0].getAttribute('perPage'))
|
|
|
|
def getAlbumsPageCount(self):
|
|
"""Returns the number of pages you'd get when calling getAlbums. """
|
|
|
|
if self._album_pages:
|
|
return self._album_pages
|
|
|
|
self._get_albums_info()
|
|
|
|
return self._album_pages
|
|
|
|
def getAlbumsPerPage(self):
|
|
"""Returns the number of albums per page you'd get wen calling getAlbums. """
|
|
|
|
if self._album_perpage:
|
|
return self._album_perpage
|
|
|
|
self._get_albums_info()
|
|
|
|
return self._album_perpage
|
|
|
|
def getArtistsPageCount(self):
|
|
"""Returns the number of pages you'd get when calling getArtists(). """
|
|
|
|
if self._artist_pages:
|
|
return self._artist_pages
|
|
|
|
self._get_artists_info()
|
|
|
|
return self._artist_pages
|
|
|
|
def getArtistsPerPage(self):
|
|
"""Returns the number of artists per page you'd get wen calling getArtists(). """
|
|
|
|
if self._artist_perpage:
|
|
return self._artist_perpage
|
|
|
|
self._get_artists_info()
|
|
|
|
return self._artist_perpage
|
|
|
|
def getTracksPageCount(self):
|
|
"""Returns the number of pages you'd get when calling getTracks(). """
|
|
|
|
if self._track_pages:
|
|
return self._track_pages
|
|
|
|
self._get_tracks_info()
|
|
|
|
return self._track_pages
|
|
|
|
def getTracksPerPage(self):
|
|
"""Returns the number of tracks per page you'd get wen calling getTracks. """
|
|
|
|
if self._track_perpage:
|
|
return self._track_perpage
|
|
|
|
self._get_tracks_info()
|
|
|
|
return self._track_perpage
|
|
|
|
def getAlbums(self, limit = None, page = None):
|
|
"""Returns a paginated list of all the albums in a user's library.
|
|
* limit: The number of albums to retrieve.
|
|
* page: The page to retrieve (default is the first one).
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if limit: params['limit'] = str(limit)
|
|
if page: params['page'] = str(page)
|
|
|
|
doc = Request(self, 'library.getAlbums', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
albums = doc.getElementsByTagName('album')
|
|
list = []
|
|
|
|
for album in albums:
|
|
artist = self._extract(album, 'name', 1)
|
|
name = self._extract(album, 'name')
|
|
|
|
playcount = self._extract(album, 'playcount')
|
|
tagcount = self._extract(album, 'tagcount')
|
|
|
|
a = Album(artist, name, *self.auth_data)
|
|
list.append(a)
|
|
|
|
self._albums_playcounts[a._hash()] = playcount
|
|
self._albums_tagcounts[a._hash()] = tagcount
|
|
|
|
return list
|
|
|
|
def getArtists(self, limit = None, page = None):
|
|
"""Returns a paginated list of all the artists in a user's library.
|
|
* limit: The number of artists to retrieve.
|
|
* page: The page to retrieve (default is the first one).
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if limit: params['limit'] = str(limit)
|
|
if page: params['page'] = str(page)
|
|
|
|
doc = Request(self, 'library.getArtists', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
artists = doc.getElementsByTagName('artist')
|
|
list = []
|
|
|
|
for artist in artists:
|
|
name = self._extract(artist, 'name')
|
|
|
|
playcount = self._extract(artist, 'playcount')
|
|
tagcount = self._extract(artist, 'tagcount')
|
|
|
|
a = Artist(name, *self.auth_data)
|
|
list.append(a)
|
|
|
|
self._artists_playcounts[a._hash()] = playcount
|
|
self._artists_tagcounts[a._hash()] = tagcount
|
|
|
|
return list
|
|
|
|
def getTracks(self, limit = None, page = None):
|
|
"""Returns a paginated list of all the tracks in a user's library. """
|
|
|
|
params = self._getParams()
|
|
if limit: params['limit'] = str(limit)
|
|
if page: params['page'] = str(page)
|
|
|
|
doc = Request(self, 'library.getTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
tracks = doc.getElementsByTagName('track')
|
|
list = []
|
|
|
|
for track in tracks:
|
|
|
|
title = self._extract(track, 'name')
|
|
artist = self._extract(track, 'name', 1)
|
|
|
|
playcount = self._extract(track, 'playcount')
|
|
tagcount = self._extract(track, 'tagcount')
|
|
|
|
t = Track(artist, title, *self.auth_data)
|
|
list.append(t)
|
|
|
|
self._tracks_playcounts[t._hash()] = playcount
|
|
self._tracks_tagcounts[t._hash()] = tagcount
|
|
|
|
return list
|
|
|
|
def getAlbumPlaycount(self, album_object):
|
|
"""Goes through the library until it finds the playcount of this album and returns it (could take a relatively long time).
|
|
* album_object : The Album to find.
|
|
"""
|
|
|
|
key = album_object._hash()
|
|
if key in self._albums_playcounts.keys():
|
|
return self._albums_playcounts[key]
|
|
|
|
for i in range(1, self.getAlbumsPageCount() +1):
|
|
stack = self.getAlbums(page = i)
|
|
|
|
for album in stack:
|
|
if album._hash() == album_object._hash():
|
|
return self._albums_playcounts[album._hash()]
|
|
|
|
def getAlbumTagcount(self, album_object):
|
|
"""Goes through the library until it finds the tagcount of this album and returns it (could take a relatively long time).
|
|
* album_object : The Album to find.
|
|
"""
|
|
|
|
key = album_object._hash()
|
|
if key in self._albums_tagcounts.keys():
|
|
return self._albums_tagcounts[key]
|
|
|
|
for i in range(1, self.getAlbumsPageCount() +1):
|
|
stack = self.getAlbums(page = i)
|
|
|
|
for album in stack:
|
|
if album._hash() == album_object._hash():
|
|
return self._albums_tagcounts[album._hash()]
|
|
|
|
def getArtistPlaycount(self, artist_object):
|
|
"""Goes through the library until it finds the playcount of this artist and returns it (could take a relatively long time).
|
|
* artist_object : The Artist to find.
|
|
"""
|
|
|
|
key = artist_object._hash()
|
|
if key in self._artists_playcounts.keys():
|
|
return self._artists_playcounts[key]
|
|
|
|
for i in range(1, self.getArtistsPageCount() +1):
|
|
stack = self.getArtists(page = i)
|
|
|
|
for artist in stack:
|
|
if artist._hash() == artist_object._hash():
|
|
return self._artists_playcounts[artist._hash()]
|
|
|
|
def getArtistTagcount(self, artist_object):
|
|
"""Goes through the library until it finds the tagcount of this artist and returns it (could take a relatively long time).
|
|
* artist_object : The Artist to find.
|
|
"""
|
|
|
|
key = artist_object._hash()
|
|
if key in self._artists_tagcounts.keys():
|
|
return self._artists_tagcounts[key]
|
|
|
|
for i in range(1, self.getArtistsPageCount() +1):
|
|
stack = self.getArtist(page = i)
|
|
|
|
for artist in stack:
|
|
if artist._hash() == artist_object._hash():
|
|
return self._artists_tagcounts[artist._hash()]
|
|
|
|
def getTrackPlaycount(self, track_object):
|
|
"""Goes through the library until it finds the playcount of this track and returns it (could take a relatively long time).
|
|
* track_object : The Track to find.
|
|
"""
|
|
|
|
key = track_object._hash()
|
|
if key in self._tracks_playcounts.keys():
|
|
return self._tracks_playcounts[key]
|
|
|
|
for i in range(1, self.getTracksPageCount() +1):
|
|
stack = self.getTracks(page = i)
|
|
|
|
for track in stack:
|
|
if track._hash() == track_object._hash():
|
|
return self._tracks_playcounts[track._hash()]
|
|
|
|
def getTrackTagcount(self, track_object):
|
|
"""Goes through the library until it finds the tagcount of this track and returns it (could take a relatively long time).
|
|
* track_object : The Track to find.
|
|
"""
|
|
|
|
key = track_object._hash()
|
|
if key in self._tracks_tagcounts.keys():
|
|
return self._tracks_tagcounts[key]
|
|
|
|
for i in range(1, self.getTracksPageCount() +1):
|
|
stack = self.getTracks(page = i)
|
|
|
|
for track in stack:
|
|
if track._hash() == track_object._hash():
|
|
return self._tracks_tagcounts[track._hash()]
|
|
|
|
class Playlist(BaseObject):
|
|
|
|
def __init__(self, playlist_uri, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
|
|
self._playlist_uri = playlist_uri
|
|
|
|
def _getParams(self):
|
|
return {'playlistURL': self._playlist_uri}
|
|
|
|
def getPlaylistURI(self):
|
|
return self._playlist_uri
|
|
|
|
def fetch(self):
|
|
"""Returns the Last.fm playlist URI. """
|
|
|
|
params = self._getParams()
|
|
|
|
doc = Request(self, 'playlist.fetch', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
|
|
data['title'] = self._extract(doc, 'title')
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('track'):
|
|
title = self._extract(n, 'title')
|
|
artist = self._extract(n, 'creator')
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getPlaylistURI()
|
|
|
|
class Tag(BaseObject):
|
|
|
|
def __init__(self, tag_name, api_key, secret, session_key):
|
|
BaseObject.__init__(self, api_key, secret, session_key)
|
|
|
|
self.name = tag_name
|
|
|
|
def _getParams(self):
|
|
return {'tag': self.name}
|
|
|
|
def getName(self):
|
|
"""Returns the name of the tag. """
|
|
|
|
return self.name
|
|
|
|
def getSimilar(self):
|
|
"""Returns the tags similar to this one, ordered by similarity. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'tag.getSimilar', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(Tag(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopAlbums(self):
|
|
"""Returns a list of the top Albums tagged by this tag, ordered by tag count. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'tag.getTopAlbums', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('album'):
|
|
name = self._extract(n, 'name')
|
|
artist = self._extract(n, 'name', 1)
|
|
|
|
list.append(Album(artist, name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopArtists(self):
|
|
"""Returns a list of the top Artists tagged by this tag, ordered by tag count. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'tag.getTopArtists', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
for name in names:
|
|
list.append(Artist(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopTracks(self):
|
|
"""Returns a list of the top Tracks tagged by this tag, ordered by tag count. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'tag.getTopTracks', self.api_key, params).execute()
|
|
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('track'):
|
|
title = self._extract(n, 'name')
|
|
artist = self._extract(n, 'name', 1)
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def fetchPlaylist(self, free_tracks_only = False):
|
|
"""Returns lit of the tracks tagged by this tag.
|
|
* free_tracks_only: Set to True to include only free tracks.
|
|
"""
|
|
|
|
uri = 'lastfm://playlist/tag/%s' %self.getName()
|
|
if free_tracks_only:
|
|
uri += '/freetracks'
|
|
|
|
return Playlist(uri, *self.auth_data).fetch()
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the tag page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
|
|
url = 'http://%(domain)s/tag/%(name)s'
|
|
|
|
name = self._get_url_safe(self.getName())
|
|
|
|
return url %{'domain': domain_name, 'name': name}
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getName()
|
|
|
|
class User(BaseObject, Cacheable):
|
|
|
|
def __init__(self, user_name, api_key, api_secret, session_key):
|
|
BaseObject.__init__(self, api_key, api_secret, session_key)
|
|
Cacheable.__init__(self)
|
|
|
|
self.name = user_name
|
|
|
|
self._cached_info = None
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'user': self.name}
|
|
|
|
def _getInfo(self):
|
|
"""Returns a dictionary with various metadata values."""
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'user.getInfo', self.api_key, params, True, self.secret).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
data = {}
|
|
|
|
data['name'] = self._extract(doc, 'name')
|
|
data['image'] = self._extract(doc, 'image')
|
|
data['language'] = self._extract(doc, 'lang')
|
|
data['country'] = self._extract(doc, 'country')
|
|
data['age'] = self._extract(doc, 'age')
|
|
data['gender'] = self._extract(doc, 'gender')
|
|
data['subscriber'] = self._extract(doc, 'subscriber')
|
|
data['play_count'] = self._extract(doc, 'playcount')
|
|
|
|
return data
|
|
|
|
def getName(self, from_server = False):
|
|
"""Returns the user name.
|
|
* from_server: If set to True, the value will be retrieved from the server.
|
|
"""
|
|
|
|
if from_server:
|
|
return self._getCachedInfo('name')
|
|
else:
|
|
return self.name
|
|
|
|
def getImage(self):
|
|
"""Returns the user's avatar."""
|
|
|
|
return self._getCachedInfo('image')
|
|
|
|
def getLanguage(self):
|
|
"""Returns the language code of the language used by the user."""
|
|
|
|
return self._getCachedInfo('language')
|
|
|
|
def getCountryName(self):
|
|
"""Returns the name of the country of the user."""
|
|
|
|
return self._getCachedInfo('country')
|
|
|
|
def getAge(self):
|
|
"""Returns the user's age."""
|
|
|
|
return self._getCachedInfo('age')
|
|
|
|
def getGender(self):
|
|
"""Returns the user's gender. Either USER_MALE or USER_FEMALE."""
|
|
|
|
value = self._getCachedInfo('gender')
|
|
if value == 'm':
|
|
return USER_MALE
|
|
elif value == 'f':
|
|
return USER_FEMALE
|
|
|
|
return None
|
|
|
|
def isSubscriber(self):
|
|
"""Returns whether the user is a subscriber or not. True or False."""
|
|
|
|
value = self._getCachedInfo('subscriber')
|
|
|
|
if value == '1':
|
|
return True
|
|
elif value == '0':
|
|
return False
|
|
|
|
return None
|
|
|
|
def getPlayCount(self):
|
|
"""Returns the user's playcount so far."""
|
|
|
|
return self._getCachedInfo('play_count')
|
|
|
|
def getEvents(self):
|
|
"""Returns all the upcoming events for this user. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'user.getEvents', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
ids = self._extract_all(doc, 'id')
|
|
events = []
|
|
|
|
for id in ids:
|
|
events.append(Event(id, *self.auth_data))
|
|
|
|
return events
|
|
|
|
def getFriends(self, limit = None):
|
|
"""Returns a list of the user's friends. """
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = limit
|
|
|
|
doc = Request(self, 'user.getFriends', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
users = doc.getElementsByTagName('user')
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
for name in names:
|
|
list.append(User(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getLovedTracks(self):
|
|
"""Returns the last 50 tracks loved by this user. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'user.getLovedTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
for track in doc.getElementsByTagName('track'):
|
|
title = self._extract(track, 'name', 0)
|
|
artist = self._extract(track, 'name', 1)
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getNeighbours(self, limit = None):
|
|
"""Returns a list of the user's friends.
|
|
* limit: A limit for how many neighbours to show.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = limit
|
|
|
|
doc = Request(self, 'user.getNeighbours', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
for name in names:
|
|
list.append(User(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getPastEvents(self, limit = None):
|
|
"""Retruns the past events of this user. """
|
|
|
|
# TODO
|
|
# http://www.last.fm/api/show?service=343
|
|
# use the page value
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = limit
|
|
|
|
doc = Request(self, 'user.getPastEvents', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
ids = self._extract_all(doc, 'id')
|
|
list = []
|
|
for id in ids:
|
|
list.append(User(id, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getPlaylistIDs(self):
|
|
"""Returns a list the playlists IDs this user has created. """
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'user.getPlaylists', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for playlist in doc.getElementsByTagName('playlist'):
|
|
p = {}
|
|
p['id'] = self._extract(playlist, 'id')
|
|
p['title'] = self._extract(playlist, 'title')
|
|
p['date'] = self._extract(playlist, 'date')
|
|
p['size'] = self._extract(playlist, 'size')
|
|
|
|
list.append(p)
|
|
|
|
return list
|
|
|
|
def fetchPlaylist(self, playlist_id):
|
|
"""Returns a list of the tracks on a playlist.
|
|
* playlist_id: A unique last.fm playlist ID, can be retrieved from getPlaylistIDs().
|
|
"""
|
|
|
|
uri = u'lastfm://playlist/%s' %unicode(playlist_id)
|
|
|
|
return Playlist(uri, self.api_key).fetch()
|
|
|
|
def getNowPlaying(self):
|
|
"""Returns the currently playing track, or None if nothing is playing. """
|
|
|
|
params = self._getParams()
|
|
params['limit'] = '1'
|
|
|
|
list = []
|
|
|
|
doc = Request(self, 'user.getRecentTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
e = doc.getElementsByTagName('track')[0]
|
|
|
|
if not e.hasAttribute('nowplaying'):
|
|
return None
|
|
|
|
artist = self._extract(e, 'artist')
|
|
title = self._extract(e, 'name')
|
|
|
|
return Track(artist, title, *self.auth_data)
|
|
|
|
|
|
def getRecentTracks(self, limit = None):
|
|
"""Returns this user's recent listened-to tracks. """
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = unicode(limit)
|
|
|
|
list = []
|
|
|
|
doc = Request(self, 'user.getRecentTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
for track in doc.getElementsByTagName('track'):
|
|
title = self._extract(track, 'name')
|
|
artist = self._extract(track, 'artist')
|
|
|
|
if track.hasAttribute('nowplaying'):
|
|
continue #to prevent the now playing track from sneaking in here
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopAlbums(self, period = PERIOD_OVERALL):
|
|
"""Returns the top albums listened to by a user.
|
|
* period: The period of time. Possible values:
|
|
o PERIOD_OVERALL
|
|
o PERIOD_3MONTHS
|
|
o PERIOD_6MONTHS
|
|
o PERIOD_12MONTHS
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['period'] = period
|
|
|
|
doc = Request(self, 'user.getTopAlbums', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for album in doc.getElementsByTagName('album'):
|
|
name = self._extract(album, 'name')
|
|
artist = self._extract(album, 'name', 1)
|
|
|
|
list.append(Album(artist, name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopArtists(self, period = PERIOD_OVERALL):
|
|
"""Returns the top artists listened to by a user.
|
|
* period: The period of time. Possible values:
|
|
o PERIOD_OVERALL
|
|
o PERIOD_3MONTHS
|
|
o PERIOD_6MONTHS
|
|
o PERIOD_12MONTHS
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['period'] = period
|
|
|
|
doc = Request(self, 'user.getTopArtists', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(Artist(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopTags(self, limit = None):
|
|
"""Returns the top tags used by this user.
|
|
* limit: The limit of how many tags to return.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if limit:
|
|
params['limit'] = limit
|
|
|
|
doc = Request(self, 'user.getTopTags', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
names = self._extract_all(doc, 'name')
|
|
|
|
for name in names:
|
|
list.append(Tag(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopTracks(self, period = PERIOD_OVERALL):
|
|
"""Returns the top tracks listened to by a user.
|
|
* period: The period of time. Possible values:
|
|
o PERIOD_OVERALL
|
|
o PERIOD_3MONTHS
|
|
o PERIOD_6MONTHS
|
|
o PERIOD_12MONTHS
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['period'] = period
|
|
|
|
doc = Request(self, 'user.getTopTracks', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for track in doc.getElementsByTagName('track'):
|
|
title = self._extract(track, 'name')
|
|
artist = self._extract(track, 'name', 1)
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopWeeklyAlbums(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Albums in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList().
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = from_value
|
|
params['to'] = to_value
|
|
|
|
doc = Request(self, 'user.getWeeklyAlbumChart', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
for n in doc.getElementsByTagName('album'):
|
|
artist = self._extract(n, 'artist')
|
|
name = self._extract(n, 'name')
|
|
|
|
list.append(Album(artist, name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopWeeklyArtists(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Artists in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList().
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = from_value
|
|
params['to'] = to_value
|
|
|
|
doc = Request(self, 'user.getWeeklyArtistChart', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
|
|
names = self._extract_all(doc, 'name')
|
|
for name in names:
|
|
list.append(Artist(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getTopWeeklyTracks(self, from_value = None, to_value = None):
|
|
"""Returns a tuple of the most frequently listened to Tracks in a week range. If no date range is supplied, it will return the most recent week's data. You can obtain the available ranges from getWeeklyChartList().
|
|
* from_value: The value marking the beginning of a week.
|
|
* to_value: The value marking the end of a week.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
if from_value and to_value:
|
|
params['from'] = from_value
|
|
params['to'] = to_value
|
|
|
|
doc = Request(self, 'user.getWeeklyTrackChart', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for track in doc.getElementsByTagName('track'):
|
|
artist = self._extract(track, 'artist')
|
|
title = self._extract(track, 'name')
|
|
|
|
|
|
list.append(Track(artist, title, *self.auth_data))
|
|
|
|
return list
|
|
|
|
def getWeeklyChartList(self):
|
|
"""Returns a list of range pairs to use with the chart methods."""
|
|
|
|
params = self._getParams()
|
|
doc = Request(self, 'user.getWeeklyChartList', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
list = []
|
|
for chart in doc.getElementsByTagName('chart'):
|
|
c = {}
|
|
c['from'] = chart.getAttribute('from')
|
|
c['to'] = chart.getAttribute('to')
|
|
|
|
list.append(c)
|
|
|
|
return list
|
|
|
|
def getURL(self, domain_name = DOMAIN_ENGLISH):
|
|
"""Returns the url of the user page on Last.fm.
|
|
* domain_name: Last.fm's language domain. Possible values:
|
|
o DOMAIN_ENGLISH
|
|
o DOMAIN_GERMAN
|
|
o DOMAIN_SPANISH
|
|
o DOMAIN_FRENCH
|
|
o DOMAIN_ITALIAN
|
|
o DOMAIN_POLISH
|
|
o DOMAIN_PORTUGUESE
|
|
o DOMAIN_SWEDISH
|
|
o DOMAIN_TURKISH
|
|
o DOMAIN_RUSSIAN
|
|
o DOMAIN_JAPANESE
|
|
o DOMAIN_CHINESE
|
|
"""
|
|
url = 'http://%(domain)s/user/%(name)s'
|
|
|
|
name = self._get_url_safe(self.getName())
|
|
|
|
return url %{'domain': domain_name, 'name': name}
|
|
|
|
def getLibrary(self):
|
|
"""Returns the associated Library object. """
|
|
|
|
return Library(self.getName(), *self.auth_data)
|
|
|
|
def toStr(self):
|
|
"""Returns a string representation of the object."""
|
|
|
|
return self.getName()
|
|
|
|
class Search(BaseObject):
|
|
"""An abstract search class. Use one of its derivatives."""
|
|
|
|
def __init__(self, api_key, api_secret, session_key):
|
|
BaseObject.__init__(self, api_key, api_secret, session_key)
|
|
|
|
self._limit = None
|
|
self._page = None
|
|
self._total_result_count = None
|
|
|
|
def getLimit(self):
|
|
"""Returns the limit of the search."""
|
|
|
|
return self._limit
|
|
|
|
def getPage(self):
|
|
"""Returns the last page retrieved."""
|
|
|
|
return self._page
|
|
|
|
def getTotalResultCount(self):
|
|
"""Returns the total count of all the results."""
|
|
|
|
return self._total_result_count
|
|
|
|
def getResults(self, limit = 30, page = 1):
|
|
pass
|
|
|
|
def getFirstMatch(self):
|
|
"""Returns the first match."""
|
|
|
|
matches = self.getResults(1)
|
|
if matches:
|
|
return matches[0]
|
|
|
|
class ArtistSearch(Search):
|
|
"""Search for an artist by artist name."""
|
|
|
|
def __init__(self, artist_name, api_key, api_secret, session_key):
|
|
Search.__init__(self, api_key, api_secret, session_key)
|
|
|
|
self._artist_name = artist_name
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'artist': self.getArtistName()}
|
|
|
|
def getArtistName(self):
|
|
"""Returns the artist name."""
|
|
|
|
return self._artist_name
|
|
|
|
def getResults(self, limit = 30, page = 1):
|
|
"""Returns the matches sorted by relevance.
|
|
* limit: Limit the number of artists returned at one time. Default (maximum) is 30.
|
|
* page: Scan into the results by specifying a page number. Defaults to first page.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['limit'] = unicode(limit)
|
|
params['page'] = unicode(page)
|
|
|
|
doc = Request(self, 'artist.search', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._total_result_count = self._extract(doc, 'opensearch:totalResults')
|
|
self._page = page
|
|
self._limit = limit
|
|
|
|
e = doc.getElementsByTagName('artistmatches')[0]
|
|
|
|
names = self._extract_all(e, 'name')
|
|
|
|
list = []
|
|
for name in names:
|
|
list.append(Artist(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
class TagSearch(Search):
|
|
"""Search for a tag by tag name."""
|
|
|
|
def __init__(self, tag_name, api_key, api_secret, session_key):
|
|
Search.__init__(self, api_key, api_secret, session_key)
|
|
|
|
self._tag_name = tag_name
|
|
|
|
def _getParams(self):
|
|
return {'sk': self.session_key, 'tag': self.getTagName()}
|
|
|
|
def getTagName(self):
|
|
"""Returns the tag name."""
|
|
|
|
return self._tag_name
|
|
|
|
def getResults(self, limit = 30, page = 1):
|
|
"""Returns the matches sorted by relevance.
|
|
* limit: Limit the number of artists returned at one time. Default (maximum) is 30.
|
|
* page: Scan into the results by specifying a page number. Defaults to first page.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['limit'] = unicode(limit)
|
|
params['page'] = unicode(page)
|
|
|
|
doc = Request(self, 'tag.search', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._total_result_count = self._extract(doc, 'opensearch:totalResults')
|
|
self._page = page
|
|
self._limit = limit
|
|
|
|
e = doc.getElementsByTagName('tagmatches')[0]
|
|
|
|
names = self._extract_all(e, 'name')
|
|
|
|
list = []
|
|
for name in names:
|
|
list.append(Tag(name, *self.auth_data))
|
|
|
|
return list
|
|
|
|
class TrackSearch(Search):
|
|
"""Search for a track by track title. If you don't wanna narrow the results down
|
|
by specifying the artist name, set it to None"""
|
|
|
|
def __init__(self, track_title, artist_name, api_key, api_secret, session_key):
|
|
Search.__init__(self, api_key, api_secret, session_key)
|
|
|
|
self._track_title = track_title
|
|
self._artist_name = artist_name
|
|
|
|
def _getParams(self):
|
|
params = {'sk': self.session_key, 'track': self.getTrackTitle()}
|
|
if self.getTrackArtistName():
|
|
params['artist'] = self.getTrackArtistName()
|
|
|
|
return params
|
|
|
|
def getTrackTitle(self):
|
|
"""Returns the track title."""
|
|
|
|
return self._track_title
|
|
|
|
def getTrackArtistName(self):
|
|
"""Returns the artist name."""
|
|
|
|
return self._artist_name
|
|
|
|
def getResults(self, limit = 30, page = 1):
|
|
"""Returns the matches sorted by relevance.
|
|
* limit: Limit the number of artists returned at one time. Default (maximum) is 30.
|
|
* page: Scan into the results by specifying a page number. Defaults to first page.
|
|
"""
|
|
|
|
params = self._getParams()
|
|
params['limit'] = unicode(limit)
|
|
params['page'] = unicode(page)
|
|
|
|
doc = Request(self, 'track.search', self.api_key, params).execute()
|
|
|
|
if not doc:
|
|
return None
|
|
|
|
self._total_result_count = self._extract(doc, 'opensearch:totalResults')
|
|
self._page = page
|
|
self._limit = limit
|
|
|
|
e = doc.getElementsByTagName('trackmatches')[0]
|
|
|
|
titles = self._extract_all(e, 'name')
|
|
artists = self._extract_all(e, 'artist')
|
|
|
|
list = []
|
|
for i in range(0, len(titles)):
|
|
list.append(Track(artists[i], titles[i], *self.auth_data))
|
|
|
|
return list
|