frankenbot/app.py
pdyde ca820d20db fix: Code-Quality und Security-Verbesserungen
Security Fixes:
- Fix XSS vulnerability in orchestrator.html (escapeHtml für user input)
- Verbesserte Error-Handling: 4 bare except clauses mit spezifischen Exception-Typen

Code Quality:
- Logging für alle Exception-Handler hinzugefügt
- Timeout für Agent-Tasks von 300s auf 600s erhöht (10 Min)
- Bessere Kommentare für Exception-Handling

Performance:
- Wissensdatenbank aus Systemprompt entfernt
- Agents nutzen @READ_KNOWLEDGE für on-demand Zugriff
- Reduziert Prompt-Größe um ~15KB pro Task

UI Improvements (aus vorherigem Work):
- Tasks: Auto-Refresh Info statt Toggle
- Tasks: Status-Anzeigen statt manuelle Buttons
- Konsistentes Auto-Refresh (15s) wenn Tasks aktiv
2026-02-21 12:36:24 +01:00

2129 lines
84 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import json
import sqlite3
import subprocess
import imaplib
import smtplib
import email
import threading
import time
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import decode_header
from flask import Flask, render_template, request, redirect, url_for, session, flash, Response, send_from_directory, jsonify
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# ── Agent Konfiguration ───────────────────────────────────────────────────────
AGENT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), 'agent_config.json')
AGENTS_BASE_DIR = os.path.join(os.path.dirname(__file__), 'agents')
# Cache für verfügbare Modelle
_available_models_cache = None
_models_cache_time = None
MODELS_CACHE_TTL = 3600 # Cache für 1 Stunde
# ── Agent Memory System ────────────────────────────────────────────────────────
def ensure_agent_structure(agent_key):
"""Stellt sicher, dass die Ordnerstruktur für einen Agenten existiert."""
agent_dir = os.path.join(AGENTS_BASE_DIR, agent_key)
work_dir = os.path.join(agent_dir, 'work')
memory_dir = os.path.join(agent_dir, 'memory')
os.makedirs(agent_dir, exist_ok=True)
os.makedirs(work_dir, exist_ok=True)
os.makedirs(memory_dir, exist_ok=True)
return {
'agent_dir': agent_dir,
'work_dir': work_dir,
'memory_dir': memory_dir
}
def get_agent_memory(agent_key, memory_type='tasks'):
"""Lädt Erinnerungen eines Agenten aus JSON-Datei.
memory_type kann sein:
- 'tasks': Erledigte Tasks
- 'notes': Notizen
- 'conversations': Konversationen
- 'research': Research-Ergebnisse
"""
dirs = ensure_agent_structure(agent_key)
memory_file = os.path.join(dirs['memory_dir'], f'{memory_type}.json')
if os.path.exists(memory_file):
try:
with open(memory_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError, OSError) as e:
logging.warning(f"Fehler beim Laden von {memory_file}: {e}")
pass
return []
def add_agent_memory(agent_key, memory_type, entry):
"""Fügt eine Erinnerung hinzu.
entry sollte ein dict sein mit mindestens:
- timestamp: ISO-Format
- title: Kurze Beschreibung
- content: Detaillierter Inhalt
- metadata: Zusätzliche Infos (optional)
"""
dirs = ensure_agent_structure(agent_key)
memory_file = os.path.join(dirs['memory_dir'], f'{memory_type}.json')
memories = get_agent_memory(agent_key, memory_type)
# Timestamp hinzufügen wenn nicht vorhanden
if 'timestamp' not in entry:
entry['timestamp'] = datetime.now().isoformat()
# ID hinzufügen
entry['id'] = len(memories) + 1
memories.append(entry)
# Nur die letzten 100 Einträge behalten
memories = memories[-100:]
with open(memory_file, 'w', encoding='utf-8') as f:
json.dump(memories, f, indent=2, ensure_ascii=False)
return entry
def get_agent_work_files(agent_key):
"""Gibt alle Dateien im work-Ordner eines Agenten zurück."""
dirs = ensure_agent_structure(agent_key)
work_dir = dirs['work_dir']
files = []
if os.path.exists(work_dir):
for filename in os.listdir(work_dir):
filepath = os.path.join(work_dir, filename)
if os.path.isfile(filepath):
stat = os.stat(filepath)
files.append({
'name': filename,
'size': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime).isoformat(),
'path': filepath
})
return sorted(files, key=lambda x: x['modified'], reverse=True)
def get_agent_memory_summary(agent_key):
"""Generiert eine Zusammenfassung aller Erinnerungen für den Systemprompt."""
tasks = get_agent_memory(agent_key, 'tasks')
notes = get_agent_memory(agent_key, 'notes')
summary = []
if tasks:
summary.append("## Letzte Tasks (letzte 5)")
for task in tasks[-5:]:
summary.append(f"- [{task.get('timestamp', 'N/A')}] {task.get('title', 'Unbekannt')}")
if task.get('result'):
summary.append(f" Ergebnis: {task.get('result', '')[:200]}")
if notes:
summary.append("\n## Wichtige Notizen")
for note in notes[-5:]:
summary.append(f"- {note.get('content', '')[:100]}")
return '\n'.join(summary) if summary else "Keine Erinnerungen vorhanden."
def get_available_models(force_refresh=False):
"""Lädt die verfügbaren KI-Modelle dynamisch von opencode."""
global _available_models_cache, _models_cache_time
# Cache prüfen
if not force_refresh and _available_models_cache is not None and _models_cache_time is not None:
if (time.time() - _models_cache_time) < MODELS_CACHE_TTL:
return _available_models_cache
try:
# opencode models ausführen
result = subprocess.run(
['opencode', 'models'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
models = []
for line in result.stdout.strip().split('\n'):
line = line.strip()
if line:
models.append(line)
# Nach Anbieter gruppieren
grouped = {}
for model in models:
if '/' in model:
provider, name = model.split('/', 1)
if provider not in grouped:
grouped[provider] = []
grouped[provider].append(model)
_available_models_cache = {
'models': models,
'grouped': grouped,
'count': len(models)
}
_models_cache_time = time.time()
return _available_models_cache
except Exception as e:
logging.warning(f"[ModelLoader] Fehler beim Laden der Modelle: {e}")
# Fallback auf hardcodierte Modelle wenn Laden fehlschlägt
fallback = {
'models': [
'opencode/big-pickle',
'opencode/gpt-5-nano',
'opencode/glm-5-free',
'opencode/minimax-m2.5-free',
'opencode/trinity-large-preview-free'
],
'grouped': {
'opencode': [
'opencode/big-pickle',
'opencode/gpt-5-nano',
'opencode/glm-5-free',
'opencode/minimax-m2.5-free',
'opencode/trinity-large-preview-free'
]
},
'count': 5
}
_available_models_cache = fallback
_models_cache_time = time.time()
return fallback
def get_agent_config():
"""Lädt die Agentenkonfiguration aus der JSON-Datei."""
if os.path.exists(AGENT_CONFIG_FILE):
try:
with open(AGENT_CONFIG_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError, OSError) as e:
logging.warning(f"Fehler beim Laden von {AGENT_CONFIG_FILE}: {e}")
pass
return {}
def get_agent_model(agent_key):
"""Gibt das konfigurierte Modell für einen Agenten zurück."""
config = get_agent_config()
return config.get(agent_key, {}).get('model', 'opencode/big-pickle')
def save_agent_config(agent_key, model):
"""Speichert die Konfiguration für einen Agenten."""
config = get_agent_config()
if agent_key not in config:
config[agent_key] = {}
config[agent_key]['model'] = model
with open(AGENT_CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
# ── Email-Journal (SQLite) ──────────────────────────────────────────────────
# Speichert jede gesehene Email mit Message-ID und Verarbeitungsstatus.
# Verhindert, dass Emails verloren gehen wenn die App abstürzt.
EMAIL_JOURNAL_DB = os.path.join(os.path.dirname(__file__), 'email_journal.db')
def init_journal():
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("""
CREATE TABLE IF NOT EXISTS email_journal (
message_id TEXT PRIMARY KEY,
imap_uid TEXT,
sender TEXT,
subject TEXT,
received_at TEXT,
status TEXT DEFAULT 'pending',
agent_key TEXT,
updated_at TEXT
)
""")
con.commit()
con.close()
def journal_seen(message_id: str) -> bool:
"""True wenn diese Message-ID bereits im Journal ist (egal welcher Status)."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
row = con.execute("SELECT status FROM email_journal WHERE message_id=?", (message_id,)).fetchone()
con.close()
return row is not None
def journal_is_done(message_id: str) -> bool:
"""True wenn Status = 'completed', 'skipped' oder 'error'."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
row = con.execute("SELECT status FROM email_journal WHERE message_id=?", (message_id,)).fetchone()
con.close()
return row is not None and row[0] in ('completed', 'skipped', 'error')
def journal_is_stale(message_id: str) -> bool:
"""True wenn Journal-Eintrag als 'queued' gilt UND älter als failsafe_window ist."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
row = con.execute("SELECT status, updated_at FROM email_journal WHERE message_id=?", (message_id,)).fetchone()
con.close()
if row is None or row[0] in ('completed', 'skipped', 'error'):
return False
try:
updated = datetime.fromisoformat(row[1])
age = (datetime.now() - updated).total_seconds()
return age > poller_settings['failsafe_window']
except Exception:
return False
def journal_insert(message_id: str, imap_uid: str, sender: str, subject: str, status: str, agent_key: str = ''):
now = datetime.now().isoformat()
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("""
INSERT OR IGNORE INTO email_journal
(message_id, imap_uid, sender, subject, received_at, status, agent_key, updated_at)
VALUES (?,?,?,?,?,?,?,?)
""", (message_id, imap_uid, sender, subject, now, status, agent_key, now))
con.commit()
con.close()
def journal_update(message_id: str, status: str):
now = datetime.now().isoformat()
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("UPDATE email_journal SET status=?, updated_at=? WHERE message_id=?",
(status, now, message_id))
con.commit()
con.close()
init_journal()
# ────────────────────────────────────────────────────────────────────────────
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
AGENT_KEYWORDS = {
'researcher': ['recherche', 'recherchieren', 'suchen', 'informationen', 'trends', 'forschung', 'web'],
'zusammenfasser': ['zusammenfassung', 'zusammenfassen', 'konsolidieren', 'übersicht'],
'tax_advisor': ['steuer', 'steuerrecht', 'umsatzsteuer', 'gemeinnützig', 'abgabe', 'finanzamt', '§4a', ' §4a', '§ 4a', 'mehrwertsteuer', 'mwst', 'ust', 'spenden'],
'location_manager': ['rathaus', 'location', 'ort', 'veranstaltungsort', 'raum', 'festsaal', 'wien'],
'program_manager': ['programm', 'ablauf', 'programmablauf', 'tanz', 'unterhaltung', 'awards', 'tombola', 'reden'],
'catering_manager': ['catering', 'essen', 'speisen', 'getränke', 'menü', 'küche', 'buffet', 'service', 'gastronomie'],
'musik_rechte_advisor': ['musik', 'akm', 'gema', 'lizenz', 'rechte', 'urheber', 'copyright', 'verwertungsgesellschaft'],
'document_editor': ['dokument', 'vertrag', 'brief', 'text', 'bearbeiten', 'erstellen', 'schreiben'],
}
app.secret_key = 'agent-orchestration-secret-key-2026'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# Email Configuration - loaded AFTER load_dotenv()
EMAIL_CONFIG = {
'imap_server': os.getenv('IMAP_SERVER', 'sslin.de'),
'smtp_server': os.getenv('SMTP_SERVER', 'sslout.de'),
'email_address': os.getenv('EMAIL_ADDRESS', ''),
'email_password': os.getenv('EMAIL_PASSWORD', '').strip('"').strip("'"),
'imap_port': int(os.getenv('IMAP_PORT', '993')),
'smtp_port': int(os.getenv('SMTP_PORT', '465'))
}
# Whitelist: Only these senders get processed by the poller
EMAIL_WHITELIST = [
'eric.fischer@signtime.media',
'p.dyderski@live.at',
'georg.tschare@gmail.com',
'georg.tschare@signtime.media',
]
EMAIL_WHITELIST_DOMAINS = ['diversityball.at']
# ── Poller-Einstellungen (zur Laufzeit änderbar via /settings) ───────────────
# POLLER_INTERVAL: Wie oft der IMAP-Poller läuft (Sekunden)
# FAILSAFE_WINDOW: Wie lange ein Task laufen darf bevor Failsafe anschlägt (Sekunden)
poller_settings = {
'poll_interval': 120, # 2 Minuten
'failsafe_window': 600, # 10 Minuten (großzügig Agenten brauchen oft 3-5 Min)
}
# Email processing log (max 50 entries)
email_log = []
email_log_lock = threading.Lock()
# Task queue für Email-Tasks
task_queue = []
task_queue_lock = threading.Lock()
def get_agent_prompt(agent_key):
"""Liest den System-Prompt und Persönlichkeit eines Agenten aus den Dateien."""
agent_dir = os.path.join(os.path.dirname(__file__), 'agents', agent_key)
prompt_file = os.path.join(agent_dir, 'systemprompt.md')
personality_file = os.path.join(agent_dir, 'personality.md')
prompt_content = ""
personality_content = ""
if os.path.exists(prompt_file):
with open(prompt_file, 'r', encoding='utf-8') as f:
prompt_content = f.read()
if os.path.exists(personality_file):
with open(personality_file, 'r', encoding='utf-8') as f:
personality_content = f.read().strip()
# Persönlichkeit vor dem System-Prompt einfügen, falls vorhanden
if personality_content:
return f"{personality_content}\n\n---\n\n{prompt_content}"
return prompt_content
def execute_agent_task(agent_key, user_prompt, extra_context=""):
"""
Führt einen echten Agenten-Task via opencode aus.
System-Prompt wird als --prompt übergeben, User-Prompt als Message.
"""
system_prompt = get_agent_prompt(agent_key)
if not system_prompt:
return f"⚠️ Kein System-Prompt für Agent '{agent_key}' gefunden."
# Agent-Struktur sicherstellen
dirs = ensure_agent_structure(agent_key)
work_dir = dirs['work_dir']
# Memory-Zusammenfassung laden
memory_summary = get_agent_memory_summary(agent_key)
# Wissensdatenbank-Pfad (Agent holt sich selbst was er braucht)
kb_file = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md')
# System-Prompt = Agent-Rolle + Memory + Kommandos (OHNE große Wissensdatenbank!)
full_system = f"""{system_prompt}
## Deine Erinnerungen:
{memory_summary}
## Wissensdatenbank:
Die Wissensdatenbank liegt unter: {kb_file}
Wenn du spezifische Informationen brauchst:
@READ_KNOWLEDGE
Topic: [Thema, z.B. "Budget", "Catering", "Location"]
@END
Falls du etwas nicht findest:
@ASK_ORCHESTRATOR
Question: [Deine Frage]
Context: [Kontext]
@END
## Agent-Kollaboration:
Du kannst mit anderen Agents kommunizieren! Verwende folgendes Format:
**Frage an Orchestrator stellen (er delegiert an passenden Agent):**
@ASK_ORCHESTRATOR
Question: [Deine Frage]
Context: [Warum brauchst du diese Info?]
@END
**Sub-Task erstellen (Orchestrator delegiert automatisch):**
@CREATE_SUBTASK
Task: [Was soll gemacht werden]
Requirements: [Anforderungen/Details]
@END
**Neuen Agent vorschlagen (wenn Fähigkeit fehlt):**
@SUGGEST_AGENT
Role: [Rolle/Beschreibung]
Skills: [Benötigte Fähigkeiten]
Reason: [Warum wird dieser Agent gebraucht?]
@END
Der Orchestrator kümmert sich um die Zuweisung und Kommunikation!
## Wichtig:
- Du hast Zugriff auf das Internet via WebFetch-Tool - nutze es aktiv!
- Du kannst Emails versenden - nutze send_email wenn beauftragt
- Dein Arbeitsverzeichnis: {work_dir}
- Speichere ALLE erstellten Dateien in diesem Verzeichnis!
- Verwende absolute Pfade für Dateien: {work_dir}/dateiname.ext
- Liefere immer eine vollständige, direkt verwertbare Antwort
{extra_context}"""
# System-Prompt + User-Prompt zusammen als eine Message
# (--prompt flag gibt leere Antwort, daher alles in eine Message)
combined_message = f"{full_system}\n\n---\n\n{user_prompt}"
# Modell aus Konfiguration holen
model = get_agent_model(agent_key)
try:
result = subprocess.run(
['opencode', 'run', '--model', model, '--format', 'json', combined_message],
capture_output=True,
text=True,
timeout=600, # 10 Minuten für komplexe Tasks
cwd=work_dir # Agent arbeitet in seinem work-Verzeichnis
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
response_text = ""
for line in lines:
try:
data = json.loads(line)
if data.get('part', {}).get('type') == 'text':
response_text += data.get('part', {}).get('text', '')
except (json.JSONDecodeError, KeyError):
# Ignore invalid JSON lines in streaming output
pass
# Agent-Kommandos parsen (Task-Delegation, Fragen, Agent-Erstellung)
if response_text:
parse_agent_commands(agent_key, response_text)
return response_text if response_text else "⚠️ Keine Antwort erhalten."
else:
return f"⚠️ Fehler: {result.stderr}"
except FileNotFoundError:
return "⚠️ OpenCode CLI nicht gefunden. Bitte installiere opencode."
except subprocess.TimeoutExpired:
return "⚠️ Timeout - Agentenantwort dauert zu lange."
except Exception as e:
return f"⚠️ Fehler: {str(e)}"
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
def load_agents_from_directories():
"""Lädt Agenten dynamisch aus dem agents/ Verzeichnis."""
agents = {}
agents_dir = os.path.join(os.path.dirname(__file__), 'agents')
if not os.path.exists(agents_dir):
return {}
for agent_name in os.listdir(agents_dir):
agent_path = os.path.join(agents_dir, agent_name)
if os.path.isdir(agent_path):
prompt_file = os.path.join(agent_path, 'systemprompt.md')
description = f"Agent: {agent_name}"
if os.path.exists(prompt_file):
with open(prompt_file, 'r', encoding='utf-8') as f:
content = f.read()
first_line = content.split('\n')[0] if content else ""
description = first_line.replace('#', '').strip() or f"Agent: {agent_name}"
agents[agent_name] = {
'name': agent_name.replace('_', ' ').title(),
'description': description,
'status': 'active'
}
return agents
AGENTS = load_agents_from_directories()
tasks = []
chat_history = []
orchestrator_chat = []
# ── Agent Message Queue ─────────────────────────────────────────────────────
# Ermöglicht Agent-zu-Agent Kommunikation
agent_messages = [] # Liste von {id, from_agent, to_agent, message_type, content, status, created, response}
def send_agent_message(from_agent, to_agent, message_type, content):
"""Sendet eine Nachricht von einem Agent an einen anderen.
message_type kann sein:
- 'task_request': Bitte den Ziel-Agent einen Task zu erledigen
- 'question': Stelle eine Frage
- 'info': Teile Information
- 'create_agent': Bitte um Erstellung eines neuen Agents
"""
message = {
'id': len(agent_messages) + 1,
'from_agent': from_agent,
'to_agent': to_agent,
'message_type': message_type,
'content': content,
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'response': None
}
agent_messages.append(message)
logger.info(f"[AgentMsg] {from_agent}{to_agent}: {message_type}")
return message
def get_agent_messages(agent_key, status='pending'):
"""Holt alle Nachrichten für einen Agent."""
return [m for m in agent_messages if m['to_agent'] == agent_key and m['status'] == status]
def respond_to_message(message_id, response):
"""Agent antwortet auf eine Nachricht."""
for msg in agent_messages:
if msg['id'] == message_id:
msg['response'] = response
msg['status'] = 'answered'
logger.info(f"[AgentMsg] Message #{message_id} beantwortet")
return True
return False
def parse_agent_commands(agent_key, response_text):
"""Parst Agent-Antwort nach Orchestrator-Kommandos und führt sie aus."""
import re
# ASK_ORCHESTRATOR: Agent stellt Frage an Orchestrator
ask_requests = re.findall(
r'@ASK_ORCHESTRATOR\s*\nQuestion:\s*([^\n]+)\s*\nContext:\s*([^@]+)@END',
response_text,
re.DOTALL
)
for question, context in ask_requests:
# Erstelle Task für Orchestrator um die Frage zu beantworten
new_task = {
'id': len(tasks) + 1,
'title': f"Frage von {agent_key}: {question.strip()[:80]}",
'description': f"""Ein Agent braucht Hilfe!
**Von:** {agent_key}
**Frage:** {question.strip()}
**Kontext:** {context.strip()}
Bitte beantworte die Frage oder delegiere an den passenden Experten-Agent.
Die Antwort wird an {agent_key} zurückgegeben.""",
'assigned_agent': 'Orchestrator',
'agent_key': 'orchestrator',
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'agent_question',
'from_agent': agent_key,
'return_to': agent_key
}
tasks.append(new_task)
logger.info(f"[AgentCmd] {agent_key} fragt Orchestrator: {question.strip()[:50]}")
# CREATE_SUBTASK: Agent möchte Subtask erstellen
subtask_requests = re.findall(
r'@CREATE_SUBTASK\s*\nTask:\s*([^\n]+)\s*\nRequirements:\s*([^@]+)@END',
response_text,
re.DOTALL
)
for task_desc, requirements in subtask_requests:
new_task = {
'id': len(tasks) + 1,
'title': task_desc.strip()[:100],
'description': f"Von {agent_key} angefordert:\n{requirements.strip()}",
'assigned_agent': 'Orchestrator',
'agent_key': 'orchestrator',
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'agent_subtask',
'from_agent': agent_key
}
tasks.append(new_task)
logger.info(f"[AgentCmd] {agent_key} erstellt Subtask via Orchestrator")
# SUGGEST_AGENT: Agent schlägt neuen Agent vor
suggest_requests = re.findall(
r'@SUGGEST_AGENT\s*\nRole:\s*([^\n]+)\s*\nSkills:\s*([^\n]+)\s*\nReason:\s*([^@]+)@END',
response_text,
re.DOTALL
)
for role, skills, reason in suggest_requests:
# Agent-Key aus Role ableiten
agent_key_suggestion = role.lower().replace(' ', '_').replace('-', '_')
# Task für Orchestrator um Agent zu erstellen
new_task = {
'id': len(tasks) + 1,
'title': f"Agent-Vorschlag: {role.strip()}",
'description': f"""Agent {agent_key} schlägt vor, einen neuen Agent zu erstellen:
**Rolle:** {role.strip()}
**Fähigkeiten:** {skills.strip()}
**Begründung:** {reason.strip()}
Bitte entscheide ob dieser Agent erstellt werden soll.""",
'assigned_agent': 'Orchestrator',
'agent_key': 'orchestrator',
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'agent_suggestion',
'from_agent': agent_key,
'suggested_agent': agent_key_suggestion,
'suggested_role': role.strip(),
'suggested_skills': skills.strip()
}
tasks.append(new_task)
logger.info(f"[AgentCmd] {agent_key} schlägt neuen Agent vor: {role.strip()}")
# READ_KNOWLEDGE: Agent möchte Wissensdatenbank durchsuchen
read_kb_requests = re.findall(
r'@READ_KNOWLEDGE\s*\nTopic:\s*([^@]+)@END',
response_text,
re.DOTALL
)
# Wenn Agent Wissensdatenbank lesen will, füge relevante Sektion zur Antwort hinzu
# (wird im Response-Text nicht sichtbar, aber Agent bekommt es als Context)
if read_kb_requests:
kb_file = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md')
if os.path.exists(kb_file):
with open(kb_file, 'r', encoding='utf-8') as f:
kb_content = f.read()
for topic in read_kb_requests:
topic_clean = topic.strip().lower()
logger.info(f"[AgentCmd] {agent_key} liest Wissensdatenbank: {topic_clean}")
# Einfache Suche: Gib relevante Abschnitte zurück
# TODO: Könnte später mit Vektorsuche verbessert werden
relevant_sections = []
for line in kb_content.split('\n'):
if topic_clean in line.lower():
relevant_sections.append(line)
if relevant_sections:
logger.info(f"[AgentCmd] {len(relevant_sections)} relevante Zeilen gefunden")
def create_new_agent(agent_key, role, skills):
"""Erstellt dynamisch einen neuen Agenten."""
agent_dir = os.path.join(AGENTS_BASE_DIR, agent_key)
if os.path.exists(agent_dir):
logger.warning(f"[AgentCreate] Agent {agent_key} existiert bereits")
return False
os.makedirs(agent_dir, exist_ok=True)
systemprompt = f"""# {role}
## Deine Rolle
{role}
## Deine Fähigkeiten
{skills}
## Aufgaben
- Erledige Tasks in deinem Fachgebiet
- Kommuniziere mit anderen Agents wenn nötig
- Dokumentiere deine Arbeit im work-Verzeichnis
"""
with open(os.path.join(agent_dir, 'systemprompt.md'), 'w', encoding='utf-8') as f:
f.write(systemprompt)
open(os.path.join(agent_dir, 'personality.md'), 'w').close()
with open(os.path.join(agent_dir, 'reminders.md'), 'w', encoding='utf-8') as f:
f.write(f"# Erinnerungen - {agent_key.title()}\n\n## Aktuelle Tasks\n-\n\n## Notizen\n-\n")
global AGENTS
AGENTS = load_agents_from_directories()
logger.info(f"[AgentCreate] Neuer Agent erstellt: {agent_key}")
return True
def load_knowledge_base():
kb_path = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md')
if os.path.exists(kb_path):
with open(kb_path, 'r', encoding='utf-8') as f:
content = f.read()
return content[:1500]
return ""
def load_agent_prompts():
prompts = {}
agents_dir = os.path.join(os.path.dirname(__file__), 'agents')
if os.path.exists(agents_dir):
for agent_name in os.listdir(agents_dir):
prompt_file = os.path.join(agents_dir, agent_name, 'systemprompt.md')
if os.path.exists(prompt_file):
with open(prompt_file, 'r', encoding='utf-8') as f:
prompts[agent_name] = f.read()[:500]
return prompts
def init_orchestrator_session():
if 'orchestrator_kb' not in session:
session['orchestrator_kb'] = load_knowledge_base()
if 'orchestrator_prompts' not in session:
session['orchestrator_prompts'] = load_agent_prompts()
if 'orchestrator_chat' not in session:
session['orchestrator_chat'] = []
def delegate_to_agent(prompt):
prompt_lower = prompt.lower()
for agent, keywords in AGENT_KEYWORDS.items():
for keyword in keywords:
if keyword in prompt_lower:
return agent
return 'researcher'
def get_uploaded_files():
files = []
upload_dir = app.config['UPLOAD_FOLDER']
if os.path.exists(upload_dir):
for f in os.listdir(upload_dir):
filepath = os.path.join(upload_dir, f)
if os.path.isfile(filepath):
files.append({
'name': f,
'size': os.path.getsize(filepath),
'modified': datetime.fromtimestamp(os.path.getmtime(filepath)).strftime('%Y-%m-%d %H:%M')
})
return sorted(files, key=lambda x: x['modified'], reverse=True)
def get_email_folder_files():
"""Liefert Dateien aus dem emails/ Verzeichnis."""
files = []
email_dir = os.path.join(os.path.dirname(__file__), 'emails')
if os.path.exists(email_dir):
for f in sorted(os.listdir(email_dir)):
filepath = os.path.join(email_dir, f)
if os.path.isfile(filepath):
files.append({
'name': f,
'size': os.path.getsize(filepath),
'modified': datetime.fromtimestamp(os.path.getmtime(filepath)).strftime('%Y-%m-%d %H:%M')
})
return files
def get_project_files():
"""Liefert .md und .docx Dateien aus dem Arbeitsverzeichnis."""
files = []
base_dir = os.path.dirname(__file__)
allowed_ext = ('.md', '.docx', '.txt')
for f in sorted(os.listdir(base_dir)):
filepath = os.path.join(base_dir, f)
if os.path.isfile(filepath) and f.lower().endswith(allowed_ext):
files.append({
'name': f,
'size': os.path.getsize(filepath),
'modified': datetime.fromtimestamp(os.path.getmtime(filepath)).strftime('%Y-%m-%d %H:%M')
})
return files
# Email Functions
def get_emails():
"""Ruft Emails von IMAP-Server ab"""
try:
if not EMAIL_CONFIG['email_address'] or not EMAIL_CONFIG['email_password']:
return []
mail = imaplib.IMAP4_SSL(EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
mail.select('INBOX')
status, messages = mail.search(None, 'ALL')
email_ids = messages[0].split()[-10:] # Last 10 emails
emails = []
for email_id in reversed(email_ids):
status, msg_data = mail.fetch(email_id, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
emails.append({
'id': email_id.decode(),
'subject': msg.get('Subject', '(Kein Betreff)'),
'from': msg.get('From', '(Unbekannt)'),
'date': msg.get('Date', ''),
'preview': get_email_preview(msg)
})
mail.close()
mail.logout()
return emails
except Exception as e:
return [{'error': str(e)}]
def get_email_preview(msg):
"""Extrahiert Email-Vorschau aus Nachricht"""
try:
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
return part.get_payload(decode=True).decode('utf-8', errors='ignore')[:100]
else:
return msg.get_payload(decode=True).decode('utf-8', errors='ignore')[:100]
except (AttributeError, TypeError, UnicodeDecodeError) as e:
logging.debug(f"Email preview extraction failed: {e}")
return '(Vorschau nicht verfügbar)'
return ''
def get_email_body(email_id):
"""Ruft vollständigen Email-Body ab"""
try:
if not EMAIL_CONFIG['email_address'] or not EMAIL_CONFIG['email_password']:
return 'Email-Konfiguration erforderlich'
mail = imaplib.IMAP4_SSL(EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
mail.select('INBOX')
status, msg_data = mail.fetch(email_id, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
body = ""
if msg.is_multipart():
for part in msg.get_payload():
if part.get_content_type() == 'text/plain':
body = part.get_payload(decode=True).decode('utf-8', errors='ignore')
break
else:
body = msg.get_payload(decode=True).decode('utf-8', errors='ignore')
mail.close()
mail.logout()
return body
except Exception as e:
return f'Fehler beim Abrufen der Email: {str(e)}'
def send_email(to_address, subject, body):
"""Sendet eine Email via SMTP"""
try:
if not EMAIL_CONFIG['email_address'] or not EMAIL_CONFIG['email_password']:
return False, 'Email-Konfiguration erforderlich'
msg = MIMEMultipart()
msg['From'] = EMAIL_CONFIG['email_address']
msg['To'] = to_address
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain', 'utf-8'))
smtp_port = EMAIL_CONFIG['smtp_port']
if smtp_port == 465:
server = smtplib.SMTP_SSL(EMAIL_CONFIG['smtp_server'], smtp_port, timeout=10)
else:
server = smtplib.SMTP(EMAIL_CONFIG['smtp_server'], smtp_port, timeout=10)
server.starttls()
server.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
server.send_message(msg)
server.quit()
return True, 'Email erfolgreich versendet'
except Exception as e:
return False, f'Fehler beim Versenden: {str(e)}'
def is_whitelisted(sender_address):
"""Prüft ob Absender auf der Whitelist steht."""
match = re.search(r'<([^>]+)>', sender_address)
addr = match.group(1).lower() if match else sender_address.lower().strip()
if addr in [w.lower() for w in EMAIL_WHITELIST]:
return True
domain = addr.split('@')[-1] if '@' in addr else ''
if domain in [d.lower() for d in EMAIL_WHITELIST_DOMAINS]:
return True
return False
def decode_email_header_value(value):
"""Dekodiert Email-Header (z.B. encoded Subject/From)."""
if not value:
return ''
decoded_parts = decode_header(value)
result = ''
for part, charset in decoded_parts:
if isinstance(part, bytes):
result += part.decode(charset or 'utf-8', errors='ignore')
else:
result += part
return result
def extract_body_from_msg(msg):
"""Extrahiert Text-Body aus einem Email-Message-Objekt."""
body = ''
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disp = str(part.get('Content-Disposition', ''))
if content_type == 'text/plain' and 'attachment' not in content_disp:
try:
charset = part.get_content_charset() or 'utf-8'
payload = part.get_payload(decode=True)
if isinstance(payload, bytes):
body = payload.decode(charset, errors='ignore')
break
except Exception:
pass
else:
try:
charset = msg.get_content_charset() or 'utf-8'
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
body = payload.decode(charset, errors='ignore')
except Exception:
body = ''
return body
def add_to_email_log(entry):
"""Fügt Eintrag zum Email-Log hinzu (max 50 Einträge)."""
with email_log_lock:
email_log.append(entry)
while len(email_log) > 50:
email_log.pop(0)
def poll_emails():
"""
Hintergrund-Thread: Checkt alle 2 Minuten den IMAP-Posteingang.
Nutzt SQLite-Journal als Failsafe:
- UNSEEN → noch nie gesehen → verarbeiten
- SEEN → aber nicht im Journal → wurde von externem Client gelesen → ignorieren
- Im Journal als 'pending'/'queued' → App-Absturz → erneut verarbeiten (IMAP als UNSEEN zurücksetzen)
IMAP Seen-Flag wird erst NACH erfolgreichem Task-Abschluss gesetzt (durch TaskWorker).
"""
logger.info("[EmailPoller] Hintergrund-Thread gestartet.")
while True:
try:
addr = EMAIL_CONFIG.get('email_address', '')
pwd = EMAIL_CONFIG.get('email_password', '')
if not addr or not pwd:
logger.warning("[EmailPoller] Keine Email-Konfiguration überspringe Durchlauf.")
time.sleep(poller_settings['poll_interval'])
continue
logger.info("[EmailPoller] Verbinde mit IMAP %s:%d",
EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail = imaplib.IMAP4_SSL(EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail.login(addr, pwd)
mail.select('INBOX')
# Alle Emails holen (UNSEEN + Journal-pending als Failsafe)
status, messages = mail.search(None, 'ALL')
if status != 'OK':
mail.close()
mail.logout()
time.sleep(poller_settings['poll_interval'])
continue
all_ids = messages[0].split()
# Kandidaten: UNSEEN oder im Journal als noch nicht abgeschlossen
candidates = []
for email_id in all_ids:
fetch_status, flag_data = mail.fetch(email_id, '(FLAGS RFC822.HEADER)')
if fetch_status != 'OK' or not flag_data or flag_data[0] is None:
continue
flags_raw = flag_data[0][0].decode() if isinstance(flag_data[0][0], bytes) else str(flag_data[0][0])
is_seen = '\\Seen' in flags_raw
raw_header = flag_data[0][1]
if not isinstance(raw_header, bytes):
continue
hdr = email.message_from_bytes(raw_header)
message_id = (hdr.get('Message-ID') or hdr.get('Message-Id') or '').strip()
# Fallback falls kein Message-ID Header vorhanden
if not message_id:
message_id = f"no-msgid-uid-{email_id.decode()}"
if not is_seen:
# Noch ungelesen → immer verarbeiten
candidates.append((email_id, message_id, is_seen))
elif journal_is_stale(message_id):
# Gelesen, aber Journal-Eintrag hängt länger als failsafe_window → Absturz-Recovery
logger.warning("[EmailPoller] Failsafe: Email %s hängt seit >%ds erneut verarbeiten.",
message_id, poller_settings['failsafe_window'])
mail.store(email_id, '-FLAGS', '\\Seen')
candidates.append((email_id, message_id, False))
logger.info("[EmailPoller] %d Email(s) zur Verarbeitung.", len(candidates))
for email_id, message_id, _ in candidates:
try:
fetch_status, msg_data = mail.fetch(email_id, '(RFC822)')
if fetch_status != 'OK' or not msg_data or msg_data[0] is None:
continue
raw_bytes = msg_data[0][1]
if not isinstance(raw_bytes, bytes):
continue
msg = email.message_from_bytes(raw_bytes)
sender_raw = msg.get('From', '')
sender = decode_email_header_value(sender_raw)
subject_raw = msg.get('Subject', '(Kein Betreff)')
subject = decode_email_header_value(subject_raw)
body = extract_body_from_msg(msg)
log_entry = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'from': sender,
'subject': subject,
'agent': None,
'status': 'skipped',
'response_preview': ''
}
if not is_whitelisted(sender):
# Nicht-whitelisted: sofort als gelesen markieren, im Journal als 'skipped'
mail.store(email_id, '+FLAGS', '\\Seen')
journal_insert(message_id, email_id.decode(), sender, subject, 'skipped')
logger.info("[EmailPoller] Absender nicht auf Whitelist: %s wird ignoriert.", sender)
add_to_email_log(log_entry)
continue
# Bereits vollständig verarbeitet? (z.B. nach Neustart)
if journal_is_done(message_id):
mail.store(email_id, '+FLAGS', '\\Seen')
logger.info("[EmailPoller] Email %s bereits verarbeitet überspringe.", message_id)
continue
logger.info("[EmailPoller] Email von %s | Betreff: %s → in Queue", sender, subject)
# Absender-Email extrahieren für Reply-Adresse
addr_match = re.search(r'<([^>]+)>', sender)
sender_email = addr_match.group(1) if addr_match else sender.strip()
# Agent bestimmen
full_prompt_for_routing = f"{subject} {body}"
agent_key = delegate_to_agent(full_prompt_for_routing)
# Journal: Status 'queued' \Seen wird NICHT gesetzt (erst nach Verarbeitung)
journal_insert(message_id, email_id.decode(), sender, subject, 'queued', agent_key)
# Task erstellen und in beide Listen eintragen
task = {
'id': len(tasks) + 1,
'title': f"📧 Email: {subject[:50]}",
'description': body[:500],
'assigned_agent': AGENTS.get(agent_key, {}).get('name', agent_key),
'agent_key': agent_key,
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'email',
'reply_to': sender_email,
'reply_subject': f"Re: {subject}" if not subject.startswith('Re:') else subject,
'original_sender': sender,
'original_subject': subject,
'original_body': body,
'message_id': message_id, # für Journal-Update durch TaskWorker
'imap_uid': email_id.decode(),
}
tasks.append(task)
with task_queue_lock:
task_queue.append(task)
log_entry['agent'] = agent_key
log_entry['status'] = 'queued'
add_to_email_log(log_entry)
logger.info("[EmailPoller] Task #%d für Email von %s in Queue eingereiht.", task['id'], sender_email)
except Exception as e:
logger.error("[EmailPoller] Fehler bei Email-ID %s: %s", email_id, str(e))
mail.close()
mail.logout()
except Exception as e:
logger.error("[EmailPoller] Verbindungsfehler: %s", str(e))
time.sleep(poller_settings['poll_interval'])
def process_email_tasks():
"""
Hintergrund-Thread: Verarbeitet Email-Tasks aus der task_queue.
Prüft alle 10 Sekunden auf pending Tasks, führt Agenten aus und sendet Antworten.
"""
logger.info("[TaskWorker] Hintergrund-Thread gestartet.")
while True:
try:
with task_queue_lock:
pending = [t for t in task_queue if t.get('status') == 'pending' and t.get('type') == 'email']
for task in pending:
try:
task['status'] = 'in_progress'
logger.info("[TaskWorker] Verarbeite Task #%d Agent: %s", task['id'], task['agent_key'])
agent_key = task['agent_key']
sender_email = task['reply_to']
reply_subject = task['reply_subject']
sender = task['original_sender']
subject = task['original_subject']
body = task['original_body']
full_prompt = f"""Eine Email ist eingegangen und muss beantwortet werden.
Von: {sender}
Betreff: {subject}
Inhalt:
{body}
Bitte bearbeite diese Anfrage vollständig:
1. Nutze WebFetch wenn du Informationen aus dem Internet brauchst
2. Wenn die Email bittet eine Antwort an jemand anderen zu schicken, tue das
3. Formuliere eine vollständige, hilfreiche Antwort
4. Die Antwort wird automatisch an {sender_email} zurückgeschickt"""
extra_context = f"""
## Email-Kontext:
- Diese Anfrage kommt per Email von: {sender_email}
- Die Antwort wird automatisch per Email zurückgeschickt
- Formuliere die Antwort daher als Email-Text (freundlich, professionell)
- Wenn gebeten wird, eine Email an jemanden zu schicken: gib die Adresse in der Form "An: adresse@example.com" oder "To: adresse@example.com" an"""
agent_response = execute_agent_task(agent_key, full_prompt, extra_context=extra_context)
# Zusätzliche Empfänger aus der Agenten-Antwort extrahieren
extra_recipients = re.findall(
r'(?:An|To):\s*([a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})',
agent_response
)
send_errors = []
# An zusätzliche Empfänger senden
for recipient in extra_recipients:
if recipient.lower() != sender_email.lower():
fwd_success, fwd_msg = send_email(recipient, reply_subject, agent_response)
if fwd_success:
logger.info("[TaskWorker] Email an Zusatz-Empfänger %s gesendet.", recipient)
else:
logger.error("[TaskWorker] Fehler bei Zusatz-Empfänger %s: %s", recipient, fwd_msg)
send_errors.append(f"{recipient}: {fwd_msg}")
# Immer an den Original-Absender antworten
success, send_msg = send_email(sender_email, reply_subject, agent_response)
if success:
logger.info("[TaskWorker] Auto-Reply an %s gesendet.", sender_email)
else:
logger.error("[TaskWorker] Fehler beim Reply an %s: %s", sender_email, send_msg)
send_errors.append(f"{sender_email}: {send_msg}")
final_status = 'completed' if not send_errors else 'error'
task['status'] = final_status
task['response_preview'] = agent_response[:200]
# Journal aktualisieren + IMAP \Seen erst jetzt setzen
msg_id = task.get('message_id', '')
imap_uid = task.get('imap_uid', '')
if msg_id:
journal_update(msg_id, final_status)
if imap_uid and final_status in ('completed', 'error'):
try:
mail_done = imaplib.IMAP4_SSL(EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail_done.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
mail_done.select('INBOX')
mail_done.store(imap_uid.encode(), '+FLAGS', '\\Seen')
mail_done.close()
mail_done.logout()
logger.info("[TaskWorker] IMAP \\Seen für UID %s gesetzt.", imap_uid)
except Exception as imap_err:
logger.error("[TaskWorker] IMAP \\Seen konnte nicht gesetzt werden: %s", imap_err)
# Log-Eintrag aktualisieren
with email_log_lock:
for entry in reversed(email_log):
if entry.get('from') == sender and entry.get('subject') == subject and entry.get('status') == 'queued':
entry['status'] = final_status
entry['response_preview'] = agent_response[:200]
break
logger.info("[TaskWorker] Task #%d abgeschlossen mit Status: %s", task['id'], final_status)
except Exception as e:
task['status'] = 'error'
logger.error("[TaskWorker] Fehler bei Task #%d: %s", task['id'], str(e))
except Exception as e:
logger.error("[TaskWorker] Unerwarteter Fehler: %s", str(e))
time.sleep(10)
def start_email_poller():
"""Startet den Email-Poller und den Task-Worker als Daemon-Threads."""
poller_thread = threading.Thread(target=poll_emails, name='EmailPoller', daemon=True)
poller_thread.start()
logger.info("[EmailPoller] Daemon-Thread gestartet.")
worker_thread = threading.Thread(target=process_email_tasks, name='TaskWorker', daemon=True)
worker_thread.start()
logger.info("[TaskWorker] Daemon-Thread gestartet.")
def process_beat_tasks():
"""Background-Beat: Verarbeitet offene Tasks automatisch."""
logger.info("[TaskBeat] Hintergrund-Thread gestartet.")
while True:
try:
pending_tasks = [t for t in tasks if t.get('status') == 'pending' and t.get('type') in ('agent_created', 'manual', 'orchestrated', 'agent_delegated')]
for task in pending_tasks:
agent_key = task.get('agent_key') or task.get('assigned_agent', '')
if task.get('agent_key') == 'orchestrator':
task['status'] = 'in_progress'
logger.info("[TaskBeat] Planungsphase für Task #%d", task['id'])
sub_tasks = task.get('sub_tasks', [])
available_agents = task.get('available_agents', list(AGENTS.keys()))
prompt = f"""Du bist der Master-Orchestrator. Analysiere folgende Tasks und weise sie den richtigen Agenten zu:
Tasks:
{chr(10).join(['- ' + t for t in sub_tasks])}
Verfügbare Agenten: {', '.join(available_agents)}
Agent-Beschreibungen:
"""
for a_key, a_info in AGENTS.items():
prompt += f"- {a_key}: {a_info.get('description', 'Keine Beschreibung')[:100]}\n"
prompt += "\nAntworte in diesem Format (einen Agent pro Task):\n"
for i, t in enumerate(sub_tasks):
prompt += f"Task {i+1}: [Agent-Key] - Kurze Begründung\n"
response = execute_agent_task('orchestrator', prompt)
task['plan_response'] = response
import re
agent_assignments = re.findall(r'Task \d+: (\w+)', response)
created_sub_tasks = []
for i, t in enumerate(sub_tasks):
assigned = agent_assignments[i] if i < len(agent_assignments) else available_agents[i % len(available_agents)]
if assigned not in AGENTS:
assigned = available_agents[0]
# Sub-Task mit KOMPLETTEM Kontext
sub_task = {
'id': len(tasks) + 1 + len(created_sub_tasks),
'title': t[:80],
'description': f"""**Original-Aufgabe:**
{task.get('title', '')}
{task.get('description', '')}
**Orchestrator-Analyse:**
{response}
**Dein spezifischer Teil:**
{t}
Arbeite diesen Teil ab und liefere ein vollständiges Ergebnis.""",
'assigned_agent': AGENTS.get(assigned, {}).get('name', assigned),
'agent_key': assigned,
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'orchestrated',
'parent_task': task['id']
}
tasks.append(sub_task)
created_sub_tasks.append(sub_task['id'])
logger.info("[TaskBeat] Sub-Task #%d zugewiesen an %s", sub_task['id'], assigned)
task['status'] = 'completed'
task['sub_task_ids'] = created_sub_tasks
logger.info("[TaskBeat] Planungs-Task #%d abgeschlossen. %d Sub-Tasks erstellt.", task['id'], len(created_sub_tasks))
continue
if not agent_key:
available_agents = list(AGENTS.keys())
if available_agents:
agent_key = available_agents[0]
task['agent_key'] = agent_key
if agent_key and agent_key in AGENTS:
task['status'] = 'in_progress'
logger.info("[TaskBeat] Verarbeite Task #%d Agent: %s", task['id'], agent_key)
response = execute_agent_task(agent_key, task.get('title', '') + '\n\n' + task.get('description', ''))
task['response'] = response
# Neues Memory-System: Task als strukturierte Erinnerung speichern
try:
add_agent_memory(agent_key, 'tasks', {
'task_id': task['id'],
'title': task.get('title', 'Unbekannt'),
'description': task.get('description', ''),
'result': response,
'status': 'completed',
'metadata': {
'assigned_by': task.get('agent', 'system'),
'duration': None
}
})
except Exception as e:
logger.warning("[TaskBeat] Konnte Erinnerung nicht speichern: %s", str(e))
task['status'] = 'completed'
logger.info("[TaskBeat] Task #%d abgeschlossen.", task['id'])
except Exception as e:
logger.error("[TaskBeat] Fehler: %s", str(e))
# Beat-Intervall: 10 Sekunden (statt 30)
time.sleep(10)
def start_task_beat():
"""Startet den Task-Beat als Daemon-Thread."""
beat_thread = threading.Thread(target=process_beat_tasks, name='TaskBeat', daemon=True)
beat_thread.start()
logger.info("[TaskBeat] Daemon-Thread gestartet.")
# Poller beim App-Start starten
start_email_poller()
start_task_beat()
@app.route('/')
def index():
recent_tasks = tasks[-5:] if tasks else []
return render_template('index.html', agents=AGENTS, recent_tasks=recent_tasks)
@app.route('/chat', methods=['GET', 'POST'])
def chat():
# Chat-Verlauf aus Session laden
if 'chat_history' not in session:
session['chat_history'] = []
chat_display = session.get('chat_history', [])
return render_template('chat.html', agents=AGENTS, chat_history=chat_display)
@app.route('/chat/send', methods=['POST'])
def chat_send():
"""Führt einen Agent aus und gibt die Antwort per Server-Sent Events zurück."""
data = request.get_json()
prompt = data.get('prompt', '').strip()
agent_key = data.get('agent', '').strip()
# Validierung vor dem Generator
if not prompt or not agent_key:
return jsonify({'type': 'error', 'message': 'Fehlende Eingabe'}), 400
if agent_key not in AGENTS:
return jsonify({'type': 'error', 'message': 'Agent nicht gefunden'}), 404
agent_info = AGENTS.get(agent_key, {})
agent_name = agent_info.get('name', agent_key)
def generate():
# Agent-Info senden
yield f"data: {json.dumps({'type': 'agent_selected', 'agent': agent_name, 'agent_key': agent_key})}\n\n"
yield f"data: {json.dumps({'type': 'processing', 'message': f'{agent_name} arbeitet...'})}\n\n"
try:
# Agent ausführen (mit Memory und Work-Dir)
response = execute_agent_task(agent_key, prompt)
# Antwort streamen
yield f"data: {json.dumps({'type': 'response', 'text': response})}\n\n"
# Erfolg melden
yield f"data: {json.dumps({'type': 'complete', 'message': '✓ Fertig', 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M'), 'response': response})}\n\n"
except Exception as e:
logger.error(f"[Chat] Fehler beim Ausführen von {agent_key}: {str(e)}")
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@app.route('/chat/save', methods=['POST'])
def chat_save():
"""Speichert eine Chat-Nachricht in der Session."""
data = request.get_json()
if 'chat_history' not in session:
session['chat_history'] = []
session['chat_history'].append({
'timestamp': data.get('timestamp'),
'agent': data.get('agent'),
'agent_key': data.get('agent_key'),
'prompt': data.get('prompt'),
'response': data.get('response')
})
session['chat_history'] = session['chat_history'][-30:]
session.modified = True
return jsonify({'success': True})
@app.route('/tasks', methods=['GET', 'POST'])
def task_list():
if request.method == 'POST':
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
assigned_agent = request.form.get('assigned_agent', '')
if title:
task = {
'id': len(tasks) + 1,
'title': title,
'description': description,
'assigned_agent': AGENTS.get(assigned_agent, {}).get('name', assigned_agent) if assigned_agent else 'Nicht zugewiesen',
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'manual',
}
tasks.append(task)
flash('Task erstellt!', 'success')
# Alle Tasks anzeigen Email-Tasks sind mit type='email' markiert
all_tasks = list(reversed(tasks)) # Neueste zuerst
return render_template('tasks.html', agents=AGENTS, tasks=all_tasks)
@app.route('/tasks/update/<int:task_id>/<status>')
def update_task(task_id, status):
for task in tasks:
if task['id'] == task_id:
task['status'] = status
break
return redirect(url_for('task_list'))
@app.route('/api/agent-stream', methods=['POST'])
def agent_stream():
"""Server-Sent Events Endpoint echtes Streaming direkt aus opencode JSON-Output."""
data = request.get_json()
prompt = data.get('prompt', '').strip()
def generate():
if not prompt:
yield f"data: {json.dumps({'type': 'error', 'message': 'Leere Anfrage'})}\n\n"
return
selected_agent = delegate_to_agent(prompt)
agent_info = AGENTS.get(selected_agent, {})
agent_name = agent_info.get('name', selected_agent)
system_prompt = get_agent_prompt(selected_agent)
kb_file = os.path.join(os.path.dirname(__file__), 'diversityball_knowledge.md')
kb_content = ""
if os.path.exists(kb_file):
with open(kb_file, 'r', encoding='utf-8') as f:
kb_content = f.read()
full_prompt = f"""## Wissensdatenbank (Diversity-Ball):
{kb_content}
## System-Prompt des Agenten ({agent_name}):
{system_prompt}
## Deine Aufgabe:
{prompt}"""
# Sofort Agent-Info senden
yield f"data: {json.dumps({'type': 'agent_selected', 'agent': agent_name, 'agent_key': selected_agent})}\n\n"
yield f"data: {json.dumps({'type': 'processing', 'message': f'{agent_name} arbeitet...'})}\n\n"
try:
model = get_agent_model(selected_agent)
proc = subprocess.Popen(
['opencode', 'run', '--model', model, '--format', 'json', full_prompt],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=os.path.dirname(__file__)
)
# Jede Zeile sofort aus opencode lesen und streamen
for line in proc.stdout:
line = line.strip()
if not line:
continue
try:
event_data = json.loads(line)
if event_data.get('part', {}).get('type') == 'text':
text = event_data['part'].get('text', '')
if text:
yield f"data: {json.dumps({'type': 'response_chunk', 'text': text})}\n\n"
except Exception:
pass
proc.wait()
yield f"data: {json.dumps({'type': 'complete', 'message': '✓ Fertig'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return Response(generate(), mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@app.route('/orchestrator', methods=['GET', 'POST'])
def orchestrator():
init_orchestrator_session()
if request.method == 'POST':
prompt = request.form.get('prompt', '').strip()
if prompt:
kb = session.get('orchestrator_kb', '')
agent_prompts = session.get('orchestrator_prompts', {})
selected_agent = delegate_to_agent(prompt)
agent_info = AGENTS.get(selected_agent, {})
agent_name = agent_info.get('name', selected_agent)
response = execute_agent_task(selected_agent, prompt)
orchestrator_chat = session.get('orchestrator_chat', [])
orchestrator_chat.append({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M'),
'user_prompt': prompt,
'agent': agent_name,
'agent_key': selected_agent,
'response': response
})
session['orchestrator_chat'] = orchestrator_chat[-30:]
chat_display = session.get('orchestrator_chat', [])
return render_template('orchestrator.html',
agents=AGENTS,
chat_history=chat_display,
knowledge_loaded=bool(session.get('orchestrator_kb')))
@app.route('/agents', methods=['GET', 'POST'])
def agents():
agents_dir = os.path.join(os.path.dirname(__file__), 'agents')
agents_list = []
if os.path.exists(agents_dir):
for agent_name in sorted(os.listdir(agents_dir)):
agent_path = os.path.join(agents_dir, agent_name)
prompt_file = os.path.join(agent_path, 'systemprompt.md')
reminders_file = os.path.join(agent_path, 'reminders.md')
personality_file = os.path.join(agent_path, 'personality.md')
if os.path.isdir(agent_path):
prompt_content = ''
reminders_content = ''
personality_content = ''
if os.path.exists(prompt_file):
with open(prompt_file, 'r', encoding='utf-8') as f:
prompt_content = f.read()
if os.path.exists(reminders_file):
with open(reminders_file, 'r', encoding='utf-8') as f:
reminders_content = f.read()
else:
reminders_content = '# Erinnerungen - ' + agent_name.title() + '\n\n## Aktuelle Tasks\n-\n\n## Notizen\n- \n\n## Letzte Aktionen\n- '
if os.path.exists(personality_file):
with open(personality_file, 'r', encoding='utf-8') as f:
personality_content = f.read()
agents_list.append({
'name': agent_name,
'prompt': prompt_content,
'reminders': reminders_content,
'personality': personality_content
})
edit_agent = request.args.get('edit')
if not edit_agent and agents_list:
return redirect(url_for('agents', edit=agents_list[0]['name']))
edit_prompt = ''
edit_reminders = ''
edit_personality = ''
edit_model = 'opencode/big-pickle'
if edit_agent:
for agent in agents_list:
if agent['name'] == edit_agent:
edit_prompt = agent['prompt']
edit_reminders = agent['reminders']
edit_personality = agent['personality']
break
edit_model = get_agent_model(edit_agent)
if request.method == 'POST':
agent_name = request.form.get('agent_name', '').strip()
prompt_content = request.form.get('prompt_content', '')
reminders_content = request.form.get('reminders_content', '')
personality_content = request.form.get('personality_content', '')
if agent_name:
agent_path = os.path.join(agents_dir, agent_name)
if prompt_content is not None:
prompt_file = os.path.join(agent_path, 'systemprompt.md')
with open(prompt_file, 'w', encoding='utf-8') as f:
f.write(prompt_content)
if reminders_content is not None:
reminders_file = os.path.join(agent_path, 'reminders.md')
with open(reminders_file, 'w', encoding='utf-8') as f:
f.write(reminders_content)
if personality_content is not None:
personality_file = os.path.join(agent_path, 'personality.md')
with open(personality_file, 'w', encoding='utf-8') as f:
f.write(personality_content)
flash(f'Daten für "{agent_name}" gespeichert!', 'success')
return redirect(url_for('agents', edit=agent_name))
# Verfügbare Modelle laden
available_models = get_available_models()
return render_template('agents.html',
agents=AGENTS,
agents_list=agents_list,
edit_agent=edit_agent,
edit_prompt=edit_prompt,
edit_reminders=edit_reminders,
edit_personality=edit_personality,
edit_model=edit_model,
available_models=available_models)
@app.route('/files', methods=['GET', 'POST'])
def files():
if request.method == 'POST':
if 'file' not in request.files:
flash('Keine Datei ausgewählt', 'danger')
else:
file = request.files['file']
if file.filename == '':
flash('Keine Datei ausgewählt', 'danger')
else:
filepath = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
file.save(filepath)
flash('Datei hochgeladen!', 'success')
file_list = get_uploaded_files()
email_files = get_email_folder_files()
project_files = get_project_files()
# Agent Work Folders sammeln
agent_work_folders = {}
for agent_key in AGENTS.keys():
work_files = get_agent_work_files(agent_key)
if work_files: # Nur Agenten mit Dateien anzeigen
agent_work_folders[agent_key] = work_files
return render_template('files.html',
files=file_list,
email_files=email_files,
project_files=project_files,
agent_work_folders=agent_work_folders)
@app.route('/files/delete/<filename>')
def delete_file(filename):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(filepath):
os.remove(filepath)
flash('Datei gelöscht!', 'success')
return redirect(url_for('files'))
@app.route('/files/download/<filename>')
def download_file(filename):
"""Liefert eine hochgeladene Datei zum Download/Anzeige."""
return send_from_directory(app.config['UPLOAD_FOLDER'], filename, as_attachment=False)
@app.route('/files/agent/<agent_key>/<filename>')
def download_agent_file(agent_key, filename):
"""Liefert eine Datei aus dem Work-Ordner eines Agenten."""
if agent_key not in AGENTS:
return jsonify({'error': 'Agent nicht gefunden'}), 404
dirs = ensure_agent_structure(agent_key)
work_dir = dirs['work_dir']
filepath = os.path.join(work_dir, filename)
# Security: Stelle sicher, dass die Datei im work_dir ist
if not os.path.abspath(filepath).startswith(os.path.abspath(work_dir)):
return jsonify({'error': 'Zugriff verweigert'}), 403
if not os.path.isfile(filepath):
return jsonify({'error': 'Datei nicht gefunden'}), 404
return send_from_directory(work_dir, filename, as_attachment=False)
@app.route('/files/email/view/<filename>')
def view_email_file(filename):
"""Gibt Inhalt einer Email-Vorlage als JSON oder direkten Text zurück."""
email_dir = os.path.join(os.path.dirname(__file__), 'emails')
filepath = os.path.join(email_dir, filename)
# Security: stay inside emails/ dir
if not os.path.abspath(filepath).startswith(os.path.abspath(email_dir)):
return jsonify({'error': 'Zugriff verweigert'}), 403
if not os.path.isfile(filepath):
return jsonify({'error': 'Datei nicht gefunden'}), 404
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
if request.args.get('json'):
return jsonify({'content': content})
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/files/email/save/<filename>', methods=['POST'])
def save_email_file(filename):
"""Speichert den Inhalt einer Email-Vorlage (JSON POST)."""
email_dir = os.path.join(os.path.dirname(__file__), 'emails')
filepath = os.path.join(email_dir, filename)
if not os.path.abspath(filepath).startswith(os.path.abspath(email_dir)):
return jsonify({'ok': False, 'error': 'Zugriff verweigert'}), 403
try:
data = request.get_json()
content = data.get('content', '') if data else ''
with open(filepath, 'w', encoding='utf-8') as f:
f.write(content)
return jsonify({'ok': True})
except Exception as e:
return jsonify({'ok': False, 'error': str(e)}), 500
@app.route('/files/email/delete/<filename>')
def delete_email_file(filename):
"""Löscht eine Email-Vorlage."""
email_dir = os.path.join(os.path.dirname(__file__), 'emails')
filepath = os.path.join(email_dir, filename)
if not os.path.abspath(filepath).startswith(os.path.abspath(email_dir)):
flash('Zugriff verweigert', 'danger')
return redirect(url_for('files'))
if os.path.isfile(filepath):
os.remove(filepath)
flash(f'Email-Vorlage "{filename}" gelöscht!', 'success')
else:
flash('Datei nicht gefunden', 'warning')
return redirect(url_for('files'))
@app.route('/files/project/view/<filename>')
def view_project_file(filename):
"""Gibt Inhalt einer Projektdatei als JSON zurück."""
base_dir = os.path.dirname(__file__)
filepath = os.path.join(base_dir, filename)
# Security: stay in base dir (no subdirs)
if os.path.dirname(os.path.abspath(filepath)) != os.path.abspath(base_dir):
return jsonify({'error': 'Zugriff verweigert'}), 403
allowed_ext = ('.md', '.txt', '.docx')
if not filename.lower().endswith(allowed_ext):
return jsonify({'error': 'Dateityp nicht unterstützt'}), 400
if not os.path.isfile(filepath):
return jsonify({'error': 'Datei nicht gefunden'}), 404
try:
if filename.lower().endswith('.docx'):
return jsonify({'content': '(DOCX-Vorschau nicht verfügbar Datei herunterladen)'}), 200
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
if request.args.get('json'):
return jsonify({'content': content})
return content, 200, {'Content-Type': 'text/plain; charset=utf-8'}
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/emails', methods=['GET', 'POST'])
def emails():
"""Email Management Interface"""
if request.method == 'POST':
action = request.form.get('action')
if action == 'send':
to_address = request.form.get('to_address', '').strip()
subject = request.form.get('subject', '').strip()
body = request.form.get('body', '').strip()
if to_address and subject and body:
success, message = send_email(to_address, subject, body)
if success:
flash('Email erfolgreich versendet!', 'success')
else:
flash(f'Fehler: {message}', 'danger')
else:
flash('Bitte alle Felder ausfüllen', 'warning')
email_config_valid = bool(EMAIL_CONFIG['email_address'] and EMAIL_CONFIG['email_password'])
emails_list = get_emails() if email_config_valid else []
return render_template('emails.html',
emails=emails_list,
email_config_valid=email_config_valid,
current_email=EMAIL_CONFIG['email_address'])
@app.route('/emails/<email_id>')
def view_email(email_id):
"""View single email content"""
if not (EMAIL_CONFIG['email_address'] and EMAIL_CONFIG['email_password']):
return 'Email-Konfiguration erforderlich', 400
body = get_email_body(email_id)
return {'content': body}
@app.route('/email-log')
def email_log_view():
"""Zeigt das Email-Verarbeitungs-Log."""
with email_log_lock:
log_entries = list(reversed(email_log)) # Neueste zuerst
return render_template('email_log.html', agents=AGENTS, log_entries=log_entries)
@app.route('/settings', methods=['GET', 'POST'])
def settings():
"""Poller-Einstellungen zur Laufzeit ändern."""
if request.method == 'POST':
try:
poll_interval = int(request.form.get('poll_interval', 120))
failsafe_window = int(request.form.get('failsafe_window', 600))
if poll_interval < 10:
flash('Poll-Intervall muss mindestens 10 Sekunden betragen.', 'warning')
elif failsafe_window < poll_interval:
flash('Failsafe-Fenster muss größer als das Poll-Intervall sein.', 'warning')
else:
poller_settings['poll_interval'] = poll_interval
poller_settings['failsafe_window'] = failsafe_window
flash(f'Einstellungen gespeichert: Poll alle {poll_interval}s, Failsafe nach {failsafe_window}s.', 'success')
except ValueError:
flash('Ungültige Eingabe bitte nur ganze Zahlen eingeben.', 'danger')
return redirect(url_for('settings'))
# Journal-Statistik für Anzeige
con = sqlite3.connect(EMAIL_JOURNAL_DB)
journal_rows = con.execute(
"SELECT status, COUNT(*) FROM email_journal GROUP BY status"
).fetchall()
con.close()
journal_stats = {row[0]: row[1] for row in journal_rows}
return render_template('settings.html',
agents=AGENTS,
poller_settings=poller_settings,
journal_stats=journal_stats)
@app.route('/settings/journal-clear', methods=['POST'])
def journal_clear():
"""Löscht abgeschlossene Journal-Einträge (completed, skipped, error)."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
deleted = con.execute(
"DELETE FROM email_journal WHERE status IN ('completed','skipped','error')"
).rowcount
con.commit()
con.close()
flash(f'{deleted} abgeschlossene Journal-Einträge gelöscht.', 'success')
return redirect(url_for('settings'))
# ── Task API ────────────────────────────────────────────────────────────────
@app.route('/api/tasks', methods=['GET', 'POST'])
def api_tasks():
"""API zum Erstellen und Abrufen von Tasks."""
if request.method == 'POST':
data = request.get_json()
title = data.get('title', '').strip()
description = data.get('description', '')
assigned_agent = data.get('assigned_agent', '')
agent_key = data.get('agent_key', '')
if not title:
return jsonify({'error': 'Kein Titel übergeben'}), 400
task_id = len(tasks) + 1
while any(t['id'] == task_id for t in tasks):
task_id += 1
new_task = {
'id': task_id,
'title': title,
'description': description,
'assigned_agent': AGENTS.get(assigned_agent, {}).get('name', assigned_agent) if assigned_agent else 'Nicht zugewiesen',
'agent_key': agent_key or assigned_agent,
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'agent_created',
'created_by': agent_key
}
tasks.append(new_task)
return jsonify({'success': True, 'task': new_task})
task_list = list(reversed(tasks))
return jsonify({'tasks': task_list})
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task_api(task_id):
"""API zum Aktualisieren eines Tasks."""
data = request.get_json()
new_status = data.get('status')
for task in tasks:
if task['id'] == task_id:
task['status'] = new_status
return jsonify({'success': True, 'task': task})
return jsonify({'error': 'Task nicht gefunden'}), 404
@app.route('/api/models', methods=['GET'])
def get_models():
"""Gibt die Liste der verfügbaren KI-Modelle zurück."""
force_refresh = request.args.get('refresh', 'false').lower() == 'true'
models_data = get_available_models(force_refresh=force_refresh)
return jsonify(models_data)
@app.route('/api/agent/<agent_name>/model', methods=['POST'])
def set_agent_model(agent_name):
"""Setzt das Modell für einen Agenten."""
data = request.get_json()
if data and 'model' in data:
save_agent_config(agent_name, data['model'])
return jsonify({'success': True, 'message': f'Modell für {agent_name} gesetzt auf {data["model"]}'})
return jsonify({'error': 'Kein Modell übergeben'}), 400
@app.route('/api/agent/<agent_name>/delete', methods=['DELETE'])
def delete_agent(agent_name):
"""Löscht einen Agenten (den gesamten Ordner)."""
import shutil
agents_dir = os.path.join(os.path.dirname(__file__), 'agents')
agent_path = os.path.join(agents_dir, agent_name)
if not os.path.isdir(agent_path):
return jsonify({'error': 'Agent nicht gefunden'}), 404
try:
shutil.rmtree(agent_path)
config = get_agent_config()
if agent_name in config:
del config[agent_name]
with open(AGENT_CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
return jsonify({'success': True, 'message': f'Agent "{agent_name}" wurde gelöscht.'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/agent/<agent_name>/reminders', methods=['GET', 'POST'])
def agent_reminders(agent_name):
agents_dir = os.path.join(os.path.dirname(__file__), 'agents')
agent_path = os.path.join(agents_dir, agent_name)
reminders_file = os.path.join(agent_path, 'reminders.md')
if not os.path.isdir(agent_path):
return jsonify({'error': 'Agent nicht gefunden'}), 404
if request.method == 'GET':
if os.path.exists(reminders_file):
with open(reminders_file, 'r', encoding='utf-8') as f:
content = f.read()
else:
content = ''
return jsonify({'reminders': content})
if request.method == 'POST':
data = request.get_json()
if data and 'reminders' in data:
with open(reminders_file, 'w', encoding='utf-8') as f:
f.write(data['reminders'])
return jsonify({'success': True, 'message': 'Erinnerungen gespeichert'})
return jsonify({'error': 'Keine Daten übergeben'}), 400
@app.route('/api/orchestrator-distribute', methods=['POST'])
def distribute_tasks():
"""Erstellt einen Planungs-Task für den Orchestrator - dieser weist dann die richtigen Agenten zu."""
data = request.get_json()
tasks_list = data.get('tasks', [])
selected_agents = data.get('agents', [])
if not tasks_list:
return jsonify({'error': 'Keine Tasks übergeben'}), 400
tasks_text = '\n'.join([f"- {t}" for t in tasks_list])
agents_text = ', '.join(selected_agents) if selected_agents else 'alle verfügbaren Agenten'
planning_task = {
'id': len(tasks) + 1,
'title': f"Planungsphase: {tasks_list[0][:50]}{'...' if len(tasks_list[0]) > 50 else ''}",
'description': f"Tasks:\n{tasks_text}\n\nVerfügbare Agenten: {agents_text}\n\nDer Orchestrator soll diese Tasks analysieren und den richtigen Agenten zuweisen.",
'assigned_agent': 'Orchestrator',
'agent_key': 'orchestrator',
'status': 'pending',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'orchestrated',
'sub_tasks': tasks_list,
'available_agents': selected_agents
}
tasks.append(planning_task)
return jsonify({
'success': True,
'message': f'Planungs-Task erstellt. Der Orchestrator wird die richtigen Agenten zuweisen.',
'tasks': [planning_task['id']]
})
for i, task_text in enumerate(tasks_list):
agent_key = selected_agents[i % len(selected_agents)]
agent_info = AGENTS.get(agent_key, {})
agent_name = agent_info.get('name', agent_key)
new_task = {
'id': len(tasks) + 1 + i,
'title': task_text[:80],
'description': 'Automatisch erstellt durch Orchestrator',
'assigned_agent': agent_name,
'status': 'in_progress',
'created': datetime.now().strftime('%Y-%m-%d %H:%M'),
'type': 'orchestrated',
'agent_key': agent_key
}
tasks.append(new_task)
created_tasks.append(new_task['id'])
executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(selected_agents))
for i, task_text in enumerate(tasks_list):
agent_key = selected_agents[i % len(selected_agents)]
task_id = created_tasks[i]
executor.submit(run_task_async, agent_key, task_text, task_id)
return jsonify({
'success': True,
'message': f'{len(created_tasks)} Tasks erstellt und werden im Hintergrund ausgeführt',
'tasks': created_tasks
})
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=5000, threaded=True)