initial commit

This commit is contained in:
Hirad 2024-07-07 09:19:48 +03:30
parent 3921f5587e
commit ad603d367d
7 changed files with 371 additions and 0 deletions

50
compare.py Normal file
View file

@ -0,0 +1,50 @@
#!/usr/local/hirad-venv/bin/python
from libs.networks import Network
from libs.database import Compare
from libs.track import get_track_details
import argparse
def get_tracks(network, table, num=100, first_ts=None, last_ts=None):
print(f'Getting {num} tracks from {table}...')
tracks_list = network.get_tracks(num, first_ts, last_ts)
for track in tracks_list:
artist, title, album, _, timestamp = get_track_details(track)
compare.write_into(table, artist, title, album, timestamp)
# last_track = tracks_list[-1]
# return last_track.timestamp
def compare_tables():
compare.compare()
difference = compare.read_difference()
print("Number of tracks to be synced: {0}".format(len(difference)))
if len(difference) > 0:
for track in difference:
artist, title, album, timestamp = track
mbid, duration = lastfm_network.get_track_mbid_duration(artist, title)
print(f"Track: {artist} - {title} - {album} - {timestamp} | {mbid} | {duration}")
lonestar_network.scrobble(artist, title, album, duration, timestamp, mbid)
else:
print("No difference found!")
def arguments():
parser = argparse.ArgumentParser(description="Compare scrobbled tracks")
parser.add_argument('-t', '--tracks', type=int, nargs='?', default=100, help="Number of tracks to compare")
args = parser.parse_args()
return args.tracks
if __name__ == '__main__':
number_of_tracks = arguments()
compare = Compare()
lastfm_network = Network(Network.get_network("LastFM"))
lonestar_network = Network(Network.get_network("LoneStar"))
get_tracks(lastfm_network, "LastFM", num=number_of_tracks)
get_tracks(lonestar_network, "LoneStar", num=number_of_tracks)
compare_tables()
compare.drop_table("LastFM")
compare.drop_table("LoneStar")
compare.drop_table("Difference")

42
libs/config.py Normal file
View file

@ -0,0 +1,42 @@
import os
from configparser import ConfigParser
from xdg.BaseDirectory import xdg_data_home, xdg_config_home
class Config:
def __init__(self):
self.config = ConfigParser()
self.config_dir = os.path.join(xdg_config_home, 'lonestar.fm')
self.config_file = os.path.join(self.config_dir, 'scrobbler.ini')
self.check_config_dir_exists()
def check_config_dir_exists(self):
if os.path.exists(self.config_dir):
return True
else:
os.makedirs(self.config_dir)
return True
def check_config_file_exists(self):
if os.path.exists(self.config_file):
return True
else:
return False
def create_config_file(self, api_key, api_secret, username, password):
self.config['API'] = {
'api_key': api_key,
'api_secret': api_secret,
'username': username,
'password': password
}
with open(self.config_file, 'w') as configfile:
self.config.write(configfile)
def get_config_values(self):
self.config.read(self.config_file)
api_key = self.config['API']['api_key']
api_secret = self.config['API']['api_secret']
username = self.config['API']['username']
password = self.config['API']['password']
return api_key, api_secret, username, password

103
libs/database.py Normal file
View file

