From 8780c2b176a70ebf2da8b761c100c6c9079f9706 Mon Sep 17 00:00:00 2001 From: eric Date: Sat, 21 Feb 2026 20:22:39 +0000 Subject: [PATCH] Fix task ordering (FIFO) + Telegram standup reply matching - get_tasks(): ORDER BY id ASC (oldest first) for TaskBeat FIFO processing - UI calls use order='desc' so display stays newest-first - trigger_daily_standup(): standup sub-tasks now stored as type='standup_reply' with telegram_chat_id set correctly so replies can be matched - telegram_message_handler: checks for open standup_reply task from same chat_id before creating new task - if found, marks standup complete and creates orchestrator follow-up task with full context to process the reply - tasks.html: badge for standup_reply type --- app.py | 109 ++++++++++++++++++++++++++++++++++--------- templates/tasks.html | 2 + 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/app.py b/app.py index 826d54f..ed9ad5c 100644 --- a/app.py +++ b/app.py @@ -712,16 +712,21 @@ chat_history = [] orchestrator_chat = [] # ── Task Database Functions ───────────────────────────────────────────────── -def get_tasks(status=None, limit=None): - """Lädt Tasks aus der Datenbank.""" +def get_tasks(status=None, limit=None, order='asc'): + """Lädt Tasks aus der Datenbank. + + order='asc' → älteste zuerst (Standard für TaskBeat: FIFO) + order='desc' → neueste zuerst (für UI-Anzeige) + """ con = sqlite3.connect(EMAIL_JOURNAL_DB) con.row_factory = sqlite3.Row + direction = 'ASC' if order == 'asc' else 'DESC' if status: - query = "SELECT * FROM tasks WHERE status = ? ORDER BY id DESC" + query = f"SELECT * FROM tasks WHERE status = ? ORDER BY id {direction}" params = (status,) else: - query = "SELECT * FROM tasks ORDER BY id DESC" + query = f"SELECT * FROM tasks ORDER BY id {direction}" params = () if limit: @@ -1596,30 +1601,84 @@ async def telegram_message_handler(update: Update, context: ContextTypes.DEFAULT user_id = update.effective_user.id username = update.effective_user.username or update.effective_user.first_name message_text = update.message.text - + chat_id = update.effective_chat.id + # Whitelist-Check if TELEGRAM_CONFIG['allowed_users'] and user_id not in TELEGRAM_CONFIG['allowed_users']: await update.message.reply_text("⛔ Zugriff verweigert. Verwende /start für Details.") return - - # Task erstellen + + # ── Prüfe ob diese Nachricht eine Antwort auf einen offenen standup_reply-Task ist ── + try: + con = sqlite3.connect(EMAIL_JOURNAL_DB) + con.row_factory = sqlite3.Row + open_standup = con.execute( + "SELECT id, parent_task_id, title FROM tasks " + "WHERE type='standup_reply' AND status='pending' AND telegram_chat_id=? " + "ORDER BY id ASC LIMIT 1", + (chat_id,) + ).fetchone() + con.close() + except Exception as e: + logging.error(f"[Telegram] DB-Fehler beim Standup-Check: {e}") + open_standup = None + + if open_standup: + # Antwort auf offenen Standup-Task → mit Kontext an Orchestrator weiterleiten + standup_task_id = open_standup['id'] + parent_id = open_standup['parent_task_id'] + + # Orchestrator-Task erstellen: verarbeite die Standup-Antwort + orchestrator_prompt = ( + f"## Standup-Antwort von {username}\n\n" + f"**Antwort:** {message_text}\n\n" + f"**Aufgabe:**\n" + f"1. Verarbeite diese Standup-Antwort und extrahiere wichtige Informationen.\n" + f"2. Falls neue Informationen enthalten sind (Änderungen, Entscheidungen, Probleme): " + f"aktualisiere die Wissensdatenbank mit `` und informiere betroffene Agenten mit ``.\n" + f"3. Falls Handlungsbedarf besteht: erstelle entsprechende Tasks mit ``.\n" + f"4. Antworte {username} kurz per Telegram (telegram_id: {chat_id}) und bestätige den Empfang." + ) + followup_id = create_task( + title=f"Standup-Antwort verarbeiten: {username}", + description=orchestrator_prompt, + agent_key='orchestrator', + task_type='telegram', + created_by=f'telegram_user_{user_id}', + telegram_chat_id=chat_id, + telegram_user=username, + parent_task_id=parent_id, + ) + + # Standup-reply-Task als beantwortet markieren + update_task_db(standup_task_id, status='completed', + response=f"Antwort erhalten von {username}: {message_text[:200]}") + + await update.message.reply_text( + f"✅ Danke {username}! Deine Standup-Antwort wurde aufgenommen.\n\n" + f"📝 Der Orchestrator verarbeitet dein Update (Task #{followup_id})." + ) + logging.info(f"[Telegram] Standup-Antwort von {username} → Task #{standup_task_id} completed, Follow-up #{followup_id} erstellt") + return + + # ── Normale Nachricht → neuer Orchestrator-Task ────────────────────────── task_id = create_task( title=f"Telegram: {message_text[:50]}{'...' if len(message_text) > 50 else ''}", description=message_text, agent_key='orchestrator', task_type='telegram', created_by=f'telegram_user_{user_id}', - telegram_chat_id=update.effective_chat.id, + telegram_chat_id=chat_id, telegram_user=username ) - + await update.message.reply_text( f"✅ Task #{task_id} erstellt!\n\n" f"📝 {message_text[:100]}{'...' if len(message_text) > 100 else ''}\n\n" f"⏳ Der Orchestrator wird deine Anfrage verarbeiten.\n" f"Ich benachrichtige dich, sobald die Antwort bereit ist." ) - + logging.info(f"[Telegram] Task #{task_id} created by {username} (ID: {user_id})") @@ -2399,23 +2458,29 @@ def trigger_daily_standup(): # Telegram bevorzugen, Fallback auf Email if telegram_id: - create_task( - title=f"Standup-Frage an {name} (Telegram)", - description=f"Telegram-ID: {telegram_id}\nNachricht: {msg}", + standup_sub_id = create_task( + title=f"Standup-Antwort ausstehend: {name}", + description=( + f"Standup-Frage wurde per Telegram an {name} gesendet.\n" + f"Warte auf Antwort...\n\n" + f"Gesendete Frage:\n{msg}" + ), agent_key='orchestrator', - task_type='action_telegram', + task_type='standup_reply', created_by='system', parent_task_id=standup_task_id, + telegram_chat_id=int(telegram_id), + telegram_user=name, ) send_telegram_message(telegram_id, msg) - logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s)", name, telegram_id) + logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s), Task #%s wartet auf Antwort", name, telegram_id, standup_sub_id) elif email: subject = f"Daily Standup {today} — Was gibt's Neues?" create_task( - title=f"Standup-Frage an {name} (Email)", + title=f"Standup-Antwort ausstehend: {name} (Email)", description=f"An: {email}\nBetreff: {subject}\n\n{msg}", agent_key='orchestrator', - task_type='action_email', + task_type='standup_reply', created_by='system', parent_task_id=standup_task_id, ) @@ -2580,7 +2645,7 @@ def logout(): @login_required def index(): # Hole die 5 neuesten Tasks aus DB - all_tasks = get_tasks() + all_tasks = get_tasks(order='desc') recent_tasks = all_tasks[:5] if all_tasks else [] return render_template('index.html', agents=AGENTS, recent_tasks=recent_tasks) @@ -2713,8 +2778,8 @@ def task_list(): ) flash(f'Task #{task_id} erstellt!', 'success') - # Alle Tasks aus Datenbank holen – Neueste zuerst - all_tasks = get_tasks() + # Alle Tasks aus Datenbank holen – Neueste zuerst (für UI) + all_tasks = get_tasks(order='desc') return render_template('tasks.html', agents=AGENTS, tasks=all_tasks) @app.route('/tasks/') @@ -3577,8 +3642,8 @@ def api_tasks(): return jsonify({'success': True, 'task': new_task}) - # GET: Alle Tasks aus DB - task_list = get_tasks() + # GET: Alle Tasks aus DB (neueste zuerst für API) + task_list = get_tasks(order='desc') return jsonify({'tasks': task_list}) diff --git a/templates/tasks.html b/templates/tasks.html index 8b50dea..3c45742 100644 --- a/templates/tasks.html +++ b/templates/tasks.html @@ -52,6 +52,8 @@ ❓ Frage {% elif task.type == 'standup' %} ☀ Standup + {% elif task.type == 'standup_reply' %} + 💬 Standup-Antwort {% elif task.type == 'broadcast' %} 📡 Broadcast {% elif task.type == 'action_knowledge' %}