pylast/pylast.py
2009-08-04 20:20:34 +00:00

3219 lines
87 KiB
Python

# -*- coding: utf-8 -*-
#
# pylast - A Python interface to Last.fm
# Copyright (C) 2008-2009 Amr Hassan
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
#
# http://code.google.com/p/pylast/
__name__ = 'pylast'
__version__ = '0.3'
__revision__ = "$Revision$"
__doc__ = 'A Python interface to Last.fm'
__author__ = 'Amr Hassan'
__copyright__ = "Copyright (C) 2008-2009 Amr Hassan"
__license__ = "gpl"
__email__ = 'amr.hassan@gmail.com'
# Parse revision and add it to __version__
r = __revision__
__version__ = __version__ + "." + r[r.find(" ")+1:r.rfind("$")-1]
# Default values for Last.fm.
WS_SERVER = ('ws.audioscrobbler.com', '/2.0/')
SUBMISSION_SERVER = "http://post.audioscrobbler.com:80/"
__proxy = None
__proxy_enabled = False
__cache_backend = None
__last_call_time = 0
import hashlib
import httplib
import urllib
import codecs
import threading
from xml.dom import minidom
import os
import time
from logging import info, warn, debug
import shelve
import tempfile
import sys
try:
import sqlite3
except ImportError:
pass
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_TOKEN_ERROR = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_INVALID_SIGNATURE = 13
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
EVENT_ATTENDING = '0'
EVENT_MAYBE_ATTENDING = '1'
EVENT_NOT_ATTENDING = '2'
PERIOD_OVERALL = 'overall'
PERIOD_3MONTHS = '3month'
PERIOD_6MONTHS = '6month'
PERIOD_12MONTHS = '12month'
IMAGE_SMALL = 0
IMAGE_MEDIUM = 1
IMAGE_LARGE = 2
IMAGE_EXTRA_LARGE = 3
DOMAIN_ENGLISH = 'www.last.fm'
DOMAIN_GERMAN = 'www.lastfm.de'
DOMAIN_SPANISH = 'www.lastfm.es'
DOMAIN_FRENCH = 'www.lastfm.fr'
DOMAIN_ITALIAN = 'www.lastfm.it'
DOMAIN_POLISH = 'www.lastfm.pl'
DOMAIN_PORTUGUESE = 'www.lastfm.com.br'
DOMAIN_SWEDISH = 'www.lastfm.se'
DOMAIN_TURKISH = 'www.lastfm.com.tr'
DOMAIN_RUSSIAN = 'www.lastfm.ru'
DOMAIN_JAPANESE = 'www.lastfm.jp'
DOMAIN_CHINESE = 'cn.last.fm'
USER_MALE = 'Male'
USER_FEMALE = 'Female'
SCROBBLE_SOURCE_USER = "P"
SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST = "R"
SCROBBLE_SOURCE_PERSONALIZED_BROADCAST = "E"
SCROBBLE_SOURCE_LASTFM = "L"
SCROBBLE_SOURCE_UNKNOWN = "U"
SCROBBLE_MODE_PLAYED = "L"
SCROBBLE_MODE_BANNED = "B"
SCROBBLE_MODE_SKIPPED = "S"
class _ShelfCacheBackend(object):
"""Used as a backend for caching cacheable requests."""
def __init__(self, file_path = None):
self.shelf = shelve.open(file_path)
def get_xml(self, key):
return self.shelf[key]
def set_xml(self, key, xml_string):
self.shelf[key] = xml_string
def has_key(self, key):
return key in self.shelf.keys()
class _SqliteCacheBackend(object):
"""Used as a backend for caching cacheable requests."""
def __init__(self, file_path = None):
self.connection = sqlite3.connect(file_path)
self.connection.execute("CREATE TABLE IF NOT EXISTS cache(key TEXT, xml TEXT)")
self.connection.commit()
def get_xml(self, key):
row = self.connection.execute("SELECT xml FROM cache WHERE key=?", (key,)).fetchone()
return row[0]
def set_xml(self, key, xml_string):
self.connection.execute("INSERT INTO cache VALUES(?, ?)", (key, xml_string))
self.connection.commit()
def has_key(self, key):
row = self.connection.execute("SELECT COUNT(*) FROM cache WHERE key=?", (key,)).fetchone()
return row[0] > 0
class _ThreadedCall(threading.Thread):
"""Facilitates calling a function on another thread."""
def __init__(self, sender, funct, funct_args, callback, callback_args):
threading.Thread.__init__(self)
self.funct = funct
self.funct_args = funct_args
self.callback = callback
self.callback_args = callback_args
self.sender = sender
def run(self):
output = []
if self.funct:
if self.funct_args:
output = self.funct(*self.funct_args)
else:
output = self.funct()
if self.callback:
if self.callback_args:
self.callback(self.sender, output, *self.callback_args)
else:
self.callback(self.sender, output)
class _Request(object):
"""Representing an abstract web service operation."""
global WS_SERVER
(HOST_NAME, HOST_SUBDIR) = WS_SERVER
def __init__(self, method_name, params, api_key, api_secret, session_key = None):
self.params = params
self.api_secret = api_secret
self.params["api_key"] = api_key
self.params["method"] = method_name
if is_caching_enabled():
self.cache = _get_cache_backend()
if session_key:
self.params["sk"] = session_key
self.sign_it()
def sign_it(self):
"""Sign this request."""
if not "api_sig" in self.params.keys():
self.params['api_sig'] = self._get_signature()
def _get_signature(self):
"""Returns a 32-character hexadecimal md5 hash of the signature string."""
keys = self.params.keys()[:]
keys.sort()
string = ""
for name in keys:
string += name
string += self.params[name]
string += self.api_secret
return md5(string)
def _get_cache_key(self):
"""The cache key is a string of concatenated sorted names and values."""
keys = self.params.keys()
keys.sort()
cache_key = str()
for key in keys:
if key != "api_sig" and key != "api_key" and key != "sk":
cache_key += key + self.params[key].encode("utf-8")
return hashlib.sha1(cache_key).hexdigest()
def _get_cached_response(self):
"""Returns a file object of the cached response."""
if not self._is_cached():
response = self._download_response()
self.cache.set_xml(self._get_cache_key(), response)
return self.cache.get_xml(self._get_cache_key())
def _is_cached(self):
"""Returns True if the request is already in cache."""
return self.cache.has_key(self._get_cache_key())
def _download_response(self):
"""Returns a response body string from the server."""
# Delay the call if necessary
_delay_call()
data = []
for name in self.params.keys():
data.append('='.join((name, urllib.quote_plus(self.params[name].encode("utf-8")))))
data = '&'.join(data)
headers = {
"Content-type": "application/x-www-form-urlencoded",
'Accept-Charset': 'utf-8',
'User-Agent': __name__ + '/' + __version__
}
if is_proxy_enabled():
conn = httplib.HTTPConnection(host = _get_proxy()[0], port = _get_proxy()[1])
conn.request(method='POST', url="http://" + self.HOST_NAME + self.HOST_SUBDIR,
body=data, headers=headers)
else:
conn = httplib.HTTPConnection(host=self.HOST_NAME)
conn.request(method='POST', url=self.HOST_SUBDIR, body=data, headers=headers)
response = conn.getresponse()
response_text = unicode(response.read(), "utf-8")
self._check_response_for_errors(response_text)
return response_text
def execute(self, cacheable = False):
"""Returns the XML DOM response of the POST Request from the server"""
if is_caching_enabled() and cacheable:
response = self._get_cached_response()
else:
response = self._download_response()
return minidom.parseString(response.encode("utf-8"))
def _check_response_for_errors(self, response):
"""Checks the response for errors and raises one if any exists."""
doc = minidom.parseString(response.encode("utf-8"))
e = doc.getElementsByTagName('lfm')[0]
if e.getAttribute('status') != "ok":
e = doc.getElementsByTagName('error')[0]
status = e.getAttribute('code')
details = e.firstChild.data.strip()
raise ServiceException(status, details)
class SessionKeyGenerator(object):
"""Methods of generating a session key:
1) Web Authentication:
a. sg = SessionKeyGenerator(API_KEY, API_SECRET)
b. url = sg.get_web_auth_url()
c. Ask the user to open the url and authorize you, and wait for it.
d. session_key = sg.get_web_auth_session_key(url)
2) Username and Password Authentication:
a. username = raw_input("Please enter your username: ")
b. md5_password = pylast.md5(raw_input("Please enter your password: ")
c. session_key = SessionKeyGenerator(API_KEY, API_SECRET).get_session_key(username, md5_password)
A session key's lifetime is infinie, unless the user provokes the rights of the given API Key.
"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
self.web_auth_tokens = {}
def _get_web_auth_token(self):
"""Retrieves a token from Last.fm for web authentication.
The token then has to be authorized from getAuthURL before creating session.
"""
request = _Request('auth.getToken', dict(), self.api_key, self.api_secret)
# default action is that a request is signed only when
# a session key is provided.
request.sign_it()
doc = request.execute()
e = doc.getElementsByTagName('token')[0]
return e.firstChild.data
def get_web_auth_url(self):
"""The user must open this page, and you first, then call get_web_auth_session_key(url) after that."""
token = self._get_web_auth_token()
url = 'http://www.last.fm/api/auth/?api_key=%(api)s&token=%(token)s' % \
{'api': self.api_key, 'token': token}
self.web_auth_tokens[url] = token
return url
def get_web_auth_session_key(self, url):
"""Retrieves the session key of a web authorization process by its url."""
if url in self.web_auth_tokens.keys():
token = self.web_auth_tokens[url]
else:
token = "" #that's gonna raise a ServiceException of an unauthorized token when the request is executed.
request = _Request('auth.getSession', {'token': token}, self.api_key, self.api_secret)
# default action is that a request is signed only when
# a session key is provided.
request.sign_it()
doc = request.execute()
return doc.getElementsByTagName('key')[0].firstChild.data
def get_session_key(self, username, md5_password):
"""Retrieve a session key with a username and a md5 hash of the user's password."""
params = {"username": username, "authToken": md5(username + md5_password)}
request = _Request("auth.getMobileSession", params, self.api_key, self.api_secret)
# default action is that a request is signed only when
# a session key is provided.
request.sign_it()
doc = request.execute()
return _extract(doc, "key")
class _BaseObject(object):
"""An abstract webservices object."""
def __init__(self, api_key, api_secret, session_key=""):
self.api_key = api_key
self.api_secret = api_secret
self.session_key = session_key
self.auth_data = (self.api_key, self.api_secret, self.session_key)
def _request(self, method_name, cacheable = False, params = None):
if not params:
params = self._get_params()
return _Request(method_name, params, *self.auth_data).execute(cacheable)
def _get_params():
"""Returns the most common set of parameters between all objects."""
return dict()
class _Taggable(object):
"""Common functions for classes with tags."""
def __init__(self, ws_prefix):
self.ws_prefix = ws_prefix
def add_tags(self, *tags):
"""Adds one or several tags.
* *tags: Any number of tag names or Tag objects.
"""
for tag in tags:
self._add_tag(tag)
def _add_tag(self, tag):
"""Adds one or several tags.
* tag: one tag name or a Tag object.
"""
if isinstance(tag, Tag):
tag = tag.get_name()
params = self._get_params()
params['tags'] = unicode(tag)
self._request(self.ws_prefix + '.addTags', False, params)
info("Tagged " + repr(self) + " as (" + repr(tag) + ")")
def _remove_tag(self, single_tag):
"""Remove a user's tag from this object."""
if isinstance(single_tag, Tag):
single_tag = single_tag.get_name()
params = self._get_params()
params['tag'] = unicode(single_tag)
self._request(self.ws_prefix + '.removeTag', False, params)
info("Removed tag (" + repr(tag) + ") from " + repr(self))
def get_tags(self):
"""Returns a list of the tags set by the user to this object."""
# Uncacheable because it can be dynamically changed by the user.
params = self._get_params()
doc = _Request(self.ws_prefix + '.getTags', params, *self.auth_data).execute(cacheable = False)
tag_names = _extract_all(doc, 'name')
tags = []
for tag in tag_names:
tags.append(Tag(tag, *self.auth_data))
return tags
def remove_tags(self, *tags):
"""Removes one or several tags from this object.
* *tags: Any number of tag names or Tag objects.
"""
for tag in tags:
self._remove_tag(tag)
def clear_tags(self):
"""Clears all the user-set tags. """
self.remove_tags(*(self.get_tags()))
def set_tags(self, *tags):
"""Sets this object's tags to only those tags.
* *tags: any number of tag names.
"""
c_old_tags = []
old_tags = []
c_new_tags = []
new_tags = []
to_remove = []
to_add = []
tags_on_server = self.get_tags()
for tag in tags_on_server:
c_old_tags.append(tag.get_name().lower())
old_tags.append(tag.get_name())
for tag in tags:
c_new_tags.append(tag.lower())
new_tags.append(tag)
for i in range(0, len(old_tags)):
if not c_old_tags[i] in c_new_tags:
to_remove.append(old_tags[i])
for i in range(0, len(new_tags)):
if not c_new_tags[i] in c_old_tags:
to_add.append(new_tags[i])
self.remove_tags(*to_remove)
self.add_tags(*to_add)
def get_top_tags(self, limit = None):
"""Returns a list of the most frequently used Tags on this object."""
doc = self._request(self.ws_prefix + '.getTopTags', True)
elements = doc.getElementsByTagName('tag')
list = []
for element in elements:
if limit and len(list) >= limit:
break
tag_name = _extract(element, 'name')
tagcount = _extract(element, 'count')
list.append(TopItem(Tag(tag_name, *self.auth_data), tagcount))
return list
class ServiceException(Exception):
"""Exception related to the Last.fm web service"""
def __init__(self, lastfm_status, details):
self.lastfm_status = lastfm_status
self.details = details
def __repr__(self):
return self.details
def get_id(self):
"""Returns the exception ID, from one of the following:
STATUS_INVALID_SERVICE = 2
STATUS_INVALID_METHOD = 3
STATUS_AUTH_FAILED = 4
STATUS_INVALID_FORMAT = 5
STATUS_INVALID_PARAMS = 6
STATUS_INVALID_RESOURCE = 7
STATUS_TOKEN_ERROR = 8
STATUS_INVALID_SK = 9
STATUS_INVALID_API_KEY = 10
STATUS_OFFLINE = 11
STATUS_SUBSCRIBERS_ONLY = 12
STATUS_TOKEN_UNAUTHORIZED = 14
STATUS_TOKEN_EXPIRED = 15
"""
return self.lastfm_status
class TopItem (object):
"""A top item in a list that has a weight. Returned from functions like get_top_tracks() and get_top_artists()."""
def __init__(self, item, weight):
object.__init__(self)
self.item = item
self.weight = _number(weight)
def __repr__(self):
return "Item: " + self.get_item().__repr__() + ", Weight: " + str(self.get_weight())
def get_item(self):
"""Returns the item."""
return self.item
def get_weight(self):
"""Returns the weight of the itme in the list."""
return self.weight
class LibraryItem (object):
"""An item in a User's Library. It could be an artist, an album or a track."""
def __init__(self, item, playcount, tagcount):
object.__init__(self)
self.item = item
self.playcount = _number(playcount)
self.tagcount = _number(tagcount)
def __repr__(self):
return "Item: " + self.get_item().__repr__() + ", Playcount: " + str(self.get_playcount()) + ", Tagcount: " + str(self.get_tagcount())
def get_item(self):
"""Returns the itme."""
return self.item
def get_playcount(self):
"""Returns the item's playcount in the Library."""
return self.playcount
def get_tagcount(self):
"""Returns the item's tagcount in the Library."""
return self.tagcount
class PlayedTrack (object):
"""A track with a playback date."""
def __init__(self, track, date, timestamp):
object.__init__(self)
self.track = track
self.date = date
self.timestamp = timestamp
def __repr__(self):
return repr(self.track) + " played at " + self.date
def get_track(self):
"""Return the track."""
return self.track
def get_item(self):
"""Returns the played track. An alias to get_track()."""
return self.get_track();
def get_date(self):
"""Returns the playback date."""
return self.date
def get_timestamp(self):
"""Returns the unix timestamp of the playback date."""
return self.timestamp
class Album(_BaseObject, _Taggable):
"""A Last.fm album."""
def __init__(self, artist, title, api_key, api_secret, session_key=""):
"""
Create an album instance.
# Parameters:
* artist: An artist name or an Artist object.
* title: The album title.
"""
_BaseObject.__init__(self, api_key, api_secret, session_key)
_Taggable.__init__(self, 'album')
if isinstance(artist, Artist):
self.artist = artist
else:
self.artist = Artist(artist, *self.auth_data)
self.title = title
def __repr__(self):
return self.get_artist().get_name() + ' - ' + self.get_title()
def __eq__(self, other):
return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower())
def __ne__(self, other):
return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower())
def _get_params(self):
return {'artist': self.get_artist().get_name(), 'album': self.get_title(), }
def get_artist(self):
"""Returns the associated Artist object."""
return self.artist
def get_title(self):
"""Returns the album title."""
return self.title
def get_name(self):
"""Returns the album title (alias to Album.get_title)."""
return self.get_title()
def get_release_date(self):
"""Retruns the release date of the album."""
return _extract(self._request("album.getInfo", cacheable = True), "releasedate")
def get_image_url(self, size = IMAGE_EXTRA_LARGE):
"""Returns the associated image URL.
# Parameters:
* size int: The image size. Possible values:
o IMAGE_EXTRA_LARGE
o IMAGE_LARGE
o IMAGE_MEDIUM
o IMAGE_SMALL
"""
return _extract_all(self._request("album.getInfo", cacheable = True), 'image')[size]
def get_id(self):
"""Returns the Last.fm ID."""
return _extract(self._request("album.getInfo", cacheable = True), "id")
def get_playcount(self):
"""Returns the number of plays on Last.fm."""
return _number(_extract(self._request("album.getInfo", cacheable = True), "playcount"))
def get_listener_count(self):
"""Returns the number of liteners on Last.fm."""
return _number(_extract(self._request("album.getInfo", cacheable = True), "listeners"))
def get_top_tags(self, limit = None):
"""Returns a list of the most-applied tags to this album."""
# BROKEN: Web service is currently broken.
return None
def get_tracks(self):
"""Returns the list of Tracks on this album."""
uri = 'lastfm://playlist/album/%s' %self.get_id()
return XSPF(uri, *self.auth_data).get_tracks()
def get_mbid(self):
"""Returns the MusicBrainz id of the album."""
return _extract(self._request("album.getInfo", cacheable = True), "mbid")
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the album page on Last.fm.
# Parameters:
* domain_name str: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/music/%(artist)s/%(album)s'
artist = _get_url_safe(self.get_artist().get_name())
album = _get_url_safe(self.get_title())
return url %{'domain': domain_name, 'artist': artist, 'album': album}
class Artist(_BaseObject, _Taggable):
"""A Last.fm artist."""
def __init__(self, name, api_key, api_secret, session_key=""):
"""Create an artist object.
# Parameters:
* name str: The artist's name.
"""
_BaseObject.__init__(self, api_key, api_secret, session_key)
_Taggable.__init__(self, 'artist')
self.name = name
def __repr__(self):
return unicode(self.get_name())
def __eq__(self, other):
return self.get_name().lower() == other.get_name().lower()
def __ne__(self, other):
return self.get_name().lower() != other.get_name().lower()
def _get_params(self):
return {'artist': self.get_name()}
def get_name(self):
"""Returns the name of the artist."""
return self.name
def get_image_url(self, size = IMAGE_LARGE):
"""Returns the associated image URL.
# Parameters:
* size int: The image size. Possible values:
o IMAGE_LARGE
o IMAGE_MEDIUM
o IMAGE_SMALL
"""
return _extract_all(self._request("artist.getInfo", True), "image")[size]
def get_playcount(self):
"""Returns the number of plays on Last.fm."""
return _number(_extract(self._request("artist.getInfo", True), "playcount"))
def get_mbid(self):
"""Returns the MusicBrainz ID of this artist."""
doc = self._request("artist.getInfo", True)
return _extract(doc, "mbid")
def get_listener_count(self):
"""Returns the number of liteners on Last.fm."""
return _number(_extract(self._request("artist.getInfo", True), "listeners"))
def is_streamable(self):
"""Returns True if the artist is streamable."""
return bool(_number(_extract(self._request("artist.getInfo", True), "streamable")))
def get_bio_published_date(self):
"""Returns the date on which the artist's biography was published."""
return _extract(self._request("artist.getInfo", True), "published")
def get_bio_summary(self):
"""Returns the summary of the artist's biography."""
return _extract(self._request("artist.getInfo", True), "summary")
def get_bio_content(self):
"""Returns the content of the artist's biography."""
return _extract(self._request("artist.getInfo", True), "content")
def get_upcoming_events(self):
"""Returns a list of the upcoming Events for this artist."""
doc = self._request('artist.getEvents', True)
ids = _extract_all(doc, 'id')
events = []
for id in ids:
events.append(Event(id, *self.auth_data))
return events
def get_similar(self, limit = None):
"""Returns the similar artists on Last.fm."""
params = self._get_params()
if limit:
params['limit'] = unicode(limit)
doc = self._request('artist.getSimilar', True, params)
names = _extract_all(doc, 'name')
artists = []
for name in names:
artists.append(Artist(name, *self.auth_data))
return artists
def get_top_albums(self):
"""Retuns a list of the top albums."""
doc = self._request('artist.getTopAlbums', True)
list = []
for node in doc.getElementsByTagName("album"):
name = _extract(node, "name")
artist = _extract(node, "name", 1)
playcount = _extract(node, "playcount")
list.append(TopItem(Album(artist, name, *self.auth_data), playcount))
return list
def get_top_tracks(self):
"""Returns a list of the most played Tracks by this artist."""
doc = self._request("artist.getTopTracks", True)
list = []
for track in doc.getElementsByTagName('track'):
title = _extract(track, "name")
artist = _extract(track, "name", 1)
playcount = _number(_extract(track, "playcount"))
list.append( TopItem(Track(artist, title, *self.auth_data), playcount) )
return list
def get_top_fans(self, limit = None):
"""Returns a list of the Users who played this artist the most.
# Parameters:
* limit int: Max elements.
"""
params = self._get_params()
doc = self._request('artist.getTopFans', True)
list = []
elements = doc.getElementsByTagName('user')
for element in elements:
if limit and len(list) >= limit:
break
name = _extract(element, 'name')
weight = _number(_extract(element, 'weight'))
list.append(TopItem(User(name, *self.auth_data), weight))
return list
def share(self, users, message = None):
"""Shares this artist (sends out recommendations).
# Parameters:
* users [User|str,]: A list that can contain usernames, emails, User objects, or all of them.
* message str: A message to include in the recommendation message.
"""
#last.fm currently accepts a max of 10 recipient at a time
while(len(users) > 10):
section = users[0:9]
users = users[9:]
self.share(section, message)
nusers = []
for user in users:
if isinstance(user, User):
nusers.append(user.get_name())
else:
nusers.append(user)
params = self._get_params()
recipients = ','.join(nusers)
params['recipient'] = recipients
if message: params['message'] = unicode(message)
self._request('artist.share', False, params)
info(repr(self) + " was shared with " + repr(users))
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the artist page on Last.fm.
# Parameters:
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/music/%(artist)s'
artist = _get_url_safe(self.get_name())
return url %{'domain': domain_name, 'artist': artist}
class Event(_BaseObject):
"""A Last.fm event."""
def __init__(self, event_id, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.id = unicode(event_id)
def __repr__(self):
return "Event #" + self.get_id()
def __eq__(self, other):
return self.get_id() == other.get_id()
def __ne__(self, other):
return self.get_id() != other.get_id()
def _get_params(self):
return {'event': self.get_id()}
def attend(self, attending_status):
"""Sets the attending status.
* attending_status: The attending status. Possible values:
o EVENT_ATTENDING
o EVENT_MAYBE_ATTENDING
o EVENT_NOT_ATTENDING
"""
params = self._get_params()
params['status'] = unicode(attending_status)
doc = self._request('event.attend', False, params)
info("Attendance to " + repr(self) + " was set to " + repr(attending_status))
def get_id(self):
"""Returns the id of the event on Last.fm. """
return self.id
def get_title(self):
"""Returns the title of the event. """
doc = self._request("event.getInfo", True)
return _extract(doc, "title")
def get_headliner(self):
"""Returns the headliner of the event. """
doc = self._request("event.getInfo", True)
return Artist(_extract(doc, "headliner"), *self.auth_data)
def get_artists(self):
"""Returns a list of the participating Artists. """
doc = self._request("event.getInfo", True)
names = _extract_all(doc, "artist")
artists = []
for name in names:
artists.append(Artist(name, *self.auth_data))
return artists
def get_venue(self):
"""Returns the venue where the event is held."""
doc = self._request("event.getInfo", True)
venue_url = _extract(doc, "url")
venue_id = _number(venue_url[venue_url.rfind("/") + 1:])
return Venue(venue_id, *self.auth_data)
def get_start_date(self):
"""Returns the date when the event starts."""
doc = self._request("event.getInfo", True)
return _extract(doc, "startDate")
def get_description(self):
"""Returns the description of the event. """
doc = self._request("event.getInfo", True)
return _extract(doc, "description")
def get_image_url(self, size = IMAGE_LARGE):
"""Returns the associated image URL.
* size: The image size. Possible values:
o IMAGE_LARGE
o IMAGE_MEDIUM
o IMAGE_SMALL
"""
doc = self._request("event.getInfo", True)
return _extract_all(doc, "image")[size]
def get_attendance_count(self):
"""Returns the number of attending people. """
doc = self._request("event.getInfo", True)
return _number(_extract(doc, "attendance"))
def get_review_count(self):
"""Returns the number of available reviews for this event. """
doc = self._request("event.getInfo", True)
return _number(_extract(doc, "reviews"))
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the event page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/event/%(id)s'
return url %{'domain': domain_name, 'id': self.get_id()}
def share(self, users, message = None):
"""Shares this event (sends out recommendations).
* users: A list that can contain usernames, emails, User objects, or all of them.
* message: A message to include in the recommendation message.
"""
#last.fm currently accepts a max of 10 recipient at a time
while(len(users) > 10):
section = users[0:9]
users = users[9:]
self.share(section, message)
nusers = []
for user in users:
if isinstance(user, User):
nusers.append(user.get_name())
else:
nusers.append(user)
params = self._get_params()
recipients = ','.join(nusers)
params['recipient'] = recipients
if message: params['message'] = unicode(message)
self._request('event.share', False, params)
info(repr(self) + " was shared with " + repr(users))
class Country(_BaseObject):
"""A country at Last.fm."""
def __init__(self, name, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.name = name
def __repr__(self):
return self.get_name()
def __eq__(self, other):
self.get_name().lower() == other.get_name().lower()
def __ne__(self, other):
self.get_name() != other.get_name()
def _get_params(self):
return {'country': self.get_name()}
def _get_name_from_code(self, alpha2code):
# TODO: Have this function lookup the alpha-2 code and return the country name.
return alpha2code
def get_name(self):
"""Returns the country name. """
return self.name
def get_top_artists(self):
"""Returns a sequence of the most played artists."""
doc = self._request('geo.getTopArtists', True)
list = []
for node in doc.getElementsByTagName("artist"):
name = _extract(node, 'name')
playcount = _extract(node, "playcount")
list.append(TopItem(Artist(name, *self.auth_data), playcount))
return list
def get_top_tracks(self):
"""Returns a sequence of the most played tracks"""
doc = self._request("geo.getTopTracks", True)
list = []
for n in doc.getElementsByTagName('track'):
title = _extract(n, 'name')
artist = _extract(n, 'name', 1)
playcount = _number(_extract(n, "playcount"))
list.append( TopItem(Track(artist, title, *self.auth_data), playcount))
return list
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the event page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/place/%(country_name)s'
country_name = _get_url_safe(self.get_name())
return url %{'domain': domain_name, 'country_name': country_name}
class Library(_BaseObject):
"""A user's Last.fm library."""
def __init__(self, user, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
if isinstance(user, User):
self.user = user
else:
self.user = User(user, *self.auth_data)
self._albums_index = 0
self._artists_index = 0
self._tracks_index = 0
def __repr__(self):
return self.get_user().__repr__() + "'s Library"
def _get_params(self):
return {'user': self.user.get_name()}
def get_user(self):
"""Returns the user who owns this library."""
return self.user
def add_album(self, album):
"""Add an album to this library."""
params = self._get_params()
params["artist"] = album.get_artist.get_name()
params["album"] = album.get_name()
self._request("library.addAlbum", False, params)
info(repr(album) + " was added to " + repr(self))
def add_artist(self, artist):
"""Add an artist to this library."""
params = self._get_params()
params["artist"] = artist.get_name()
self._request("library.addArtist", False, params)
info(repr(artist) + " was added to " + repr(self))
def add_track(self, track):
"""Add a track to this library."""
params = self._get_prams()
params["track"] = track.get_title()
self._request("library.addTrack", False, params)
info(repr(track) + " was added to " + repr(self))
def _get_albums_pagecount(self):
"""Returns the number of album pages in this library."""
doc = self._request("library.getAlbums", True)
return _number(doc.getElementsByTagName("albums")[0].getAttribute("totalPages"))
def is_end_of_albums(self):
"""Returns True when the last page of albums has ben retrieved."""
if self._albums_index >= self._get_albums_pagecount():
return True
else:
return False
def _get_artists_pagecount(self):
"""Returns the number of artist pages in this library."""
doc = self._request("library.getArtists", True)
return _number(doc.getElementsByTagName("artists")[0].getAttribute("totalPages"))
def is_end_of_artists(self):
"""Returns True when the last page of artists has ben retrieved."""
if self._artists_index >= self._get_artists_pagecount():
return True
else:
return False
def _get_tracks_pagecount(self):
"""Returns the number of track pages in this library."""
doc = self._request("library.getTracks", True)
return _number(doc.getElementsByTagName("tracks")[0].getAttribute("totalPages"))
def is_end_of_tracks(self):
"""Returns True when the last page of tracks has ben retrieved."""
if self._tracks_index >= self._get_tracks_pagecount():
return True
else:
return False
def get_albums_page(self):
"""Retreives the next page of albums in the Library. Returns a sequence of TopItem objects.
Use the function extract_items like extract_items(Library.get_albums_page()) to return only a sequence of
Album objects with no extra data.
Example:
-------
library = Library("rj", API_KEY, API_SECRET, SESSION_KEY)
while not library.is_end_of_albums():
print library.get_albums_page()
"""
self._albums_index += 1
params = self._get_params()
params["page"] = str(self._albums_index)
list = []
doc = self._request("library.getAlbums", True, params)
for node in doc.getElementsByTagName("album"):
name = _extract(node, "name")
artist = _extract(node, "name", 1)
playcount = _number(_extract(node, "playcount"))
tagcount = _number(_extract(node, "tagcount"))
list.append(LibraryItem(Album(artist, name, *self.auth_data), playcount, tagcount))
return list
def get_artists_page(self):
"""Retreives the next page of artists in the Library. Returns a sequence of TopItem objects.
Use the function extract_items like extract_items(Library.get_artists_page()) to return only a sequence of
Artist objects with no extra data.
Example:
-------
library = Library("rj", API_KEY, API_SECRET, SESSION_KEY)
while not library.is_end_of_artists():
print library.get_artists_page()
"""
self._artists_index += 1
params = self._get_params()
params["page"] = str(self._artists_index)
list = []
doc = self._request("library.getArtists", True, params)
for node in doc.getElementsByTagName("artist"):
name = _extract(node, "name")
playcount = _number(_extract(node, "playcount"))
tagcount = _number(_extract(node, "tagcount"))
list.append(LibraryItem(Artist(name, *self.auth_data), playcount, tagcount))
return list
def get_tracks_page(self):
"""Retreives the next page of tracks in the Library. Returns a sequence of TopItem objects.
Use the function extract_items like extract_items(Library.get_tracks_page()) to return only a sequence of
Track objects with no extra data.
Example:
-------
library = Library("rj", API_KEY, API_SECRET, SESSION_KEY)
while not library.is_end_of_tracks():
print library.get_tracks_page()
"""
self._tracks_index += 1
params = self._get_params()
params["page"] = str(self._tracks_index)
list = []
doc = self._request("library.getTracks", True, params)
for node in doc.getElementsByTagName("track"):
name = _extract(node, "name")
artist = _extract(node, "name", 1)
playcount = _number(_extract(node, "playcount"))
tagcount = _number(_extract(node, "tagcount"))
list.append(LibraryItem(Track(artist, name, *self.auth_data), playcount, tagcount))
return list
class Playlist(_BaseObject):
"""A Last.fm user playlist."""
def __init__(self, user, id, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
if isinstance(user, User):
self.user = user
else:
self.user = User(user, *self.auth_data)
self.id = unicode(id)
def __repr__(self):
return repr(self.user) + "'s playlist # " + repr(self.id)
def _get_info_node(self):
"""Returns the node from user.getPlaylists where this playlist's info is."""
doc = self._request("user.getPlaylists", True)
for node in doc.getElementsByTagName("playlist"):
if _extract(node, "id") == str(self.get_id()):
return node
def _get_params(self):
return {'user': self.user.get_name(), 'playlistID': self.get_id()}
def get_id(self):
"""Returns the playlist id."""
return self.id
def get_user(self):
"""Returns the owner user of this playlist."""
return self.user
def get_tracks(self):
"""Returns a list of the tracks on this user playlist."""
uri = u'lastfm://playlist/%s' %self.get_id()
return XSPF(uri, *self.auth_data).get_tracks()
def add_track(self, track):
"""Adds a Track to this Playlist."""
params = self._get_params()
params['artist'] = track.get_artist().get_name()
params['track'] = track.get_title()
self._request('playlist.addTrack', False, params)
info(repr(track) + " was added to " + repr(self))
def get_title(self):
"""Returns the title of this playlist."""
return _extract(self._get_info_node(), "title")
def get_creation_date(self):
"""Returns the creation date of this playlist."""
return _extract(self._get_info_node(), "date")
def get_size(self):
"""Returns the number of tracks in this playlist."""
return _number(_extract(self._get_info_node(), "size"))
def get_description(self):
"""Returns the description of this playlist."""
return _extract(self._get_info_node(), "description")
def get_duration(self):
"""Returns the duration of this playlist in milliseconds."""
return _number(_extract(self._get_info_node(), "duration"))
def is_streamable(self):
"""Returns True if the playlist is streamable.
For a playlist to be streamable, it needs at least 45 tracks by 15 different artists."""
if _extract(self._get_info_node(), "streamable") == '1':
return True
else:
return False
def has_track(self, track):
"""Checks to see if track is already in the playlist.
* track: Any Track object.
"""
return track in self.get_tracks()
def get_image_url(self, size = IMAGE_LARGE):
"""Returns the associated image URL.
* size: The image size. Possible values:
o IMAGE_LARGE
o IMAGE_MEDIUM
o IMAGE_SMALL
"""
return _extract(self._get_info_node(), "image")[size]
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the playlist on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = "http://%(domain)s/user/%(user)s/library/playlists/%(appendix)s"
english_url = _extract(self._get_info_node(), "url")
appendix = english_url[english_url.rfind("/") + 1:]
return url %{'domain': domain_name, 'appendix': appendix, "user": self.get_user().get_name()}
class Tag(_BaseObject):
"""A Last.fm object tag."""
# TODO: getWeeklyArtistChart (too lazy, i'll wait for when someone requests it)
def __init__(self, name, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.name = name
def _get_params(self):
return {'tag': self.get_name()}
def __repr__(self):
return self.get_name()
def __eq__(self):
return self.get_name().lower() == other.get_name().lower()
def __ne__(self):
return self.get_name().lower() != other.get_name().lower()
def get_name(self):
"""Returns the name of the tag. """
return self.name
def get_similar(self):
"""Returns the tags similar to this one, ordered by similarity. """
doc = self._request('tag.getSimilar', True)
list = []
names = _extract_all(doc, 'name')
for name in names:
list.append(Tag(name, *self.auth_data))
return list
def get_top_albums(self):
"""Retuns a list of the top albums."""
doc = self._request('tag.getTopAlbums', True)
list = []
for node in doc.getElementsByTagName("album"):
name = _extract(node, "name")
artist = _extract(node, "name", 1)
playcount = _extract(node, "playcount")
list.append(TopItem(Album(artist, name, *self.auth_data), playcount))
return list
def get_top_tracks(self):
"""Returns a list of the most played Tracks by this artist."""
doc = self._request("tag.getTopTracks", True)
list = []
for track in doc.getElementsByTagName('track'):
title = _extract(track, "name")
artist = _extract(track, "name", 1)
playcount = _number(_extract(track, "playcount"))
list.append( TopItem(Track(artist, title, *self.auth_data), playcount) )
return list
def get_top_artists(self):
"""Returns a sequence of the most played artists."""
doc = self._request('tag.getTopArtists', True)
list = []
for node in doc.getElementsByTagName("artist"):
name = _extract(node, 'name')
playcount = _extract(node, "playcount")
list.append(TopItem(Artist(name, *self.auth_data), playcount))
return list
def get_weekly_chart_dates(self):
"""Returns a list of From and To tuples for the available charts."""
doc = self._request("tag.getWeeklyChartList", True)
list = []
for node in doc.getElementsByTagName("chart"):
list.append( (node.getAttribute("from"), node.getAttribute("to")) )
return list
def get_weekly_artist_charts(self, from_date = None, to_date = None):
"""Returns the weekly artist charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("tag.getWeeklyArtistChart", True, params)
list = []
for node in doc.getElementsByTagName("artist"):
item = Artist(_extract(node, "name"), *self.auth_data)
weight = _extract(node, "weight")
list.append(TopItem(item, weight))
return list
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the tag page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/tag/%(name)s'
name = _get_url_safe(self.get_name())
return url %{'domain': domain_name, 'name': name}
class Track(_BaseObject, _Taggable):
"""A Last.fm track."""
def __init__(self, artist, title, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
_Taggable.__init__(self, 'track')
if isinstance(artist, Artist):
self.artist = artist
else:
self.artist = Artist(artist, *self.auth_data)
self.title = title
def __repr__(self):
return self.get_artist().get_name() + ' - ' + self.get_title()
def __eq__(self, other):
return (self.get_title().lower() == other.get_title().lower()) and (self.get_artist().get_name().lower() == other.get_artist().get_name().lower())
def __ne__(self, other):
return (self.get_title().lower() != other.get_title().lower()) or (self.get_artist().get_name().lower() != other.get_artist().get_name().lower())
def _get_params(self):
return {'artist': self.get_artist().get_name(), 'track': self.get_title()}
def get_artist(self):
"""Returns the associated Artist object."""
return self.artist
def get_title(self):
"""Returns the track title."""
return self.title
def get_name(self):
"""Returns the track title (alias to Track.get_title)."""
return self.get_title()
def get_id(self):
"""Returns the track id on Last.fm."""
doc = self._request("track.getInfo", True)
return _extract(doc, "id")
def get_duration(self):
"""Returns the track duration."""
doc = self._request("track.getInfo", True)
return _number(_extract(doc, "duration"))
def get_mbid(self):
"""Returns the MusicBrainz ID of this track."""
doc = self._request("track.getInfo", True)
return _extract(doc, "mbid")
def get_listener_count(self):
"""Returns the listener count."""
doc = self._request("track.getInfo", True)
return _number(_extract(doc, "listeners"))
def get_playcount(self):
"""Returns the play count."""
doc = self._request("track.getInfo", True)
return _number(_extract(doc, "playcount"))
def is_streamable(self):
"""Returns True if the track is available at Last.fm."""
doc = self._request("track.getInfo", True)
return _extract(doc, "streamable") == "1"
def is_fulltrack_available(self):
"""Returns True if the fulltrack is available for streaming."""
doc = self._request("track.getInfo", True)
return doc.getElementsByTagName("streamable")[0].getAttribute("fulltrack") == "1"
def get_album(self):
"""Returns the album object of this track."""
doc = self._request("track.getInfo", True)
albums = doc.getElementsByTagName("album")
if len(albums) == 0:
return
node = doc.getElementsByTagName("album")[0]
return Album(_extract(node, "artist"), _extract(node, "title"), *self.auth_data)
def get_wiki_published_date(self):
"""Returns the date of publishing this version of the wiki."""
doc = self._request("track.getInfo", True)
if len(doc.getElementsByTagName("wiki")) == 0:
return
node = doc.getElementsByTagName("wiki")[0]
return _extract(node, "published")
def get_wiki_summary(self):
"""Returns the summary of the wiki."""
doc = self._request("track.getInfo", True)
if len(doc.getElementsByTagName("wiki")) == 0:
return
node = doc.getElementsByTagName("wiki")[0]
return _extract(node, "summary")
def get_wiki_content(self):
"""Returns the content of the wiki."""
doc = self._request("track.getInfo", True)
if len(doc.getElementsByTagName("wiki")) == 0:
return
node = doc.getElementsByTagName("wiki")[0]
return _extract(node, "content")
def love(self):
"""Adds the track to the user's loved tracks. """
self._request('track.love')
def ban(self):
"""Ban this track from ever playing on the radio. """
self._request('track.ban')
def get_similar(self):
"""Returns similar tracks for this track on Last.fm, based on listening data. """
doc = self._request('track.getSimilar', True)
list = []
for node in doc.getElementsByTagName("track"):
title = _extract(node, 'name')
artist = _extract(node, 'name', 1)
list.append(Track(artist, title, *self.auth_data))
return list
def get_top_fans(self, limit = None):
"""Returns a list of the Users who played this track."""
doc = self._request('track.getTopFans', True)
list = []
elements = doc.getElementsByTagName('user')
for element in elements:
if limit and len(list) >= limit:
break
name = _extract(element, 'name')
weight = _number(_extract(element, 'weight'))
list.append(TopItem(User(name, *self.auth_data), weight))
return list
def share(self, users, message = None):
"""Shares this track (sends out recommendations).
* users: A list that can contain usernames, emails, User objects, or all of them.
* message: A message to include in the recommendation message.
"""
#last.fm currently accepts a max of 10 recipient at a time
while(len(users) > 10):
section = users[0:9]
users = users[9:]
self.share(section, message)
nusers = []
for user in users:
if isinstance(user, User):
nusers.append(user.get_name())
else:
nusers.append(user)
params = self._get_params()
recipients = ','.join(nusers)
params['recipient'] = recipients
if message: params['message'] = unicode(message)
self._request('track.share', False, params)
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the track page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/music/%(artist)s/_/%(title)s'
artist = _get_url_safe(self.get_artist().get_name())
title = _get_url_safe(self.get_title())
return url %{'domain': domain_name, 'artist': artist, 'title': title}
class Group(_BaseObject):
"""A Last.fm group."""
def __init__(self, group_name, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.name = group_name
def __repr__(self):
return self.get_name()
def __eq__(self, other):
return self.get_name().lower() == other.get_name().lower()
def __ne__(self, other):
return self.get_name() != other.get_name()
def _get_params(self):
return {'group': self.get_name()}
def get_name(self):
"""Returns the group name. """
return self.name
def get_weekly_chart_dates(self):
"""Returns a list of From and To tuples for the available charts."""
doc = self._request("group.getWeeklyChartList", True)
list = []
for node in doc.getElementsByTagName("chart"):
list.append( (node.getAttribute("from"), node.getAttribute("to")) )
return list
def get_weekly_artist_charts(self, from_date = None, to_date = None):
"""Returns the weekly artist charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("group.getWeeklyArtistChart", True, params)
list = []
for node in doc.getElementsByTagName("artist"):
item = Artist(_extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def get_weekly_album_charts(self, from_date = None, to_date = None):
"""Returns the weekly album charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("group.getWeeklyAlbumChart", True, params)
list = []
for node in doc.getElementsByTagName("album"):
item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def get_weekly_track_charts(self, from_date = None, to_date = None):
"""Returns the weekly track charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("group.getWeeklyTrackChart", True, params)
list = []
for node in doc.getElementsByTagName("track"):
item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the group page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/group/%(name)s'
name = _get_url_safe(self.get_name())
return url %{'domain': domain_name, 'name': name}
class XSPF(_BaseObject):
"A Last.fm XSPF playlist."""
def __init__(self, uri, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.uri = uri
def _get_params(self):
return {'playlistURL': self.get_uri()}
def __repr__(self):
return self.get_uri()
def __eq__(self, other):
return self.get_uri() == other.get_uri()
def __ne__(self, other):
return self.get_uri() != other.get_uri()
def get_uri(self):
"""Returns the Last.fm playlist URI. """
return self.uri
def get_tracks(self):
"""Returns the tracks on this playlist."""
doc = self._request('playlist.fetch', True)
list = []
for n in doc.getElementsByTagName('track'):
title = _extract(n, 'title')
artist = _extract(n, 'creator')
list.append(Track(artist, title, *self.auth_data))
return list
class User(_BaseObject):
"""A Last.fm user."""
def __init__(self, user_name, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.name = user_name
self._past_events_index = 0
self._recommended_events_index = 0
self._recommended_artists_index = 0
def __repr__(self):
return self.get_name()
def __eq__(self, another):
return self.get_name() == another.get_name()
def __ne__(self, another):
return self.get_name() != another.get_name()
def _get_params(self):
return {"user": self.get_name()}
def get_name(self):
"""Returns the nuser name."""
return self.name
def get_upcoming_events(self):
"""Returns all the upcoming events for this user. """
doc = self._request('user.getEvents', True)
ids = _extract_all(doc, 'id')
events = []
for id in ids:
events.append(Event(id, *self.auth_data))
return events
def get_friends(self, limit = None):
"""Returns a list of the user's friends. """
params = self._get_params()
if limit:
params['limit'] = unicode(limit)
doc = self._request('user.getFriends', True, params)
names = _extract_all(doc, 'name')
list = []
for name in names:
list.append(User(name, *self.auth_data))
return list
def get_loved_tracks(self):
"""Returns the last 50 tracks loved by this user. """
doc = self._request('user.getLovedTracks', True)
list = []
for track in doc.getElementsByTagName('track'):
title = _extract(track, 'name', 0)
artist = _extract(track, 'name', 1)
list.append(Track(artist, title, *self.auth_data))
return list
def get_neighbours(self, limit = None):
"""Returns a list of the user's friends."""
params = self._get_params()
if limit:
params['limit'] = unicode(limit)
doc = self._request('user.getNeighbours', True, params)
list = []
names = _extract_all(doc, 'name')
for name in names:
list.append(User(name, *self.auth_data))
return list
def _get_past_events_pagecount(self):
"""Returns the number of pages in the past events."""
params = self._get_params()
params["page"] = str(self._past_events_index)
doc = self._request("user.getPastEvents", True, params)
return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages"))
def is_end_of_past_events(self):
"""Returns True if the end of Past Events was reached."""
return self._past_events_index >= self._get_past_events_pagecount()
def get_past_events_page(self, ):
"""Retruns a paginated list of all events a user has attended in the past.
Example:
--------
while not user.is_end_of_past_events():
print user.get_past_events_page()
"""
self._past_events_index += 1
params = self._get_params()
params["page"] = str(self._past_events_index)
doc = self._request('user.getPastEvents', True, params)
list = []
for id in _extract_all(doc, 'id'):
list.append(Event(id, *self.auth_data))
return list
def get_playlists(self):
"""Returns a list of Playlists that this user owns."""
doc = self._request("user.getPlaylists", True)
playlists = []
for id in _extract_all(doc, "id"):
playlists.append(Playlist(self.get_name(), id, *self.auth_data))
return playlists
def get_now_playing(self):
"""Returns the currently playing track, or None if nothing is playing. """
params = self._get_params()
params['limit'] = '1'
list = []
doc = self._request('user.getRecentTracks', False, params)
e = doc.getElementsByTagName('track')[0]
if not e.hasAttribute('nowplaying'):
return None
artist = _extract(e, 'artist')
title = _extract(e, 'name')
return Track(artist, title, *self.auth_data)
def get_recent_tracks(self, limit = None):
"""Returns this user's recent listened-to tracks as
a sequence of PlayedTrack objects.
Use extract_items() with the return of this function to
get only a sequence of Track objects with no playback dates. """
params = self._get_params()
if limit:
params['limit'] = unicode(limit)
doc = self._request('user.getRecentTracks', False, params)
list = []
for track in doc.getElementsByTagName('track'):
title = _extract(track, "name")
artist = _extract(track, "artist")
date = _extract(track, "date")
timestamp = track.getElementsByTagName("date")[0].getAttribute("uts")
if track.hasAttribute('nowplaying'):
continue #to prevent the now playing track from sneaking in here
list.append(PlayedTrack(Track(artist, title, *self.auth_data), date, timestamp))
return list
def get_top_albums(self, period = PERIOD_OVERALL):
"""Returns the top albums played by a user.
* period: The period of time. Possible values:
o PERIOD_OVERALL
o PERIOD_3MONTHS
o PERIOD_6MONTHS
o PERIOD_12MONTHS
"""
params = self._get_params()
params['period'] = period
doc = self._request('user.getTopAlbums', True, params)
list = []
for album in doc.getElementsByTagName('album'):
name = _extract(album, 'name')
artist = _extract(album, 'name', 1)
playcount = _extract(album, "playcount")
list.append(TopItem(Album(artist, name, *self.auth_data), playcount))
return list
def get_top_artists(self, period = PERIOD_OVERALL):
"""Returns the top artists played by a user.
* period: The period of time. Possible values:
o PERIOD_OVERALL
o PERIOD_3MONTHS
o PERIOD_6MONTHS
o PERIOD_12MONTHS
"""
params = self._get_params()
params['period'] = period
doc = self._request('user.getTopArtists', True, params)
list = []
for node in doc.getElementsByTagName('artist'):
name = _extract(node, 'name')
playcount = _extract(node, "playcount")
list.append(TopItem(Artist(name, *self.auth_data), playcount))
return list
def get_top_tags(self, limit = None):
"""Returns a sequence of the top tags used by this user with their counts as (Tag, tagcount).
* limit: The limit of how many tags to return.
"""
doc = self._request("user.getTopTags", True)
list = []
for node in doc.getElementsByTagName("tag"):
list.append(TopItem(Tag(_extract(node, "name"), *self.auth_data), _extract(node, "count")))
return list
def get_top_tracks(self, period = PERIOD_OVERALL):
"""Returns the top tracks played by a user.
* period: The period of time. Possible values:
o PERIOD_OVERALL
o PERIOD_3MONTHS
o PERIOD_6MONTHS
o PERIOD_12MONTHS
"""
params = self._get_params()
params['period'] = period
doc = self._request('user.getTopTracks', True, params)
list = []
for track in doc.getElementsByTagName('track'):
name = _extract(track, 'name')
artist = _extract(track, 'name', 1)
playcount = _extract(track, "playcount")
list.append(TopItem(Track(artist, name, *self.auth_data), playcount))
return list
def get_weekly_chart_dates(self):
"""Returns a list of From and To tuples for the available charts."""
doc = self._request("user.getWeeklyChartList", True)
list = []
for node in doc.getElementsByTagName("chart"):
list.append( (node.getAttribute("from"), node.getAttribute("to")) )
return list
def get_weekly_artist_charts(self, from_date = None, to_date = None):
"""Returns the weekly artist charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("user.getWeeklyArtistChart", True, params)
list = []
for node in doc.getElementsByTagName("artist"):
item = Artist(_extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def get_weekly_album_charts(self, from_date = None, to_date = None):
"""Returns the weekly album charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("user.getWeeklyAlbumChart", True, params)
list = []
for node in doc.getElementsByTagName("album"):
item = Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def get_weekly_track_charts(self, from_date = None, to_date = None):
"""Returns the weekly track charts for the week starting from the from_date value to the to_date value."""
params = self._get_params()
if from_date and to_date:
params["from"] = from_date
params["to"] = to_date
doc = self._request("user.getWeeklyTrackChart", True, params)
list = []
for node in doc.getElementsByTagName("track"):
item = Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data)
weight = _extract(node, "playcount")
list.append(TopItem(item, weight))
return list
def compare_with_user(self, user, shared_artists_limit = None):
"""Compare this user with another Last.fm user.
Returns a sequence (tasteometer_score, (shared_artist1, shared_artist2, ...))
user: A User object or a username string/unicode object.
"""
if isinstance(user, User):
user = user.get_name()
params = self._get_params()
if shared_artists_limit:
params['limit'] = unicode(shared_artists_limit)
params['type1'] = 'user'
params['type2'] = 'user'
params['value1'] = self.get_name()
params['value2'] = user
doc = _Request('tasteometer.compare', params, *self.auth_data).execute()
score = _extract(doc, 'score')
artists = doc.getElementsByTagName('artists')[0]
shared_artists_names = _extract_all(artists, 'name')
shared_artists_list = []
for name in shared_artists_names:
shared_artists_list.append(Artist(name, *self.auth_data))
return (score, shared_artists_list)
def getRecommendedEvents(self, page = None, limit = None):
"""Returns a paginated list of all events recommended to a user by Last.fm, based on their listening profile.
* page: The page number of results to return.
* limit: The limit of events to return.
"""
params = self._get_params()
if page:
params['page'] = unicode(page)
if limit:
params['limit'] = unicode(limit)
doc = _Request('user.getRecommendedEvents', params, *self.auth_data).execute()
ids = _extract_all(doc, 'id')
list = []
for id in ids:
list.append(Event(id, *self.auth_data))
return list
def get_url(self, domain_name = DOMAIN_ENGLISH):
"""Returns the url of the user page on Last.fm.
* domain_name: Last.fm's language domain. Possible values:
o DOMAIN_ENGLISH
o DOMAIN_GERMAN
o DOMAIN_SPANISH
o DOMAIN_FRENCH
o DOMAIN_ITALIAN
o DOMAIN_POLISH
o DOMAIN_PORTUGUESE
o DOMAIN_SWEDISH
o DOMAIN_TURKISH
o DOMAIN_RUSSIAN
o DOMAIN_JAPANESE
o DOMAIN_CHINESE
"""
url = 'http://%(domain)s/user/%(name)s'
name = _get_url_safe(self.get_name())
return url %{'domain': domain_name, 'name': name}
def get_library(self):
"""Returns the associated Library object. """
return Library(self, *self.auth_data)
class AuthenticatedUser(User):
def __init__(self, api_key, api_secret, session_key=""):
User.__init__(self, "", api_key, api_secret, session_key);
def _get_params(self):
return {}
def get_name(self):
"""Returns the name of the authenticated user."""
doc = self._request("user.getInfo", True)
self.name = _extract(doc, "name")
return self.name
def get_id(self):
"""Returns the user id."""
doc = self._request("user.getInfo", True)
return _extract(doc, "id")
def get_image_url(self):
"""Returns the user's avatar."""
doc = self._request("user.getInfo", True)
return _extract(doc, "image")
def get_language(self):
"""Returns the language code of the language used by the user."""
doc = self._request("user.getInfo", True)
return _extract(doc, "lang")
def get_country(self):
"""Returns the name of the country of the user."""
doc = self._request("user.getInfo", True)
return Country(_extract(doc, "country"), *self.auth_data)
def get_age(self):
"""Returns the user's age."""
doc = self._request("user.getInfo", True)
return _number(_extract(doc, "age"))
def get_gender(self):
"""Returns the user's gender. Either USER_MALE or USER_FEMALE."""
doc = self._request("user.getInfo", True)
return _extract(doc, "gender")
if value == 'm':
return USER_MALE
elif value == 'f':
return USER_FEMALE
return None
def is_subscriber(self):
"""Returns whether the user is a subscriber or not. True or False."""
doc = self._request("user.getInfo", True)
return _extract(doc, "subscriber") == "1"
def get_playcount(self):
"""Returns the user's playcount so far."""
doc = self._request("user.getInfo", True)
return _number(_extract(doc, "playcount"))
def _get_recommended_events_pagecount(self):
"""Returns the number of pages in the past events."""
params = self._get_params()
params["page"] = str(self._recommended_events_index)
doc = self._request("user.getRecommendedEvents", True, params)
return _number(doc.getElementsByTagName("events")[0].getAttribute("totalPages"))
def is_end_of_recommended_events(self):
"""Returns True if the end of Past Events was reached."""
return self._recommended_events_index >= self._get_recommended_events_pagecount()
def get_recommended_events_page(self, ):
"""Retruns a paginated list of all events a user has attended in the past.
Example:
--------
while not user.is_end_of_recommended_events():
print user.get_recommended_events_page()
"""
self._recommended_events_index += 1
params = self._get_params()
params["page"] = str(self._recommended_events_index)
doc = self._request('user.getRecommendedEvents', True, params)
list = []
for id in _extract_all(doc, 'id'):
list.append(Event(id, *self.auth_data))
return list
def _get_recommended_artists_pagecount(self):
"""Returns the number of pages in the past artists."""
params = self._get_params()
params["page"] = str(self._recommended_artists_index)
doc = self._request("user.getRecommendedArtists", True, params)
return _number(doc.getElementsByTagName("recommendations")[0].getAttribute("totalPages"))
def is_end_of_recommended_artists(self):
"""Returns True if the end of Past Artists was reached."""
return self._recommended_artists_index >= self._get_recommended_artists_pagecount()
def get_recommended_artists_page(self, ):
"""Retruns a paginated list of all artists a user has attended in the past.
Example:
--------
while not user.is_end_of_recommended_artists():
print user.get_recommended_artists_page()
"""
self._recommended_artists_index += 1
params = self._get_params()
params["page"] = str(self._recommended_artists_index)
doc = self._request('user.getRecommendedArtists', True, params)
list = []
for name in _extract_all(doc, 'name'):
list.append(Artist(name, *self.auth_data))
return list
class _Search(_BaseObject):
"""An abstract class. Use one of its derivatives."""
def __init__(self, ws_prefix, search_terms, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self._ws_prefix = ws_prefix
self.search_terms = search_terms
self._last_page_index = 0
def _get_params(self):
params = {}
for key in self.search_terms.keys():
params[key] = self.search_terms[key]
return params
def get_total_result_count(self):
"""Returns the total count of all the results."""
doc = self._request(self._ws_prefix + ".search", True)
return _extract(doc, "opensearch:totalResults")
def _retreive_page(self, page_index):
"""Returns the node of matches to be processed"""
params = self._get_params()
params["page"] = str(page_index)
doc = self._request(self._ws_prefix + ".search", True, params)
return doc.getElementsByTagName(self._ws_prefix + "matches")[0]
def _retrieve_next_page(self):
self._last_page_index += 1
return self._retreive_page(self._last_page_index)
class AlbumSearch(_Search):
"""Search for an album by name."""
def __init__(self, album_name, api_key, api_secret, session_key=""):
_Search.__init__(self, "album", {"album": album_name}, api_key, api_secret, session_key)
def get_next_page(self):
"""Returns the next page of results as a sequence of Album objects."""
master_node = self._retrieve_next_page()
list = []
for node in master_node.getElementsByTagName("album"):
list.append(Album(_extract(node, "artist"), _extract(node, "name"), *self.auth_data))
return list
class ArtistSearch(_Search):
"""Search for an artist by artist name."""
def __init__(self, artist_name, api_key, api_secret, session_key=""):
_Search.__init__(self, "artist", {"artist": artist_name}, api_key, api_secret, session_key)
def get_next_page(self):
"""Returns the next page of results as a sequence of Artist objects."""
master_node = self._retrieve_next_page()
list = []
for node in master_node.getElementsByTagName("artist"):
list.append(Artist(_extract(node, "name"), *self.auth_data))
return list
class TagSearch(_Search):
"""Search for a tag by tag name."""
def __init__(self, tag_name, api_key, api_secret, session_key=""):
_Search.__init__(self, "tag", {"tag": tag_name}, api_key, api_secret, session_key)
def get_next_page(self):
"""Returns the next page of results as a sequence of Tag objects."""
master_node = self._retrieve_next_page()
list = []
for node in master_node.getElementsByTagName("tag"):
list.append(Tag(_extract(node, "name"), *self.auth_data))
return list
class TrackSearch(_Search):
"""Search for a track by track title. If you don't wanna narrow the results down
by specifying the artist name, set it to empty string."""
def __init__(self, artist_name, track_title, api_key, api_secret, session_key=""):
_Search.__init__(self, "track", {"track": track_title, "artist": artist_name}, api_key, api_secret, session_key)
def get_next_page(self):
"""Returns the next page of results as a sequence of Track objects."""
master_node = self._retrieve_next_page()
list = []
for node in master_node.getElementsByTagName("track"):
list.append(Track(_extract(node, "artist"), _extract(node, "name"), *self.auth_data))
return list
class VenueSearch(_Search):
"""Search for a venue by its name. If you don't wanna narrow the results down
by specifying a country, set it to empty string."""
def __init__(self, venue_name, country_name, api_key, api_secret, session_key=""):
_Search.__init__(self, "venue", {"venue": venue_name, "country": country_name}, api_key, api_secret, session_key)
def get_next_page(self):
"""Returns the next page of results as a sequence of Track objects."""
master_node = self._retrieve_next_page()
list = []
for node in master_node.getElementsByTagName("venue"):
list.append(Venue(_extract(node, "id"), *self.auth_data))
return list
class Venue(_BaseObject):
"""A venue where events are held."""
# TODO: waiting for a venue.getInfo web service to use.
def __init__(self, id, api_key, api_secret, session_key=""):
_BaseObject.__init__(self, api_key, api_secret, session_key)
self.id = _number(id)
def __repr__(self):
return "Venue #" + str(self.id)
def __eq__(self, other):
return self.get_id() == other.get_id()
def _get_params(self):
return {"venue": self.get_id()}
def get_id(self):
"""Returns the id of the venue."""
return self.id
def get_upcoming_events(self):
"""Returns the upcoming events in this venue."""
doc = self._request("venue.getEvents", True)
list = []
for node in doc.getElementsByTagName("event"):
list.append(Event(_extract(node, "id"), *self.auth_data))
return list
def get_past_events(self):
"""Returns the past events held in this venue."""
doc = self._request("venue.getEvents", True)
list = []
for node in doc.getElementsByTagName("event"):
list.append(Event(_extract(node, "id"), *self.auth_data))
return list
def create_new_playlist(title, description, api_key, api_secret, session_key=""):
"""Creates a playlist for the authenticated user and returns it.
* title: The title of the new playlist.
* description: The description of the new playlist.
"""
params = dict()
params['title'] = unicode(title)
params['description'] = unicode(description)
doc = _Request('playlist.create', params, api_key, api_secret, session_key).execute()
id = doc.getElementsByTagName("id")[0].firstChild.data
user = doc.getElementsByTagName('playlists')[0].getAttribute('user')
return Playlist(user, id, api_key, api_secret, session_key)
def get_authenticated_user(api_key, api_secret, session_key=""):
"""Returns the authenticated user."""
return AuthenticatedUser(api_key, api_secret, session_key)
def md5(text):
"""Returns the md5 hash of a string."""
hash = hashlib.md5()
hash.update(text.encode("utf-8"))
return hash.hexdigest()
def enable_proxy(host, port):
"""Enable a default web proxy."""
global __proxy
global __proxy_enabled
__proxy = [host, _number(port)]
__proxy_enabled = True
def disable_proxy():
"""Disable using the web proxy."""
global __proxy_enabled
__proxy_enabled = False
def is_proxy_enabled():
"""Returns True if a web proxy is enabled."""
global __proxy_enabled
return __proxy_enabled
def _get_proxy():
"""Returns proxy details."""
global __proxy
return __proxy
def async_call(sender, call, callback = None, call_args = None, callback_args = None):
"""This is the function for setting up an asynchronous operation.
* call: The function to call asynchronously.
* callback: The function to call after the operation is complete, Its prototype has to be like:
callback(sender, output[, param1, param3, ... ])
* call_args: A sequence of args to be passed to call.
* callback_args: A sequence of args to be passed to callback.
"""
thread = _ThreadedCall(sender, call, call_args, callback, callback_args)
thread.start()
def enable_caching(file_path = None):
"""Enables caching request-wide for all cachable calls.
In choosing the backend used for caching, it will try _SqliteCacheBackend first if
the module sqlite3 is present. If not, it will fallback to _ShelfCacheBackend which uses shelve.Shelf objects.
* file_path: A file path for the backend storage file. If
None set, a temp file would probably be created, according the backend.
"""
global __cache_backend
if not file_path:
file_path = tempfile.mktemp(prefix="pylast_tmp_")
if "sqlite3" in sys.modules.keys():
__cache_backend = _SqliteCacheBackend(file_path)
debug("Caching to Sqlite3 at " + file_path)
else:
__cache_backend = _ShelfCacheBackend(file_path)
debug("Caching to Shelf at " + file_path)
def disable_caching():
"""Disables all caching features."""
global __cache_backend
__cache_backend = None
def is_caching_enabled():
"""Returns True if caching is enabled."""
global __cache_backend
return not (__cache_backend == None)
def _get_cache_backend():
global __cache_backend
return __cache_backend
def _extract(node, name, index = 0):
"""Extracts a value from the xml string"""
nodes = node.getElementsByTagName(name)
if len(nodes):
if nodes[index].firstChild:
return nodes[index].firstChild.data.strip()
else:
return None
def _extract_all(node, name, limit_count = None):
"""Extracts all the values from the xml string. returning a list."""
list = []
for i in range(0, len(node.getElementsByTagName(name))):
if len(list) == limit_count:
break
list.append(_extract(node, name, i))
return list
def _get_url_safe(text):
"""Does all kinds of tricks on a text to make it safe to use in a url."""
if type(text) == type(unicode()):
text = text
return urllib.quote_plus(urllib.quote_plus(text)).lower()
def _number(string):
"""Extracts an int from a string. Returns a 0 if None or an empty string was passed."""
if not string:
return 0
elif string == "":
return 0
else:
return int(string)
def search_for_album(album_name, api_key, api_secret, session_key=""):
"""Searches for an album by its name. Returns a AlbumSearch object.
Use get_next_page() to retreive sequences of results."""
return AlbumSearch(album_name, api_key, api_secret, session_key)
def search_for_artist(artist_name, api_key, api_secret, session_key=""):
"""Searches of an artist by its name. Returns a ArtistSearch object.
Use get_next_page() to retreive sequences of results."""
return ArtistSearch(artist_name, api_key, api_secret, session_key)
def search_for_tag(tag_name, api_key, api_secret, session_key=""):
"""Searches of a tag by its name. Returns a TagSearch object.
Use get_next_page() to retreive sequences of results."""
return TagSearch(tag_name, api_key, api_secret, session_key)
def search_for_track(artist_name, track_name, api_key, api_secret, session_key=""):
"""Searches of a track by its name and its artist. Set artist to an empty string if not available.
Returns a TrackSearch object.
Use get_next_page() to retreive sequences of results."""
return TrackSearch(artist_name, track_name, api_key, api_secret, session_key)
def search_for_venue(venue_name, country_name, api_key, api_secret, session_key=""):
"""Searches of a venue by its name and its country. Set country_name to an empty string if not available.
Returns a VenueSearch object.
Use get_next_page() to retreive sequences of results."""
return VenueSearch(venue_name, country_name, api_key, api_secret, session_key)
def extract_items(topitems_or_libraryitems):
"""Extracts a sequence of items from a sequence of TopItem or LibraryItem objects."""
list = []
for i in topitems_or_libraryitems:
list.append(i.get_item())
return list
def get_top_tags(api_key, api_secret, session_key=""):
"""Returns a sequence of the most used Last.fm tags as a sequence of TopItem objects."""
doc = _Request("tag.getTopTags", dict(), api_key, api_secret, session_key).execute(True)
list = []
for node in doc.getElementsByTagName("tag"):
tag = Tag(_extract(node, "name"), api_key, api_secret, session_key)
weight = _extract(node, "count")
list.append(TopItem(tag, weight))
return list
def get_track_by_mbid(mbid, api_key, api_secret, session_key=""):
"""Looks up a track by its MusicBrainz ID."""
params = {"mbid": unicode(mbid)}
doc = _Request("track.getInfo", params, api_key, api_secret, session_key).execute(True)
return Track(_extract(doc, "name", 1), _extract(doc, "name"), api_key, api_secret, session_key)
def get_artist_by_mbid(mbid, api_key, api_secret, session_key=""):
"""Loooks up an artist by its MusicBrainz ID."""
params = {"mbid": unicode(mbid)}
doc = _Request("artist.getInfo", params, api_key, api_secret, session_key).execute(True)
return Artist(_extract(doc, "name"), api_key, api_secret, session_key)
def get_album_by_mbid(mbid, api_key, api_secret, session_key=""):
"""Looks up an album by its MusicBrainz ID."""
params = {"mbid": unicode(mbid)}
doc = _Request("album.getInfo", params, api_key, api_secret, session_key).execute(True)
return Album(_extract(doc, "artist"), _extract(doc, "name"), api_key, api_secret, session_key)
def _delay_call():
"""Makes sure that web service calls are at least a second apart."""
global __last_call_time
# delay time in seconds
DELAY_TIME = 1.0
now = time.time()
if (now - __last_call_time) < DELAY_TIME:
time.sleep(1)
__last_call_time = now
# ------------------------------------------------------------
class ScrobblingException(Exception):
def __inint__(self, message):
Exception.__init__(self)
self.message = message
def __repr__(self):
return self.message
class BannedClient(ScrobblingException):
def __init__(self):
ScrobblingException.__init__(self, "This version of the client has been banned")
class BadAuthentication(ScrobblingException):
def __init__(self):
ScrobblingException.__init__(self, "Bad authentication token")
class BadTime(ScrobblingException):
def __init__(self):
ScrobblingException.__init__(self, "Time provided is not close enough to current time")
class BadSession(ScrobblingException):
def __init__(self):
ScrobblingException.__init__(self, "Bad session id, consider re-handshaking")
class _ScrobblerRequest(object):
def __init__(self, url, params):
self.params = params
self.hostname = url[url.find("//") + 2:url.rfind("/")]
self.subdir = url[url.rfind("/"):]
def execute(self):
"""Returns a string response of this request."""
connection = httplib.HTTPConnection(self.hostname)
data = []
for name in self.params.keys():
value = urllib.quote_plus(self.params[name])
data.append('='.join((name, value)))
data = "&".join(data)
headers = {
"Content-type": "application/x-www-form-urlencoded",
"Accept-Charset": "utf-8",
"User-Agent": __name__ + "/" + __version__,
"HOST": self.hostname
}
debug("Scrobbler Request:\n\tHOST :" + self.hostname + "\n\tSUBDIR: " + self.subdir +
"\n\tDATA:" + repr(data) + "\n\tHEADERS: " + repr(headers))
connection.request("POST", self.subdir, data, headers)
response = connection.getresponse().read()
self._check_response_for_errors(response)
debug("Scrobbler Response:\n\t" + response)
return response
def _check_response_for_errors(self, response):
"""When passed a string response it checks for erros, raising
any exceptions as necessary."""
lines = response.split("\n")
status_line = lines[0]
if status_line == "OK":
return
elif status_line == "BANNED":
raise BannedClient()
elif status_line == "BADAUTH":
raise BadAuthentication()
elif status_line == "BADTIME":
raise BadTime()
elif status_line == "BADSESSION":
raise BadSession()
elif status_line.startswith("FAILED "):
reason = status_line[status_line.find("FAILED ")+len("FAILED "):]
raise ScrobblingException(reason)
class Scrobbler(object):
"""A class for scrobbling tracks to Last.fm"""
session_id = None
nowplaying_url = None
submissions_url = None
def __init__(self, client_id, client_version, username, md5_password):
self.client_id = client_id
self.client_version = client_version
self.username = username
self.password = md5_password
def _do_handshake(self):
"""Handshakes with the server"""
timestamp = str(int(time.time()))
token = md5(self.password + timestamp)
params = {"hs": "true", "p": "1.2.1", "c": self.client_id,
"v": self.client_version, "u": self.username, "t": timestamp,
"a": token}
global SUBMISSION_SERVER
response = _ScrobblerRequest(SUBMISSION_SERVER, params).execute().split("\n")
self.session_id = response[1]
self.nowplaying_url = response[2]
self.submissions_url = response[3]
def _get_session_id(self, new = False):
"""Returns a handshake. If new is true, then it will be requested from the server
even if one was cached."""
if not self.session_id or new:
debug("Doing a scrobbling handshake")
self._do_handshake()
return self.session_id
def report_now_playing(self, artist, title, album = "", duration = "", track_number = "", mbid = ""):
params = {"s": self._get_session_id(), "a": artist, "t": title,
"b": album, "l": duration, "n": track_number, "m": mbid}
response = _ScrobblerRequest(self.nowplaying_url, params).execute()
try:
_ScrobblerRequest(self.nowplaying_url, params).execute()
except BadSession:
self._do_handshake()
self.report_now_playing(artist, title, album, duration, track_number, mbid)
info(artist + " - " + title + " was reported as now-playing")
def scrobble(self, artist, title, time_started, source, mode, duration, album="", track_number="", mbid=""):
"""Scrobble a track. parameters:
artist: Artist name.
title: Track title.
time_started: UTC timestamp of when the track started playing.
source: The source of the track
SCROBBLE_SOURCE_USER: Chosen by the user (the most common value, unless you have a reason for choosing otherwise, use this).
SCROBBLE_SOURCE_NON_PERSONALIZED_BROADCAST: Non-personalised broadcast (e.g. Shoutcast, BBC Radio 1).
SCROBBLE_SOURCE_PERSONALIZED_BROADCAST: Personalised recommendation except Last.fm (e.g. Pandora, Launchcast).
SCROBBLE_SOURCE_LASTFM: ast.fm (any mode). In this case, the 5-digit recommendation_key value must be set.
SCROBBLE_SOURCE_UNKNOWN: Source unknown.
mode: The submission mode
SCROBBLE_MODE_PLAYED: The track was played.
SCROBBLE_MODE_SKIPPED: The track was skipped (Only if source was Last.fm)
SCROBBLE_MODE_BANNED: The track was banned (Only if source was Last.fm)
duration: Track duration in seconds.
album: The album name.
track_number: The track number on the album.
mbid: MusicBrainz ID.
"""
params = {"s": self._get_session_id(), "a[0]": artist, "t[0]": title,
"i[0]": str(time_started), "o[0]": source, "r[0]": mode, "l[0]": str(duration),
"b[0]": album, "n[0]": track_number, "m[0]": mbid}
response = _ScrobblerRequest(self.submissions_url, params).execute()
info(artist + " - " + title + " was scrobbled")