@ -0,0 +1,103 @@
import os
import sqlite3
from xdg.BaseDirectory import xdg_data_home
class Sqlite:
db_directory = os.path.join(xdg_data_home, 'lonestar.fm')
if not os.path.exists(db_directory):
os.makedirs(db_directory)
db = os.path.join(db_directory, 'scrobbles.db')
conn = sqlite3.connect(db)
cur = conn.cursor()
def close(self):
self.conn.close()
class Sync(Sqlite):
def __init__(self):
table = '''CREATE TABLE IF NOT EXISTS Tracks (
Date TEXT, Artist TEXT, Title TEXT, Album TEXT,
Duration INT, Timestamp INT, mbid TEXT)'''
self.cur.execute(table)
self.conn.commit()
def write_into(self,
date, artist, title, album,
duration, timestamp, mbid):
self.cur.execute(
'SELECT * FROM Tracks WHERE (Title=? AND Timestamp=?)',
(title, timestamp))
entry = self.cur.fetchone()
if not entry:
insert_query = '''INSERT INTO Tracks(
Date, Artist, Title, Album,
Duration, Timestamp, mbid) VALUES (?,?,?,?,?,?,?)'''
self.cur.execute(
insert_query,
(date, artist, title, album, duration, timestamp, mbid))
self.conn.commit()
else:
print("Item already exists!")
def select_all(self):
self.cur.execute('SELECT * FROM Tracks')
result = self.cur.fetchall()
return result
def drop_track(self, title, timestamp):
query = 'DELETE FROM Tracks WHERE (Title=? AND Timestamp=?)'
self.cur.execute(query, (title, timestamp))
self.conn.commit()
return True
class Compare(Sqlite):
def __init__(self):
tables = ['LastFM', 'LoneStar']
for table in tables:
self.create_table(table)
def create_table(self, table_name):
table = f'''CREATE TABLE IF NOT EXISTS {table_name} (
Artist TEXT, Title TEXT, Album TEXT, Timestamp INT)'''
self.cur.execute(table)
self.conn.commit()
def write_into(self, table_name, artist, title, album, timestamp):
self.cur.execute(
f'SELECT * FROM {table_name} WHERE (Title=? AND Timestamp=?)',
(title, timestamp))
entry = self.cur.fetchone()
if not entry:
query = f'''INSERT INTO {table_name} (
Artist, Title, Album, Timestamp) VALUES (?,?,?,?)'''
self.cur.execute(query, (artist, title, album, timestamp))
self.conn.commit()
else:
print("Item already exists!")
def compare(self):
# query = 'SELECT * FROM Lastfm EXCEPT SELECT * FROM Lonestar'
query = '''
CREATE TABLE Difference AS
SELECT * FROM LastFM
EXCEPT
SELECT * FROM LoneStar
'''
self.cur.execute(query)
self.conn.commit()
# result = self.cur.fetchall()
# return result
def read_difference(self):
query = 'SELECT * FROM Difference'
self.cur.execute(query)
result = self.cur.fetchall()
return result
def drop_table(self, table_name):
self.cur.execute(f'DROP TABLE {table_name}')
self.conn.commit()

54
libs/networks.py Normal file
View file

@ -0,0 +1,54 @@
import pylast
from libs.config import Config
config = Config()
api_key, api_secret, username, password = config.get_config_values()
class Network:
def __init__(self, network):
self.network = network
self.user = username
@staticmethod
def get_network(network_name):
if network_name == "LastFM":
network = pylast.LastFMNetwork(
api_key=api_key,
api_secret=api_secret,
username=username,
password_hash=pylast.md5(password)
)
return network
elif network_name == "LoneStar":
network = pylast.LibreFMNetwork(
api_key=api_key,
api_secret=api_secret,
username=username,
password_hash=pylast.md5(password)
)
return network
else:
raise ValueError("Unknown network name provided!")
def get_tracks(self, number, first, last):
tracks_list = (self.network.get_user(self.user)
.get_recent_tracks(limit=number, time_from=first, time_to=last))
return tracks_list
def get_track_mbid_duration(self, artist, title):
net = self.get_network("LastFM")
track = net.get_track(artist=artist, title=title)
return track.get_mbid(), track.get_duration()
def scrobble(self, artist, title, album, duration, timestamp, mbid):
self.network.scrobble(
artist=artist,
title=title,
album=album,
duration=duration,
timestamp=timestamp,
mbid=mbid
)

13
libs/track.py Normal file
View file

