From 46a77c7800255d8b20aa2eb1e0f5ab7c12a58305 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 19:29:20 +0000 Subject: [PATCH 1/7] Fix /api/agent-stream: use build_agent_prompt, accumulate response_text, call parse_agent_commands --- app.py | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/app.py b/app.py index d25406d..e19ae3c 100644 --- a/app.py +++ b/app.py @@ -2495,22 +2495,11 @@ def agent_stream(): selected_agent = delegate_to_agent(prompt) agent_info = AGENTS.get(selected_agent, {}) agent_name = agent_info.get('name', selected_agent) - system_prompt = get_agent_prompt(selected_agent) - kb_file = os.path.join(os.path.dirname(__file__), 'agents', 'orchestrator', 'knowledge', 'diversityball_knowledge.md') - kb_content = "" - if os.path.exists(kb_file): - with open(kb_file, 'r', encoding='utf-8') as f: - kb_content = f.read() - - full_prompt = f"""## Wissensdatenbank (Diversity-Ball): -{kb_content} - -## System-Prompt des Agenten ({agent_name}): -{system_prompt} - -## Deine Aufgabe: -{prompt}""" + full_prompt = build_agent_prompt(selected_agent, prompt) + if not full_prompt: + yield f"data: {json.dumps({'type': 'error', 'message': f'Kein System-Prompt für Agent {selected_agent}'})}\n\n" + return # Sofort Agent-Info senden yield f"data: {json.dumps({'type': 'agent_selected', 'agent': agent_name, 'agent_key': selected_agent})}\n\n" @@ -2530,7 +2519,8 @@ def agent_stream(): cwd=work_dir # Agent arbeitet in seinem work-Verzeichnis ) - # Jede Zeile sofort aus opencode lesen und streamen + # Jede Zeile sofort aus opencode lesen, streamen und akkumulieren + response_text = "" for line in proc.stdout: line = line.strip() if not line: @@ -2540,11 +2530,17 @@ def agent_stream(): if event_data.get('part', {}).get('type') == 'text': text = event_data['part'].get('text', '') if text: + response_text += text yield f"data: {json.dumps({'type': 'response_chunk', 'text': text})}\n\n" except Exception: pass proc.wait() + + # XML-Kommandos aus der gesammelten Antwort ausführen + if response_text: + parse_agent_commands(selected_agent, response_text) + yield f"data: {json.dumps({'type': 'complete', 'message': '✓ Fertig'})}\n\n" except Exception as e: From 7fe1365ebcb3b15ffee607c441768d51b9c77f57 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 19:35:34 +0000 Subject: [PATCH 2/7] Fix get_field(): support multiline values in key-value format (email body was truncated to first line) --- app.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/app.py b/app.py index e19ae3c..402e344 100644 --- a/app.py +++ b/app.py @@ -1002,16 +1002,23 @@ def parse_agent_commands(agent_key, response_text, task_id=None): import re def get_field(block, field): - """Extrahiert ein Feld aus einem XML-Block: 'field: value' oder 'value'.""" - # Versuche erst XML-Tag-Format + """Extrahiert ein Feld aus einem XML-Block: 'value' oder 'field: value'. + Unterstützt mehrzeilige Werte (z.B. langer Email-Body). + """ + # Versuche erst XML-Tag-Format (bevorzugt, unterstützt Mehrzeiligkeit) m = re.search(rf'<{field}>(.*?)', block, re.DOTALL | re.IGNORECASE) if m: return m.group(1).strip() - # Dann Key-Value-Format - m = re.search(rf'^{field}\s*:\s*(.+)', block, re.MULTILINE | re.IGNORECASE) - if m: - return m.group(1).strip() - return '' + # Key-Value-Format: finde 'field: ...' und lies bis zum nächsten echten Key (^\w+: ) + m = re.search(rf'(?m)^{field}\s*:\s*(.*)', block, re.IGNORECASE) + if not m: + return '' + rest = block[m.start(1):] + # Stoppe nur bei echten einwortigen Keys (^\w+: Leerzeichen) — nicht bei "Report:" etc. + stop = re.search(r'(?m)^\w+\s*:\s', rest) + if stop: + return rest[:stop.start()].strip() + return rest.strip() # ── CREATE_TASK ────────────────────────────────────────────────────────── for block in re.findall(r'(.*?)', response_text, re.DOTALL | re.IGNORECASE): From 003e591a0491dacf02479ac390922be732f2c6e4 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 19:46:42 +0000 Subject: [PATCH 3/7] Add daily standup + knowledge broadcast system - DailyStandupBeat thread fires at 09:00 every day - trigger_daily_standup(): messages all team members, orchestrator updates KB + agent reminders - broadcast_knowledge_update(): distributes any new info to all agents immediately - parse_agent_commands(): add and XML handlers - /api/standup/trigger and /api/broadcast routes for manual triggering - orchestrator systemprompt: standup + broadcast instructions with examples - tasks.html: badges for standup / broadcast / action_knowledge task types --- agents/orchestrator/systemprompt.md | 71 ++++++- app.py | 290 ++++++++++++++++++++++++++++ templates/tasks.html | 6 + 3 files changed, 366 insertions(+), 1 deletion(-) diff --git a/agents/orchestrator/systemprompt.md b/agents/orchestrator/systemprompt.md index 9add563..d044d0e 100644 --- a/agents/orchestrator/systemprompt.md +++ b/agents/orchestrator/systemprompt.md @@ -73,10 +73,79 @@ email: email@adresse.com ``` +**Wissensdatenbank aktualisieren:** +``` + +topic: Eventstart +content: Das Event startet um 18:00 Uhr. Einlass ab 17:30 Uhr. + +``` + +**Reminder eines Agenten aktualisieren:** +``` + +agent: catering_manager +reminder: WICHTIG: Eventstart wurde auf 18:00 geändert. Catering-Aufbau muss bis 17:00 fertig sein. + +``` + +## Tägliches Standup + +Jeden Morgen um 09:00 Uhr wirst du automatisch aktiviert, um: +1. Alle Team-Members nach ihren täglichen Updates zu fragen +2. Neue Informationen in die Wissensdatenbank einzupflegen +3. Jeden Agenten über relevante Änderungen zu informieren + +Wenn du für ein Standup aktiviert wirst: +- Prüfe aktuelle Tasks und Erinnerungen auf neue Informationen +- Aktualisiere `` für jede wichtige Änderung +- Schreibe für jeden Agenten einen `` mit relevanten Updates +- Delegiere mit `` an jeden Agenten, damit er seinen Bereich prüft + +## Wissens-Broadcasts + +Wenn jemand eine neue wichtige Information mitteilt (z.B. "das Event startet um 18:00 statt 19:00"): +1. Erkenne, dass dies ein Update ist das alle betrifft +2. Aktualisiere sofort die Wissensdatenbank: `` +3. Informiere **jeden** relevanten Agenten per `` +4. Delegiere an betroffene Agenten Sub-Tasks zur Überprüfung ihrer Bereiche: `` +5. Bestätige Piotr (telegram_id: 1578034974) dass alles verteilt wurde + +**Beispiel — jemand sagt "das Event startet um 18:00 nicht 19:00":** + +``` + +topic: Eventstart +content: Das Event startet um 18:00 Uhr (geändert). Einlass ab 17:30 Uhr. + + + +agent: catering_manager +reminder: WICHTIG: Eventstart wurde auf 18:00 geändert (war 19:00). Catering-Aufbau bis 17:00 abschließen. + + + +agent: location_manager +reminder: WICHTIG: Eventstart 18:00 Uhr (geändert). Venue-Öffnung und Setup entsprechend anpassen. + + + +title: Eventstart-Änderung prüfen: 18:00 statt 19:00 +agent: program_manager +details: Der Eventstart wurde auf 18:00 geändert. Bitte überprüfe den Programmablauf und passe alle Zeitangaben an. + + + +telegram_id: 1578034974 +message: ✅ Update verteilt: Eventstart 18:00 Uhr. Wissensdatenbank aktualisiert, alle Agenten informiert. + +``` + ## Verhalten bei Nachrichten 1. Antworte freundlich und direkt 2. Wenn eine Aufgabe dabei ist → sofort `` anlegen 3. Wenn Email/Telegram gesendet werden soll → `` / `` direkt ausführen 4. Wenn Team-Daten zu aktualisieren → `` direkt ausführen -5. Bestätige am Ende was du getan hast +5. Wenn neue wichtige Information → `` + `` für betroffene Agenten +6. Bestätige am Ende was du getan hast diff --git a/app.py b/app.py index 402e344..2f2722d 100644 --- a/app.py +++ b/app.py @@ -1176,6 +1176,64 @@ def parse_agent_commands(agent_key, response_text, task_id=None): update_task_db(action_id, status='completed' if success else 'error', response=status_msg) logger.info(f"[AgentCmd] {status_msg}") + # ── UPDATE_KNOWLEDGE ───────────────────────────────────────────────────── + # Agenten können damit einen neuen Abschnitt in der Wissensdatenbank anlegen/aktualisieren + for block in re.findall(r'(.*?)', response_text, re.DOTALL | re.IGNORECASE): + topic = get_field(block, 'topic') + content = get_field(block, 'content') + if not topic or not content: + logger.warning("[AgentCmd] ohne topic/content ignoriert") + continue + kb_file = os.path.join(os.path.dirname(__file__), 'agents', 'orchestrator', 'knowledge', 'diversityball_knowledge.md') + os.makedirs(os.path.dirname(kb_file), exist_ok=True) + # Existierenden Abschnitt ersetzen oder neuen anhängen + section_header = f"## {topic}" + new_section = f"{section_header}\n\n{content.strip()}\n" + if os.path.exists(kb_file): + with open(kb_file, 'r', encoding='utf-8') as f: + kb_text = f.read() + # Ersetze bestehenden Abschnitt (## Topic ... bis zum nächsten ##) + pattern = rf'(^## {re.escape(topic)}\s*\n)(.*?)(?=\n## |\Z)' + if re.search(pattern, kb_text, re.MULTILINE | re.DOTALL): + kb_text = re.sub(pattern, new_section, kb_text, flags=re.MULTILINE | re.DOTALL) + else: + kb_text = kb_text.rstrip() + f"\n\n{new_section}" + else: + kb_text = f"# Diversity Ball Wien 2026 — Wissensdatenbank\n\n{new_section}" + with open(kb_file, 'w', encoding='utf-8') as f: + f.write(kb_text) + action_id = create_task( + title=f"Wissen aktualisiert: {topic}", + description=f"**Topic:** {topic}\n\n{content[:200]}{'...' if len(content) > 200 else ''}", + agent_key=agent_key, task_type='action_knowledge', created_by=agent_key, parent_task_id=task_id, + ) + update_task_db(action_id, status='completed', response=f"✓ Wissensdatenbank aktualisiert: {topic}") + logger.info(f"[AgentCmd] Wissensdatenbank aktualisiert: {topic}") + + # ── UPDATE_AGENT_REMINDER ──────────────────────────────────────────────── + # Orchestrator kann damit die reminders.md eines beliebigen Agenten aktualisieren + for block in re.findall(r'(.*?)', response_text, re.DOTALL | re.IGNORECASE): + target_agent = get_field(block, 'agent') + reminder = get_field(block, 'reminder') + if not target_agent or not reminder: + logger.warning("[AgentCmd] ohne agent/reminder ignoriert") + continue + reminder_file = os.path.join(os.path.dirname(__file__), 'agents', target_agent, 'reminders.md') + if not os.path.exists(os.path.dirname(reminder_file)): + logger.warning(f"[AgentCmd] Agent-Verzeichnis nicht gefunden: {target_agent}") + continue + timestamp = datetime.now().strftime('%d.%m.%Y %H:%M') + entry = f"\n## Update {timestamp}\n\n{reminder.strip()}\n" + with open(reminder_file, 'a', encoding='utf-8') as f: + f.write(entry) + action_id = create_task( + title=f"Reminder aktualisiert: {target_agent}", + description=f"**Agent:** {target_agent}\n\n{reminder[:200]}{'...' if len(reminder) > 200 else ''}", + agent_key=agent_key, task_type='action_knowledge', created_by=agent_key, parent_task_id=task_id, + ) + update_task_db(action_id, status='completed', response=f"✓ reminders.md aktualisiert für {target_agent}") + logger.info(f"[AgentCmd] reminders.md aktualisiert für {target_agent}") + def create_new_agent(agent_key, role, skills): """Erstellt dynamisch einen neuen Agenten.""" agent_dir = os.path.join(AGENTS_BASE_DIR, agent_key) @@ -2286,10 +2344,218 @@ def start_orchestrator_beat(): logger.info("[OrchestratorBeat] Daemon-Thread gestartet.") +# ── DAILY STANDUP ───────────────────────────────────────────────────────────── + +def trigger_daily_standup(): + """ + Tägliches Standup: Orchestrator fragt alle Team-Members nach Updates + und delegiert anschließend Wissensupdates an alle Agenten. + """ + logger.info("[DailyStandup] Starte tägliches Standup...") + + # Team-Members aus DB holen + try: + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.row_factory = sqlite3.Row + members = con.execute("SELECT name, email, role, telegram_id FROM team_members").fetchall() + con.close() + except Exception as e: + logger.error("[DailyStandup] Fehler beim Laden der Team-Members: %s", e) + members = [] + + today = datetime.now().strftime('%d.%m.%Y') + + # ── 1. Orchestrator-Haupttask: plant den Standup ────────────────────────── + standup_task_id = create_task( + title=f"Daily Standup {today}", + description=( + f"Täglicher Status-Check vom {today}.\n\n" + "Der Orchestrator koordiniert:\n" + "1. Alle Team-Members werden nach Updates gefragt (Telegram + Email)\n" + "2. Eingegangene Updates werden an alle Agenten weitergegeben\n" + "3. Wissensdatenbank wird bei Bedarf aktualisiert" + ), + agent_key='orchestrator', + task_type='standup', + created_by='system', + ) + + # ── 2. Pro Team-Member einen Sub-Task: frag nach Updates ───────────────── + for m in members: + name = m['name'] + email = m['email'] + telegram_id = m['telegram_id'] + role = m['role'] or 'Team-Member' + + msg = ( + f"Guten Morgen {name}! 👋\n\n" + f"Täglicher Status-Check ({today}):\n\n" + f"Bitte teile uns mit:\n" + f"• Was hat sich seit gestern in deinem Bereich geändert?\n" + f"• Gibt es neue Informationen, Termine oder Entscheidungen?\n" + f"• Benötigst du Unterstützung von anderen?\n\n" + f"Du kannst direkt hier per Telegram antworten oder eine Email senden." + ) + + # Telegram bevorzugen, Fallback auf Email + if telegram_id: + create_task( + title=f"Standup-Frage an {name} (Telegram)", + description=f"Telegram-ID: {telegram_id}\nNachricht: {msg}", + agent_key='orchestrator', + task_type='action_telegram', + created_by='system', + parent_task_id=standup_task_id, + ) + send_telegram_message(telegram_id, msg) + logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s)", name, telegram_id) + elif email: + subject = f"Daily Standup {today} — Was gibt's Neues?" + create_task( + title=f"Standup-Frage an {name} (Email)", + description=f"An: {email}\nBetreff: {subject}\n\n{msg}", + agent_key='orchestrator', + task_type='action_email', + created_by='system', + parent_task_id=standup_task_id, + ) + send_email(email, subject, msg, triggered_by='system:standup', task_id=standup_task_id) + logger.info("[DailyStandup] Email-Standup gesendet an %s (%s)", name, email) + + # ── 3. Orchestrator-Prompt: sammle & verteile Wissen ───────────────────── + agent_list = ', '.join(k for k in AGENTS.keys() if k != 'orchestrator') + orchestrator_prompt = f"""## Daily Standup — {today} + +Du hast soeben alle Team-Members nach ihren täglichen Updates gefragt. + +**Deine Aufgabe jetzt:** + +1. Prüfe ob es aktuelle Informationen oder Änderungen gibt (aus deiner Erinnerung, aus Tasks der letzten 24h, oder aus eingegangenen Nachrichten). + +2. Falls es wichtige Updates gibt (z.B. Terminänderungen, neue Entscheidungen, Budget-Anpassungen): + - Aktualisiere die Wissensdatenbank mit `` + - Delegiere an **jeden** der folgenden Agenten einen Sub-Task damit sie ihre reminders.md aktualisieren: + {agent_list} + +3. Falls keine konkreten Updates vorliegen: schreibe eine kurze Zusammenfassung des aktuellen Status und schicke sie per Telegram an Piotr (telegram_id: 1578034974). + +**Beispiel für Wissens-Update:** +``` + +topic: Eventstart +content: Das Event startet um 18:00 Uhr (Stand {today}). Einlass ab 17:30 Uhr. + +``` + +**Beispiel für Agent-Reminder:** +``` + +agent: catering_manager +reminder: WICHTIG ({today}): Eventstart wurde auf 18:00 geändert. Catering-Aufbau muss spätestens um 17:00 abgeschlossen sein. + +``` + +**Beispiel für Info-Delegation:** +``` + +title: Wissensupdate: Eventstart 18:00 Uhr +agent: budget_manager +details: Bitte aktualisiere deinen Wissensstand: Das Event startet am Diversity Ball Wien 2026 um 18:00 Uhr (nicht 19:00). Prüfe ob sich dadurch Änderungen für deinen Bereich ergeben. + +``` + +Führe alle notwendigen Aktionen aus und bestätige am Ende was du getan hast. +""" + + response = execute_agent_task('orchestrator', orchestrator_prompt) + if response: + parse_agent_commands('orchestrator', response, task_id=standup_task_id) + update_task_db(standup_task_id, status='completed', response=response[:500]) + logger.info("[DailyStandup] Orchestrator-Standup abgeschlossen.") + else: + update_task_db(standup_task_id, status='error', response='Keine Antwort vom Orchestrator') + + +def broadcast_knowledge_update(info: str, source: str = 'manual'): + """ + Verteilt eine neue Information an alle Agenten: + - Wissensdatenbank aktualisieren (via Orchestrator) + - Jeden Agenten per Sub-Task informieren + - Piotr per Telegram bestätigen + """ + logger.info("[Broadcast] Starte Knowledge-Broadcast: %s", info[:80]) + today = datetime.now().strftime('%d.%m.%Y %H:%M') + + broadcast_task_id = create_task( + title=f"Wissens-Broadcast: {info[:60]}", + description=f"Neue Information vom {today}:\n\n{info}\n\nQuelle: {source}", + agent_key='orchestrator', + task_type='broadcast', + created_by='system', + ) + + agent_list = ', '.join(k for k in AGENTS.keys() if k != 'orchestrator') + prompt = f"""## Wissens-Broadcast — {today} + +Eine neue wichtige Information wurde eingegeben und muss an das gesamte Team verteilt werden: + +**Neue Information:** +{info} + +**Deine Aufgaben:** + +1. Aktualisiere die Wissensdatenbank mit dem passenden Topic. + +2. Aktualisiere die reminders.md für **jeden** dieser Agenten: + {agent_list} + +3. Lege für jeden Agenten einen Sub-Task an, damit er die neue Information in seinem Fachbereich berücksichtigt. + +4. Sende Piotr (telegram_id: 1578034974) eine Bestätigung dass die Information verteilt wurde. + +Führe alle Aktionen jetzt aus. +""" + + response = execute_agent_task('orchestrator', prompt) + if response: + parse_agent_commands('orchestrator', response, task_id=broadcast_task_id) + update_task_db(broadcast_task_id, status='completed', response=response[:500]) + logger.info("[Broadcast] Knowledge-Broadcast abgeschlossen.") + else: + update_task_db(broadcast_task_id, status='error', response='Keine Antwort') + + +def daily_standup_beat(): + """Hintergrund-Thread: Führt täglich um 09:00 das Standup aus.""" + logger.info("[DailyStandup] Hintergrund-Thread gestartet.") + # Beim Start: nächste 09:00 berechnen + while True: + try: + now = datetime.now() + target = now.replace(hour=9, minute=0, second=0, microsecond=0) + if now >= target: + target += timedelta(days=1) + sleep_secs = (target - now).total_seconds() + logger.info("[DailyStandup] Nächstes Standup um %s (in %.0f Minuten)", target.strftime('%d.%m.%Y %H:%M'), sleep_secs / 60) + time.sleep(sleep_secs) + trigger_daily_standup() + except Exception as e: + logger.error("[DailyStandup] Fehler: %s", e) + time.sleep(60) + + +def start_daily_standup(): + """Startet den Daily-Standup-Thread als Daemon.""" + t = threading.Thread(target=daily_standup_beat, name='DailyStandup', daemon=True) + t.start() + logger.info("[DailyStandup] Daemon-Thread gestartet.") + + # Poller beim App-Start starten start_email_poller() start_task_beat() start_orchestrator_beat() +start_daily_standup() @app.route('/login', methods=['GET', 'POST']) @@ -3471,6 +3737,30 @@ def distribute_tasks(): }) +@app.route('/api/standup/trigger', methods=['POST']) +@login_required +def api_trigger_standup(): + """Löst das Daily Standup manuell aus (für Tests oder on-demand).""" + def run(): + trigger_daily_standup() + threading.Thread(target=run, daemon=True).start() + return jsonify({'success': True, 'message': 'Daily Standup wurde gestartet.'}) + + +@app.route('/api/broadcast', methods=['POST']) +@login_required +def api_broadcast(): + """Verteilt eine neue Information sofort an alle Agenten.""" + data = request.get_json() + info = data.get('info', '').strip() + if not info: + return jsonify({'error': 'Kein info-Text übergeben'}), 400 + def run(): + broadcast_knowledge_update(info, source='manual_broadcast') + threading.Thread(target=run, daemon=True).start() + return jsonify({'success': True, 'message': f'Broadcast gestartet: {info[:80]}'}) + + @app.route('/api/webhook/deploy', methods=['POST']) def webhook_deploy(): """Gitea Webhook: git pull + restart service on push to main.""" diff --git a/templates/tasks.html b/templates/tasks.html index 6bdc710..8b50dea 100644 --- a/templates/tasks.html +++ b/templates/tasks.html @@ -50,6 +50,12 @@ Agent {% elif task.type == 'agent_question' %} ❓ Frage + {% elif task.type == 'standup' %} + ☀ Standup + {% elif task.type == 'broadcast' %} + 📡 Broadcast + {% elif task.type == 'action_knowledge' %} + 🧠 Wissen {% endif %} {% if task.parent_task_id %} ↳ #{{ task.parent_task_id }} From 01858b5fc6bf6ba51da4a14f46471bb4d60db8a8 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 19:53:11 +0000 Subject: [PATCH 4/7] Fix TaskBeat: add agent_subtask/standup/broadcast to processing whitelist --- app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app.py b/app.py index 2f2722d..826d54f 100644 --- a/app.py +++ b/app.py @@ -2049,7 +2049,7 @@ def process_beat_tasks(): # Konvertiere zu Dict-Format für Legacy-Kompatibilität pending_tasks = [] for db_task in db_tasks: - if db_task.get('type') in ('agent_created', 'manual', 'orchestrated', 'agent_delegated', 'telegram'): + if db_task.get('type') in ('agent_created', 'manual', 'orchestrated', 'agent_delegated', 'telegram', 'agent_subtask', 'standup', 'broadcast'): pending_tasks.append(db_task) for task in pending_tasks: From 8780c2b176a70ebf2da8b761c100c6c9079f9706 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 20:22:39 +0000 Subject: [PATCH 5/7] Fix task ordering (FIFO) + Telegram standup reply matching - get_tasks(): ORDER BY id ASC (oldest first) for TaskBeat FIFO processing - UI calls use order='desc' so display stays newest-first - trigger_daily_standup(): standup sub-tasks now stored as type='standup_reply' with telegram_chat_id set correctly so replies can be matched - telegram_message_handler: checks for open standup_reply task from same chat_id before creating new task - if found, marks standup complete and creates orchestrator follow-up task with full context to process the reply - tasks.html: badge for standup_reply type --- app.py | 109 ++++++++++++++++++++++++++++++++++--------- templates/tasks.html | 2 + 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/app.py b/app.py index 826d54f..ed9ad5c 100644 --- a/app.py +++ b/app.py @@ -712,16 +712,21 @@ chat_history = [] orchestrator_chat = [] # ── Task Database Functions ───────────────────────────────────────────────── -def get_tasks(status=None, limit=None): - """Lädt Tasks aus der Datenbank.""" +def get_tasks(status=None, limit=None, order='asc'): + """Lädt Tasks aus der Datenbank. + + order='asc' → älteste zuerst (Standard für TaskBeat: FIFO) + order='desc' → neueste zuerst (für UI-Anzeige) + """ con = sqlite3.connect(EMAIL_JOURNAL_DB) con.row_factory = sqlite3.Row + direction = 'ASC' if order == 'asc' else 'DESC' if status: - query = "SELECT * FROM tasks WHERE status = ? ORDER BY id DESC" + query = f"SELECT * FROM tasks WHERE status = ? ORDER BY id {direction}" params = (status,) else: - query = "SELECT * FROM tasks ORDER BY id DESC" + query = f"SELECT * FROM tasks ORDER BY id {direction}" params = () if limit: @@ -1596,30 +1601,84 @@ async def telegram_message_handler(update: Update, context: ContextTypes.DEFAULT user_id = update.effective_user.id username = update.effective_user.username or update.effective_user.first_name message_text = update.message.text - + chat_id = update.effective_chat.id + # Whitelist-Check if TELEGRAM_CONFIG['allowed_users'] and user_id not in TELEGRAM_CONFIG['allowed_users']: await update.message.reply_text("⛔ Zugriff verweigert. Verwende /start für Details.") return - - # Task erstellen + + # ── Prüfe ob diese Nachricht eine Antwort auf einen offenen standup_reply-Task ist ── + try: + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.row_factory = sqlite3.Row + open_standup = con.execute( + "SELECT id, parent_task_id, title FROM tasks " + "WHERE type='standup_reply' AND status='pending' AND telegram_chat_id=? " + "ORDER BY id ASC LIMIT 1", + (chat_id,) + ).fetchone() + con.close() + except Exception as e: + logging.error(f"[Telegram] DB-Fehler beim Standup-Check: {e}") + open_standup = None + + if open_standup: + # Antwort auf offenen Standup-Task → mit Kontext an Orchestrator weiterleiten + standup_task_id = open_standup['id'] + parent_id = open_standup['parent_task_id'] + + # Orchestrator-Task erstellen: verarbeite die Standup-Antwort + orchestrator_prompt = ( + f"## Standup-Antwort von {username}\n\n" + f"**Antwort:** {message_text}\n\n" + f"**Aufgabe:**\n" + f"1. Verarbeite diese Standup-Antwort und extrahiere wichtige Informationen.\n" + f"2. Falls neue Informationen enthalten sind (Änderungen, Entscheidungen, Probleme): " + f"aktualisiere die Wissensdatenbank mit `` und informiere betroffene Agenten mit ``.\n" + f"3. Falls Handlungsbedarf besteht: erstelle entsprechende Tasks mit ``.\n" + f"4. Antworte {username} kurz per Telegram (telegram_id: {chat_id}) und bestätige den Empfang." + ) + followup_id = create_task( + title=f"Standup-Antwort verarbeiten: {username}", + description=orchestrator_prompt, + agent_key='orchestrator', + task_type='telegram', + created_by=f'telegram_user_{user_id}', + telegram_chat_id=chat_id, + telegram_user=username, + parent_task_id=parent_id, + ) + + # Standup-reply-Task als beantwortet markieren + update_task_db(standup_task_id, status='completed', + response=f"Antwort erhalten von {username}: {message_text[:200]}") + + await update.message.reply_text( + f"✅ Danke {username}! Deine Standup-Antwort wurde aufgenommen.\n\n" + f"📝 Der Orchestrator verarbeitet dein Update (Task #{followup_id})." + ) + logging.info(f"[Telegram] Standup-Antwort von {username} → Task #{standup_task_id} completed, Follow-up #{followup_id} erstellt") + return + + # ── Normale Nachricht → neuer Orchestrator-Task ────────────────────────── task_id = create_task( title=f"Telegram: {message_text[:50]}{'...' if len(message_text) > 50 else ''}", description=message_text, agent_key='orchestrator', task_type='telegram', created_by=f'telegram_user_{user_id}', - telegram_chat_id=update.effective_chat.id, + telegram_chat_id=chat_id, telegram_user=username ) - + await update.message.reply_text( f"✅ Task #{task_id} erstellt!\n\n" f"📝 {message_text[:100]}{'...' if len(message_text) > 100 else ''}\n\n" f"⏳ Der Orchestrator wird deine Anfrage verarbeiten.\n" f"Ich benachrichtige dich, sobald die Antwort bereit ist." ) - + logging.info(f"[Telegram] Task #{task_id} created by {username} (ID: {user_id})") @@ -2399,23 +2458,29 @@ def trigger_daily_standup(): # Telegram bevorzugen, Fallback auf Email if telegram_id: - create_task( - title=f"Standup-Frage an {name} (Telegram)", - description=f"Telegram-ID: {telegram_id}\nNachricht: {msg}", + standup_sub_id = create_task( + title=f"Standup-Antwort ausstehend: {name}", + description=( + f"Standup-Frage wurde per Telegram an {name} gesendet.\n" + f"Warte auf Antwort...\n\n" + f"Gesendete Frage:\n{msg}" + ), agent_key='orchestrator', - task_type='action_telegram', + task_type='standup_reply', created_by='system', parent_task_id=standup_task_id, + telegram_chat_id=int(telegram_id), + telegram_user=name, ) send_telegram_message(telegram_id, msg) - logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s)", name, telegram_id) + logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s), Task #%s wartet auf Antwort", name, telegram_id, standup_sub_id) elif email: subject = f"Daily Standup {today} — Was gibt's Neues?" create_task( - title=f"Standup-Frage an {name} (Email)", + title=f"Standup-Antwort ausstehend: {name} (Email)", description=f"An: {email}\nBetreff: {subject}\n\n{msg}", agent_key='orchestrator', - task_type='action_email', + task_type='standup_reply', created_by='system', parent_task_id=standup_task_id, ) @@ -2580,7 +2645,7 @@ def logout(): @login_required def index(): # Hole die 5 neuesten Tasks aus DB - all_tasks = get_tasks() + all_tasks = get_tasks(order='desc') recent_tasks = all_tasks[:5] if all_tasks else [] return render_template('index.html', agents=AGENTS, recent_tasks=recent_tasks) @@ -2713,8 +2778,8 @@ def task_list(): ) flash(f'Task #{task_id} erstellt!', 'success') - # Alle Tasks aus Datenbank holen – Neueste zuerst - all_tasks = get_tasks() + # Alle Tasks aus Datenbank holen – Neueste zuerst (für UI) + all_tasks = get_tasks(order='desc') return render_template('tasks.html', agents=AGENTS, tasks=all_tasks) @app.route('/tasks/') @@ -3577,8 +3642,8 @@ def api_tasks(): return jsonify({'success': True, 'task': new_task}) - # GET: Alle Tasks aus DB - task_list = get_tasks() + # GET: Alle Tasks aus DB (neueste zuerst für API) + task_list = get_tasks(order='desc') return jsonify({'tasks': task_list}) diff --git a/templates/tasks.html b/templates/tasks.html index 8b50dea..3c45742 100644 --- a/templates/tasks.html +++ b/templates/tasks.html @@ -52,6 +52,8 @@ ❓ Frage {% elif task.type == 'standup' %} ☀ Standup + {% elif task.type == 'standup_reply' %} + 💬 Standup-Antwort {% elif task.type == 'broadcast' %} 📡 Broadcast {% elif task.type == 'action_knowledge' %} From 0868a2c71fba6e36474ae46b759a7dc4b9800b73 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 20:28:22 +0000 Subject: [PATCH 6/7] Fix standup race condition: lock + daily guard + remove standup from TaskBeat whitelist - _standup_lock: threading.Lock prevents concurrent standup runs - DB check: if a standup task already exists for today, abort - TaskBeat whitelist: remove 'standup' type so TaskBeat never re-runs standup tasks (standup is always driven by trigger_daily_standup(), not TaskBeat) --- app.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/app.py b/app.py index ed9ad5c..4c14047 100644 --- a/app.py +++ b/app.py @@ -2108,7 +2108,7 @@ def process_beat_tasks(): # Konvertiere zu Dict-Format für Legacy-Kompatibilität pending_tasks = [] for db_task in db_tasks: - if db_task.get('type') in ('agent_created', 'manual', 'orchestrated', 'agent_delegated', 'telegram', 'agent_subtask', 'standup', 'broadcast'): + if db_task.get('type') in ('agent_created', 'manual', 'orchestrated', 'agent_delegated', 'telegram', 'agent_subtask', 'broadcast'): pending_tasks.append(db_task) for task in pending_tasks: @@ -2405,12 +2405,43 @@ def start_orchestrator_beat(): # ── DAILY STANDUP ───────────────────────────────────────────────────────────── +_standup_lock = threading.Lock() + def trigger_daily_standup(): """ Tägliches Standup: Orchestrator fragt alle Team-Members nach Updates und delegiert anschließend Wissensupdates an alle Agenten. + Gegen Doppel-Trigger gesichert: nur ein Standup pro Tag möglich. """ - logger.info("[DailyStandup] Starte tägliches Standup...") + # Nur einen gleichzeitigen Standup erlauben + if not _standup_lock.acquire(blocking=False): + logger.warning("[DailyStandup] Bereits aktiv — zweiter Trigger ignoriert.") + return + + try: + # Prüfe ob heute schon ein Standup läuft oder abgeschlossen ist + today_str = datetime.now().strftime('%Y-%m-%d') + try: + con = sqlite3.connect(EMAIL_JOURNAL_DB) + existing = con.execute( + "SELECT id FROM tasks WHERE type='standup' AND created_at LIKE ? AND status != 'error'", + (f"{today_str}%",) + ).fetchone() + con.close() + except Exception: + existing = None + + if existing: + logger.warning("[DailyStandup] Standup für heute (Task #%s) bereits vorhanden — abgebrochen.", existing['id'] if existing else '?') + return + + logger.info("[DailyStandup] Starte tägliches Standup...") + _trigger_daily_standup_inner() + finally: + _standup_lock.release() + + +def _trigger_daily_standup_inner(): # Team-Members aus DB holen try: From c82ecdd5f1be377ca4e4de9d5f31b08bd4998c49 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 20:39:35 +0000 Subject: [PATCH 7/7] Fix duplicate Telegram: orchestrator prompt must not send messages during standup (team already contacted directly) --- app.py | 39 ++++++++------------------------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/app.py b/app.py index 4c14047..75afff8 100644 --- a/app.py +++ b/app.py @@ -2522,43 +2522,20 @@ def _trigger_daily_standup_inner(): agent_list = ', '.join(k for k in AGENTS.keys() if k != 'orchestrator') orchestrator_prompt = f"""## Daily Standup — {today} -Du hast soeben alle Team-Members nach ihren täglichen Updates gefragt. +Die Standup-Fragen wurden soeben per Telegram/Email an alle Team-Members versendet und warten auf Antwort. -**Deine Aufgabe jetzt:** +**Deine Aufgabe jetzt — nur Wissenspflege, KEIN erneutes Kontaktieren der Team-Members:** -1. Prüfe ob es aktuelle Informationen oder Änderungen gibt (aus deiner Erinnerung, aus Tasks der letzten 24h, oder aus eingegangenen Nachrichten). +1. Prüfe ob es seit dem letzten Standup wichtige Informationen oder Änderungen gibt (aus deiner Erinnerung oder den letzten Tasks). -2. Falls es wichtige Updates gibt (z.B. Terminänderungen, neue Entscheidungen, Budget-Anpassungen): +2. Falls es wichtige Updates gibt (z.B. Terminänderungen, Entscheidungen, Budget-Anpassungen): - Aktualisiere die Wissensdatenbank mit `` - - Delegiere an **jeden** der folgenden Agenten einen Sub-Task damit sie ihre reminders.md aktualisieren: - {agent_list} + - Schreibe für jeden betroffenen Agenten einen `` + - Delegiere Sub-Tasks an die Agenten: {agent_list} -3. Falls keine konkreten Updates vorliegen: schreibe eine kurze Zusammenfassung des aktuellen Status und schicke sie per Telegram an Piotr (telegram_id: 1578034974). +3. Falls keine Änderungen vorliegen: antworte kurz mit einer Zusammenfassung was du geprüft hast. Sende dabei **keine** Telegram-Nachrichten — die Team-Members wurden bereits kontaktiert. -**Beispiel für Wissens-Update:** -``` - -topic: Eventstart -content: Das Event startet um 18:00 Uhr (Stand {today}). Einlass ab 17:30 Uhr. - -``` - -**Beispiel für Agent-Reminder:** -``` - -agent: catering_manager -reminder: WICHTIG ({today}): Eventstart wurde auf 18:00 geändert. Catering-Aufbau muss spätestens um 17:00 abgeschlossen sein. - -``` - -**Beispiel für Info-Delegation:** -``` - -title: Wissensupdate: Eventstart 18:00 Uhr -agent: budget_manager -details: Bitte aktualisiere deinen Wissensstand: Das Event startet am Diversity Ball Wien 2026 um 18:00 Uhr (nicht 19:00). Prüfe ob sich dadurch Änderungen für deinen Bereich ergeben. - -``` +**Wichtig:** Sende KEINE weiteren Nachrichten an Team-Members. Die Standup-Fragen laufen bereits. Führe alle notwendigen Aktionen aus und bestätige am Ende was du getan hast. """