watchdog-docker/app/email_handler.py

308 lines
10 KiB
Python

import smtplib
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
from typing import Dict, List
logger = logging.getLogger(__name__)
class EmailHandler:
"""Handle email notifications for Watchdog events"""
def __init__(self, config: Dict):
self.config = config
self.smtp_server = config['smtp_server']
self.smtp_port = config['smtp_port']
self.smtp_use_tls = config['smtp_use_tls']
self.smtp_username = config['smtp_username']
self.smtp_password = config['smtp_password']
self.from_address = config['from_address']
self.to_addresses = config['to_addresses']
logger.info(f"EmailHandler initialized for {self.smtp_server}:{self.smtp_port}")
def _send_email(self, subject: str, html_content: str):
"""Send an email"""
try:
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.from_address
msg['To'] = ', '.join(self.to_addresses)
# Add HTML content
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Send email
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
if self.smtp_use_tls:
server.starttls()
server.login(self.smtp_username, self.smtp_password)
server.send_message(msg)
logger.info(f"Email sent: {subject}")
return True
except Exception as e:
logger.error(f"Failed to send email: {e}", exc_info=True)
return False
def send_event_notification(self, event: Dict):
"""Send notification for a single event"""
subject = self._format_subject(event)
html_content = self._format_event_email(event)
self._send_email(subject, html_content)
def send_startup_notification(self):
"""Send notification when Watchdog starts"""
subject = "🟢 Watchdog Docker gestartet"
html_content = f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #28a745;
color: white;
padding: 20px;
border-radius: 5px;
text-align: center;
}}
.content {{
background-color: #f8f9fa;
padding: 20px;
margin-top: 20px;
border-radius: 5px;
}}
.footer {{
text-align: center;
margin-top: 20px;
color: #666;
font-size: 12px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🟢 Watchdog Docker</h1>
<p>Monitoring gestartet</p>
</div>
<div class="content">
<p><strong>Status:</strong> Aktiv</p>
<p><strong>Zeitpunkt:</strong> {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}</p>
<hr>
<p>OPNsense Monitoring ist aktiv und überwacht folgende Events:</p>
<ul>
<li>DHCP Leases</li>
<li>Neue Geräte (ARP)</li>
<li>Interface Status</li>
<li>Gateway Status</li>
</ul>
</div>
<div class="footer">
<p>Watchdog Docker - Automatische Benachrichtigung</p>
</div>
</div>
</body>
</html>
"""
self._send_email(subject, html_content)
def _format_subject(self, event: Dict) -> str:
"""Format email subject based on event type"""
event_type = event.get('type', 'unknown')
prefixes = {
'dhcp_lease': '🔵 Neue DHCP Lease',
'new_device': '🔴 Neues Gerät erkannt' if not event.get('known') else '🟡 Bekanntes Gerät',
'interface_status': '⚠️ Interface Status',
'gateway_status': '⚠️ Gateway Status'
}
prefix = prefixes.get(event_type, '📢 Event')
interface = event.get('interface', '')
if interface:
return f"{prefix} - {interface}"
else:
return prefix
def _format_event_email(self, event: Dict) -> str:
"""Format event as HTML email"""
event_type = event.get('type', 'unknown')
timestamp = datetime.now().strftime('%d.%m.%Y %H:%M:%S')
# Event type specific colors
colors = {
'dhcp_lease': '#0d6efd',
'new_device': '#dc3545',
'interface_status': '#ffc107',
'gateway_status': '#fd7e14'
}
color = colors.get(event_type, '#6c757d')
# Build event details HTML
details_html = self._build_event_details_html(event)
html_content = f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: {color};
color: white;
padding: 20px;
border-radius: 5px;
text-align: center;
}}
.content {{
background-color: #f8f9fa;
padding: 20px;
margin-top: 20px;
border-radius: 5px;
}}
.detail-row {{
padding: 8px 0;
border-bottom: 1px solid #dee2e6;
}}
.detail-label {{
font-weight: bold;
display: inline-block;
width: 150px;
}}
.footer {{
text-align: center;
margin-top: 20px;
color: #666;
font-size: 12px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>{self._format_subject(event)}</h1>
<p>{event.get('details', 'Event detected')}</p>
</div>
<div class="content">
<div class="detail-row">
<span class="detail-label">Zeitpunkt:</span>
<span>{timestamp}</span>
</div>
<div class="detail-row">
<span class="detail-label">Event-Typ:</span>
<span>{event_type.upper()}</span>
</div>
{details_html}
</div>
<div class="footer">
<p>Watchdog Docker - Automatische Benachrichtigung</p>
</div>
</div>
</body>
</html>
"""
return html_content
def _build_event_details_html(self, event: Dict) -> str:
"""Build event-specific details HTML"""
event_type = event.get('type', 'unknown')
html = ""
# Common fields
if event.get('interface'):
html += f"""
<div class="detail-row">
<span class="detail-label">Interface:</span>
<span>{event['interface']}</span>
</div>
"""
# Type-specific fields
if event_type in ['dhcp_lease', 'new_device']:
if event.get('ip'):
html += f"""
<div class="detail-row">
<span class="detail-label">IP-Adresse:</span>
<span>{event['ip']}</span>
</div>
"""
if event.get('mac'):
html += f"""
<div class="detail-row">
<span class="detail-label">MAC-Adresse:</span>
<span>{event['mac']}</span>
</div>
"""
if event.get('hostname'):
html += f"""
<div class="detail-row">
<span class="detail-label">Hostname:</span>
<span>{event['hostname']}</span>
</div>
"""
if event_type == 'new_device':
known = event.get('known', False)
html += f"""
<div class="detail-row">
<span class="detail-label">Bekannt:</span>
<span style="color: {'green' if known else 'red'};">
{'✓ Ja' if known else '✗ Nein (Erstes Mal gesehen!)'}
</span>
</div>
"""
if event_type in ['interface_status', 'gateway_status']:
if event.get('old_status'):
html += f"""
<div class="detail-row">
<span class="detail-label">Vorheriger Status:</span>
<span>{event['old_status']}</span>
</div>
"""
if event.get('new_status'):
html += f"""
<div class="detail-row">
<span class="detail-label">Neuer Status:</span>
<span>{event['new_status']}</span>
</div>
"""
if event_type == 'gateway_status' and event.get('gateway'):
html += f"""
<div class="detail-row">
<span class="detail-label">Gateway:</span>
<span>{event['gateway']}</span>
</div>
"""
return html