@ -0,0 +1,13 @@
def get_track_details(track):
artist, title = str(track.track).split(' - ', 1)
album = track.album
date = track.playback_date
timestamp = track.timestamp
return artist, title, album, date, timestamp
def get_duration_mbid(track):
duration = track.track.get_duration()
mbid = track.track.get_mbid()
return duration, mbid

83
sync.py Normal file
View file

@ -0,0 +1,83 @@
#!/usr/local/hirad-venv/bin/python
import time
import argparse
import httpcore
import httpx
from libs.networks import Network
from libs.database import Sync
from libs.track import get_track_details, get_duration_mbid
def get_last_lonestar_track():
last_track = lonestar_network.get_tracks(1, None, None)[0]
artist, title, _, date, timestamp = get_track_details(last_track)
print(f"Last LoneStar track: {date} - {artist} - {title} - {timestamp}")
return timestamp
def get_lastfm_tracks(first_ts, last_ts):
tracks_list = lastfm_network.get_tracks(None, first_ts, last_ts)
print(f"There is {len(tracks_list)} tracks from LastFM to be synced to LoneStar.")
for i, track in enumerate(tracks_list):
artist, title, album, date, timestamp = get_track_details(track)
duration, mbid = get_duration_mbid(track)
print(f'{i + 1}. {artist} - {title} - {date} - {timestamp}')
database.write_into(date, artist, title, album, duration, timestamp, mbid)
print("Finished adding tracks to the database")
def sync_scrobbles():
tracks_list = database.select_all()
for i, track in enumerate(tracks_list):
date, artist, title, album, duration, timestamp, mbid = track
print(f"Syncing {i + 1}. {artist} - {title} - {timestamp}")
try:
lonestar_network.scrobble(
artist=artist,
title=title,
album=album,
duration=duration,
timestamp=timestamp,
mbid=mbid
)
drop_track = database.drop_track(title, timestamp)
if drop_track:
print("Track synced and dropped from db")
except Exception as e:
print("An error occurred while syncing", e)
def arguments():
parser = argparse.ArgumentParser(description="Sync scrobbles between LastFM and LoneStar")
parser.add_argument('-q', action='store_true', help="Quiet run. Not asking for timestamps")
args = parser.parse_args()
if args.q:
first_ts = get_last_lonestar_track()
last_ts = int(time.time())
else:
first_ts = input("Enter first track timestamp: ")
if len(first_ts) != 10:
first_ts = get_last_lonestar_track()
last_ts = input("Enter last track timestamp: ")
if len(last_ts) != 10:
last_ts = int(time.time())
return first_ts, last_ts
if __name__ == '__main__':
database = Sync()
lastfm_network = Network(Network.get_network("LastFM"))
lonestar_network = Network(Network.get_network("LoneStar"))
first_timestamp, last_timestamp = arguments()
try:
get_lastfm_tracks(first_timestamp, last_timestamp)
except (httpx.ReadTimeout, httpcore.ReadTimeout) as err:
print("Network read timeout {}. Retrying...".format(err))
time.sleep(2)
get_lastfm_tracks(first_timestamp, last_timestamp)
sync_scrobbles()

26
tracks_list.py Normal file
View file

@ -0,0 +1,26 @@
#!/usr/local/hirad-venv/bin/python
from libs.networks import Network
from libs.track import get_track_details
def get_tracks_list(tracks_list, net_name):
with open(net_name + ".txt", "w") as file:
for i, track in enumerate(tracks_list):
artist, title, _, date, timestamp = get_track_details(track)
line = f"{artist} - {title} - {timestamp}\n"
file.write(line)
print(line)
if __name__ == '__main__':
network = None
net = input("Enter network name: ")
if net == "lastfm":
network = Network(Network.get_network("LastFM"))
elif net == "lonestar":
network = Network(Network.get_network("LoneStar"))
number_of_tracks = int(input("Enter number of tracks: "))
result = network.get_tracks(number_of_tracks, None, None)
get_tracks_list(result, net)