feat: Persist outbound emails, fix @UPDATE_TEAM_MEMBER parser, add per-entry delete

- Add sent_emails table to DB for persistent outbox logging
- send_email() now writes every outgoing mail (incl. errors) to sent_emails
- parse_agent_commands() passes agent_key/task_id as triggered_by metadata
- Fix @UPDATE_TEAM_MEMBER parser: now matches Identifier/TelegramID/Role/etc.
  format from system prompt (was expecting Email/Field/Value — never matched)
- update_team_member() called correctly via **kwargs (was positional args bug)
- Set Piotr telegram_id=1578034974 directly in DB
- email_log.html: two-tab UI (Inbox Journal + Outbox), click-to-expand body
- emails.html: per-message delete button in inbox list
- New routes: DELETE inbox (IMAP expunge), journal entry, sent entry
This commit is contained in:
eric 2026-02-21 18:43:21 +00:00
parent 99df910497
commit f6ad727bf0
3 changed files with 340 additions and 165 deletions

244
app.py
View file

@ -290,6 +290,20 @@ def init_journal():
)
""")
# Outbox-Tabelle für ausgehende Emails
con.execute("""
CREATE TABLE IF NOT EXISTS sent_emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
to_address TEXT NOT NULL,
subject TEXT,
body TEXT,
sent_at TEXT NOT NULL,
triggered_by TEXT,
task_id INTEGER,
status TEXT DEFAULT 'sent'
)
""")
# App-Settings Tabelle für globale Einstellungen
con.execute("""
CREATE TABLE IF NOT EXISTS app_settings (
@ -954,13 +968,13 @@ def respond_to_message(message_id, response):
return True
return False
def parse_agent_commands(agent_key, response_text):
def parse_agent_commands(agent_key, response_text, task_id=None):
"""Parst Agent-Antwort nach Orchestrator-Kommandos und führt sie aus."""
import re
# ASK_ORCHESTRATOR: Agent stellt Frage an Orchestrator
ask_requests = re.findall(
r'@ASK_ORCHESTRATOR\s*\nQuestion:\s*([^\n]+)\s*\nContext:\s*([^@]+)@END',
r'@ASK_ORCHESTRATOR\s*\nQuestion:\s*([^\n]+)\s*\nContext:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -986,7 +1000,7 @@ Die Antwort wird an {agent_key} zurückgegeben.""",
# CREATE_SUBTASK: Agent möchte Subtask erstellen
subtask_requests = re.findall(
r'@CREATE_SUBTASK\s*\nTask:\s*([^\n]+)\s*\nRequirements:\s*([^@]+)@END',
r'@CREATE_SUBTASK\s*\nTask:\s*([^\n]+)\s*\nRequirements:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1003,7 +1017,7 @@ Die Antwort wird an {agent_key} zurückgegeben.""",
# SUGGEST_AGENT: Agent schlägt neuen Agent vor
suggest_requests = re.findall(
r'@SUGGEST_AGENT\s*\nRole:\s*([^\n]+)\s*\nSkills:\s*([^\n]+)\s*\nReason:\s*([^@]+)@END',
r'@SUGGEST_AGENT\s*\nRole:\s*([^\n]+)\s*\nSkills:\s*([^\n]+)\s*\nReason:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1033,7 +1047,7 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
# READ_KNOWLEDGE: Agent möchte Wissensdatenbank durchsuchen
read_kb_requests = re.findall(
r'@READ_KNOWLEDGE\s*\nTopic:\s*([^@]+)@END',
r'@READ_KNOWLEDGE\s*\nTopic:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1062,7 +1076,7 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
# SEND_EMAIL: Orchestrator sendet Email an Team-Member
send_email_requests = re.findall(
r'@SEND_EMAIL\s*\nTo:\s*([^\n]+)\s*\nSubject:\s*([^\n]+)\s*\nBody:\s*([^@]+)@END',
r'@SEND_EMAIL\s*\nTo:\s*([^\n]+)\s*\nSubject:\s*([^\n]+)\s*\nBody:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1072,7 +1086,8 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
body_clean = body.strip()
# Versuche Email zu senden
success, message = send_email(to_clean, subject_clean, body_clean)
success, message = send_email(to_clean, subject_clean, body_clean,
triggered_by=f'agent:{agent_key}', task_id=task_id)
if success:
logger.info(f"[AgentCmd] Email gesendet an {to_clean}: {subject_clean}")
else:
@ -1080,7 +1095,7 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
# SEND_TELEGRAM: Orchestrator sendet Telegram-Nachricht
send_telegram_requests = re.findall(
r'@SEND_TELEGRAM\s*\nTo:\s*([^\n]+)\s*\nMessage:\s*([^@]+)@END',
r'@SEND_TELEGRAM\s*\nTo:\s*([^\n]+)\s*\nMessage:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1116,7 +1131,7 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
# ADD_TEAM_MEMBER: Füge neues Team-Mitglied hinzu
add_member_requests = re.findall(
r'@ADD_TEAM_MEMBER\s*\nName:\s*([^\n]+)\s*\nEmail:\s*([^\n]+)\s*\nRole:\s*([^\n]+)\s*\nResponsibilities:\s*([^@]+)@END',
r'@ADD_TEAM_MEMBER\s*\nName:\s*([^\n]+)\s*\nEmail:\s*([^\n]+)\s*\nRole:\s*([^\n]+)\s*\nResponsibilities:\s*(.*?)@END',
response_text,
re.DOTALL
)
@ -1133,50 +1148,41 @@ Bitte entscheide ob dieser Agent erstellt werden soll.""",
logger.warning(f"[AgentCmd] Team-Member konnte nicht hinzugefügt werden: {name_clean}")
# UPDATE_TEAM_MEMBER: Aktualisiere Team-Mitglied
update_member_requests = re.findall(
r'@UPDATE_TEAM_MEMBER\s*\nEmail:\s*([^\n]+)\s*\nField:\s*([^\n]+)\s*\nValue:\s*([^@]+)@END',
response_text,
re.DOTALL
)
for email, field, value in update_member_requests:
email_clean = email.strip()
field_clean = field.strip().lower()
value_clean = value.strip()
# Hole aktuelles Team-Member
con = sqlite3.connect(EMAIL_JOURNAL_DB)
member = con.execute(
"SELECT name, role, email, responsibilities FROM team_members WHERE LOWER(email) = ?",
(email_clean.lower(),)
).fetchone()
if member:
# Update je nach Field
updates = {
'name': member[0],
'role': member[1],
'email': member[2],
'responsibilities': member[3]
}
if field_clean in updates:
updates[field_clean] = value_clean
success = update_team_member(
email_clean,
updates['name'],
updates['role'],
updates['responsibilities']
)
if success:
logger.info(f"[AgentCmd] Team-Member aktualisiert: {email_clean} - {field_clean}")
else:
logger.error(f"[AgentCmd] Update fehlgeschlagen für {email_clean}")
else:
logger.warning(f"[AgentCmd] Unbekanntes Field: {field_clean}")
# Format: @UPDATE_TEAM_MEMBER\nIdentifier: <email oder name>\n[Feld: Wert]\n...\n@END
ALLOWED_UPDATE_FIELDS = {
'name': 'name',
'role': 'role',
'responsibilities': 'responsibilities',
'email': 'email',
'telegramid': 'telegram_id',
'telegram_id': 'telegram_id',
'phone': 'phone',
}
for block in re.findall(r'@UPDATE_TEAM_MEMBER\s*\n(.*?)@END', response_text, re.DOTALL):
lines = [l.strip() for l in block.strip().splitlines() if l.strip()]
identifier = None
kwargs = {}
for line in lines:
if ':' not in line:
continue
key, _, val = line.partition(':')
key_norm = key.strip().lower().replace(' ', '_')
val_clean = val.strip()
if key_norm == 'identifier':
identifier = val_clean
elif key_norm in ALLOWED_UPDATE_FIELDS:
kwargs[ALLOWED_UPDATE_FIELDS[key_norm]] = val_clean
if not identifier:
logger.warning("[AgentCmd] @UPDATE_TEAM_MEMBER ohne Identifier ignoriert")
continue
if not kwargs:
logger.warning(f"[AgentCmd] @UPDATE_TEAM_MEMBER für '{identifier}' ohne Felder ignoriert")
continue
success = update_team_member(identifier, **kwargs)
if success:
logger.info(f"[AgentCmd] Team-Member aktualisiert: {identifier} - {list(kwargs.keys())}")
else:
logger.warning(f"[AgentCmd] Team-Member nicht gefunden: {email_clean}")
con.close()
logger.error(f"[AgentCmd] Update fehlgeschlagen für '{identifier}'")
def create_new_agent(agent_key, role, skills):
"""Erstellt dynamisch einen neuen Agenten."""
@ -1373,8 +1379,9 @@ def get_email_body(email_id):
return f'Fehler beim Abrufen der Email: {str(e)}'
def send_email(to_address, subject, body):
"""Sendet eine Email via SMTP"""
def send_email(to_address, subject, body, triggered_by='manual', task_id=None):
"""Sendet eine Email via SMTP und persistiert sie in der Outbox."""
now = datetime.now().isoformat()
try:
if not EMAIL_CONFIG['email_address'] or not EMAIL_CONFIG['email_password']:
return False, 'Email-Konfiguration erforderlich'
@ -1396,9 +1403,29 @@ def send_email(to_address, subject, body):
server.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
server.send_message(msg)
server.quit()
# Ausgehende Email persistieren
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute(
"INSERT INTO sent_emails (to_address, subject, body, sent_at, triggered_by, task_id, status) VALUES (?,?,?,?,?,?,'sent')",
(to_address, subject, body, now, triggered_by, task_id)
)
con.commit()
con.close()
return True, 'Email erfolgreich versendet'
except Exception as e:
# Fehlgeschlagene Versuche auch loggen
try:
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute(
"INSERT INTO sent_emails (to_address, subject, body, sent_at, triggered_by, task_id, status) VALUES (?,?,?,?,?,?,'error')",
(to_address, subject, body, now, triggered_by, task_id)
)
con.commit()
con.close()
except Exception:
pass
return False, f'Fehler beim Versenden: {str(e)}'
def is_whitelisted(sender_address):
@ -1547,33 +1574,24 @@ async def telegram_message_handler(update: Update, context: ContextTypes.DEFAULT
def send_telegram_message(chat_id: int, message: str):
"""Sendet eine Nachricht an einen Telegram-Chat."""
global telegram_app
if not telegram_app or not TELEGRAM_CONFIG['bot_token']:
logging.warning("[Telegram] Cannot send message: Bot not configured")
"""Sendet eine Nachricht an einen Telegram-Chat via direktem HTTP-Request."""
token = TELEGRAM_CONFIG.get('bot_token')
if not token:
logging.warning("[Telegram] Cannot send message: Bot token not configured")
return False
try:
import asyncio
# In die laufende Event-Loop des Telegram-Threads senden (thread-safe)
loop = telegram_app.updater.application.updater._network_loop if hasattr(telegram_app, 'updater') else None
if loop is None:
# Fallback: Loop des Telegram-Threads direkt holen
loop = telegram_app.loop if hasattr(telegram_app, 'loop') else None
if loop and loop.is_running():
future = asyncio.run_coroutine_threadsafe(
telegram_app.bot.send_message(chat_id=chat_id, text=message),
loop
)
future.result(timeout=30)
import requests as req
r = req.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={'chat_id': chat_id, 'text': message},
timeout=15
)
if r.ok:
logging.info(f"[Telegram] Nachricht an {chat_id} gesendet.")
return True
else:
# Letzter Fallback: eigene Loop (kein Telegram-Bot läuft)
loop = asyncio.new_event_loop()
loop.run_until_complete(telegram_app.bot.send_message(chat_id=chat_id, text=message))
loop.close()
return True
logging.error(f"[Telegram] API Fehler: {r.status_code} {r.text}")
return False
except Exception as e:
logging.error(f"[Telegram] Error sending message: {e}")
return False
@ -2012,6 +2030,12 @@ def process_beat_tasks():
task['response'] = response
logger.info("[TaskBeat] Task #%d abgeschlossen.", task['id'])
# Agent-Kommandos parsen (@SEND_EMAIL, @UPDATE_TEAM_MEMBER, etc.)
try:
parse_agent_commands('orchestrator', response, task_id=task['id'])
except Exception as e:
logger.error("[TaskBeat] parse_agent_commands Fehler: %s", str(e))
# Telegram-Antwort senden
if task.get('type') == 'telegram' and task.get('telegram_chat_id'):
try:
@ -3006,13 +3030,69 @@ def view_email(email_id):
return {'content': body}
@app.route('/emails/inbox/<imap_uid>/delete', methods=['POST'])
@login_required
def delete_inbox_email(imap_uid):
"""Löscht eine einzelne Email aus dem IMAP-Posteingang (verschiebt in Trash)."""
if not (EMAIL_CONFIG['email_address'] and EMAIL_CONFIG['email_password']):
flash('Email-Konfiguration erforderlich', 'danger')
return redirect(url_for('emails'))
try:
mail = imaplib.IMAP4_SSL(EMAIL_CONFIG['imap_server'], EMAIL_CONFIG['imap_port'])
mail.login(EMAIL_CONFIG['email_address'], EMAIL_CONFIG['email_password'])
mail.select('INBOX')
mail.store(imap_uid, '+FLAGS', '\\Deleted')
mail.expunge()
mail.close()
mail.logout()
flash('Email aus Posteingang gelöscht.', 'success')
except Exception as e:
flash(f'Fehler beim Löschen: {e}', 'danger')
return redirect(url_for('emails'))
@app.route('/emails/journal/<path:message_id>/delete', methods=['POST'])
@login_required
def delete_journal_entry(message_id):
"""Löscht einen einzelnen Journal-Eintrag (eingehende Emails)."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("DELETE FROM email_journal WHERE message_id = ?", (message_id,))
con.commit()
con.close()
flash('Journal-Eintrag gelöscht.', 'success')
return redirect(url_for('email_log_view'))
@app.route('/emails/sent/<int:sent_id>/delete', methods=['POST'])
@login_required
def delete_sent_email(sent_id):
"""Löscht einen einzelnen Outbox-Eintrag."""
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.execute("DELETE FROM sent_emails WHERE id = ?", (sent_id,))
con.commit()
con.close()
flash('Gesendete Email aus Log gelöscht.', 'success')
return redirect(url_for('email_log_view'))
@app.route('/email-log')
@login_required
def email_log_view():
"""Zeigt das Email-Verarbeitungs-Log."""
with email_log_lock:
log_entries = list(reversed(email_log)) # Neueste zuerst
return render_template('email_log.html', agents=AGENTS, log_entries=log_entries)
"""Zeigt Inbox-Journal und Outbox-Log."""
# Eingehende: aus DB
con = sqlite3.connect(EMAIL_JOURNAL_DB)
con.row_factory = sqlite3.Row
journal_rows = con.execute(
"SELECT * FROM email_journal ORDER BY received_at DESC"
).fetchall()
# Ausgehende: aus DB
sent_rows = con.execute(
"SELECT * FROM sent_emails ORDER BY sent_at DESC"
).fetchall()
con.close()
return render_template('email_log.html', agents=AGENTS,
journal_rows=journal_rows,
sent_rows=sent_rows)
@app.route('/settings', methods=['GET', 'POST'])