watchdog-docker/app/opnsense_api.py

70 lines
2.5 KiB
Python
Raw Permalink Normal View History

import requests
import logging
from typing import Dict, List, Optional
import urllib3
# Disable SSL warnings (only if verify_ssl is false)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
class OPNsenseAPI:
"""Client for OPNsense API"""
def __init__(self, host: str, api_key: str, api_secret: str, verify_ssl: bool = True):
self.host = host.rstrip('/')
self.api_key = api_key
self.api_secret = api_secret
self.verify_ssl = verify_ssl
self.session = requests.Session()
self.session.auth = (api_key, api_secret)
self.session.verify = verify_ssl
logger.info(f"OPNsense API client initialized for {self.host}")
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict]:
"""Make API request"""
url = f"{self.host}/api/{endpoint}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"API request failed: {e}")
return None
def get_dhcp_leases(self) -> Optional[List[Dict]]:
"""Get current DHCP leases"""
data = self._request('GET', 'dhcpv4/leases/searchLease')
if data and 'rows' in data:
return data['rows']
return []
def get_interfaces(self) -> Optional[Dict]:
"""Get interface status"""
return self._request('GET', 'diagnostics/interface/getInterfaceConfig')
def get_interface_statistics(self) -> Optional[Dict]:
"""Get interface statistics"""
return self._request('GET', 'diagnostics/traffic/interface')
def get_gateways(self) -> Optional[Dict]:
"""Get gateway status"""
return self._request('GET', 'routes/gateway/status')
def get_arp_table(self) -> Optional[List[Dict]]:
"""Get ARP table for device detection"""
data = self._request('GET', 'diagnostics/interface/search_arp')
if data and 'rows' in data:
return data['rows']
return []
def test_connection(self) -> bool:
"""Test API connection"""
try:
result = self._request('GET', 'core/firmware/status')
return result is not None
except Exception as e:
logger.error(f"Connection test failed: {e}")
return False