from flask import Flask, render_template, redirect, url_for, request, flash, jsonify from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from werkzeug.security import check_password_hash import yaml import logging from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime import os from database import Database from monitor import OPNsenseMonitor from email_handler import EmailHandler # Load configuration with open('config/config.yaml', 'r') as f: config = yaml.safe_load(f) # Setup logging logging.basicConfig( level=getattr(logging, config['logging']['level']), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(config['logging']['file']), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize Flask app = Flask(__name__) app.config['SECRET_KEY'] = config['web']['secret_key'] # Initialize Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # Initialize components db = Database(config['database']['path']) email_handler = EmailHandler(config['email']) if config['email']['enabled'] else None monitor = OPNsenseMonitor(config, db, email_handler) # Simple User class class User(UserMixin): def __init__(self, id): self.id = id @login_manager.user_loader def load_user(user_id): return User(user_id) # Routes @app.route('/') @login_required def index(): return redirect(url_for('dashboard')) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('dashboard')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') # Simple authentication (username: admin) if username == 'admin' and check_password_hash( config['web']['admin_password_hash'], password ): user = User('admin') login_user(user) logger.info(f"User {username} logged in successfully") return redirect(url_for('dashboard')) else: logger.warning(f"Failed login attempt for user: {username}") flash('Invalid username or password', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): logger.info(f"User {current_user.id} logged out") logout_user() return redirect(url_for('login')) @app.route('/dashboard') @login_required def dashboard(): # Get recent events events = db.get_recent_events(limit=100) stats = db.get_statistics() return render_template('dashboard.html', events=events, stats=stats, config=config) @app.route('/api/events') @login_required def api_events(): """API endpoint for real-time event updates""" limit = request.args.get('limit', 50, type=int) event_type = request.args.get('type', None) interface = request.args.get('interface', None) events = db.get_recent_events(limit=limit, event_type=event_type, interface=interface) return jsonify(events) @app.route('/api/stats') @login_required def api_stats(): """API endpoint for statistics""" return jsonify(db.get_statistics()) @app.route('/health') def health(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'version': '0.1' }) # Initialize scheduler def start_monitoring(): scheduler = BackgroundScheduler() interval = config['monitoring']['polling_interval'] scheduler.add_job( func=monitor.check_all, trigger="interval", seconds=interval, id='opnsense_monitor', name='OPNsense Monitor', replace_existing=True ) scheduler.start() logger.info(f"Monitoring started with {interval}s interval") # Send startup notification if email_handler and config['email']['send_on_startup']: email_handler.send_startup_notification() if __name__ == '__main__': logger.info("Starting Watchdog Docker v0.1") # Initialize database db.initialize() # Start monitoring start_monitoring() # Run Flask app.run( host=config['web']['host'], port=config['web']['port'], debug=False )