#!/usr/bin/env python3 """ DynDNS Client for Hetzner DNS API Automatically updates DNS records when IP address changes """ import os import sys import time import logging import requests from typing import Optional, Dict, Any class HetznerDynDNS: """Hetzner DNS API client for dynamic DNS updates""" BASE_URL = "https://dns.hetzner.com/api/v1" def __init__( self, api_token: str, domain: str, record_name: str, record_type: str = "A", zone_id: Optional[str] = None ): """ Initialize Hetzner DynDNS client Args: api_token: Hetzner DNS API token domain: Domain name (e.g., example.com) record_name: DNS record name (e.g., 'home' or '@' for root) record_type: Record type ('A' for IPv4, 'AAAA' for IPv6) zone_id: Optional zone ID (will be auto-detected if not provided) """ self.api_token = api_token self.domain = domain self.record_name = record_name if record_name != "@" else "" self.record_type = record_type self.zone_id = zone_id self.headers = { "Auth-API-Token": api_token, "Content-Type": "application/json" } if not self.zone_id: self.zone_id = self._get_zone_id() def _make_request( self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Make HTTP request to Hetzner API""" url = f"{self.BASE_URL}/{endpoint}" try: if method == "GET": response = requests.get(url, headers=self.headers) elif method == "POST": response = requests.post(url, headers=self.headers, json=data) elif method == "PUT": response = requests.put(url, headers=self.headers, json=data) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logging.error(f"API request failed: {e}") raise def _get_zone_id(self) -> str: """Get zone ID for the domain""" logging.info(f"Looking up zone ID for domain: {self.domain}") try: response = self._make_request("GET", "zones") zones = response.get("zones", []) for zone in zones: if zone.get("name") == self.domain: zone_id = zone.get("id") logging.info(f"Found zone ID: {zone_id}") return zone_id raise ValueError(f"Zone not found for domain: {self.domain}") except Exception as e: logging.error(f"Failed to get zone ID: {e}") raise def get_current_ip(self) -> str: """Get current public IP address""" try: if self.record_type == "A": # IPv4 response = requests.get("https://api.ipify.org", timeout=10) else: # IPv6 response = requests.get("https://api6.ipify.org", timeout=10) response.raise_for_status() ip = response.text.strip() logging.debug(f"Current IP ({self.record_type}): {ip}") return ip except requests.exceptions.RequestException as e: logging.error(f"Failed to get current IP: {e}") raise def get_dns_record(self) -> Optional[Dict[str, Any]]: """Get existing DNS record""" try: response = self._make_request("GET", f"records?zone_id={self.zone_id}") records = response.get("records", []) for record in records: if (record.get("type") == self.record_type and record.get("name") == self.record_name): logging.debug(f"Found existing record: {record}") return record logging.info(f"No existing record found for {self.record_name}.{self.domain}") return None except Exception as e: logging.error(f"Failed to get DNS record: {e}") raise def create_dns_record(self, ip: str) -> Dict[str, Any]: """Create new DNS record""" logging.info(f"Creating new DNS record: {self.record_name}.{self.domain} -> {ip}") data = { "zone_id": self.zone_id, "type": self.record_type, "name": self.record_name, "value": ip, "ttl": 60 } try: response = self._make_request("POST", "records", data) logging.info(f"DNS record created successfully") return response except Exception as e: logging.error(f"Failed to create DNS record: {e}") raise def update_dns_record(self, record_id: str, ip: str) -> Dict[str, Any]: """Update existing DNS record""" logging.info(f"Updating DNS record: {self.record_name}.{self.domain} -> {ip}") data = { "zone_id": self.zone_id, "type": self.record_type, "name": self.record_name, "value": ip, "ttl": 60 } try: response = self._make_request("PUT", f"records/{record_id}", data) logging.info(f"DNS record updated successfully") return response except Exception as e: logging.error(f"Failed to update DNS record: {e}") raise def update(self) -> bool: """ Update DNS record if IP has changed Returns: True if record was updated, False if no update was needed """ try: current_ip = self.get_current_ip() existing_record = self.get_dns_record() if existing_record: if existing_record.get("value") == current_ip: logging.info(f"IP unchanged ({current_ip}), no update needed") return False else: logging.info(f"IP changed: {existing_record.get('value')} -> {current_ip}") self.update_dns_record(existing_record.get("id"), current_ip) return True else: logging.info(f"No existing record, creating new one with IP: {current_ip}") self.create_dns_record(current_ip) return True except Exception as e: logging.error(f"Update failed: {e}") return False def setup_logging(log_level: str = "INFO") -> None: """Configure logging""" logging.basicConfig( level=getattr(logging, log_level.upper()), format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def main(): """Main entry point""" # Load configuration from environment variables api_token = os.getenv("HETZNER_API_TOKEN") domain = os.getenv("DOMAIN") record_name = os.getenv("RECORD_NAME", "@") record_type = os.getenv("RECORD_TYPE", "A") zone_id = os.getenv("ZONE_ID") check_interval = int(os.getenv("CHECK_INTERVAL", "300")) log_level = os.getenv("LOG_LEVEL", "INFO") # Setup logging setup_logging(log_level) # Validate required configuration if not api_token: logging.error("HETZNER_API_TOKEN environment variable is required") sys.exit(1) if not domain: logging.error("DOMAIN environment variable is required") sys.exit(1) logging.info("Starting Hetzner DynDNS client") logging.info(f"Domain: {domain}") logging.info(f"Record: {record_name if record_name else '@'}.{domain}") logging.info(f"Type: {record_type}") logging.info(f"Check interval: {check_interval} seconds") # Initialize client try: client = HetznerDynDNS( api_token=api_token, domain=domain, record_name=record_name, record_type=record_type, zone_id=zone_id ) except Exception as e: logging.error(f"Failed to initialize client: {e}") sys.exit(1) # Main loop while True: try: client.update() except KeyboardInterrupt: logging.info("Shutting down...") break except Exception as e: logging.error(f"Unexpected error: {e}") # Wait before next check time.sleep(check_interval) if __name__ == "__main__": main()