Fix task ordering (FIFO) + Telegram standup reply matching

- get_tasks(): ORDER BY id ASC (oldest first) for TaskBeat FIFO processing
- UI calls use order='desc' so display stays newest-first
- trigger_daily_standup(): standup sub-tasks now stored as type='standup_reply'
  with telegram_chat_id set correctly so replies can be matched
- telegram_message_handler: checks for open standup_reply task from same
  chat_id before creating new task - if found, marks standup complete and
  creates orchestrator follow-up task with full context to process the reply
- tasks.html: badge for standup_reply type
This commit is contained in:
eric 2026-02-21 20:22:39 +00:00
parent 01858b5fc6
commit 8780c2b176
2 changed files with 89 additions and 22 deletions

109
app.py
View file

@ -712,16 +712,21 @@ chat_history = []
orchestrator_chat = []
# ── Task Database Functions ─────────────────────────────────────────────────
def get_tasks(status=None, limit=None):
"""Lädt Tasks aus der Datenbank."""
def get_tasks(status=None, limit=None, order='asc'):
"""Lädt Tasks aus der Datenbank.
order='asc' älteste zuerst (Standard für TaskBeat: FIFO)
order='desc' neueste zuerst (für UI-Anzeige)
"""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
direction = 'ASC' if order == 'asc' else 'DESC'
if status:
query = "SELECT * FROM tasks WHERE status = ? ORDER BY id DESC"
query = f"SELECT * FROM tasks WHERE status = ? ORDER BY id {direction}"
params = (status,)
else:
query = "SELECT * FROM tasks ORDER BY id DESC"
query = f"SELECT * FROM tasks ORDER BY id {direction}"
params = ()
if limit:
@ -1596,30 +1601,84 @@ async def telegram_message_handler(update: Update, context: ContextTypes.DEFAULT
user_id = update.effective_user.id
username = update.effective_user.username or update.effective_user.first_name
message_text = update.message.text
chat_id = update.effective_chat.id
# Whitelist-Check
if TELEGRAM_CONFIG['allowed_users'] and user_id not in TELEGRAM_CONFIG['allowed_users']:
await update.message.reply_text("⛔ Zugriff verweigert. Verwende /start für Details.")
return
# Task erstellen
# ── Prüfe ob diese Nachricht eine Antwort auf einen offenen standup_reply-Task ist ──
try:
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
open_standup = con.execute(
"SELECT id, parent_task_id, title FROM tasks "
"WHERE type='standup_reply' AND status='pending' AND telegram_chat_id=? "
"ORDER BY id ASC LIMIT 1",
(chat_id,)
).fetchone()
con.close()
except Exception as e:
logging.error(f"[Telegram] DB-Fehler beim Standup-Check: {e}")
open_standup = None
if open_standup:
# Antwort auf offenen Standup-Task → mit Kontext an Orchestrator weiterleiten
standup_task_id = open_standup['id']
parent_id = open_standup['parent_task_id']
# Orchestrator-Task erstellen: verarbeite die Standup-Antwort
orchestrator_prompt = (
f"## Standup-Antwort von {username}\n\n"
f"**Antwort:** {message_text}\n\n"
f"**Aufgabe:**\n"
f"1. Verarbeite diese Standup-Antwort und extrahiere wichtige Informationen.\n"
f"2. Falls neue Informationen enthalten sind (Änderungen, Entscheidungen, Probleme): "
f"aktualisiere die Wissensdatenbank mit `<update_knowledge>` und informiere betroffene Agenten mit `<update_agent_reminder>`.\n"
f"3. Falls Handlungsbedarf besteht: erstelle entsprechende Tasks mit `<create_task>`.\n"
f"4. Antworte {username} kurz per Telegram (telegram_id: {chat_id}) und bestätige den Empfang."
)
followup_id = create_task(
title=f"Standup-Antwort verarbeiten: {username}",
description=orchestrator_prompt,
agent_key='orchestrator',
task_type='telegram',
created_by=f'telegram_user_{user_id}',
telegram_chat_id=chat_id,
telegram_user=username,
parent_task_id=parent_id,
)
# Standup-reply-Task als beantwortet markieren
update_task_db(standup_task_id, status='completed',
response=f"Antwort erhalten von {username}: {message_text[:200]}")
await update.message.reply_text(
f"✅ Danke {username}! Deine Standup-Antwort wurde aufgenommen.\n\n"
f"📝 Der Orchestrator verarbeitet dein Update (Task #{followup_id})."
)
logging.info(f"[Telegram] Standup-Antwort von {username} → Task #{standup_task_id} completed, Follow-up #{followup_id} erstellt")
return
# ── Normale Nachricht → neuer Orchestrator-Task ──────────────────────────
task_id = create_task(
title=f"Telegram: {message_text[:50]}{'...' if len(message_text) > 50 else ''}",
description=message_text,
agent_key='orchestrator',
task_type='telegram',
created_by=f'telegram_user_{user_id}',
telegram_chat_id=update.effective_chat.id,
telegram_chat_id=chat_id,
telegram_user=username
)
await update.message.reply_text(
f"✅ Task #{task_id} erstellt!\n\n"
f"📝 {message_text[:100]}{'...' if len(message_text) > 100 else ''}\n\n"
f"⏳ Der Orchestrator wird deine Anfrage verarbeiten.\n"
f"Ich benachrichtige dich, sobald die Antwort bereit ist."
)
logging.info(f"[Telegram] Task #{task_id} created by {username} (ID: {user_id})")
@ -2399,23 +2458,29 @@ def trigger_daily_standup():
# Telegram bevorzugen, Fallback auf Email
if telegram_id:
create_task(
title=f"Standup-Frage an {name} (Telegram)",
description=f"Telegram-ID: {telegram_id}\nNachricht: {msg}",
standup_sub_id = create_task(
title=f"Standup-Antwort ausstehend: {name}",
description=(
f"Standup-Frage wurde per Telegram an {name} gesendet.\n"
f"Warte auf Antwort...\n\n"
f"Gesendete Frage:\n{msg}"
),
agent_key='orchestrator',
task_type='action_telegram',
task_type='standup_reply',
created_by='system',
parent_task_id=standup_task_id,
telegram_chat_id=int(telegram_id),
telegram_user=name,
)
send_telegram_message(telegram_id, msg)
logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s)", name, telegram_id)
logger.info("[DailyStandup] Telegram-Standup gesendet an %s (%s), Task #%s wartet auf Antwort", name, telegram_id, standup_sub_id)
elif email:
subject = f"Daily Standup {today} — Was gibt's Neues?"
create_task(
title=f"Standup-Frage an {name} (Email)",
title=f"Standup-Antwort ausstehend: {name} (Email)",
description=f"An: {email}\nBetreff: {subject}\n\n{msg}",
agent_key='orchestrator',
task_type='action_email',
task_type='standup_reply',
created_by='system',
parent_task_id=standup_task_id,
)
@ -2580,7 +2645,7 @@ def logout():
@login_required
def index():
# Hole die 5 neuesten Tasks aus DB
all_tasks = get_tasks()
all_tasks = get_tasks(order='desc')
recent_tasks = all_tasks[:5] if all_tasks else []
return render_template('index.html', agents=AGENTS, recent_tasks=recent_tasks)
@ -2713,8 +2778,8 @@ def task_list():
)
flash(f'Task #{task_id} erstellt!', 'success')
# Alle Tasks aus Datenbank holen Neueste zuerst
all_tasks = get_tasks()
# Alle Tasks aus Datenbank holen Neueste zuerst (für UI)
all_tasks = get_tasks(order='desc')
return render_template('tasks.html', agents=AGENTS, tasks=all_tasks)
@app.route('/tasks/<int:task_id>')
@ -3577,8 +3642,8 @@ def api_tasks():
return jsonify({'success': True, 'task': new_task})
# GET: Alle Tasks aus DB
task_list = get_tasks()
# GET: Alle Tasks aus DB (neueste zuerst für API)
task_list = get_tasks(order='desc')
return jsonify({'tasks': task_list})