watchdog-docker/app/email_handler.py

308 lines
10 KiB
Python
Raw Permalink Normal View History

import smtplib
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
from typing import Dict, List
logger = logging.getLogger(__name__)
class EmailHandler:
"""Handle email notifications for Watchdog events"""
def __init__(self, config: Dict):
self.config = config
self.smtp_server = config['smtp_server']
self.smtp_port = config['smtp_port']
self.smtp_use_tls = config['smtp_use_tls']
self.smtp_username = config['smtp_username']
self.smtp_password = config['smtp_password']
self.from_address = config['from_address']
self.to_addresses = config['to_addresses']
logger.info(f"EmailHandler initialized for {self.smtp_server}:{self.smtp_port}")
def _send_email(self, subject: str, html_content: str):
"""Send an email"""
try:
# Create message
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.from_address
msg['To'] = ', '.join(self.to_addresses)
# Add HTML content
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Send email
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
if self.smtp_use_tls:
server.starttls()
server.login(self.smtp_username, self.smtp_password)
server.send_message(msg)
logger.info(f"Email sent: {subject}")
return True
except Exception as e:
logger.error(f"Failed to send email: {e}", exc_info=True)
return False
def send_event_notification(self, event: Dict):
"""Send notification for a single event"""
subject = self._format_subject(event)
html_content = self._format_event_email(event)
self._send_email(subject, html_content)
def send_startup_notification(self):
"""Send notification when Watchdog starts"""
subject = "🟢 Watchdog Docker gestartet"
html_content = f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #28a745;
color: white;
padding: 20px;
border-radius: 5px;
text-align: center;
}}
.content {{
background-color: #f8f9fa;
padding: 20px;
margin-top: 20px;
border-radius: 5px;
}}
.footer {{
text-align: center;
margin-top: 20px;
color: #666;
font-size: 12px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🟢 Watchdog Docker</h1>
<p>Monitoring gestartet</p>
</div>
<div class="content">
<p><strong>Status:</strong> Aktiv</p>
<p><strong>Zeitpunkt:</strong> {datetime.now().strftime('%d.%m.%Y %H:%M:%S')}</p>
<hr>
<p>OPNsense Monitoring ist aktiv und überwacht folgende Events:</p>
<ul>
<li>DHCP Leases</li>
<li>Neue Geräte (ARP)</li>
<li>Interface Status</li>
<li>Gateway Status</li>
</ul>
</div>
<div class="footer">
<p>Watchdog Docker - Automatische Benachrichtigung</p>
</div>
</div>
</body>
</html>
"""
self._send_email(subject, html_content)
def _format_subject(self, event: Dict) -> str:
"""Format email subject based on event type"""
event_type = event.get('type', 'unknown')
prefixes = {
'dhcp_lease': '🔵 Neue DHCP Lease',
'new_device': '🔴 Neues Gerät erkannt' if not event.get('known') else '🟡 Bekanntes Gerät',
'interface_status': '⚠️ Interface Status',
'gateway_status': '⚠️ Gateway Status'
}
prefix = prefixes.get(event_type, '📢 Event')
interface = event.get('interface', '')
if interface:
return f"{prefix} - {interface}"
else:
return prefix
def _format_event_email(self, event: Dict) -> str:
"""Format event as HTML email"""
event_type = event.get('type', 'unknown')
timestamp = datetime.now().strftime('%d.%m.%Y %H:%M:%S')
# Event type specific colors
colors = {
'dhcp_lease': '#0d6efd',
'new_device': '#dc3545',
'interface_status': '#ffc107',
'gateway_status': '#fd7e14'
}
color = colors.get(event_type, '#6c757d')
# Build event details HTML
details_html = self._build_event_details_html(event)
html_content = f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: {color};
color: white;
padding: 20px;
border-radius: 5px;
text-align: center;
}}
.content {{
background-color: #f8f9fa;
padding: 20px;
margin-top: 20px;
border-radius: 5px;
}}
.detail-row {{
padding: 8px 0;
border-bottom: 1px solid #dee2e6;
}}
.detail-label {{
font-weight: bold;
display: inline-block;
width: 150px;
}}
.footer {{
text-align: center;
margin-top: 20px;
color: #666;
font-size: 12px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>{self._format_subject(event)}</h1>
<p>{event.get('details', 'Event detected')}</p>
</div>
<div class="content">
<div class="detail-row">
<span class="detail-label">Zeitpunkt:</span>
<span>{timestamp}</span>
</div>
<div class="detail-row">
<span class="detail-label">Event-Typ:</span>
<span>{event_type.upper()}</span>
</div>
{details_html}
</div>
<div class="footer">
<p>Watchdog Docker - Automatische Benachrichtigung</p>
</div>
</div>
</body>
</html>
"""
return html_content
def _build_event_details_html(self, event: Dict) -> str:
"""Build event-specific details HTML"""
event_type = event.get('type', 'unknown')
html = ""
# Common fields
if event.get('interface'):
html += f"""
<div class="detail-row">
<span class="detail-label">Interface:</span>
<span>{event['interface']}</span>
</div>
"""
# Type-specific fields
if event_type in ['dhcp_lease', 'new_device']:
if event.get('ip'):
html += f"""
<div class="detail-row">
<span class="detail-label">IP-Adresse:</span>
<span>{event['ip']}</span>
</div>
"""
if event.get('mac'):
html += f"""
<div class="detail-row">
<span class="detail-label">MAC-Adresse:</span>
<span>{event['mac']}</span>
</div>
"""
if event.get('hostname'):
html += f"""
<div class="detail-row">
<span class="detail-label">Hostname:</span>
<span>{event['hostname']}</span>
</div>
"""
if event_type == 'new_device':
known = event.get('known', False)
html += f"""
<div class="detail-row">
<span class="detail-label">Bekannt:</span>
<span style="color: {'green' if known else 'red'};">
{'✓ Ja' if known else '✗ Nein (Erstes Mal gesehen!)'}
</span>
</div>
"""
if event_type in ['interface_status', 'gateway_status']:
if event.get('old_status'):
html += f"""
<div class="detail-row">
<span class="detail-label">Vorheriger Status:</span>
<span>{event['old_status']}</span>
</div>
"""
if event.get('new_status'):
html += f"""
<div class="detail-row">
<span class="detail-label">Neuer Status:</span>
<span>{event['new_status']}</span>
</div>
"""
if event_type == 'gateway_status' and event.get('gateway'):
html += f"""
<div class="detail-row">
<span class="detail-label">Gateway:</span>
<span>{event['gateway']}</span>
</div>
"""
return html