70 lines
2.5 KiB
Python
70 lines
2.5 KiB
Python
import requests
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
import urllib3
|
|
|
|
# Disable SSL warnings (only if verify_ssl is false)
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class OPNsenseAPI:
|
|
"""Client for OPNsense API"""
|
|
|
|
def __init__(self, host: str, api_key: str, api_secret: str, verify_ssl: bool = True):
|
|
self.host = host.rstrip('/')
|
|
self.api_key = api_key
|
|
self.api_secret = api_secret
|
|
self.verify_ssl = verify_ssl
|
|
self.session = requests.Session()
|
|
self.session.auth = (api_key, api_secret)
|
|
self.session.verify = verify_ssl
|
|
|
|
logger.info(f"OPNsense API client initialized for {self.host}")
|
|
|
|
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict]:
|
|
"""Make API request"""
|
|
url = f"{self.host}/api/{endpoint}"
|
|
|
|
try:
|
|
response = self.session.request(method, url, **kwargs)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"API request failed: {e}")
|
|
return None
|
|
|
|
def get_dhcp_leases(self) -> Optional[List[Dict]]:
|
|
"""Get current DHCP leases"""
|
|
data = self._request('GET', 'dhcpv4/leases/searchLease')
|
|
if data and 'rows' in data:
|
|
return data['rows']
|
|
return []
|
|
|
|
def get_interfaces(self) -> Optional[Dict]:
|
|
"""Get interface status"""
|
|
return self._request('GET', 'diagnostics/interface/getInterfaceConfig')
|
|
|
|
def get_interface_statistics(self) -> Optional[Dict]:
|
|
"""Get interface statistics"""
|
|
return self._request('GET', 'diagnostics/traffic/interface')
|
|
|
|
def get_gateways(self) -> Optional[Dict]:
|
|
"""Get gateway status"""
|
|
return self._request('GET', 'routes/gateway/status')
|
|
|
|
def get_arp_table(self) -> Optional[List[Dict]]:
|
|
"""Get ARP table for device detection"""
|
|
data = self._request('GET', 'diagnostics/interface/search_arp')
|
|
if data and 'rows' in data:
|
|
return data['rows']
|
|
return []
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test API connection"""
|
|
try:
|
|
result = self._request('GET', 'core/firmware/status')
|
|
return result is not None
|
|
except Exception as e:
|
|
logger.error(f"Connection test failed: {e}")
|
|
return False |