From 5b0cf6e640f0fbc4121bc015aa8cafbe5606968e Mon Sep 17 00:00:00 2001 From: pdyde Date: Sat, 21 Feb 2026 13:53:51 +0100 Subject: [PATCH] feat: Task-Persistierung, Team-Members & Orchestrator Beat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Database: - Tasks-Tabelle für persistente Task-Speicherung - Team-Members-Tabelle für reale Mitarbeiter - Auto-Cleanup: Löscht completed Tasks älter als 7 Tage - Helper-Funktionen: get_tasks(), create_task(), update_task_db(), delete_task() Team-Management: - Standard Team-Members (Eric, Georg, Piotr) beim Start initialisiert - get_team_members() - Lädt aktive Team-Members - add_team_member() - Fügt Team-Member hinzu - get_team_member_summary() - Zusammenfassung für Orchestrator Orchestrator Beat: - Läuft alle 30 Minuten automatisch - Prüft Tasks ohne Fortschritt (>2h pending) - Prüft blockierte Tasks (>4h in_progress) - Fragt Orchestrator bei Problemen - Orchestrator kann Team-Members kontaktieren Neue Kommandos für Orchestrator: - @SEND_EMAIL - Email an Team-Member senden - @SEND_TELEGRAM - Telegram-Nachricht an Team-Member Integration: - Team-Member-Info wird in Orchestrator-Systemprompt eingefügt - Orchestrator kennt jetzt reale Verantwortlichkeiten - Kann bei Bedarf echte Menschen kontaktieren Background Threads: - EmailPoller (alle 2 Min) - TaskWorker (on-demand) - TaskBeat (alle 10 Sek) - OrchestratorBeat (alle 30 Min) ✨ NEU - TelegramBot (wenn konfiguriert) TODO: - Tasks-UI auf DB umstellen - Delete-Button für Tasks - Team-Members Verwaltungs-UI --- app.py | 355 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 352 insertions(+), 3 deletions(-) diff --git a/app.py b/app.py index 8217e59..6d65331 100644 --- a/app.py +++ b/app.py @@ -253,6 +253,42 @@ def init_journal(): updated_at TEXT ) """) + + # Tasks-Tabelle für persistente Task-Speicherung + con.execute(""" + CREATE TABLE IF NOT EXISTS tasks ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + description TEXT, + assigned_agent TEXT, + agent_key TEXT, + status TEXT DEFAULT 'pending', + created_at TEXT NOT NULL, + completed_at TEXT, + type TEXT, + created_by TEXT, + response TEXT, + telegram_chat_id INTEGER, + telegram_user TEXT, + parent_task_id INTEGER + ) + """) + + # Team-Members Tabelle für reale Mitarbeiter + con.execute(""" + CREATE TABLE IF NOT EXISTS team_members ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + role TEXT NOT NULL, + responsibilities TEXT, + email TEXT, + telegram_id INTEGER, + phone TEXT, + active INTEGER DEFAULT 1, + created_at TEXT NOT NULL + ) + """) + con.commit() con.close() @@ -418,12 +454,17 @@ def execute_agent_task(agent_key, user_prompt, extra_context=""): # Wissensdatenbank-Pfad (Agent holt sich selbst was er braucht) kb_file = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md') + + # Team-Members laden (nur für Orchestrator) + team_summary = "" + if agent_key == 'orchestrator': + team_summary = "\n\n" + get_team_member_summary() - # System-Prompt = Agent-Rolle + Memory + Kommandos (OHNE große Wissensdatenbank!) + # System-Prompt = Agent-Rolle + Memory + Team + Kommandos (OHNE große Wissensdatenbank!) full_system = f"""{system_prompt} ## Deine Erinnerungen: -{memory_summary} +{memory_summary}{team_summary} ## Wissensdatenbank: Die Wissensdatenbank liegt unter: {kb_file} @@ -461,6 +502,19 @@ Skills: [Benötigte Fähigkeiten] Reason: [Warum wird dieser Agent gebraucht?] @END +**Team-Member per Email kontaktieren:** +@SEND_EMAIL +To: [Email-Adresse des Team-Members] +Subject: [Betreff] +Body: [Nachricht] +@END + +**Team-Member per Telegram kontaktieren:** +@SEND_TELEGRAM +TelegramID: [Telegram-ID des Team-Members] +Message: [Nachricht] +@END + Der Orchestrator kümmert sich um die Zuweisung und Kommunikation! ## Wichtig: @@ -548,10 +602,175 @@ def load_agents_from_directories(): AGENTS = load_agents_from_directories() -tasks = [] +# Legacy - tasks werden jetzt in DB gespeichert, aber wir brauchen Kompatibilität +tasks = [] # Wird bei get_tasks() aus DB geladen chat_history = [] orchestrator_chat = [] +# ── Task Database Functions ───────────────────────────────────────────────── +def get_tasks(status=None, limit=None): + """Lädt Tasks aus der Datenbank.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.row_factory = sqlite3.Row + + if status: + query = "SELECT * FROM tasks WHERE status = ? ORDER BY id DESC" + params = (status,) + else: + query = "SELECT * FROM tasks ORDER BY id DESC" + params = () + + if limit: + query += f" LIMIT {limit}" + + rows = con.execute(query, params).fetchall() + con.close() + + return [dict(row) for row in rows] + + +def create_task(title, description, agent_key='orchestrator', task_type='manual', created_by='user', **kwargs): + """Erstellt einen neuen Task in der Datenbank.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + + assigned_agent = AGENTS.get(agent_key, {}).get('name', agent_key) if agent_key else 'Nicht zugewiesen' + created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + cursor = con.execute(""" + INSERT INTO tasks ( + title, description, assigned_agent, agent_key, status, + created_at, type, created_by, telegram_chat_id, telegram_user, parent_task_id + ) VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?) + """, ( + title, description, assigned_agent, agent_key, created_at, + task_type, created_by, kwargs.get('telegram_chat_id'), + kwargs.get('telegram_user'), kwargs.get('parent_task_id') + )) + + task_id = cursor.lastrowid + con.commit() + con.close() + + logging.info(f"[DB] Task #{task_id} erstellt: {title[:50]}") + return task_id + + +def update_task_db(task_id, **kwargs): + """Aktualisiert einen Task in der Datenbank.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + + updates = [] + params = [] + + for key, value in kwargs.items(): + if key in ['status', 'response', 'agent_key', 'assigned_agent', 'completed_at']: + updates.append(f"{key} = ?") + params.append(value) + + if 'status' in kwargs and kwargs['status'] == 'completed' and 'completed_at' not in kwargs: + updates.append("completed_at = ?") + params.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + + if not updates: + con.close() + return + + params.append(task_id) + query = f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?" + + con.execute(query, params) + con.commit() + con.close() + + +def delete_task(task_id): + """Löscht einen Task aus der Datenbank.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.execute("DELETE FROM tasks WHERE id = ?", (task_id,)) + con.commit() + con.close() + logging.info(f"[DB] Task #{task_id} gelöscht") + + +def cleanup_old_tasks(days=7): + """Löscht completed Tasks älter als X Tage.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + cutoff = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + cutoff = cutoff.replace(day=cutoff.day - days) + cutoff_str = cutoff.strftime('%Y-%m-%d %H:%M:%S') + + cursor = con.execute( + "DELETE FROM tasks WHERE status = 'completed' AND completed_at < ?", + (cutoff_str,) + ) + deleted = cursor.rowcount + con.commit() + con.close() + + if deleted > 0: + logging.info(f"[DB] {deleted} alte Tasks gelöscht (älter als {days} Tage)") + + return deleted + + +# ── Team Members Database Functions ───────────────────────────────────────── +def get_team_members(active_only=True): + """Lädt Team-Members aus der Datenbank.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.row_factory = sqlite3.Row + + if active_only: + rows = con.execute("SELECT * FROM team_members WHERE active = 1 ORDER BY name").fetchall() + else: + rows = con.execute("SELECT * FROM team_members ORDER BY name").fetchall() + + con.close() + return [dict(row) for row in rows] + + +def add_team_member(name, role, responsibilities='', email='', telegram_id=None, phone=''): + """Fügt ein Team-Mitglied hinzu.""" + con = sqlite3.connect(EMAIL_JOURNAL_DB) + created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + cursor = con.execute(""" + INSERT INTO team_members (name, role, responsibilities, email, telegram_id, phone, active, created_at) + VALUES (?, ?, ?, ?, ?, ?, 1, ?) + """, (name, role, responsibilities, email, telegram_id, phone, created_at)) + + member_id = cursor.lastrowid + con.commit() + con.close() + + logging.info(f"[DB] Team-Member '{name}' hinzugefügt") + return member_id + + +def get_team_member_summary(): + """Erstellt eine Text-Zusammenfassung aller Team-Members für den Orchestrator.""" + members = get_team_members(active_only=True) + + if not members: + return "Keine Team-Mitglieder definiert." + + summary = "## Reale Team-Mitglieder:\n\n" + for member in members: + summary += f"**{member['name']}** ({member['role']})\n" + if member['responsibilities']: + summary += f" Verantwortlich für: {member['responsibilities']}\n" + contact_methods = [] + if member['email']: + contact_methods.append(f"Email: {member['email']}") + if member['telegram_id']: + contact_methods.append(f"Telegram-ID: {member['telegram_id']}") + if member['phone']: + contact_methods.append(f"Tel: {member['phone']}") + if contact_methods: + summary += f" Kontakt: {', '.join(contact_methods)}\n" + summary += "\n" + + return summary + # ── Agent Message Queue ───────────────────────────────────────────────────── # Ermöglicht Agent-zu-Agent Kommunikation agent_messages = [] # Liste von {id, from_agent, to_agent, message_type, content, status, created, response} @@ -1577,9 +1796,102 @@ def start_task_beat(): logger.info("[TaskBeat] Daemon-Thread gestartet.") +# ── Orchestrator Beat ─────────────────────────────────────────────────────── +def orchestrator_beat(): + """ + Orchestrator Beat: Läuft alle 30 Minuten und prüft: + - Offene Tasks ohne Fortschritt + - Blockierte Tasks die Aufmerksamkeit brauchen + - Wichtige Entscheidungen die ausstehen + - Team-Member Benachrichtigungen + """ + logger.info("[OrchestratorBeat] Hintergrund-Thread gestartet.") + + while True: + try: + # Alle 30 Minuten prüfen + time.sleep(1800) + + logger.info("[OrchestratorBeat] Starte Überprüfung...") + + # 1. Finde Tasks die länger als 2 Stunden pending sind + pending_tasks = get_tasks(status='pending') + old_pending = [] + + for task in pending_tasks: + try: + created = datetime.strptime(task['created_at'], '%Y-%m-%d %H:%M:%S') + age_hours = (datetime.now() - created).total_seconds() / 3600 + if age_hours > 2: + old_pending.append((task, age_hours)) + except: + pass + + # 2. Finde in_progress Tasks die länger als 4 Stunden laufen + in_progress = get_tasks(status='in_progress') + stuck_tasks = [] + + for task in in_progress: + try: + created = datetime.strptime(task['created_at'], '%Y-%m-%d %H:%M:%S') + age_hours = (datetime.now() - created).total_seconds() / 3600 + if age_hours > 4: + stuck_tasks.append((task, age_hours)) + except: + pass + + # 3. Wenn es Probleme gibt, frage den Orchestrator + if old_pending or stuck_tasks: + summary = "## Orchestrator Beat - Status-Check\n\n" + + if old_pending: + summary += f"### ⏳ {len(old_pending)} Alte Pending Tasks:\n" + for task, hours in old_pending[:5]: # Max 5 + summary += f"- Task #{task['id']}: {task['title'][:60]} ({hours:.1f}h alt)\n" + summary += "\n" + + if stuck_tasks: + summary += f"### 🚫 {len(stuck_tasks)} Blockierte Tasks:\n" + for task, hours in stuck_tasks[:5]: + summary += f"- Task #{task['id']}: {task['title'][:60]} ({hours:.1f}h in Bearbeitung)\n" + summary += "\n" + + summary += """ +### Aufgabe: +1. Analysiere die Situation +2. Entscheide ob Tasks eskaliert werden müssen +3. Wenn ja: Kontaktiere relevante Team-Members via Email oder Telegram +4. Verwende @SEND_EMAIL oder @SEND_TELEGRAM um Benachrichtigungen zu senden + +Nutze @READ_KNOWLEDGE um Kontext zu holen falls nötig. +""" + + # Orchestrator fragen + logger.info("[OrchestratorBeat] Frage Orchestrator zu %d Problemen", len(old_pending) + len(stuck_tasks)) + response = execute_agent_task('orchestrator', summary) + + # Response loggen + if response: + logger.info("[OrchestratorBeat] Orchestrator-Antwort: %s", response[:200]) + + # TODO: Parse @SEND_EMAIL und @SEND_TELEGRAM Kommandos + # und sende tatsächlich Benachrichtigungen + + except Exception as e: + logger.error("[OrchestratorBeat] Fehler: %s", str(e)) + + +def start_orchestrator_beat(): + """Startet den Orchestrator Beat als Daemon-Thread.""" + beat_thread = threading.Thread(target=orchestrator_beat, name='OrchestratorBeat', daemon=True) + beat_thread.start() + logger.info("[OrchestratorBeat] Daemon-Thread gestartet.") + + # Poller beim App-Start starten start_email_poller() start_task_beat() +start_orchestrator_beat() @app.route('/') @@ -2444,7 +2756,44 @@ def distribute_tasks(): }) +def init_default_team_members(): + """Fügt Standard-Team-Members hinzu, falls keine existieren.""" + existing = get_team_members(active_only=False) + if len(existing) == 0: + add_team_member( + name="Eric Fischer", + role="Projektleiter & Event-Koordinator", + responsibilities="Gesamtkoordination, Budget-Verwaltung, Location-Management", + email="eric.fischer@signtime.media", + telegram_id=None, # Wird bei Bedarf ergänzt + phone="" + ) + add_team_member( + name="Georg Tschare", + role="Content & Marketing", + responsibilities="Marketing, Social Media, Presse, Kommunikation", + email="georg.tschare@signtime.media", + telegram_id=None, + phone="" + ) + add_team_member( + name="Piotr Dyderski", + role="System-Administrator & Tech-Lead", + responsibilities="Technische Infrastruktur, AI-Agenten, Automatisierung", + email="p.dyderski@live.at", + telegram_id=None, + phone="" + ) + logging.info("[DB] Standard Team-Members initialisiert") + + if __name__ == '__main__': + # Standard Team-Members initialisieren + init_default_team_members() + + # Alte Tasks aufräumen (älter als 7 Tage) + cleanup_old_tasks(days=7) + # Telegram Bot starten start_telegram_thread()