feat: Add Telegram bot integration and task detail/delete UI

- Wire up Telegram bot with token, allowed users and username from .env
- Fix TaskBeat to handle direct tasks (Telegram/email) without sub_tasks
- Fix send_telegram_message to use run_coroutine_threadsafe (avoid event loop clash)
- Add TaskBeat watchdog thread for auto-restart on crash
- Reset stuck in_progress tasks on startup
- Add task detail page (/tasks/<id>) with full response/log view and auto-refresh
- Add task delete route (/tasks/delete/<id>) with confirmation
- Include agent sender info in Telegram task prompts
- Orchestrator self-updated knowledge base with Telegram contact info
This commit is contained in:
eric 2026-02-21 18:14:43 +00:00
parent 5b4b698064
commit 99df910497
4 changed files with 237 additions and 34 deletions

127
app.py
View file

@ -1555,15 +1555,24 @@ def send_telegram_message(chat_id: int, message: str):
return False
try:
# Async-Funktion in sync Context ausführen
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(telegram_app.bot.send_message(
chat_id=chat_id,
text=message
))
loop.close()
# In die laufende Event-Loop des Telegram-Threads senden (thread-safe)
loop = telegram_app.updater.application.updater._network_loop if hasattr(telegram_app, 'updater') else None
if loop is None:
# Fallback: Loop des Telegram-Threads direkt holen
loop = telegram_app.loop if hasattr(telegram_app, 'loop') else None
if loop and loop.is_running():
future = asyncio.run_coroutine_threadsafe(
telegram_app.bot.send_message(chat_id=chat_id, text=message),
loop
)
future.result(timeout=30)
else:
# Letzter Fallback: eigene Loop (kein Telegram-Bot läuft)
loop = asyncio.new_event_loop()
loop.run_until_complete(telegram_app.bot.send_message(chat_id=chat_id, text=message))
loop.close()
return True
except Exception as e:
logging.error(f"[Telegram] Error sending message: {e}")
@ -1604,7 +1613,7 @@ def run_telegram_bot():
try:
logging.info("[Telegram] Starting bot polling...")
telegram_app.run_polling(drop_pending_updates=True)
telegram_app.run_polling(drop_pending_updates=True, stop_signals=None)
except Exception as e:
logging.error(f"[Telegram] Bot polling error: {e}")
@ -1979,20 +1988,49 @@ def process_beat_tasks():
agent_key = task.get('agent_key') or task.get('assigned_agent', '')
if task.get('agent_key') == 'orchestrator':
# Update in DB
sub_tasks = task.get('sub_tasks', [])
available_agents = task.get('available_agents', list(AGENTS.keys()))
# Direkte Tasks (Telegram, Email, etc.) ohne sub_tasks → direkt ausführen
if not sub_tasks:
update_task_db(task['id'], status='in_progress')
task['status'] = 'in_progress'
logger.info("[TaskBeat] Direkte Ausführung für Task #%d (kein sub_tasks)", task['id'])
sender_info = ''
if task.get('type') == 'telegram':
sender_info = (
f"\n\n[Eingehende Telegram-Nachricht]\n"
f"Von: {task.get('telegram_user', 'Unbekannt')} "
f"(Telegram-ID: {task.get('telegram_chat_id', 'N/A')})\n"
f"Created by: {task.get('created_by', 'N/A')}\n"
)
response = execute_agent_task('orchestrator', task.get('title', '') + '\n\n' + task.get('description', '') + sender_info)
update_task_db(task['id'], status='completed', response=response)
task['status'] = 'completed'
task['response'] = response
logger.info("[TaskBeat] Task #%d abgeschlossen.", task['id'])
# Telegram-Antwort senden
if task.get('type') == 'telegram' and task.get('telegram_chat_id'):
try:
telegram_msg = (
f"✅ Task #{task['id']} abgeschlossen!\n\n"
f"📝 Anfrage: {task.get('title', 'N/A')}\n\n"
f"💬 Antwort:\n{response[:4000]}"
)
send_telegram_message(task['telegram_chat_id'], telegram_msg)
logger.info("[TaskBeat] Telegram-Antwort gesendet für Task #%d", task['id'])
except Exception as e:
logger.error("[TaskBeat] Fehler beim Senden der Telegram-Antwort: %s", str(e))
continue
# Planungsphase für Tasks mit sub_tasks
update_task_db(task['id'], status='in_progress')
task['status'] = 'in_progress'
logger.info("[TaskBeat] Planungsphase für Task #%d", task['id'])
sub_tasks = task.get('sub_tasks', [])
available_agents = task.get('available_agents', list(AGENTS.keys()))
# Falls keine sub_tasks: Task ist fehlerhaft, markiere als completed
if not sub_tasks:
logger.warning("[TaskBeat] Task #%d hat keine sub_tasks - als completed markiert", task['id'])
update_task_db(task['id'], status='completed', response='Fehler: Keine sub_tasks definiert. Dieser Task wurde wahrscheinlich über eine veraltete API erstellt.')
continue
prompt = f"""Du bist der Master-Orchestrator. Analysiere folgende Tasks und weise sie den richtigen Agenten zu:
Tasks:
@ -2113,10 +2151,31 @@ Arbeite diesen Teil ab und liefere ein vollständiges Ergebnis.""",
def start_task_beat():
"""Startet den Task-Beat als Daemon-Thread."""
beat_thread = threading.Thread(target=process_beat_tasks, name='TaskBeat', daemon=True)
beat_thread.start()
logger.info("[TaskBeat] Daemon-Thread gestartet.")
"""Startet den Task-Beat als Daemon-Thread mit Watchdog."""
# Stuck in_progress Tasks beim Start zurücksetzen
try:
con = sqlite3.connect(EMAIL_JOURNAL_DB)
stuck = con.execute("SELECT id FROM tasks WHERE status='in_progress'").fetchall()
if stuck:
con.execute("UPDATE tasks SET status='pending', completed_at=NULL WHERE status='in_progress'")
con.commit()
logger.warning("[TaskBeat] %d stuck in_progress Task(s) auf pending zurückgesetzt.", len(stuck))
con.close()
except Exception as e:
logger.error("[TaskBeat] Fehler beim Reset stuck Tasks: %s", str(e))
def watchdog():
while True:
beat_thread = threading.Thread(target=process_beat_tasks, name='TaskBeat', daemon=True)
beat_thread.start()
logger.info("[TaskBeat] Daemon-Thread gestartet.")
beat_thread.join() # Warte bis Thread stirbt
logger.error("[TaskBeat] Thread unerwartet beendet - starte neu in 5s...")
time.sleep(5)
watchdog_thread = threading.Thread(target=watchdog, name='TaskBeatWatchdog', daemon=True)
watchdog_thread.start()
logger.info("[TaskBeat] Watchdog gestartet.")
# ── Orchestrator Beat ───────────────────────────────────────────────────────
@ -2390,6 +2449,28 @@ def task_list():
all_tasks = get_tasks()
return render_template('tasks.html', agents=AGENTS, tasks=all_tasks)
@app.route('/tasks/<int:task_id>')
@login_required
def task_detail(task_id):
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
task = con.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
con.close()
if not task:
flash(f'Task #{task_id} nicht gefunden.', 'danger')
return redirect(url_for('task_list'))
return render_template('task_detail.html', task=dict(task), agents=AGENTS)
@app.route('/tasks/delete/<int:task_id>', methods=['POST'])
@login_required
def delete_task(task_id):
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
con.commit()
con.close()
flash(f'Task #{task_id} gelöscht.', 'success')
return redirect(url_for('task_list'))
@app.route('/tasks/update/<int:task_id>/<status>')
@login_required
def update_task(task_id, status):