import logging from datetime import datetime from typing import Dict, List, Optional from opnsense_api import OPNsenseAPI from database import Database from email_handler import EmailHandler logger = logging.getLogger(__name__) class OPNsenseMonitor: """Monitor OPNsense events""" def __init__(self, config: Dict, db: Database, email_handler: Optional[EmailHandler]): self.config = config self.db = db self.email_handler = email_handler # Initialize API client opn_config = config['opnsense'] self.api = OPNsenseAPI( host=opn_config['host'], api_key=opn_config['api_key'], api_secret=opn_config['api_secret'], verify_ssl=opn_config['verify_ssl'] ) # Previous states self.previous_leases = {} self.previous_devices = {} self.previous_interfaces = {} self.previous_gateways = {} logger.info("OPNsense Monitor initialized") def check_all(self): """Check all monitored events""" logger.debug("Starting monitoring cycle") events_config = self.config['monitoring']['events'] try: if events_config.get('dhcp_leases'): self.check_dhcp_leases() if events_config.get('new_devices'): self.check_new_devices() if events_config.get('interface_status'): self.check_interface_status() if events_config.get('gateway_status'): self.check_gateway_status() except Exception as e: logger.error(f"Error in monitoring cycle: {e}", exc_info=True) def check_dhcp_leases(self): """Check for new DHCP leases""" leases = self.api.get_dhcp_leases() if not leases: return monitored_interfaces = self.config['monitoring'].get('monitored_interfaces', []) for lease in leases: lease_id = lease.get('address', '') + lease.get('mac', '') interface = lease.get('if', '') # Filter by interface if specified if monitored_interfaces and interface not in monitored_interfaces: continue if lease_id not in self.previous_leases: # New lease detected event = { 'type': 'dhcp_lease', 'interface': interface, 'ip': lease.get('address'), 'mac': lease.get('mac'), 'hostname': lease.get('hostname', 'Unknown'), 'details': f"New DHCP lease: {lease.get('address')} ({lease.get('hostname', 'Unknown')})" } self.db.add_event(event) logger.info(f"New DHCP lease: {event['details']}") if self.email_handler: self.email_handler.send_event_notification(event) # Update previous state self.previous_leases = {lease.get('address', '') + lease.get('mac', ''): lease for lease in leases} def check_new_devices(self): """Check for new devices via ARP table""" arp_entries = self.api.get_arp_table() if not arp_entries: return monitored_interfaces = self.config['monitoring'].get('monitored_interfaces', []) for entry in arp_entries: mac = entry.get('mac', '') interface = entry.get('intf', '') # Filter by interface if monitored_interfaces and interface not in monitored_interfaces: continue if mac and mac not in self.previous_devices: # Check if device is in known devices DB is_known = self.db.is_known_device(mac) event = { 'type': 'new_device', 'interface': interface, 'ip': entry.get('ip'), 'mac': mac, 'hostname': entry.get('hostname', 'Unknown'), 'known': is_known, 'details': f"{'Known' if is_known else 'Unknown'} device detected: {mac} ({entry.get('hostname', 'Unknown')})" } self.db.add_event(event) logger.info(f"New device: {event['details']}") if self.email_handler and not is_known: # Only send email for unknown devices self.email_handler.send_event_notification(event) # Update previous state self.previous_devices = {entry.get('mac', ''): entry for entry in arp_entries} def check_interface_status(self): """Check interface status changes""" interfaces = self.api.get_interfaces() if not interfaces: return monitored_interfaces = self.config['monitoring'].get('monitored_interfaces', []) for if_name, if_data in interfaces.items(): # Filter by interface if monitored_interfaces and if_name not in monitored_interfaces: continue current_status = if_data.get('status', 'unknown') if if_name in self.previous_interfaces: previous_status = self.previous_interfaces[if_name].get('status', 'unknown') if current_status != previous_status: event = { 'type': 'interface_status', 'interface': if_name, 'old_status': previous_status, 'new_status': current_status, 'details': f"Interface {if_name} changed: {previous_status} → {current_status}" } self.db.add_event(event) logger.warning(f"Interface status change: {event['details']}") if self.email_handler: self.email_handler.send_event_notification(event) # Update previous state self.previous_interfaces = interfaces def check_gateway_status(self): """Check gateway status changes""" gateways = self.api.get_gateways() if not gateways: return for gw_name, gw_data in gateways.items(): current_status = gw_data.get('status', 'unknown') if gw_name in self.previous_gateways: previous_status = self.previous_gateways[gw_name].get('status', 'unknown') if current_status != previous_status: event = { 'type': 'gateway_status', 'gateway': gw_name, 'old_status': previous_status, 'new_status': current_status, 'details': f"Gateway {gw_name} changed: {previous_status} → {current_status}" } self.db.add_event(event) logger.warning(f"Gateway status change: {event['details']}") if self.email_handler: self.email_handler.send_event_notification(event) # Update previous state self.previous_gateways = gateways