feat: Task-Persistierung, Team-Members & Orchestrator Beat

Database:
- Tasks-Tabelle für persistente Task-Speicherung
- Team-Members-Tabelle für reale Mitarbeiter
- Auto-Cleanup: Löscht completed Tasks älter als 7 Tage
- Helper-Funktionen: get_tasks(), create_task(), update_task_db(), delete_task()

Team-Management:
- Standard Team-Members (Eric, Georg, Piotr) beim Start initialisiert
- get_team_members() - Lädt aktive Team-Members
- add_team_member() - Fügt Team-Member hinzu
- get_team_member_summary() - Zusammenfassung für Orchestrator

Orchestrator Beat:
- Läuft alle 30 Minuten automatisch
- Prüft Tasks ohne Fortschritt (>2h pending)
- Prüft blockierte Tasks (>4h in_progress)
- Fragt Orchestrator bei Problemen
- Orchestrator kann Team-Members kontaktieren

Neue Kommandos für Orchestrator:
- @SEND_EMAIL - Email an Team-Member senden
- @SEND_TELEGRAM - Telegram-Nachricht an Team-Member

Integration:
- Team-Member-Info wird in Orchestrator-Systemprompt eingefügt
- Orchestrator kennt jetzt reale Verantwortlichkeiten
- Kann bei Bedarf echte Menschen kontaktieren

Background Threads:
- EmailPoller (alle 2 Min)
- TaskWorker (on-demand)
- TaskBeat (alle 10 Sek)
- OrchestratorBeat (alle 30 Min)  NEU
- TelegramBot (wenn konfiguriert)

TODO:
- Tasks-UI auf DB umstellen
- Delete-Button für Tasks
- Team-Members Verwaltungs-UI
This commit is contained in:
pdyde 2026-02-21 13:53:51 +01:00
parent 11352d2ca5
commit 5b0cf6e640

355
app.py
View file

