Initial commit
This commit is contained in:
parent
d38a6dd66b
commit
be9a605dac
28 changed files with 1994 additions and 0 deletions
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
0
src/srtify/__init__.py
Normal file
0
src/srtify/__init__.py
Normal file
18
src/srtify/__main__.py
Normal file
18
src/srtify/__main__.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
import sys
|
||||
from srtify.cli.commands import cli
|
||||
|
||||
|
||||
def main():
|
||||
""" Main entry point for the application. """
|
||||
try:
|
||||
cli()
|
||||
except KeyboardInterrupt:
|
||||
print("\nGoodbye!")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
print(f"❌ An error occurred: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
0
src/srtify/cli/__init__.py
Normal file
0
src/srtify/cli/__init__.py
Normal file
276
src/srtify/cli/cli_handler.py
Normal file
276
src/srtify/cli/cli_handler.py
Normal file
|
@ -0,0 +1,276 @@
|
|||
import shutil
|
||||
from .menu import MainMenu, PromptMenu, SettingsMenu
|
||||
from srtify.core.prompts import PromptsManager
|
||||
from srtify.core.settings import SettingsManager
|
||||
from srtify.utils.utils import clear_screen, fancy_headline
|
||||
|
||||
|
||||
class CliHandler:
|
||||
def __init__(self, settings_manager: SettingsManager, prompts_manager: PromptsManager):
|
||||
self.settings_manager = settings_manager
|
||||
self.prompts_manager = prompts_manager
|
||||
self.settings_menu = SettingsMenu()
|
||||
|
||||
def handle_main_menu(self):
|
||||
"""Handle the main menu."""
|
||||
main_menu = MainMenu()
|
||||
|
||||
|
||||
while True:
|
||||
clear_screen()
|
||||
print(fancy_headline("Gemini SRT Translator", "rounded"))
|
||||
summary = self.settings_manager.get_settings_summary()
|
||||
print(f"API Key: {summary['API Key']}")
|
||||
print(f"Target Language: {summary['Language']}")
|
||||
print(f"Input Directory: {summary['Input Directory']}")
|
||||
print(f"Output Directory: {summary['Output Directory']}")
|
||||
print(f"Batch Size: {summary['Batch Size']}")
|
||||
|
||||
choice = main_menu.select()
|
||||
|
||||
if choice is None or choice == "exit":
|
||||
print(fancy_headline("Goodbye!", "rounded"))
|
||||
return None
|
||||
|
||||
try:
|
||||
match choice:
|
||||
case "translate":
|
||||
return "translate"
|
||||
case "prompts":
|
||||
self.handle_prompts_menu()
|
||||
case "settings":
|
||||
self.handle_settings_menu()
|
||||
case "quick":
|
||||
# TODO: Implement quick translation
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
print("\nReturning to main menu...")
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Main Menu: {e}")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
def handle_prompts_menu(self):
|
||||
while True:
|
||||
try:
|
||||
print(fancy_headline("Prompts Menu", "rounded"))
|
||||
|
||||
prompts = self.prompts_manager.get_all_prompts()
|
||||
prompts_menu = PromptMenu(prompts)
|
||||
name, choice = prompts_menu.select(clear_screen=True)
|
||||
|
||||
if choice == 'new':
|
||||
self.add_prompt()
|
||||
elif choice == 'main':
|
||||
break
|
||||
else:
|
||||
print(f"Selected prompt: {name}: {choice}")
|
||||
self.select_prompt(name, choice)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nReturning to main menu...")
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Prompts Menu: {e}")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
def add_prompt(self):
|
||||
"""Add a new prompt."""
|
||||
name = input("Enter prompt name: ").strip()
|
||||
if not name:
|
||||
print("❌ Prompt name cannot be empty!")
|
||||
return
|
||||
if self.prompts_manager.prompt_exists(name):
|
||||
print(f"❌ Prompt '{name}' already exists!")
|
||||
input("Press Enter to continue...")
|
||||
return
|
||||
|
||||
description = input("Enter prompt description: ").strip()
|
||||
if not description:
|
||||
print("❌ Prompt description cannot be empty!")
|
||||
return
|
||||
|
||||
try:
|
||||
if self.prompts_manager.add_prompt(name, description):
|
||||
print(f"✓ Prompt '{name}' added successfully!")
|
||||
else:
|
||||
print(f"❌ Failed to add prompt '{name}'!")
|
||||
except ValueError as e:
|
||||
print(f"❌ Error: {e}")
|
||||
|
||||
input("Press Enter to continue...")
|
||||
|
||||
def select_prompt(self, *prompt):
|
||||
"""Select a prompt."""
|
||||
prompt_menu = PromptMenu()
|
||||
choice = prompt_menu.prompt_options()
|
||||
|
||||
if choice is None:
|
||||
print("❌ No option selected!")
|
||||
return
|
||||
|
||||
if choice == "select":
|
||||
pass
|
||||
elif choice == "edit":
|
||||
self.edit_prompt(*prompt)
|
||||
elif choice == "delete":
|
||||
self.delete_prompt(*prompt)
|
||||
elif choice == "back":
|
||||
return
|
||||
|
||||
def edit_prompt(self, *prompt):
|
||||
"""Edit a prompt."""
|
||||
print(f"Prompt: {prompt[0]}")
|
||||
print('-' * (len(prompt[0]) + 8))
|
||||
print(f"Description: {prompt[1]}")
|
||||
|
||||
try:
|
||||
print("=" * shutil.get_terminal_size().columns)
|
||||
new_prompt = input("Enter new description: ").strip()
|
||||
if new_prompt and self.prompts_manager.update_prompt(prompt[0], new_prompt):
|
||||
print(f"✓ Prompt '{prompt[0]}' updated successfully!")
|
||||
else:
|
||||
print(f"❌ Failed to update prompt '{prompt[0]}'!")
|
||||
except (ValueError, IndexError) as e:
|
||||
print(f"❌ Error: {e}")
|
||||
|
||||
def delete_prompt(self, *prompt):
|
||||
"""Delete a prompt."""
|
||||
try:
|
||||
if input(f"Are you sure you want to delete prompt '{prompt[0]}'? (y/n) ").strip().lower().endswith('y'):
|
||||
if self.prompts_manager.delete_prompt(prompt[0]):
|
||||
print(f"✓ Prompt '{prompt[0]}' deleted successfully!")
|
||||
else:
|
||||
print(f"❌ Failed to delete prompt '{prompt[0]}'!")
|
||||
except (ValueError, IndexError) as e:
|
||||
print(f"❌ Error: {e}")
|
||||
|
||||
def search_prompts(self, search_term):
|
||||
"""Search for prompts."""
|
||||
if not search_term:
|
||||
search_term = input("Enter search term: ").strip()
|
||||
if not search_term:
|
||||
print("❌ Search term cannot be empty!")
|
||||
exit()
|
||||
|
||||
results = self.prompts_manager.search_prompts(search_term)
|
||||
if results:
|
||||
if len(results) > 1:
|
||||
print(f"Found {len(results)} prompts matching '{search_term}':")
|
||||
print("=" * shutil.get_terminal_size().columns)
|
||||
for i, name, description in enumerate(results.items()):
|
||||
print(f"{i+1} - {name}: {description}")
|
||||
print("=" * shutil.get_terminal_size().columns)
|
||||
choice = input("Enter the number of the prompt you want to select: ").strip()
|
||||
if choice.isdigit() and 1 <= int(choice) <= len(results):
|
||||
prompt_name = list(results.keys())[int(choice) - 1]
|
||||
print(f"Selected prompt: {prompt_name}")
|
||||
return results[prompt_name]
|
||||
else:
|
||||
print("❌ Invalid choice!")
|
||||
exit()
|
||||
else:
|
||||
for name, description in results.items():
|
||||
print(f"Found prompt '{name}':\n{description}")
|
||||
return results[name]
|
||||
|
||||
return None
|
||||
|
||||
def handle_settings_menu(self):
|
||||
"""Handle the settings menu."""
|
||||
while True:
|
||||
summary = self.settings_manager.get_settings_summary()
|
||||
choice = self.settings_menu.select()
|
||||
|
||||
if choice is None:
|
||||
print("Settings selection cancelled.")
|
||||
break
|
||||
|
||||
try:
|
||||
match choice:
|
||||
case 'view':
|
||||
print(fancy_headline("Settings Summary"))
|
||||
for key, value in summary.items():
|
||||
print(f" {key}: {value}")
|
||||
print("=" * 50)
|
||||
input("Press Enter to continue...")
|
||||
case 'api':
|
||||
self.set_api_key()
|
||||
case 'dirs':
|
||||
self.set_directories()
|
||||
case 'translation':
|
||||
self.set_translation_preferences()
|
||||
case 'reset':
|
||||
if input("Reset all settings to default? (y/N) ").strip().lower().endswith('y'):
|
||||
print("Resetting settings...")
|
||||
if self.settings_manager.reset_to_defaults():
|
||||
print("✓ Settings reset to default successfully!")
|
||||
else:
|
||||
print("❌ Failed to reset settings to default!")
|
||||
else:
|
||||
print("Settings reset cancelled.")
|
||||
input("Press Enter to continue...")
|
||||
case 'back':
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
print("\nReturning to main menu...")
|
||||
except Exception as e:
|
||||
print(f"❌ Error in Settings Menu: {e}")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
def set_api_key(self):
|
||||
"""Set API key."""
|
||||
current_key = self.settings_manager.get_api_key()
|
||||
if current_key:
|
||||
masked_key = f"...{current_key[-6:]}" if len(current_key) > 6 else "Set"
|
||||
print(f"Current API Key: {masked_key}")
|
||||
|
||||
new_key = input("Enter new API key: ").strip()
|
||||
if new_key:
|
||||
try:
|
||||
if self.settings_manager.set_api_key(new_key):
|
||||
print("✓ API key set successfully!")
|
||||
else:
|
||||
print("❌ Failed to set API key!")
|
||||
except ValueError as e:
|
||||
print(f"❌ Error setting API key: {e}")
|
||||
|
||||
def set_directories(self):
|
||||
"""Set input and output directories."""
|
||||
current_input, current_output = self.settings_manager.get_directories()
|
||||
print(f"Current Input Directory: {current_input}")
|
||||
print(f"Current Output Directory: {current_output}")
|
||||
|
||||
new_input = input("Enter new input directory: ").strip()
|
||||
new_output = input("Enter new output directory: ").strip()
|
||||
|
||||
if new_input or new_output:
|
||||
try:
|
||||
if self.settings_manager.set_directories(
|
||||
new_input or current_input,
|
||||
new_output or current_output
|
||||
):
|
||||
print("✓ Directories set successfully!")
|
||||
else:
|
||||
print("❌ Failed to set directories!")
|
||||
except ValueError as e:
|
||||
print(f"❌ Error setting directories: {e}")
|
||||
|
||||
def set_translation_preferences(self):
|
||||
"""Set translation preferences."""
|
||||
current_lang, current_batch = self.settings_manager.get_translation_config()
|
||||
|
||||
print(f"Current Language: {current_lang}")
|
||||
print(f"Current Batch Size: {current_batch}")
|
||||
|
||||
new_lang = input("Enter new language (or leave blank to keep current): ").strip()
|
||||
new_batch = input("Enter new batch size (or leave blank to keep current): ").strip()
|
||||
|
||||
try:
|
||||
batch_size = int(new_batch) if new_batch else current_batch
|
||||
language = new_lang if new_lang else current_lang
|
||||
|
||||
if self.settings_manager.set_translation_config(language, batch_size):
|
||||
print("✓ Translation preferences set successfully!")
|
||||
else:
|
||||
print("❌ Failed to set translation preferences!")
|
||||
except ValueError as e:
|
||||
print(f"❌ Error setting translation preferences: {e}")
|
104
src/srtify/cli/commands.py
Normal file
104
src/srtify/cli/commands.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
import click
|
||||
from pathlib import Path
|
||||
from .cli_handler import CliHandler
|
||||
from srtify.core.translator import TranslatorApp
|
||||
from srtify.core.settings import SettingsManager
|
||||
from srtify.core.prompts import PromptsManager
|
||||
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@click.pass_context
|
||||
def cli(ctx):
|
||||
""" Gemini SRT Translator - Translate subtitle files using AI """
|
||||
if ctx.invoked_subcommand is None:
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
cli_handler = CliHandler(settings, prompts)
|
||||
cli_handler.handle_main_menu()
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.argument('input_path', type=click.Path(exists=True, path_type=Path))
|
||||
@click.option('--output', '-o', type=click.Path(path_type=Path),
|
||||
help='Output directory (default: from settings)')
|
||||
@click.option('--language', '-l', help='Target language (default: from settings)')
|
||||
@click.option('--file', '-f', help='Specific file to translate (default: all .srt files)')
|
||||
@click.option('--prompt', '-p', help='Search for a specific prompt by name')
|
||||
@click.option('--custom-prompt', help='Use a custom prompt')
|
||||
@click.option('--batch-size', '-b', type=int, help='Batch size for translation')
|
||||
@click.option('-quick', '-q', is_flag=True, help='Quick translation with default prompt')
|
||||
def translate(input_path, output, language, file, prompt, custom_prompt, batch_size, quick):
|
||||
""" Translate SRT files from INPUT_PATH """
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
|
||||
app = TranslatorApp(settings, prompts)
|
||||
|
||||
options = {
|
||||
'input_path': input_path,
|
||||
'output_path': output,
|
||||
'language': language,
|
||||
'file': file,
|
||||
'prompt': prompt,
|
||||
'custom_prompt': custom_prompt,
|
||||
'batch_size': batch_size,
|
||||
'quick': quick
|
||||
}
|
||||
|
||||
app.run_translation(options)
|
||||
|
||||
|
||||
@cli.command()
|
||||
def interactive():
|
||||
""" Start interactive translation mode ."""
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
cli_handler = CliHandler(settings, prompts)
|
||||
cli_handler.handle_main_menu()
|
||||
|
||||
|
||||
@cli.command()
|
||||
def settings():
|
||||
""" Configure application settings. """
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
cli_handler = CliHandler(settings, prompts)
|
||||
cli_handler.handle_settings_menu()
|
||||
|
||||
|
||||
@cli.command()
|
||||
def prompts():
|
||||
""" Manage translation prompts. """
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
cli_handler = CliHandler(settings, prompts)
|
||||
cli_handler.handle_prompts_menu()
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument('search_term')
|
||||
def search_prompts(search_term):
|
||||
""" Search for prompts by name or description. """
|
||||
prompts = PromptsManager()
|
||||
results = prompts.search_prompts(search_term)
|
||||
|
||||
if results:
|
||||
click.echo(f"Found {len(results)} prompts matching {search_term}...")
|
||||
for name, description in results.items():
|
||||
click.echo(f" {name}: {description[:100]}{'...' if len(description) else ''}")
|
||||
else:
|
||||
click.echo(f"No prompts found matching '{search_term}'")
|
||||
|
||||
|
||||
@cli.command()
|
||||
def status():
|
||||
""" Show current configuration status. """
|
||||
settings = SettingsManager()
|
||||
prompts = PromptsManager()
|
||||
|
||||
click.echo("=== Configuration Status ===")
|
||||
summary = settings.get_settings_summary()
|
||||
for key, value in summary.items():
|
||||
click.echo(f" {key}: {value}")
|
||||
|
||||
click.echo(f"\nPrompts: {prompts.count_prompts()} total")
|
110
src/srtify/cli/menu.py
Normal file
110
src/srtify/cli/menu.py
Normal file
|
@ -0,0 +1,110 @@
|
|||
from typing import Dict
|
||||
from simple_term_menu import TerminalMenu
|
||||
|
||||
|
||||
class Menu:
|
||||
def __init__(self, title, style=None):
|
||||
self.title = title
|
||||
self.style = style or {
|
||||
"menu_highlight_style": ("bg_green", "fg_black"),
|
||||
"search_highlight_style": ("bg_red", "fg_black"),
|
||||
"cycle_cursor": True,
|
||||
"search_key": '/'
|
||||
}
|
||||
|
||||
def show(self, items, clear_screen=False):
|
||||
return TerminalMenu(
|
||||
items,
|
||||
clear_screen=clear_screen,
|
||||
title=self.title,
|
||||
**self.style
|
||||
)
|
||||
|
||||
|
||||
class MainMenu(Menu):
|
||||
def __init__(self):
|
||||
super().__init__("====== Gemini SRT Translator ======")
|
||||
self.options = {
|
||||
"Start Translation": "translate",
|
||||
"Prompts Menu": "prompts",
|
||||
"Settings Menu": "settings",
|
||||
"Quick Translation (Default Settings)": "quick",
|
||||
"Exit": "exit"
|
||||
}
|
||||
|
||||
def select(self):
|
||||
option_names = list(self.options.keys())
|
||||
menu = self.show(option_names, clear_screen=False)
|
||||
selected_index = menu.show()
|
||||
|
||||
if selected_index is not None:
|
||||
selected_key = option_names[selected_index]
|
||||
return self.options[selected_key]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class PromptMenu(Menu):
|
||||
def __init__(self, prompts_list: Dict[str, str] = None):
|
||||
super().__init__("====== Prompt Menu ======")
|
||||
self.prompts_list = prompts_list if prompts_list is not None else {}
|
||||
self.options = {
|
||||
"Add New Prompt": "new",
|
||||
"Back to Main Menu": "main"
|
||||
}
|
||||
# if prompts_list:
|
||||
self.prompts_list.update(self.options)
|
||||
|
||||
def select(self, clear_screen=False):
|
||||
prompts_list = list(self.prompts_list.keys())
|
||||
menu = self.show(prompts_list, clear_screen)
|
||||
selected_index = menu.show()
|
||||
|
||||
if selected_index is not None:
|
||||
selected_key = prompts_list[selected_index]
|
||||
selected_value = self.prompts_list[selected_key]
|
||||
return selected_key, selected_value
|
||||
|
||||
print("No option selected.")
|
||||
return None
|
||||
|
||||
def prompt_options(self):
|
||||
options = {
|
||||
"Select": "select",
|
||||
"Edit": "edit",
|
||||
"Delete": "delete",
|
||||
"Back": "back"
|
||||
}
|
||||
options_list = list(options.keys())
|
||||
menu = self.show(options_list, clear_screen=True)
|
||||
selected_index = menu.show()
|
||||
|
||||
if selected_index is not None:
|
||||
selected_key = options_list[selected_index]
|
||||
return options[selected_key]
|
||||
|
||||
print("No option selected.")
|
||||
return None
|
||||
|
||||
|
||||
class SettingsMenu(Menu):
|
||||
def __init__(self):
|
||||
super().__init__("====== Settings Menu ======")
|
||||
self.options = {
|
||||
"View Current Settings": "view",
|
||||
"Set API Key": "api",
|
||||
"Set Input/Output Directories": "dirs",
|
||||
"Set Translation Preferences": "translation",
|
||||
"Reset to Default": "reset",
|
||||
"Back to Main Menu": "back"
|
||||
}
|
||||
|
||||
def select(self):
|
||||
option_names = list(self.options.keys())
|
||||
menu = self.show(option_names, clear_screen=True)
|
||||
selected_index = menu.show()
|
||||
|
||||
if selected_index is not None:
|
||||
return self.options[option_names[selected_index]]
|
||||
print("Settings selection cancelled.")
|
||||
return None
|
0
src/srtify/core/__init__.py
Normal file
0
src/srtify/core/__init__.py
Normal file
62
src/srtify/core/file_encoding.py
Normal file
62
src/srtify/core/file_encoding.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
import chardet
|
||||
|
||||
|
||||
def detect_and_fix_encoding(path: str, file_name: str, target_encoding='utf-8'):
|
||||
"""
|
||||
Detect and convert file encoding to target encoding, replacing the original file.
|
||||
:param path:
|
||||
:param file_name:
|
||||
:param target_encoding:
|
||||
:return: bool: True if successful, False otherwise
|
||||
"""
|
||||
|
||||
path = Path(path)
|
||||
file_path = path / file_name
|
||||
if not file_path.exists():
|
||||
print(f"✗ File {file_path} does not exist.")
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(file_path, 'rb') as f:
|
||||
raw_data = f.read()
|
||||
encoding_info = chardet.detect(raw_data)
|
||||
detected_encoding = encoding_info['encoding']
|
||||
|
||||
if not detected_encoding:
|
||||
print(f"✗ Unable to detect encoding for file {file_path}")
|
||||
return False
|
||||
|
||||
accepted_encodings = ['utf-8', 'utf-8-sig']
|
||||
if detected_encoding.lower() in accepted_encodings:
|
||||
print(f"✓ Encoding for file {file_name} is already {detected_encoding}")
|
||||
return True
|
||||
|
||||
print(f"Converting {file_name} from {detected_encoding} to {target_encoding}...")
|
||||
|
||||
with open(file_path, 'r', encoding=detected_encoding) as f:
|
||||
content = f.read()
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', encoding=target_encoding,
|
||||
dir=path, delete=False) as temp_file:
|
||||
temp_file.write(content)
|
||||
temp_path = Path(temp_file.name)
|
||||
|
||||
temp_path.replace(file_path)
|
||||
|
||||
print(f"✓ Converted {file_name} from {detected_encoding} to {target_encoding}")
|
||||
return True
|
||||
|
||||
except (UnicodeDecodeError, UnicodeEncodeError) as e:
|
||||
print(f"✗ Error converting {file_name} to {target_encoding}: {e}")
|
||||
if 'temp_path' in locals() and os.path.exists(temp_path):
|
||||
temp_path.unlink()
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error processing {file_name}: {e}")
|
||||
if 'temp_path' in locals() and os.path.exists(temp_path):
|
||||
temp_path.unlink()
|
||||
return False
|
166
src/srtify/core/models.py
Normal file
166
src/srtify/core/models.py
Normal file
|
@ -0,0 +1,166 @@
|
|||
import os
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Dict, Optional, Any
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApiConfig:
|
||||
"""API Configuration settings."""
|
||||
gemini_key: str = ""
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
return bool(self.gemini_key.strip())
|
||||
|
||||
|
||||
@dataclass
|
||||
class DirectoryConfig:
|
||||
"""Directory configuration settings."""
|
||||
input_dir: str = ""
|
||||
output_dir: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
"""Expand user paths after initialization."""
|
||||
if self.input_dir:
|
||||
self.input_dir = os.path.expanduser(self.input_dir)
|
||||
if self.output_dir:
|
||||
self.output_dir = os.path.expanduser(self.output_dir)
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Validate directory paths."""
|
||||
if not self.input_dir or not self.output_dir:
|
||||
return False
|
||||
|
||||
try:
|
||||
Path(self.input_dir).mkdir(parents=True, exist_ok=True)
|
||||
Path(self.output_dir).mkdir(parents=True, exist_ok=True)
|
||||
return True
|
||||
except (OSError, PermissionError):
|
||||
return False
|
||||
|
||||
|
||||
@dataclass
|
||||
class TranslationConfig:
|
||||
"""Translation configuration settings."""
|
||||
default_language: str = "persian"
|
||||
batch_size: int = 300
|
||||
|
||||
def __post_init__(self):
|
||||
if self.batch_size <= 0:
|
||||
self.batch_size = 300
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppSettings:
|
||||
api: ApiConfig = field(default_factory=ApiConfig)
|
||||
directories: DirectoryConfig = field(default_factory=DirectoryConfig)
|
||||
translation: TranslationConfig = field(default_factory=TranslationConfig)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert settings to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'AppSettings':
|
||||
api_data = data.get('api', {})
|
||||
dir_data = data.get('directories', {})
|
||||
trans_data = data.get('translation', {})
|
||||
|
||||
return cls(
|
||||
api=ApiConfig(**api_data),
|
||||
directories=DirectoryConfig(**dir_data),
|
||||
translation=TranslationConfig(**trans_data)
|
||||
)
|
||||
|
||||
def validate(self) -> bool:
|
||||
return (self.api.is_valid() and
|
||||
self.directories.validate() and
|
||||
self.translation.batch_size > 0)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptItem:
|
||||
"""Individual prompt item."""
|
||||
name: str
|
||||
description: str
|
||||
created_at: Optional[str] = None
|
||||
last_used: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
"""Set creation item if not provided."""
|
||||
if not self.created_at:
|
||||
self.created_at = datetime.now().isoformat()
|
||||
|
||||
|
||||
@dataclass
|
||||
class PromptsCollection:
|
||||
"""Collection of prompts with metadata."""
|
||||
prompts: Dict[str, PromptItem] = field(default_factory=dict)
|
||||
version: str = "1.0"
|
||||
|
||||
def add_prompt(self, name: str, description: str) -> bool:
|
||||
"""Add a new prompt."""
|
||||
if name in self.prompts:
|
||||
return False
|
||||
|
||||
self.prompts[name] = PromptItem(name=name, description=description)
|
||||
return True
|
||||
|
||||
def update_prompt(self, name: str, description: str) -> bool:
|
||||
"""Update an existing prompt."""
|
||||
if name not in self.prompts:
|
||||
return False
|
||||
|
||||
self.prompts[name].description = description
|
||||
return True
|
||||
|
||||
def delete_prompt(self, name: str) -> bool:
|
||||
"""Delete a prompt."""
|
||||
if name not in self.prompts:
|
||||
return False
|
||||
|
||||
del self.prompts[name]
|
||||
return True
|
||||
|
||||
def get_prompt_names(self) -> list[str]:
|
||||
"""Get a list of prompt names."""
|
||||
return list(self.prompts.keys())
|
||||
|
||||
def get_prompt_descriptions(self) -> Dict[str, str]:
|
||||
"""Get dictionary of name -> description mapping."""
|
||||
return {name: prompt.description for name, prompt in self.prompts.items()}
|
||||
|
||||
def mark_used(self, name: str):
|
||||
"""Mark prompt as recently used."""
|
||||
if name in self.prompts:
|
||||
self.prompts[name].last_used = datetime.now().isoformat()
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert dictionary from JSON serialization."""
|
||||
return {
|
||||
'prompts': {name: asdict(prompt) for name, prompt in self.prompts.items()},
|
||||
'version': self.version
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'PromptsCollection':
|
||||
"""Create instance from dictionary."""
|
||||
prompts_data = data.get('prompts', {})
|
||||
prompts = {}
|
||||
|
||||
for name, prompt_data in prompts_data.items():
|
||||
prompts[name] = PromptItem(**prompt_data)
|
||||
|
||||
return cls(
|
||||
prompts=prompts,
|
||||
version=data.get('version', '1.0')
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_legacy_dict(cls, data: Dict[str, str]) -> 'PromptsCollection':
|
||||
"""Create instance from legacy dictionary."""
|
||||
collection = cls()
|
||||
for name, description in data.items():
|
||||
collection.add_prompt(name, description)
|
||||
return collection
|
255
src/srtify/core/prompts.py
Normal file
255
src/srtify/core/prompts.py
Normal file
|
@ -0,0 +1,255 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from srtify.core.models import PromptsCollection, PromptItem
|
||||
from srtify.utils.paths import get_prompts_path
|
||||
|
||||
|
||||
class PromptsManager:
|
||||
"""Prompts manager class."""
|
||||
def __init__(self, file_path: str = None):
|
||||
self.file_path = Path(file_path) if file_path else get_prompts_path()
|
||||
self.collection = self._load_prompts()
|
||||
|
||||
def _load_prompts(self) -> PromptsCollection:
|
||||
"""Load prompts from file or create an empty collection."""
|
||||
self.file_path.parent.mkdir(exist_ok=True)
|
||||
|
||||
if not self.file_path.exists():
|
||||
collection = PromptsCollection()
|
||||
self._save_prompts(collection)
|
||||
return collection
|
||||
|
||||
try:
|
||||
with open(self.file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if isinstance(data, dict) and 'prompts' not in data:
|
||||
print("Converting legacy prompts format...")
|
||||
collection = PromptsCollection.from_legacy_dict(data)
|
||||
self._save_prompts(collection)
|
||||
return collection
|
||||
|
||||
return PromptsCollection.from_dict(data)
|
||||
|
||||
except (json.JSONDecodeError, KeyError, TypeError) as e:
|
||||
print(f"Error loading prompts file: {e}. Creating new collection.")
|
||||
|
||||
backup_file = self.file_path.with_suffix('.json.backup')
|
||||
if self.file_path.exists():
|
||||
self.file_path.rename(backup_file)
|
||||
print(f"Backed up corrupt prompts file to {backup_file}")
|
||||
|
||||
collection = PromptsCollection()
|
||||
self._save_prompts(collection)
|
||||
return collection
|
||||
|
||||
def _save_prompts(self, collection: Optional[PromptsCollection] = None) -> bool:
|
||||
"""Save current prompts to file."""
|
||||
if collection is None:
|
||||
collection = self.collection
|
||||
|
||||
try:
|
||||
with open(self.file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(collection.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
return True
|
||||
except (IOError, OSError) as e:
|
||||
print(f"Error saving prompts file: {e}")
|
||||
return False
|
||||
|
||||
def reload(self) -> bool:
|
||||
"""Reload prompts from file."""
|
||||
try:
|
||||
self.collection = self._load_prompts()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error reloading prompts: {e}")
|
||||
return False
|
||||
|
||||
def add_prompt(self, name: str, description: str) -> bool:
|
||||
"""Add a new prompt."""
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Prompt name cannot be empty.")
|
||||
if not description or not description.strip():
|
||||
raise ValueError("Prompt description cannot be empty.")
|
||||
|
||||
name = name.strip()
|
||||
description = description.strip()
|
||||
|
||||
if self.collection.add_prompt(name, description):
|
||||
if self._save_prompts():
|
||||
# print(f"✓ Prompt '{name}' added successfully.")
|
||||
return True
|
||||
else:
|
||||
self.collection.delete_prompt(name)
|
||||
# print(f"✗ Failed to save prompt '{name}'.")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ Prompt '{name}' already exists.")
|
||||
return False
|
||||
|
||||
def update_prompt(self, name: str, description: str) -> bool:
|
||||
"""Update an existing prompt."""
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Prompt name cannot be empty.")
|
||||
if not description or not description.strip():
|
||||
raise ValueError("Prompt description cannot be empty.")
|
||||
|
||||
name = name.strip()
|
||||
description = description.strip()
|
||||
|
||||
if self.collection.update_prompt(name, description):
|
||||
if self._save_prompts():
|
||||
print(f"✓ Prompt '{name}' updated successfully.")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ Failed to save prompt '{name}'.")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ Prompt '{name}' does not exist.")
|
||||
return False
|
||||
|
||||
def delete_prompt(self, name: str) -> bool:
|
||||
"""Delete a prompt."""
|
||||
if not name or not name.strip():
|
||||
raise ValueError("Prompt name cannot be empty.")
|
||||
|
||||
name = name.strip()
|
||||
|
||||
backup_prompt = self.collection.prompts.get(name)
|
||||
|
||||
if self.collection.delete_prompt(name):
|
||||
if self._save_prompts():
|
||||
print(f"✓ Prompt '{name}' deleted successfully.")
|
||||
return True
|
||||
else:
|
||||
if backup_prompt:
|
||||
self.collection.prompts[name] = backup_prompt
|
||||
print(f"✗ Failed to save prompt '{name}'.")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ Prompt '{name}' does not exist.")
|
||||
return False
|
||||
|
||||
def get_prompt(self, name: str) -> Optional[str]:
|
||||
"""Get a specific prompt description by name."""
|
||||
if not name or name not in self.collection.prompts:
|
||||
return None
|
||||
|
||||
self.collection.mark_used(name)
|
||||
self._save_prompts()
|
||||
|
||||
return self.collection.prompts[name].description
|
||||
|
||||
def prompt_exists(self, name: str) -> bool:
|
||||
"""Check if a prompt exists by name."""
|
||||
return name in self.collection.prompts
|
||||
|
||||
def get_all_prompts(self) -> Dict[str, str]:
|
||||
"""Get all prompts as a dictionary of name -> description."""
|
||||
return self.collection.get_prompt_descriptions()
|
||||
|
||||
def get_prompt_names(self) -> List[str]:
|
||||
return self.collection.get_prompt_names()
|
||||
|
||||
def get_prompt_details(self, name: str) -> Optional[PromptItem]:
|
||||
"""Get prompt details by name."""
|
||||
return self.collection.prompts.get(name)
|
||||
|
||||
def search_prompts(self, query: str) -> Dict[str, str]:
|
||||
query = query.lower().strip()
|
||||
if not query:
|
||||
return self.get_all_prompts()
|
||||
|
||||
results = {}
|
||||
for name, prompt in self.collection.prompts.items():
|
||||
if (query in name.lower() or
|
||||
query in prompt.description.lower()):
|
||||
results[name] = prompt.description
|
||||
|
||||
return results
|
||||
|
||||
def get_recently_used_prompts(self, limit: int = 5) -> Dict[str, str]:
|
||||
"""Get a list of recently used prompts."""
|
||||
sorted_prompts = sorted(
|
||||
self.collection.prompts.items(),
|
||||
key=lambda x: x[1].last_used or "",
|
||||
reverse=True
|
||||
)
|
||||
|
||||
results = {}
|
||||
for name, prompt in sorted_prompts[:limit]:
|
||||
if prompt.last_used:
|
||||
results[name] = prompt.description
|
||||
|
||||
return results
|
||||
|
||||
def count_prompts(self) -> int:
|
||||
return len(self.collection.prompts)
|
||||
|
||||
def export_prompts(self, file_path: str, file_format: str = "json") -> bool:
|
||||
"""Export prompts to file"""
|
||||
try:
|
||||
export_path = Path(file_path)
|
||||
|
||||
if file_format.lower() == "json":
|
||||
with open(export_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.collection.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
elif file_format.lower() == "txt":
|
||||
with open(export_path, 'w', encoding='utf-8') as f:
|
||||
for name, prompt in self.collection.prompts.items():
|
||||
f.write(f"=== {name} ===\n")
|
||||
f.write(f"{prompt.description}\n")
|
||||
f.write("-" * 50 + "\n\n")
|
||||
else:
|
||||
raise ValueError("Unsupported export format. Use 'json' or 'txt'.")
|
||||
print(f"✓ Prompts exported successfully to {export_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"✗ Error exporting prompts: {e}")
|
||||
return False
|
||||
|
||||
def import_prompts(self, file_path: str, merge: bool = True) -> bool:
|
||||
"""Import prompts from file"""
|
||||
try:
|
||||
import_path = Path(file_path)
|
||||
|
||||
with open(import_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
imported_collection = PromptsCollection.from_dict(data)
|
||||
|
||||
if merge:
|
||||
conflicts = []
|
||||
for name, prompt in imported_collection.prompts.items():
|
||||
if name in self.collection.prompts:
|
||||
conflicts.append(name)
|
||||
else:
|
||||
self.collection.prompts[name] = prompt
|
||||
|
||||
if conflicts:
|
||||
print(
|
||||
f"⚠️ {len(conflicts)} Conflicts detected for prompts: {', '.join(conflicts)}. Skipping these.")
|
||||
|
||||
print(
|
||||
f"✓ Imported {len(imported_collection.prompts) - len(conflicts)} prompts with {len(conflicts)} conflicts.")
|
||||
else:
|
||||
self.collection = imported_collection
|
||||
print(f"✓ Imported {len(imported_collection.prompts)} prompts, replacing existing collection.")
|
||||
|
||||
return self._save_prompts()
|
||||
except Exception as e:
|
||||
print(f"✗ Error importing prompts: {e}")
|
||||
return False
|
||||
|
||||
def get_prompts_summary(self) -> dict:
|
||||
"""Get a summary of current prompts."""
|
||||
recently_used = len(self.get_recently_used_prompts())
|
||||
total = self.count_prompts()
|
||||
|
||||
return {
|
||||
"Total Prompts": total,
|
||||
"Recently Used": recently_used,
|
||||
"Unused": total - recently_used,
|
||||
"Collection Version": self.collection.version
|
||||
}
|
190
src/srtify/core/settings.py
Normal file
190
src/srtify/core/settings.py
Normal file
|
@ -0,0 +1,190 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
from typing import Tuple, Optional
|
||||
from srtify.core.models import AppSettings, ApiConfig, DirectoryConfig, TranslationConfig
|
||||
from srtify.utils.paths import get_config_path
|
||||
|
||||
|
||||
home_dir = Path.home()
|
||||
DEFAULT_INPUT_DIR = str(home_dir / "Documents" / "Subtitles")
|
||||
DEFAULT_OUTPUT_DIR = str(home_dir / "Documents" / "Subtitles" / "Translated")
|
||||
|
||||
|
||||
class SettingsManager:
|
||||
"""Settings manager with dataclass-based configuration."""
|
||||
|
||||
def __init__(self, config_file: str = None):
|
||||
self.config_file = Path(config_file) if config_file else get_config_path()
|
||||
self.settings = self._load_settings()
|
||||
|
||||
def _load_settings(self) -> AppSettings:
|
||||
"""Load settings from file or create defaults."""
|
||||
self.config_file.parent.mkdir(exist_ok=True)
|
||||
|
||||
default_settings = AppSettings(
|
||||
api=ApiConfig(),
|
||||
directories=DirectoryConfig(
|
||||
input_dir=DEFAULT_INPUT_DIR,
|
||||
output_dir=DEFAULT_OUTPUT_DIR
|
||||
),
|
||||
translation=TranslationConfig()
|
||||
)
|
||||
|
||||
if not self.config_file.exists():
|
||||
self._save_settings(default_settings)
|
||||
return default_settings
|
||||
|
||||
try:
|
||||
with open(self.config_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
return AppSettings.from_dict(data)
|
||||
except (json.JSONDecodeError, KeyError, TypeError) as e:
|
||||
print(f"Error loading settings: {e}. Using defaults.")
|
||||
backup_file = self.config_file.with_suffix('.json.backup')
|
||||
if self.config_file.exists():
|
||||
self.config_file.rename(backup_file)
|
||||
print(f"Backed up corrupted config to {backup_file}")
|
||||
|
||||
self._save_settings(default_settings)
|
||||
return default_settings
|
||||
|
||||
def _save_settings(self, settings: Optional[AppSettings] = None):
|
||||
"""Save current settings to file."""
|
||||
if settings is None:
|
||||
settings = self.settings
|
||||
|
||||
try:
|
||||
with open(self.config_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(settings.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
return True
|
||||
except (IOError, OSError) as e:
|
||||
print(f"Error saving configuration: {e}")
|
||||
return False
|
||||
|
||||
def reload(self) -> bool:
|
||||
"""Reload settings from file."""
|
||||
try:
|
||||
self.settings = self._load_settings()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error reloading settings: {e}")
|
||||
return False
|
||||
|
||||
def get_api_key(self) -> str:
|
||||
"""Get the API key."""
|
||||
return self.settings.api.gemini_key
|
||||
|
||||
def set_api_key(self, api_key: str) -> bool:
|
||||
"""Set the API key with validation."""
|
||||
if not api_key or not api_key.strip():
|
||||
raise ValueError("API key cannot be empty.")
|
||||
|
||||
self.settings.api.gemini_key = api_key.strip()
|
||||
return self._save_settings()
|
||||
|
||||
def is_api_key_valid(self) -> bool:
|
||||
"""Check if the API key is valid."""
|
||||
return self.settings.api.is_valid()
|
||||
|
||||
def get_directories(self) -> Tuple[str, str]:
|
||||
"""Get input and output directories."""
|
||||
return (
|
||||
self.settings.directories.input_dir,
|
||||
self.settings.directories.output_dir
|
||||
)
|
||||
|
||||
def set_directories(self, input_dir: str, output_dir: str) -> bool:
|
||||
"""Set input and output directories with validation."""
|
||||
if not input_dir or not input_dir.strip():
|
||||
raise ValueError("Input directory cannot be empty.")
|
||||
if not output_dir or not output_dir.strip():
|
||||
raise ValueError("Output directory cannot be empty.")
|
||||
|
||||
new_dirs = DirectoryConfig(
|
||||
input_dir=input_dir.strip(),
|
||||
output_dir=output_dir.strip()
|
||||
)
|
||||
|
||||
if not new_dirs.validate():
|
||||
raise ValueError("One or both directories are invalid or inaccessible.")
|
||||
|
||||
self.settings.directories = new_dirs
|
||||
return self._save_settings()
|
||||
|
||||
def ensure_directories_exist(self) -> bool:
|
||||
"""Ensure input and output directories exist."""
|
||||
return self.settings.directories.validate()
|
||||
|
||||
def get_translation_config(self) -> Tuple[str, int]:
|
||||
"""Get translation configuration."""
|
||||
return (
|
||||
self.settings.translation.default_language,
|
||||
self.settings.translation.batch_size
|
||||
)
|
||||
|
||||
def set_translation_config(self, language: str, batch_size: int) -> bool:
|
||||
"""Set translation configuration with validation."""
|
||||
if not language or not language.strip():
|
||||
raise ValueError("Default language cannot be empty.")
|
||||
|
||||
if batch_size <= 0:
|
||||
raise ValueError("Batch size must be a positive integer.")
|
||||
|
||||
self.settings.translation.default_language = language.strip()
|
||||
self.settings.translation.batch_size = batch_size
|
||||
return self._save_settings()
|
||||
|
||||
def validate_all(self) -> bool:
|
||||
"""Validate all settings."""
|
||||
return self.settings.validate()
|
||||
|
||||
def reset_to_defaults(self) -> bool:
|
||||
"""Reset all settings to defaults."""
|
||||
self.settings = AppSettings(
|
||||
api=ApiConfig(),
|
||||
directories=DirectoryConfig(
|
||||
input_dir=DEFAULT_INPUT_DIR,
|
||||
output_dir=DEFAULT_OUTPUT_DIR
|
||||
),
|
||||
translation=TranslationConfig()
|
||||
)
|
||||
return self._save_settings()
|
||||
|
||||
def export_settings(self, file_path: str) -> bool:
|
||||
"""Export settings to a file."""
|
||||
try:
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.settings.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Error exporting settings: {e}")
|
||||
return False
|
||||
|
||||
def import_settings(self, file_path: str) -> bool:
|
||||
"""Import settings from a file."""
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
new_settings = AppSettings.from_dict(data)
|
||||
|
||||
if new_settings.validate():
|
||||
self.settings = new_settings
|
||||
return self._save_settings()
|
||||
else:
|
||||
print("Imported settings are invalid.")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error importing settings: {e}")
|
||||
return False
|
||||
|
||||
def get_settings_summary(self) -> dict:
|
||||
"""Get a summary of current settings."""
|
||||
return {
|
||||
"API Key": "Set" if self.is_api_key_valid() else "Not Set",
|
||||
"Input Directory": self.settings.directories.input_dir,
|
||||
"Output Directory": self.settings.directories.output_dir,
|
||||
"Language": self.settings.translation.default_language,
|
||||
"Batch Size": self.settings.translation.batch_size,
|
||||
"Directories Valid": self.settings.directories.validate()
|
||||
}
|
175
src/srtify/core/translator.py
Normal file
175
src/srtify/core/translator.py
Normal file
|
@ -0,0 +1,175 @@
|
|||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
import gemini_srt_translator as gst
|
||||
from .file_encoding import detect_and_fix_encoding
|
||||
from srtify.core.settings import SettingsManager
|
||||
from srtify.core.prompts import PromptsManager
|
||||
from srtify.utils.utils import fancy_headline
|
||||
|
||||
|
||||
class TranslatorApp:
|
||||
""" Main Application class for handling translation operations. """
|
||||
def __init__(self, settings_manager: SettingsManager, prompts_manager: PromptsManager):
|
||||
self.settings = settings_manager
|
||||
self.prompts = prompts_manager
|
||||
|
||||
def translate_single_file(
|
||||
self,
|
||||
input_path: Path,
|
||||
output_path: Path,
|
||||
srt_file: str,
|
||||
language: str,
|
||||
prompt: str,
|
||||
batch_size: int,
|
||||
api_key: str
|
||||
) -> bool:
|
||||
""" Translate a single SRT file. """
|
||||
input_file = input_path / srt_file
|
||||
output_file = output_path / srt_file
|
||||
|
||||
print(f"Translating: {srt_file}")
|
||||
print(f"Language: {language}")
|
||||
print(f"Batch Size: {batch_size}")
|
||||
if prompt:
|
||||
print(f"Prompt: {prompt[:50]}{'...' if len(prompt) > 50 else ''}")
|
||||
|
||||
if not api_key:
|
||||
print("❌ No API key found. Configure settings first.")
|
||||
return False
|
||||
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Configure gemini_srt_translator
|
||||
gst.gemini_api_key = api_key
|
||||
gst.target_language = language
|
||||
gst.input_file = str(input_file)
|
||||
gst.output_file = str(output_file)
|
||||
gst.batch_size = batch_size
|
||||
if prompt:
|
||||
gst.description = prompt
|
||||
|
||||
try:
|
||||
gst.translate()
|
||||
print("✅ Translation successful.")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ Translation failed for {srt_file}: {e}")
|
||||
return False
|
||||
|
||||
def get_srt_file(self, input_path: Path, specific_file: Optional[str] = None) -> List[str]:
|
||||
""" Get list of SRT files to process. """
|
||||
if specific_file:
|
||||
file_name = specific_file.strip()
|
||||
if not file_name.endswith(".srt"):
|
||||
file_name = f"{file_name}.srt"
|
||||
|
||||
file_path = input_path / file_name
|
||||
if file_path.exists():
|
||||
return [file_name]
|
||||
else:
|
||||
print(f"❌ File {file_name} not found in {input_path}")
|
||||
return []
|
||||
|
||||
else:
|
||||
str_files = [file.name for file in input_path.glob("*.srt")]
|
||||
return str_files
|
||||
|
||||
def validate_files_encoding(self, input_path: Path, srt_files: List[str]) -> List[str]:
|
||||
""" Validate and fix file encoding for all SRT files. """
|
||||
print("Checking file encoding...")
|
||||
valid_files = []
|
||||
|
||||
for srt_file in srt_files:
|
||||
if detect_and_fix_encoding(str(input_path), srt_file):
|
||||
valid_files.append(srt_file)
|
||||
else:
|
||||
print(f"❌ Failed to fix encoding for file {srt_file}. Skipping...")
|
||||
|
||||
return valid_files
|
||||
|
||||
def resolve_prompt(self, options: Dict[str, Any]) -> Optional[str]:
|
||||
""" Resolve which prompt to use based on options """
|
||||
if options.get('custom_prompt'):
|
||||
return options['custom_prompt'].strip()
|
||||
elif options.get('prompts.py'):
|
||||
results = self.prompts.search_prompts(options['prompt'].strip())
|
||||
if results:
|
||||
if len(results) == 1:
|
||||
return list(results.values())[0]
|
||||
else:
|
||||
print(f"Found {len(results)} prompts matching '{options['prompt']}'")
|
||||
for name, desc in results.items():
|
||||
print(f" - {name}: {desc[:100]}{'...' if len(desc) > 100 else ''}")
|
||||
return None
|
||||
else:
|
||||
print(f"No prompts found matching '{options['prompt']}'")
|
||||
return None
|
||||
elif options.get('quick'):
|
||||
return "Translate naturally and accurately"
|
||||
else:
|
||||
return "Translate naturally and accurately"
|
||||
|
||||
def run_translation(self, options: Dict[str, Any]) -> None:
|
||||
""" Main translation runner. """
|
||||
input_dir, output_dir = self.settings.get_directories()
|
||||
default_lang, default_batch_size = self.settings.get_translation_config()
|
||||
api_key = self.settings.get_api_key()
|
||||
|
||||
input_path = Path(options.get('input_path', input_dir))
|
||||
output_path = Path(options.get('output_path', output_dir))
|
||||
language = options.get('language', default_lang)
|
||||
batch_size = options.get('batch_size', default_batch_size)
|
||||
|
||||
if not api_key:
|
||||
print("❌ No API key found. Use 'subtitle-translator settings' to configure.")
|
||||
return
|
||||
|
||||
selected_prompt = self.resolve_prompt(options)
|
||||
if selected_prompt is None:
|
||||
print("❌ No prompt selected. Exiting.")
|
||||
return
|
||||
|
||||
# Print configuration
|
||||
print("Translation Settings:")
|
||||
print("=" * 30)
|
||||
print(f"- Target Language: {language}")
|
||||
print(f"- Input Directory: {input_path}")
|
||||
print(f"- Output Directory: {output_path}")
|
||||
print(f"- Batch Size: {batch_size}")
|
||||
print(f"- API Key: {'Set' if api_key else 'Not Set'}")
|
||||
print("=" * 30)
|
||||
|
||||
srt_files = self.get_srt_file(input_path, options.get('file'))
|
||||
if not srt_files:
|
||||
print("❌ No SRT files found. Exiting.")
|
||||
return
|
||||
|
||||
print(f"Found {len(srt_files)} SRT files.")
|
||||
|
||||
valid_files = self.validate_files_encoding(input_path, srt_files)
|
||||
if not valid_files:
|
||||
print("❌ No valid SRT files found after encoding check. Exiting.")
|
||||
return
|
||||
|
||||
print(f"Found {len(valid_files)} valid SRT files.")
|
||||
print(f"\nStarting translation to {language}...")
|
||||
print("=" * shutil.get_terminal_size().columns)
|
||||
|
||||
successful = 0
|
||||
failed = 0
|
||||
|
||||
for srt_file in valid_files:
|
||||
if self.translate_single_file(
|
||||
input_path, output_path, srt_file,
|
||||
language, selected_prompt, batch_size, api_key
|
||||
):
|
||||
successful += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
print("=" * shutil.get_terminal_size().columns)
|
||||
print(fancy_headline("TRANSLATION SUMMARY", "rounded"))
|
||||
print(f"Successful: {successful}")
|
||||
print(f"Failed: {failed}")
|
||||
print(f"Output Directory: {output_path}")
|
0
src/srtify/utils/__init__.py
Normal file
0
src/srtify/utils/__init__.py
Normal file
22
src/srtify/utils/paths.py
Normal file
22
src/srtify/utils/paths.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from pathlib import Path
|
||||
import platformdirs
|
||||
|
||||
|
||||
APP_NAME = "srtify"
|
||||
|
||||
|
||||
def get_app_dir() -> Path:
|
||||
""" Get platform-specific application directory """
|
||||
app_dir = Path(platformdirs.user_data_dir(APP_NAME))
|
||||
app_dir.mkdir(parents=True, exist_ok=True)
|
||||
return app_dir
|
||||
|
||||
|
||||
def get_config_path() -> Path:
|
||||
""" Get path to config file """
|
||||
return get_app_dir() / "config.json"
|
||||
|
||||
|
||||
def get_prompts_path() -> Path:
|
||||
""" Get path to prompts file """
|
||||
return get_app_dir() / "prompts.json"
|
51
src/srtify/utils/utils.py
Normal file
51
src/srtify/utils/utils.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
import os
|
||||
|
||||
|
||||
def clear_screen():
|
||||
if os.name == 'nt':
|
||||
os.system('cls')
|
||||
else:
|
||||
os.system('clear')
|
||||
|
||||
|
||||
def fancy_headline(text, style='double'):
|
||||
"""
|
||||
Create headlines with various border styles using Unicode characters.
|
||||
|
||||
Args:
|
||||
text: The headline text
|
||||
style: Border style ('double', 'heavy', 'rounded', 'dashed')
|
||||
"""
|
||||
# Define different border character sets
|
||||
border_styles = {
|
||||
'double': {
|
||||
'top_left': '╔', 'top_right': '╗', 'bottom_left': '╚', 'bottom_right': '╝',
|
||||
'horizontal': '═', 'vertical': '║'
|
||||
},
|
||||
'heavy': {
|
||||
'top_left': '┏', 'top_right': '┓', 'bottom_left': '┗', 'bottom_right': '┛',
|
||||
'horizontal': '━', 'vertical': '┃'
|
||||
},
|
||||
'rounded': {
|
||||
'top_left': '╭', 'top_right': '╮', 'bottom_left': '╰', 'bottom_right': '╯',
|
||||
'horizontal': '─', 'vertical': '│'
|
||||
},
|
||||
'dashed': {
|
||||
'top_left': '┌', 'top_right': '┐', 'bottom_left': '└', 'bottom_right': '┘',
|
||||
'horizontal': '┄', 'vertical': '┆'
|
||||
}
|
||||
}
|
||||
|
||||
# Get the border characters for the selected style
|
||||
borders = border_styles.get(style, border_styles['double'])
|
||||
|
||||
# Calculate dimensions
|
||||
text_length = len(text)
|
||||
total_width = text_length + 4 # 2 spaces padding + 2 border chars
|
||||
|
||||
# Build the headline
|
||||
top_line = borders['top_left'] + borders['horizontal'] * (total_width - 2) + borders['top_right']
|
||||
middle_line = borders['vertical'] + ' ' + text + ' ' + borders['vertical']
|
||||
bottom_line = borders['bottom_left'] + borders['horizontal'] * (total_width - 2) + borders['bottom_right']
|
||||
|
||||
return f"{top_line}\n{middle_line}\n{bottom_line}"
|
Loading…
Add table
Add a link
Reference in a new issue