Natalie/natalie.py
2025-01-24 13:36:02 +03:30

99 lines
4.5 KiB
Python

import sys
from PyQt6.QtWidgets import (QApplication, QListWidgetItem, QTableWidgetItem)
from PyQt6.QtCore import Qt
from portainer_api import PortainerService
from config.settings import PortainerConfig
from gui.portainer_ui import PortainerUi
from gui.dialog_box_ui import DialogBox
from gui.workers.load_stacks_worker import LoadStacksWorker
from gui.workers.load_endpoints_worker import LoadEndpointsWorker
from gui.workers.load_container_image_status_worker import LoadContainerImageStatusWorker
from utils.date_converter import datetime_from_timestamp
from utils.endpoint_converter import endpoint_id_to_name_converter
class Natalie(PortainerUi):
def __init__(self, portainer_url, portainer_token):
super().__init__()
self.image_status_worker = None
self.stack_webhook = None
self.endpoints_list = None
self.main_window.resize(800, 600)
self.portainer = PortainerService(portainer_url, portainer_token)
self.endpoints_worker = LoadEndpointsWorker(self.portainer)
self.endpoints_worker.endpoints_retrieved.connect(self.load_endpoints)
self.endpoints_worker.error_occurred.connect(self.error_occurred)
self.endpoints_worker.start()
self.stacks_worker = LoadStacksWorker(self.portainer)
self.stacks_worker.stacks_retrieved.connect(self.load_stacks)
self.stacks_worker.error_occurred.connect(self.error_occurred)
self.stacks_worker.start()
self.stacks_list.itemClicked.connect(self.load_stack)
self.update.clicked.connect(self.update_stack)
def load_endpoints(self, endpoints):
self.endpoints_list = endpoints
self.combo_box.clear()
self.combo_box.addItem("All")
for endpoint in endpoints:
self.combo_box.addItem(endpoint['name'])
def load_stacks(self, stacks):
current_group = self.combo_box.currentText()
self.stacks_list.clear()
for stack in stacks:
item = QListWidgetItem(stack['name'])
item.setData(Qt.ItemDataRole.UserRole, stack)
self.stacks_list.addItem(item)
def load_stack(self, item):
self.containers_table.setRowCount(0)
self.containers_table.clearContents()
self.progress_bar.setRange(0, 0)
self.progress_bar.show()
data = item.data(Qt.ItemDataRole.UserRole)
self.stack_name_label.setText(data['name'])
endpoint_id = data['endpoint_id']
self.endpoint_label.setText(endpoint_id_to_name_converter(self.endpoints_list, endpoint_id))
self.created_by_label.setText(data['created_by'])
self.creation_date_label.setText(datetime_from_timestamp(data['creation_date']))
self.updated_by_label.setText(data['updated_by'])
self.update_date_label.setText(datetime_from_timestamp(data['update_date']))
self.stack_webhook = data['webhook']
containers = self.portainer.containers.list_containers_of_stack(endpoint_id, data['name'])
self.containers_table.setRowCount(len(containers))
for i, container in enumerate(containers):
self.image_status_worker = LoadContainerImageStatusWorker(self.portainer, endpoint_id, container['id'])
self.containers_table.setItem(i, 0, QTableWidgetItem(container['name']))
self.containers_table.setItem(i, 1, QTableWidgetItem(container['image']))
self.containers_table.setItem(i, 2, QTableWidgetItem(container['status']))
self.containers_table.setItem(i, 3, QTableWidgetItem('Loading...'))
image_status_worker = LoadContainerImageStatusWorker(self.portainer, endpoint_id, container['id'])
image_status_worker.image_status_retrieved.connect(lambda status, row=i: self.update_image_status(row, status))
image_status_worker.start()
self.image_status_worker = image_status_worker
self.progress_bar.hide()
def update_image_status(self, row, status):
self.containers_table.setItem(row, 3, QTableWidgetItem(status))
def update_stack(self):
self.portainer.stacks.update_stack(self.stack_webhook)
dialog_box = DialogBox("Update Stack", "Stack updated successfully.")
dialog_box.exec()
def error_occurred(self, error_message):
print(f"Error occurred: {error_message}")
if __name__ == '__main__':
portainer_config = PortainerConfig()
instances = portainer_config.get_portainer_instances()
base_url, token = portainer_config.get_portainer_info(instances[0])
app = QApplication(sys.argv)
win = Natalie(base_url, token)
win.show()
sys.exit(app.exec())