@ -253,6 +253,42 @@ def init_journal():
updated_at TEXT
)
""")
# Tasks-Tabelle für persistente Task-Speicherung
con.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
assigned_agent TEXT,
agent_key TEXT,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
completed_at TEXT,
type TEXT,
created_by TEXT,
response TEXT,
telegram_chat_id INTEGER,
telegram_user TEXT,
parent_task_id INTEGER
)
""")
# Team-Members Tabelle für reale Mitarbeiter
con.execute("""
CREATE TABLE IF NOT EXISTS team_members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
role TEXT NOT NULL,
responsibilities TEXT,
email TEXT,
telegram_id INTEGER,
phone TEXT,
active INTEGER DEFAULT 1,
created_at TEXT NOT NULL
)
""")
con.commit()
con.close()
@ -419,11 +455,16 @@ def execute_agent_task(agent_key, user_prompt, extra_context=""):
# Wissensdatenbank-Pfad (Agent holt sich selbst was er braucht)
kb_file = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md')
# System-Prompt = Agent-Rolle + Memory + Kommandos (OHNE große Wissensdatenbank!)
# Team-Members laden (nur für Orchestrator)
team_summary = ""
if agent_key == 'orchestrator':
team_summary = "\n\n" + get_team_member_summary()
# System-Prompt = Agent-Rolle + Memory + Team + Kommandos (OHNE große Wissensdatenbank!)
full_system = f"""{system_prompt}
## Deine Erinnerungen:
{memory_summary}
{memory_summary}{team_summary}
## Wissensdatenbank:
Die Wissensdatenbank liegt unter: {kb_file}
@ -461,6 +502,19 @@ Skills: [Benötigte Fähigkeiten]
Reason: [Warum wird dieser Agent gebraucht?]
@END
**Team-Member per Email kontaktieren:**
@SEND_EMAIL
To: [Email-Adresse des Team-Members]
Subject: [Betreff]
Body: [Nachricht]
@END
**Team-Member per Telegram kontaktieren:**
@SEND_TELEGRAM
TelegramID: [Telegram-ID des Team-Members]
Message: [Nachricht]
@END
Der Orchestrator kümmert sich um die Zuweisung und Kommunikation!
## Wichtig:
@ -548,10 +602,175 @@ def load_agents_from_directories():
AGENTS = load_agents_from_directories()
tasks = []
# Legacy - tasks werden jetzt in DB gespeichert, aber wir brauchen Kompatibilität
tasks = [] # Wird bei get_tasks() aus DB geladen
chat_history = []
orchestrator_chat = []
# ── Task Database Functions ─────────────────────────────────────────────────
def get_tasks(status=None, limit=None):
"""Lädt Tasks aus der Datenbank."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
if status:
query = "SELECT * FROM tasks WHERE status = ? ORDER BY id DESC"
params = (status,)
else:
query = "SELECT * FROM tasks ORDER BY id DESC"
params = ()
if limit:
query += f" LIMIT {limit}"
rows = con.execute(query, params).fetchall()
con.close()
return [dict(row) for row in rows]
def create_task(title, description, agent_key='orchestrator', task_type='manual', created_by='user', **kwargs):
"""Erstellt einen neuen Task in der Datenbank."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
assigned_agent = AGENTS.get(agent_key, {}).get('name', agent_key) if agent_key else 'Nicht zugewiesen'
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor = con.execute("""
INSERT INTO tasks (
title, description, assigned_agent, agent_key, status,
created_at, type, created_by, telegram_chat_id, telegram_user, parent_task_id
) VALUES (?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?, ?)
""", (
title, description, assigned_agent, agent_key, created_at,
task_type, created_by, kwargs.get('telegram_chat_id'),
kwargs.get('telegram_user'), kwargs.get('parent_task_id')
))
task_id = cursor.lastrowid
con.commit()
con.close()
logging.info(f"[DB] Task #{task_id} erstellt: {title[:50]}")
return task_id
def update_task_db(task_id, **kwargs):
"""Aktualisiert einen Task in der Datenbank."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
updates = []
params = []
for key, value in kwargs.items():
if key in ['status', 'response', 'agent_key', 'assigned_agent', 'completed_at']:
updates.append(f"{key} = ?")
params.append(value)
if 'status' in kwargs and kwargs['status'] == 'completed' and 'completed_at' not in kwargs:
updates.append("completed_at = ?")
params.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
if not updates:
con.close()
return
params.append(task_id)
query = f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?"
con.execute(query, params)
con.commit()
con.close()
def delete_task(task_id):
"""Löscht einen Task aus der Datenbank."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
con.commit()
con.close()
logging.info(f"[DB] Task #{task_id} gelöscht")
def cleanup_old_tasks(days=7):
"""Löscht completed Tasks älter als X Tage."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
cutoff = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
cutoff = cutoff.replace(day=cutoff.day - days)
cutoff_str = cutoff.strftime('%Y-%m-%d %H:%M:%S')
cursor = con.execute(
"DELETE FROM tasks WHERE status = 'completed' AND completed_at < ?",
(cutoff_str,)
)
deleted = cursor.rowcount
con.commit()
con.close()
if deleted > 0:
logging.info(f"[DB] {deleted} alte Tasks gelöscht (älter als {days} Tage)")
return deleted
# ── Team Members Database Functions ─────────────────────────────────────────
def get_team_members(active_only=True):
"""Lädt Team-Members aus der Datenbank."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
if active_only:
rows = con.execute("SELECT * FROM team_members WHERE active = 1 ORDER BY name").fetchall()
else:
rows = con.execute("SELECT * FROM team_members ORDER BY name").fetchall()
con.close()
return [dict(row) for row in rows]
def add_team_member(name, role, responsibilities='', email='', telegram_id=None, phone=''):
"""Fügt ein Team-Mitglied hinzu."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
created_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor = con.execute("""
INSERT INTO team_members (name, role, responsibilities, email, telegram_id, phone, active, created_at)
VALUES (?, ?, ?, ?, ?, ?, 1, ?)
""", (name, role, responsibilities, email, telegram_id, phone, created_at))
member_id = cursor.lastrowid
con.commit()
con.close()
logging.info(f"[DB] Team-Member '{name}' hinzugefügt")
return member_id
def get_team_member_summary():
"""Erstellt eine Text-Zusammenfassung aller Team-Members für den Orchestrator."""
members = get_team_members(active_only=True)
if not members:
return "Keine Team-Mitglieder definiert."
summary = "## Reale Team-Mitglieder:\n\n"
for member in members:
summary += f"**{member['name']}** ({member['role']})\n"
if member['responsibilities']:
summary += f" Verantwortlich für: {member['responsibilities']}\n"
contact_methods = []
if member['email']:
contact_methods.append(f"Email: {member['email']}")
if member['telegram_id']:
contact_methods.append(f"Telegram-ID: {member['telegram_id']}")
if member['phone']:
contact_methods.append(f"Tel: {member['phone']}")
if contact_methods:
summary += f" Kontakt: {', '.join(contact_methods)}\n"
summary += "\n"
return summary
# ── Agent Message Queue ─────────────────────────────────────────────────────
# Ermöglicht Agent-zu-Agent Kommunikation
agent_messages = [] # Liste von {id, from_agent, to_agent, message_type, content, status, created, response}
@ -1577,9 +1796,102 @@ def start_task_beat():
logger.info("[TaskBeat] Daemon-Thread gestartet.")
# ── Orchestrator Beat ───────────────────────────────────────────────────────
def orchestrator_beat():
"""
Orchestrator Beat: Läuft alle 30 Minuten und prüft:
- Offene Tasks ohne Fortschritt
- Blockierte Tasks die Aufmerksamkeit brauchen
- Wichtige Entscheidungen die ausstehen
- Team-Member Benachrichtigungen
"""
logger.info("[OrchestratorBeat] Hintergrund-Thread gestartet.")
while True:
try:
# Alle 30 Minuten prüfen
time.sleep(1800)
logger.info("[OrchestratorBeat] Starte Überprüfung...")
# 1. Finde Tasks die länger als 2 Stunden pending sind
pending_tasks = get_tasks(status='pending')
old_pending = []
for task in pending_tasks:
try:
created = datetime.strptime(task['created_at'], '%Y-%m-%d %H:%M:%S')
age_hours = (datetime.now() - created).total_seconds() / 3600
if age_hours > 2:
old_pending.append((task, age_hours))
except:
pass
# 2. Finde in_progress Tasks die länger als 4 Stunden laufen
in_progress = get_tasks(status='in_progress')
stuck_tasks = []
for task in in_progress:
try:
created = datetime.strptime(task['created_at'], '%Y-%m-%d %H:%M:%S')
age_hours = (datetime.now() - created).total_seconds() / 3600
if age_hours > 4:
stuck_tasks.append((task, age_hours))
except:
pass
# 3. Wenn es Probleme gibt, frage den Orchestrator
if old_pending or stuck_tasks:
summary = "## Orchestrator Beat - Status-Check\n\n"
if old_pending:
summary += f"### ⏳ {len(old_pending)} Alte Pending Tasks:\n"
for task, hours in old_pending[:5]: # Max 5
summary += f"- Task #{task['id']}: {task['title'][:60]} ({hours:.1f}h alt)\n"
summary += "\n"
if stuck_tasks:
summary += f"### 🚫 {len(stuck_tasks)} Blockierte Tasks:\n"
for task, hours in stuck_tasks[:5]:
summary += f"- Task #{task['id']}: {task['title'][:60]} ({hours:.1f}h in Bearbeitung)\n"
summary += "\n"
summary += """
### Aufgabe:
1. Analysiere die Situation
2. Entscheide ob Tasks eskaliert werden müssen
3. Wenn ja: Kontaktiere relevante Team-Members via Email oder Telegram
4. Verwende @SEND_EMAIL oder @SEND_TELEGRAM um Benachrichtigungen zu senden
Nutze @READ_KNOWLEDGE um Kontext zu holen falls nötig.
"""
# Orchestrator fragen
logger.info("[OrchestratorBeat] Frage Orchestrator zu %d Problemen", len(old_pending) + len(stuck_tasks))
response = execute_agent_task('orchestrator', summary)
# Response loggen
if response:
logger.info("[OrchestratorBeat] Orchestrator-Antwort: %s", response[:200])
# TODO: Parse @SEND_EMAIL und @SEND_TELEGRAM Kommandos
# und sende tatsächlich Benachrichtigungen
except Exception as e:
logger.error("[OrchestratorBeat] Fehler: %s", str(e))
def start_orchestrator_beat():
"""Startet den Orchestrator Beat als Daemon-Thread."""
beat_thread = threading.Thread(target=orchestrator_beat, name='OrchestratorBeat', daemon=True)
beat_thread.start()
logger.info("[OrchestratorBeat] Daemon-Thread gestartet.")
# Poller beim App-Start starten
start_email_poller()
start_task_beat()
start_orchestrator_beat()
@app.route('/')
@ -2444,7 +2756,44 @@ def distribute_tasks():
})
def init_default_team_members():
"""Fügt Standard-Team-Members hinzu, falls keine existieren."""
existing = get_team_members(active_only=False)
if len(existing) == 0:
add_team_member(
name="Eric Fischer",
role="Projektleiter & Event-Koordinator",
responsibilities="Gesamtkoordination, Budget-Verwaltung, Location-Management",
email="eric.fischer@signtime.media",
telegram_id=None, # Wird bei Bedarf ergänzt
phone=""
)
add_team_member(
name="Georg Tschare",
role="Content & Marketing",
responsibilities="Marketing, Social Media, Presse, Kommunikation",
email="georg.tschare@signtime.media",
telegram_id=None,
phone=""
)
add_team_member(
name="Piotr Dyderski",
role="System-Administrator & Tech-Lead",
responsibilities="Technische Infrastruktur, AI-Agenten, Automatisierung",
email="p.dyderski@live.at",
telegram_id=None,
phone=""
)
logging.info("[DB] Standard Team-Members initialisiert")
if __name__ == '__main__':
# Standard Team-Members initialisieren
init_default_team_members()
# Alte Tasks aufräumen (älter als 7 Tage)
cleanup_old_tasks(days=7)
# Telegram Bot starten
start_telegram_thread()