From ad603d367de5616b7900de90dbde1b87fc081246 Mon Sep 17 00:00:00 2001 From: Hirad Date: Sun, 7 Jul 2024 09:19:48 +0330 Subject: [PATCH] initial commit --- compare.py | 50 +++++++++++++++++++++++ libs/config.py | 42 +++++++++++++++++++ libs/database.py | 103 +++++++++++++++++++++++++++++++++++++++++++++++ libs/networks.py | 54 +++++++++++++++++++++++++ libs/track.py | 13 ++++++ sync.py | 83 ++++++++++++++++++++++++++++++++++++++ tracks_list.py | 26 ++++++++++++ 7 files changed, 371 insertions(+) create mode 100644 compare.py create mode 100644 libs/config.py create mode 100644 libs/database.py create mode 100644 libs/networks.py create mode 100644 libs/track.py create mode 100644 sync.py create mode 100644 tracks_list.py diff --git a/compare.py b/compare.py new file mode 100644 index 0000000..9aa7e73 --- /dev/null +++ b/compare.py @@ -0,0 +1,50 @@ +#!/usr/local/hirad-venv/bin/python +from libs.networks import Network +from libs.database import Compare +from libs.track import get_track_details +import argparse + + +def get_tracks(network, table, num=100, first_ts=None, last_ts=None): + print(f'Getting {num} tracks from {table}...') + tracks_list = network.get_tracks(num, first_ts, last_ts) + for track in tracks_list: + artist, title, album, _, timestamp = get_track_details(track) + compare.write_into(table, artist, title, album, timestamp) + # last_track = tracks_list[-1] + # return last_track.timestamp + + +def compare_tables(): + compare.compare() + difference = compare.read_difference() + print("Number of tracks to be synced: {0}".format(len(difference))) + if len(difference) > 0: + for track in difference: + artist, title, album, timestamp = track + mbid, duration = lastfm_network.get_track_mbid_duration(artist, title) + print(f"Track: {artist} - {title} - {album} - {timestamp} | {mbid} | {duration}") + lonestar_network.scrobble(artist, title, album, duration, timestamp, mbid) + else: + print("No difference found!") + + +def arguments(): + parser = argparse.ArgumentParser(description="Compare scrobbled tracks") + parser.add_argument('-t', '--tracks', type=int, nargs='?', default=100, help="Number of tracks to compare") + args = parser.parse_args() + + return args.tracks + + +if __name__ == '__main__': + number_of_tracks = arguments() + compare = Compare() + lastfm_network = Network(Network.get_network("LastFM")) + lonestar_network = Network(Network.get_network("LoneStar")) + get_tracks(lastfm_network, "LastFM", num=number_of_tracks) + get_tracks(lonestar_network, "LoneStar", num=number_of_tracks) + compare_tables() + compare.drop_table("LastFM") + compare.drop_table("LoneStar") + compare.drop_table("Difference") diff --git a/libs/config.py b/libs/config.py new file mode 100644 index 0000000..6b43ed9 --- /dev/null +++ b/libs/config.py @@ -0,0 +1,42 @@ +import os +from configparser import ConfigParser +from xdg.BaseDirectory import xdg_data_home, xdg_config_home + + +class Config: + def __init__(self): + self.config = ConfigParser() + self.config_dir = os.path.join(xdg_config_home, 'lonestar.fm') + self.config_file = os.path.join(self.config_dir, 'scrobbler.ini') + self.check_config_dir_exists() + + def check_config_dir_exists(self): + if os.path.exists(self.config_dir): + return True + else: + os.makedirs(self.config_dir) + return True + + def check_config_file_exists(self): + if os.path.exists(self.config_file): + return True + else: + return False + + def create_config_file(self, api_key, api_secret, username, password): + self.config['API'] = { + 'api_key': api_key, + 'api_secret': api_secret, + 'username': username, + 'password': password + } + with open(self.config_file, 'w') as configfile: + self.config.write(configfile) + + def get_config_values(self): + self.config.read(self.config_file) + api_key = self.config['API']['api_key'] + api_secret = self.config['API']['api_secret'] + username = self.config['API']['username'] + password = self.config['API']['password'] + return api_key, api_secret, username, password diff --git a/libs/database.py b/libs/database.py new file mode 100644 index 0000000..36381a9 --- /dev/null +++ b/libs/database.py @@ -0,0 +1,103 @@ +import os +import sqlite3 +from xdg.BaseDirectory import xdg_data_home + + +class Sqlite: + db_directory = os.path.join(xdg_data_home, 'lonestar.fm') + if not os.path.exists(db_directory): + os.makedirs(db_directory) + db = os.path.join(db_directory, 'scrobbles.db') + conn = sqlite3.connect(db) + cur = conn.cursor() + + def close(self): + self.conn.close() + + +class Sync(Sqlite): + def __init__(self): + table = '''CREATE TABLE IF NOT EXISTS Tracks ( + Date TEXT, Artist TEXT, Title TEXT, Album TEXT, + Duration INT, Timestamp INT, mbid TEXT)''' + self.cur.execute(table) + self.conn.commit() + + def write_into(self, + date, artist, title, album, + duration, timestamp, mbid): + self.cur.execute( + 'SELECT * FROM Tracks WHERE (Title=? AND Timestamp=?)', + (title, timestamp)) + entry = self.cur.fetchone() + if not entry: + insert_query = '''INSERT INTO Tracks( + Date, Artist, Title, Album, + Duration, Timestamp, mbid) VALUES (?,?,?,?,?,?,?)''' + self.cur.execute( + insert_query, + (date, artist, title, album, duration, timestamp, mbid)) + self.conn.commit() + else: + print("Item already exists!") + + def select_all(self): + self.cur.execute('SELECT * FROM Tracks') + result = self.cur.fetchall() + return result + + def drop_track(self, title, timestamp): + query = 'DELETE FROM Tracks WHERE (Title=? AND Timestamp=?)' + self.cur.execute(query, (title, timestamp)) + self.conn.commit() + return True + + +class Compare(Sqlite): + def __init__(self): + tables = ['LastFM', 'LoneStar'] + for table in tables: + self.create_table(table) + + def create_table(self, table_name): + table = f'''CREATE TABLE IF NOT EXISTS {table_name} ( + Artist TEXT, Title TEXT, Album TEXT, Timestamp INT)''' + self.cur.execute(table) + self.conn.commit() + + def write_into(self, table_name, artist, title, album, timestamp): + self.cur.execute( + f'SELECT * FROM {table_name} WHERE (Title=? AND Timestamp=?)', + (title, timestamp)) + entry = self.cur.fetchone() + if not entry: + query = f'''INSERT INTO {table_name} ( + Artist, Title, Album, Timestamp) VALUES (?,?,?,?)''' + self.cur.execute(query, (artist, title, album, timestamp)) + self.conn.commit() + else: + print("Item already exists!") + + def compare(self): + # query = 'SELECT * FROM Lastfm EXCEPT SELECT * FROM Lonestar' + query = ''' + CREATE TABLE Difference AS + SELECT * FROM LastFM + EXCEPT + SELECT * FROM LoneStar + ''' + self.cur.execute(query) + self.conn.commit() + # result = self.cur.fetchall() + # return result + + def read_difference(self): + query = 'SELECT * FROM Difference' + self.cur.execute(query) + result = self.cur.fetchall() + return result + + def drop_table(self, table_name): + self.cur.execute(f'DROP TABLE {table_name}') + self.conn.commit() + \ No newline at end of file diff --git a/libs/networks.py b/libs/networks.py new file mode 100644 index 0000000..b3e56a7 --- /dev/null +++ b/libs/networks.py @@ -0,0 +1,54 @@ +import pylast +from libs.config import Config + + +config = Config() +api_key, api_secret, username, password = config.get_config_values() + + +class Network: + def __init__(self, network): + self.network = network + self.user = username + + @staticmethod + def get_network(network_name): + if network_name == "LastFM": + network = pylast.LastFMNetwork( + api_key=api_key, + api_secret=api_secret, + username=username, + password_hash=pylast.md5(password) + ) + return network + elif network_name == "LoneStar": + network = pylast.LibreFMNetwork( + api_key=api_key, + api_secret=api_secret, + username=username, + password_hash=pylast.md5(password) + ) + return network + else: + raise ValueError("Unknown network name provided!") + + def get_tracks(self, number, first, last): + tracks_list = (self.network.get_user(self.user) + .get_recent_tracks(limit=number, time_from=first, time_to=last)) + return tracks_list + + def get_track_mbid_duration(self, artist, title): + net = self.get_network("LastFM") + track = net.get_track(artist=artist, title=title) + return track.get_mbid(), track.get_duration() + + def scrobble(self, artist, title, album, duration, timestamp, mbid): + self.network.scrobble( + artist=artist, + title=title, + album=album, + duration=duration, + timestamp=timestamp, + mbid=mbid + ) + \ No newline at end of file diff --git a/libs/track.py b/libs/track.py new file mode 100644 index 0000000..0d7d7e3 --- /dev/null +++ b/libs/track.py @@ -0,0 +1,13 @@ + +def get_track_details(track): + artist, title = str(track.track).split(' - ', 1) + album = track.album + date = track.playback_date + timestamp = track.timestamp + return artist, title, album, date, timestamp + + +def get_duration_mbid(track): + duration = track.track.get_duration() + mbid = track.track.get_mbid() + return duration, mbid diff --git a/sync.py b/sync.py new file mode 100644 index 0000000..1b71ab5 --- /dev/null +++ b/sync.py @@ -0,0 +1,83 @@ +#!/usr/local/hirad-venv/bin/python +import time +import argparse +import httpcore +import httpx + +from libs.networks import Network +from libs.database import Sync +from libs.track import get_track_details, get_duration_mbid + + +def get_last_lonestar_track(): + last_track = lonestar_network.get_tracks(1, None, None)[0] + artist, title, _, date, timestamp = get_track_details(last_track) + print(f"Last LoneStar track: {date} - {artist} - {title} - {timestamp}") + return timestamp + + +def get_lastfm_tracks(first_ts, last_ts): + tracks_list = lastfm_network.get_tracks(None, first_ts, last_ts) + print(f"There is {len(tracks_list)} tracks from LastFM to be synced to LoneStar.") + for i, track in enumerate(tracks_list): + artist, title, album, date, timestamp = get_track_details(track) + duration, mbid = get_duration_mbid(track) + print(f'{i + 1}. {artist} - {title} - {date} - {timestamp}') + database.write_into(date, artist, title, album, duration, timestamp, mbid) + print("Finished adding tracks to the database") + + +def sync_scrobbles(): + tracks_list = database.select_all() + for i, track in enumerate(tracks_list): + date, artist, title, album, duration, timestamp, mbid = track + print(f"Syncing {i + 1}. {artist} - {title} - {timestamp}") + try: + lonestar_network.scrobble( + artist=artist, + title=title, + album=album, + duration=duration, + timestamp=timestamp, + mbid=mbid + ) + drop_track = database.drop_track(title, timestamp) + if drop_track: + print("Track synced and dropped from db") + except Exception as e: + print("An error occurred while syncing", e) + + +def arguments(): + parser = argparse.ArgumentParser(description="Sync scrobbles between LastFM and LoneStar") + parser.add_argument('-q', action='store_true', help="Quiet run. Not asking for timestamps") + args = parser.parse_args() + + if args.q: + first_ts = get_last_lonestar_track() + last_ts = int(time.time()) + else: + first_ts = input("Enter first track timestamp: ") + if len(first_ts) != 10: + first_ts = get_last_lonestar_track() + last_ts = input("Enter last track timestamp: ") + if len(last_ts) != 10: + last_ts = int(time.time()) + + return first_ts, last_ts + + +if __name__ == '__main__': + database = Sync() + lastfm_network = Network(Network.get_network("LastFM")) + lonestar_network = Network(Network.get_network("LoneStar")) + + first_timestamp, last_timestamp = arguments() + try: + get_lastfm_tracks(first_timestamp, last_timestamp) + except (httpx.ReadTimeout, httpcore.ReadTimeout) as err: + print("Network read timeout {}. Retrying...".format(err)) + time.sleep(2) + get_lastfm_tracks(first_timestamp, last_timestamp) + + sync_scrobbles() diff --git a/tracks_list.py b/tracks_list.py new file mode 100644 index 0000000..1df058d --- /dev/null +++ b/tracks_list.py @@ -0,0 +1,26 @@ +#!/usr/local/hirad-venv/bin/python +from libs.networks import Network +from libs.track import get_track_details + + +def get_tracks_list(tracks_list, net_name): + with open(net_name + ".txt", "w") as file: + for i, track in enumerate(tracks_list): + artist, title, _, date, timestamp = get_track_details(track) + line = f"{artist} - {title} - {timestamp}\n" + file.write(line) + print(line) + + +if __name__ == '__main__': + network = None + net = input("Enter network name: ") + if net == "lastfm": + network = Network(Network.get_network("LastFM")) + elif net == "lonestar": + network = Network(Network.get_network("LoneStar")) + + number_of_tracks = int(input("Enter number of tracks: ")) + + result = network.get_tracks(number_of_tracks, None, None) + get_tracks_list(result, net)