import smtplib import logging from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime from typing import Dict, List logger = logging.getLogger(__name__) class EmailHandler: """Handle email notifications for Watchdog events""" def __init__(self, config: Dict): self.config = config self.smtp_server = config['smtp_server'] self.smtp_port = config['smtp_port'] self.smtp_use_tls = config['smtp_use_tls'] self.smtp_username = config['smtp_username'] self.smtp_password = config['smtp_password'] self.from_address = config['from_address'] self.to_addresses = config['to_addresses'] logger.info(f"EmailHandler initialized for {self.smtp_server}:{self.smtp_port}") def _send_email(self, subject: str, html_content: str): """Send an email""" try: # Create message msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = self.from_address msg['To'] = ', '.join(self.to_addresses) # Add HTML content html_part = MIMEText(html_content, 'html') msg.attach(html_part) # Send email with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: if self.smtp_use_tls: server.starttls() server.login(self.smtp_username, self.smtp_password) server.send_message(msg) logger.info(f"Email sent: {subject}") return True except Exception as e: logger.error(f"Failed to send email: {e}", exc_info=True) return False def send_event_notification(self, event: Dict): """Send notification for a single event""" subject = self._format_subject(event) html_content = self._format_event_email(event) self._send_email(subject, html_content) def send_startup_notification(self): """Send notification when Watchdog starts""" subject = "🟢 Watchdog Docker gestartet" html_content = f"""

🟢 Watchdog Docker

Monitoring gestartet

Status: Aktiv

Zeitpunkt: {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}

Version: 0.1


OPNsense Monitoring ist aktiv und überwacht folgende Events:

""" self._send_email(subject, html_content) def _format_subject(self, event: Dict) -> str: """Format email subject based on event type""" event_type = event.get('type', 'unknown') prefixes = { 'dhcp_lease': '🔵 Neue DHCP Lease', 'new_device': '🔴 Neues Gerät erkannt' if not event.get('known') else '🟡 Bekanntes Gerät', 'interface_status': '⚠️ Interface Status', 'gateway_status': '⚠️ Gateway Status' } prefix = prefixes.get(event_type, '📢 Event') interface = event.get('interface', '') if interface: return f"{prefix} - {interface}" else: return prefix def _format_event_email(self, event: Dict) -> str: """Format event as HTML email""" event_type = event.get('type', 'unknown') timestamp = datetime.now().strftime('%d.%m.%Y %H:%M:%S') # Event type specific colors colors = { 'dhcp_lease': '#0d6efd', 'new_device': '#dc3545', 'interface_status': '#ffc107', 'gateway_status': '#fd7e14' } color = colors.get(event_type, '#6c757d') # Build event details HTML details_html = self._build_event_details_html(event) html_content = f"""

{self._format_subject(event)}

{event.get('details', 'Event detected')}

Zeitpunkt: {timestamp}
Event-Typ: {event_type.upper()}
{details_html}
""" return html_content def _build_event_details_html(self, event: Dict) -> str: """Build event-specific details HTML""" event_type = event.get('type', 'unknown') html = "" # Common fields if event.get('interface'): html += f"""
Interface: {event['interface']}
""" # Type-specific fields if event_type in ['dhcp_lease', 'new_device']: if event.get('ip'): html += f"""
IP-Adresse: {event['ip']}
""" if event.get('mac'): html += f"""
MAC-Adresse: {event['mac']}
""" if event.get('hostname'): html += f"""
Hostname: {event['hostname']}
""" if event_type == 'new_device': known = event.get('known', False) html += f"""
Bekannt: {'✓ Ja' if known else '✗ Nein (Erstes Mal gesehen!)'}
""" if event_type in ['interface_status', 'gateway_status']: if event.get('old_status'): html += f"""
Vorheriger Status: {event['old_status']}
""" if event.get('new_status'): html += f"""
Neuer Status: {event['new_status']}
""" if event_type == 'gateway_status' and event.get('gateway'): html += f"""
Gateway: {event['gateway']}
""" return html