From 320a1d4d879613305d1db8a7909a7e7290262b97 Mon Sep 17 00:00:00 2001 From: pdyde Date: Sat, 21 Feb 2026 14:13:24 +0100 Subject: [PATCH] feat: Complete DB migration - Command parsing & task persistence - Add command parsing for @SEND_EMAIL, @SEND_TELEGRAM, @UPDATE_TEAM_MEMBER, @ADD_TEAM_MEMBER - Migrate all tasks.append() calls (13 occurrences) to use create_task() for DB persistence - Update task routes to read from database instead of in-memory array - Orchestrator beat now executes parsed commands (email/telegram notifications) - Maintain legacy task_queue compatibility for email processing - All tasks now persist across app restarts --- app.py | 390 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 255 insertions(+), 135 deletions(-) diff --git a/app.py b/app.py index 8368537..46a128d 100644 --- a/app.py +++ b/app.py @@ -884,10 +884,9 @@ def parse_agent_commands(agent_key, response_text): ) for question, context in ask_requests: # Erstelle Task für Orchestrator um die Frage zu beantworten - new_task = { - 'id': len(tasks) + 1, - 'title': f"Frage von {agent_key}: {question.strip()[:80]}", - 'description': f"""Ein Agent braucht Hilfe! + task_id = create_task( + title=f"Frage von {agent_key}: {question.strip()[:80]}", + description=f"""Ein Agent braucht Hilfe! **Von:** {agent_key} **Frage:** {question.strip()} @@ -895,16 +894,13 @@ def parse_agent_commands(agent_key, response_text): Bitte beantworte die Frage oder delegiere an den passenden Experten-Agent. Die Antwort wird an {agent_key} zurückgegeben.""", - 'assigned_agent': 'Orchestrator', - 'agent_key': 'orchestrator', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'agent_question', - 'from_agent': agent_key, - 'return_to': agent_key - } - tasks.append(new_task) - logger.info(f"[AgentCmd] {agent_key} fragt Orchestrator: {question.strip()[:50]}") + agent_key='orchestrator', + task_type='agent_question', + created_by=agent_key, + from_agent=agent_key, + return_to=agent_key + ) + logger.info(f"[AgentCmd] {agent_key} fragt Orchestrator (Task #{task_id}): {question.strip()[:50]}") # CREATE_SUBTASK: Agent möchte Subtask erstellen subtask_requests = re.findall( @@ -913,19 +909,15 @@ Die Antwort wird an {agent_key} zurückgegeben.""", re.DOTALL ) for task_desc, requirements in subtask_requests: - new_task = { - 'id': len(tasks) + 1, - 'title': task_desc.strip()[:100], - 'description': f"Von {agent_key} angefordert:\n{requirements.strip()}", - 'assigned_agent': 'Orchestrator', - 'agent_key': 'orchestrator', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'agent_subtask', - 'from_agent': agent_key - } - tasks.append(new_task) - logger.info(f"[AgentCmd] {agent_key} erstellt Subtask via Orchestrator") + task_id = create_task( + title=task_desc.strip()[:100], + description=f"Von {agent_key} angefordert:\n{requirements.strip()}", + agent_key='orchestrator', + task_type='agent_subtask', + created_by=agent_key, + from_agent=agent_key + ) + logger.info(f"[AgentCmd] {agent_key} erstellt Subtask #{task_id} via Orchestrator") # SUGGEST_AGENT: Agent schlägt neuen Agent vor suggest_requests = re.findall( @@ -938,28 +930,24 @@ Die Antwort wird an {agent_key} zurückgegeben.""", agent_key_suggestion = role.lower().replace(' ', '_').replace('-', '_') # Task für Orchestrator um Agent zu erstellen - new_task = { - 'id': len(tasks) + 1, - 'title': f"Agent-Vorschlag: {role.strip()}", - 'description': f"""Agent {agent_key} schlägt vor, einen neuen Agent zu erstellen: + task_id = create_task( + title=f"Agent-Vorschlag: {role.strip()}", + description=f"""Agent {agent_key} schlägt vor, einen neuen Agent zu erstellen: **Rolle:** {role.strip()} **Fähigkeiten:** {skills.strip()} **Begründung:** {reason.strip()} Bitte entscheide ob dieser Agent erstellt werden soll.""", - 'assigned_agent': 'Orchestrator', - 'agent_key': 'orchestrator', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'agent_suggestion', - 'from_agent': agent_key, - 'suggested_agent': agent_key_suggestion, - 'suggested_role': role.strip(), - 'suggested_skills': skills.strip() - } - tasks.append(new_task) - logger.info(f"[AgentCmd] {agent_key} schlägt neuen Agent vor: {role.strip()}") + agent_key='orchestrator', + task_type='agent_suggestion', + created_by=agent_key, + from_agent=agent_key, + suggested_agent=agent_key_suggestion, + suggested_role=role.strip(), + suggested_skills=skills.strip() + ) + logger.info(f"[AgentCmd] {agent_key} schlägt neuen Agent vor (Task #{task_id}): {role.strip()}") # READ_KNOWLEDGE: Agent möchte Wissensdatenbank durchsuchen read_kb_requests = re.findall( @@ -989,6 +977,124 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""", if relevant_sections: logger.info(f"[AgentCmd] {len(relevant_sections)} relevante Zeilen gefunden") + + # SEND_EMAIL: Orchestrator sendet Email an Team-Member + send_email_requests = re.findall( + r'@SEND_EMAIL\s*\nTo:\s*([^\n]+)\s*\nSubject:\s*([^\n]+)\s*\nBody:\s*([^@]+)@END', + response_text, + re.DOTALL + ) + for to_addr, subject, body in send_email_requests: + to_clean = to_addr.strip() + subject_clean = subject.strip() + body_clean = body.strip() + + # Versuche Email zu senden + success, message = send_email(to_clean, subject_clean, body_clean) + if success: + logger.info(f"[AgentCmd] Email gesendet an {to_clean}: {subject_clean}") + else: + logger.error(f"[AgentCmd] Email-Fehler: {message}") + + # SEND_TELEGRAM: Orchestrator sendet Telegram-Nachricht + send_telegram_requests = re.findall( + r'@SEND_TELEGRAM\s*\nTo:\s*([^\n]+)\s*\nMessage:\s*([^@]+)@END', + response_text, + re.DOTALL + ) + for recipient, message in send_telegram_requests: + recipient_clean = recipient.strip() + message_clean = message.strip() + + # Telegram-Integration (wenn aktiviert) + if TELEGRAM_CONFIG.get('bot_token') and TELEGRAM_CONFIG.get('telegram_bot'): + try: + # Finde Chat-ID für Recipient (basierend auf Team-Member) + con = sqlite3.connect(EMAIL_JOURNAL_DB) + result = con.execute( + "SELECT telegram_chat_id FROM team_members WHERE name = ? OR email = ?", + (recipient_clean, recipient_clean) + ).fetchone() + con.close() + + if result and result[0]: + chat_id = result[0] + import asyncio + asyncio.run(TELEGRAM_CONFIG['telegram_bot'].bot.send_message( + chat_id=chat_id, + text=message_clean + )) + logger.info(f"[AgentCmd] Telegram gesendet an {recipient_clean}") + else: + logger.warning(f"[AgentCmd] Keine Telegram Chat-ID für {recipient_clean}") + except Exception as e: + logger.error(f"[AgentCmd] Telegram-Fehler: {str(e)}") + else: + logger.warning("[AgentCmd] Telegram nicht konfiguriert") + + # ADD_TEAM_MEMBER: Füge neues Team-Mitglied hinzu + add_member_requests = re.findall( + r'@ADD_TEAM_MEMBER\s*\nName:\s*([^\n]+)\s*\nEmail:\s*([^\n]+)\s*\nRole:\s*([^\n]+)\s*\nResponsibilities:\s*([^@]+)@END', + response_text, + re.DOTALL + ) + for name, email, role, resp in add_member_requests: + name_clean = name.strip() + email_clean = email.strip() + role_clean = role.strip() + resp_clean = resp.strip() + + success = add_team_member(name_clean, role_clean, email_clean, resp_clean) + if success: + logger.info(f"[AgentCmd] Team-Member hinzugefügt: {name_clean} ({role_clean})") + else: + logger.warning(f"[AgentCmd] Team-Member konnte nicht hinzugefügt werden: {name_clean}") + + # UPDATE_TEAM_MEMBER: Aktualisiere Team-Mitglied + update_member_requests = re.findall( + r'@UPDATE_TEAM_MEMBER\s*\nEmail:\s*([^\n]+)\s*\nField:\s*([^\n]+)\s*\nValue:\s*([^@]+)@END', + response_text, + re.DOTALL + ) + for email, field, value in update_member_requests: + email_clean = email.strip() + field_clean = field.strip().lower() + value_clean = value.strip() + + # Hole aktuelles Team-Member + con = sqlite3.connect(EMAIL_JOURNAL_DB) + member = con.execute( + "SELECT name, role, email, responsibilities FROM team_members WHERE LOWER(email) = ?", + (email_clean.lower(),) + ).fetchone() + + if member: + # Update je nach Field + updates = { + 'name': member[0], + 'role': member[1], + 'email': member[2], + 'responsibilities': member[3] + } + + if field_clean in updates: + updates[field_clean] = value_clean + success = update_team_member( + email_clean, + updates['name'], + updates['role'], + updates['responsibilities'] + ) + if success: + logger.info(f"[AgentCmd] Team-Member aktualisiert: {email_clean} - {field_clean}") + else: + logger.error(f"[AgentCmd] Update fehlgeschlagen für {email_clean}") + else: + logger.warning(f"[AgentCmd] Unbekanntes Field: {field_clean}") + else: + logger.warning(f"[AgentCmd] Team-Member nicht gefunden: {email_clean}") + + con.close() def create_new_agent(agent_key, role, skills): """Erstellt dynamisch einen neuen Agenten.""" @@ -1342,24 +1448,15 @@ async def telegram_message_handler(update: Update, context: ContextTypes.DEFAULT return # Task erstellen - task_id = len(tasks) + 1 - while any(t['id'] == task_id for t in tasks): - task_id += 1 - - new_task = { - 'id': task_id, - 'title': f"Telegram: {message_text[:50]}{'...' if len(message_text) > 50 else ''}", - 'description': message_text, - 'assigned_agent': 'Orchestrator', - 'agent_key': 'orchestrator', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'telegram', - 'created_by': f'telegram_user_{user_id}', - 'telegram_chat_id': update.effective_chat.id, - 'telegram_user': username - } - tasks.append(new_task) + task_id = create_task( + title=f"Telegram: {message_text[:50]}{'...' if len(message_text) > 50 else ''}", + description=message_text, + agent_key='orchestrator', + task_type='telegram', + created_by=f'telegram_user_{user_id}', + telegram_chat_id=update.effective_chat.id, + telegram_user=username + ) await update.message.reply_text( f"✅ Task #{task_id} erstellt!\n\n" @@ -1600,9 +1697,26 @@ Diese Email kommt von **{known_member[0]}** ({known_member[1]}) # Journal: Status 'queued' – \Seen wird NICHT gesetzt (erst nach Verarbeitung) journal_insert(message_id, email_id.decode(), sender, subject, 'queued', agent_key) - # Task erstellen und in beide Listen eintragen + # Task erstellen in DB + task_id = create_task( + title=f"📧 Email: {subject[:50]}", + description=body[:500], + agent_key=agent_key, + task_type='email', + created_by=sender_email, + reply_to=sender_email, + reply_subject=f"Re: {subject}" if not subject.startswith('Re:') else subject, + original_sender=sender, + original_subject=subject, + original_body=body, + message_id=message_id, + imap_uid=email_id.decode(), + extra_context=extra_context + ) + + # Für task_queue brauchen wir Task als Dict (Legacy-Kompatibilität) task = { - 'id': len(tasks) + 1, + 'id': task_id, 'title': f"📧 Email: {subject[:50]}", 'description': body[:500], 'assigned_agent': AGENTS.get(agent_key, {}).get('name', agent_key), @@ -1615,11 +1729,11 @@ Diese Email kommt von **{known_member[0]}** ({known_member[1]}) 'original_sender': sender, 'original_subject': subject, 'original_body': body, - 'message_id': message_id, # für Journal-Update durch TaskWorker + 'message_id': message_id, 'imap_uid': email_id.decode(), - 'extra_context': extra_context, # Für unbekannte Absender + 'extra_context': extra_context, } - tasks.append(task) + with task_queue_lock: task_queue.append(task) @@ -1627,7 +1741,7 @@ Diese Email kommt von **{known_member[0]}** ({known_member[1]}) log_entry['status'] = 'queued' add_to_email_log(log_entry) - logger.info("[EmailPoller] Task #%d für Email von %s in Queue eingereiht.", task['id'], sender_email) + logger.info("[EmailPoller] Task #%d für Email von %s in Queue eingereiht.", task_id, sender_email) except Exception as e: logger.error("[EmailPoller] Fehler bei Email-ID %s: %s", email_id, str(e)) @@ -1817,10 +1931,9 @@ Agent-Beschreibungen: assigned = available_agents[0] # Sub-Task mit KOMPLETTEM Kontext - sub_task = { - 'id': len(tasks) + 1 + len(created_sub_tasks), - 'title': t[:80], - 'description': f"""**Original-Aufgabe:** + sub_task_id = create_task( + title=t[:80], + description=f"""**Original-Aufgabe:** {task.get('title', '')} {task.get('description', '')} @@ -1832,16 +1945,13 @@ Agent-Beschreibungen: {t} Arbeite diesen Teil ab und liefere ein vollständiges Ergebnis.""", - 'assigned_agent': AGENTS.get(assigned, {}).get('name', assigned), - 'agent_key': assigned, - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'orchestrated', - 'parent_task': task['id'] - } - tasks.append(sub_task) - created_sub_tasks.append(sub_task['id']) - logger.info("[TaskBeat] Sub-Task #%d zugewiesen an %s", sub_task['id'], assigned) + agent_key=assigned, + task_type='orchestrated', + created_by='orchestrator', + parent_task_id=task['id'] + ) + created_sub_tasks.append(sub_task_id) + logger.info("[TaskBeat] Sub-Task #%d zugewiesen an %s", sub_task_id, assigned) task['status'] = 'completed' task['sub_task_ids'] = created_sub_tasks @@ -1982,12 +2092,12 @@ Nutze @READ_KNOWLEDGE um Kontext zu holen falls nötig. logger.info("[OrchestratorBeat] Frage Orchestrator zu %d Problemen", len(old_pending) + len(stuck_tasks)) response = execute_agent_task('orchestrator', summary) - # Response loggen + # Response loggen und Kommandos parsen if response: logger.info("[OrchestratorBeat] Orchestrator-Antwort: %s", response[:200]) - # TODO: Parse @SEND_EMAIL und @SEND_TELEGRAM Kommandos - # und sende tatsächlich Benachrichtigungen + # Parse Orchestrator-Kommandos (Email, Telegram, etc.) + parse_agent_commands('orchestrator', response) except Exception as e: logger.error("[OrchestratorBeat] Fehler: %s", str(e)) @@ -2008,7 +2118,9 @@ start_orchestrator_beat() @app.route('/') def index(): - recent_tasks = tasks[-5:] if tasks else [] + # Hole die 5 neuesten Tasks aus DB + all_tasks = get_tasks() + recent_tasks = all_tasks[:5] if all_tasks else [] return render_template('index.html', agents=AGENTS, recent_tasks=recent_tasks) @app.route('/chat', methods=['GET', 'POST']) @@ -2089,28 +2201,30 @@ def task_list(): assigned_agent = request.form.get('assigned_agent', '') if title: - task = { - 'id': len(tasks) + 1, - 'title': title, - 'description': description, - 'assigned_agent': AGENTS.get(assigned_agent, {}).get('name', assigned_agent) if assigned_agent else 'Nicht zugewiesen', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'manual', - } - tasks.append(task) - flash('Task erstellt!', 'success') + task_id = create_task( + title=title, + description=description, + agent_key=assigned_agent if assigned_agent else None, + task_type='manual', + created_by='user' + ) + flash(f'Task #{task_id} erstellt!', 'success') - # Alle Tasks anzeigen – Email-Tasks sind mit type='email' markiert - all_tasks = list(reversed(tasks)) # Neueste zuerst + # Alle Tasks aus Datenbank holen – Neueste zuerst + all_tasks = get_tasks() return render_template('tasks.html', agents=AGENTS, tasks=all_tasks) @app.route('/tasks/update//') def update_task(task_id, status): + # Update in Datenbank + update_task_db(task_id, status=status) + + # Auch in-memory array aktualisieren (Legacy-Kompatibilität für task_queue) for task in tasks: if task['id'] == task_id: task['status'] = status break + return redirect(url_for('task_list')) @app.route('/api/agent-stream', methods=['POST']) @@ -2698,26 +2812,35 @@ def api_tasks(): if not title: return jsonify({'error': 'Kein Titel übergeben'}), 400 - task_id = len(tasks) + 1 - while any(t['id'] == task_id for t in tasks): - task_id += 1 + task_id = create_task( + title=title, + description=description, + agent_key=agent_key or assigned_agent or 'orchestrator', + task_type='agent_created', + created_by=agent_key or 'api' + ) + + # Task aus DB holen für Response + con = sqlite3.connect(EMAIL_JOURNAL_DB) + task_row = con.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone() + con.close() new_task = { 'id': task_id, 'title': title, 'description': description, - 'assigned_agent': AGENTS.get(assigned_agent, {}).get('name', assigned_agent) if assigned_agent else 'Nicht zugewiesen', - 'agent_key': agent_key or assigned_agent, + 'assigned_agent': AGENTS.get(agent_key or assigned_agent, {}).get('name', agent_key or assigned_agent) if (agent_key or assigned_agent) else 'Nicht zugewiesen', + 'agent_key': agent_key or assigned_agent or 'orchestrator', 'status': 'pending', 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), 'type': 'agent_created', - 'created_by': agent_key + 'created_by': agent_key or 'api' } - tasks.append(new_task) return jsonify({'success': True, 'task': new_task}) - task_list = list(reversed(tasks)) + # GET: Alle Tasks aus DB + task_list = get_tasks() return jsonify({'tasks': task_list}) @@ -2727,12 +2850,17 @@ def update_task_api(task_id): data = request.get_json() new_status = data.get('status') + # Update in DB + update_task_db(task_id, status=new_status) + + # Auch in-memory aktualisieren (Legacy) for task in tasks: if task['id'] == task_id: task['status'] = new_status return jsonify({'success': True, 'task': task}) - return jsonify({'error': 'Task nicht gefunden'}), 404 + # Falls nur in DB vorhanden + return jsonify({'success': True, 'task_id': task_id, 'status': new_status}) @app.route('/api/models', methods=['GET']) @@ -2817,43 +2945,35 @@ def distribute_tasks(): tasks_text = '\n'.join([f"- {t}" for t in tasks_list]) agents_text = ', '.join(selected_agents) if selected_agents else 'alle verfügbaren Agenten' - planning_task = { - 'id': len(tasks) + 1, - 'title': f"Planungsphase: {tasks_list[0][:50]}{'...' if len(tasks_list[0]) > 50 else ''}", - 'description': f"Tasks:\n{tasks_text}\n\nVerfügbare Agenten: {agents_text}\n\nDer Orchestrator soll diese Tasks analysieren und den richtigen Agenten zuweisen.", - 'assigned_agent': 'Orchestrator', - 'agent_key': 'orchestrator', - 'status': 'pending', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'orchestrated', - 'sub_tasks': tasks_list, - 'available_agents': selected_agents - } - tasks.append(planning_task) + planning_task_id = create_task( + title=f"Planungsphase: {tasks_list[0][:50]}{'...' if len(tasks_list[0]) > 50 else ''}", + description=f"Tasks:\n{tasks_text}\n\nVerfügbare Agenten: {agents_text}\n\nDer Orchestrator soll diese Tasks analysieren und den richtigen Agenten zuweisen.", + agent_key='orchestrator', + task_type='orchestrated', + created_by='orchestrator', + sub_tasks=tasks_list, + available_agents=selected_agents + ) return jsonify({ 'success': True, 'message': f'Planungs-Task erstellt. Der Orchestrator wird die richtigen Agenten zuweisen.', - 'tasks': [planning_task['id']] + 'tasks': [planning_task_id] }) for i, task_text in enumerate(tasks_list): agent_key = selected_agents[i % len(selected_agents)] - agent_info = AGENTS.get(agent_key, {}) - agent_name = agent_info.get('name', agent_key) - new_task = { - 'id': len(tasks) + 1 + i, - 'title': task_text[:80], - 'description': 'Automatisch erstellt durch Orchestrator', - 'assigned_agent': agent_name, - 'status': 'in_progress', - 'created': datetime.now().strftime('%Y-%m-%d %H:%M'), - 'type': 'orchestrated', - 'agent_key': agent_key - } - tasks.append(new_task) - created_tasks.append(new_task['id']) + task_id = create_task( + title=task_text[:80], + description='Automatisch erstellt durch Orchestrator', + agent_key=agent_key, + task_type='orchestrated', + created_by='orchestrator' + ) + # Setze Status sofort auf in_progress + update_task_db(task_id, status='in_progress') + created_tasks.append(task_id) executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(selected_agents)) for i, task_text in enumerate(tasks_list):