From 314e1bf399eeb5bd55c66d84ae196ca3d5ae6ef5 Mon Sep 17 00:00:00 2001 From: Logan Fick Date: Thu, 15 Jan 2026 21:59:44 -0500 Subject: [PATCH] Added configurable health check endpoint for external uptime monitoring integration. --- base-config.yaml | 4 + maubot.yaml | 3 + owncastsentry/__init__.py | 36 +++++++- owncastsentry/config.py | 29 ++++++ owncastsentry/health_checker.py | 157 ++++++++++++++++++++++++++++++++ owncastsentry/stream_monitor.py | 40 ++++++-- 6 files changed, 256 insertions(+), 13 deletions(-) create mode 100644 base-config.yaml create mode 100644 owncastsentry/config.py create mode 100644 owncastsentry/health_checker.py diff --git a/base-config.yaml b/base-config.yaml new file mode 100644 index 0000000..430353d --- /dev/null +++ b/base-config.yaml @@ -0,0 +1,4 @@ +# Health check endpoint URL. +# If configured, a GET request will be sent to this URL after each successful update cycle. +# Leave empty to disable health check reporting. +health_check_endpoint: "" diff --git a/maubot.yaml b/maubot.yaml index bd571d8..d46a1b0 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -7,3 +7,6 @@ modules: main_class: OwncastSentry database: true database_type: asyncpg +config: true +extra_files: + - base-config.yaml diff --git a/owncastsentry/__init__.py b/owncastsentry/__init__.py index 0cdaa39..85240be 100644 --- a/owncastsentry/__init__.py +++ b/owncastsentry/__init__.py @@ -4,9 +4,12 @@ # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +from typing import Type + from maubot import Plugin, MessageEvent from maubot.handlers import command from mautrix.util.async_db import UpgradeTable +from mautrix.util.config import BaseProxyConfig from .migrations import get_upgrade_table from .owncast_client import OwncastClient @@ -14,6 +17,8 @@ from .database import StreamRepository, SubscriptionRepository from .notification_service import NotificationService from .stream_monitor import StreamMonitor from .commands import CommandHandler +from .config import Config +from .health_checker import HealthChecker class OwncastSentry(Plugin): @@ -28,6 +33,15 @@ class OwncastSentry(Plugin): """ return get_upgrade_table() + @classmethod + def get_config_class(cls) -> Type[BaseProxyConfig]: + """ + Helper method for telling Maubot about our configuration class. + + :return: The Config class. + """ + return Config + async def start(self) -> None: """ Method called by Maubot upon startup of the instance. @@ -35,6 +49,9 @@ class OwncastSentry(Plugin): :return: Nothing. """ + # Load configuration + self.config.load_and_update() + # Initialize the Owncast API client self.owncast_client = OwncastClient(self.log) @@ -55,6 +72,13 @@ class OwncastSentry(Plugin): self.log, ) + # Initialize health checker + self.health_checker = HealthChecker( + self.database, + self.owncast_client, + self.log, + ) + # Initialize command handler self.command_handler = CommandHandler( self.owncast_client, @@ -69,15 +93,21 @@ class OwncastSentry(Plugin): async def _update_all_stream_states(self) -> None: """ Wrapper method for updating all stream states. - Fetches list of subscribed domains and delegates to StreamMonitor. + Fetches list of subscribed domains, delegates to StreamMonitor, and performs health check. :return: Nothing. """ # Get list of all stream domains with active subscriptions subscribed_domains = await self.subscription_repo.get_all_subscribed_domains() - # Delegate to stream monitor - await self.stream_monitor.update_all_streams(subscribed_domains) + # Delegate to stream monitor and get results + update_result = await self.stream_monitor.update_all_streams(subscribed_domains) + + # Perform health check + await self.health_checker.perform_health_check( + update_result, + self.config.health_check_endpoint, + ) @command.new(help="Subscribes to a new Owncast stream.") @command.argument("url") diff --git a/owncastsentry/config.py b/owncastsentry/config.py new file mode 100644 index 0000000..4c7ab17 --- /dev/null +++ b/owncastsentry/config.py @@ -0,0 +1,29 @@ +# Copyright 2026 Logan Fick +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper + + +class Config(BaseProxyConfig): + """Configuration class for OwncastSentry plugin.""" + + def do_update(self, helper: ConfigUpdateHelper) -> None: + """ + Update configuration with user-provided values. + + :param helper: ConfigUpdateHelper for copying values. + :return: Nothing. + """ + helper.copy("health_check_endpoint") + + @property + def health_check_endpoint(self) -> str: + """ + Get the health check endpoint URL. + + :return: The configured endpoint URL or empty string if not set. + """ + return self["health_check_endpoint"] diff --git a/owncastsentry/health_checker.py b/owncastsentry/health_checker.py new file mode 100644 index 0000000..510f82c --- /dev/null +++ b/owncastsentry/health_checker.py @@ -0,0 +1,157 @@ +# Copyright 2026 Logan Fick +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +import logging +from dataclasses import dataclass + +from mautrix.util.async_db import Database + +from .owncast_client import OwncastClient + + +@dataclass +class UpdateResult: + """Result of a stream update cycle.""" + + total_streams: int + successful_checks: int + failed_checks: int + + @property + def http_healthy(self) -> bool: + """ + Determine HTTP health based on update results. + + HTTP is considered healthy if: + - No streams are subscribed (nothing to check), OR + - At least one stream check succeeded + + :return: True if HTTP is considered healthy. + """ + if self.total_streams == 0: + return True + return self.successful_checks > 0 + + +@dataclass +class HealthStatus: + """Represents the health status of the plugin.""" + + database_healthy: bool + http_healthy: bool + + @property + def is_healthy(self) -> bool: + """ + Check if all health components are healthy. + + :return: True if all checks pass. + """ + return self.database_healthy and self.http_healthy + + +class HealthChecker: + """Service for performing health checks on the plugin.""" + + def __init__( + self, + database: Database, + owncast_client: OwncastClient, + logger: logging.Logger, + ): + """ + Initialize the health checker. + + :param database: The maubot database instance. + :param owncast_client: Client for making HTTP requests. + :param logger: Logger instance. + """ + self.db = database + self.owncast_client = owncast_client + self.log = logger + + async def check_database(self) -> bool: + """ + Check if the database is functioning by executing a simple query. + + :return: True if database is healthy, False otherwise. + """ + try: + async with self.db.acquire() as conn: + await conn.fetchval("SELECT 1") + return True + except Exception as e: + self.log.warning(f"Database health check failed: {e}") + return False + + async def perform_health_check( + self, + update_result: UpdateResult, + endpoint: str, + ) -> None: + """ + Perform health check and report to configured endpoint if all healthy. + + :param update_result: Result of the stream update cycle. + :param endpoint: Health check endpoint URL (empty string to skip reporting). + :return: Nothing. + """ + # Check database health + database_healthy = await self.check_database() + + # Evaluate HTTP health from update results + http_healthy = update_result.http_healthy + + # Create health status + status = HealthStatus( + database_healthy=database_healthy, + http_healthy=http_healthy, + ) + + self.log.debug( + f"Health check: database={database_healthy}, http={http_healthy}, " + f"streams={update_result.total_streams}, " + f"succeeded={update_result.successful_checks}, " + f"failed={update_result.failed_checks}" + ) + + # Skip endpoint notification if not configured + if not endpoint or not endpoint.strip(): + self.log.debug("Health check endpoint not configured, skipping report.") + return + + # Only send to endpoint if ALL checks pass + if not status.is_healthy: + self.log.warning( + f"Health check failed, not reporting to endpoint. " + f"database={database_healthy}, http={http_healthy}" + ) + return + + # Send GET request to health endpoint + await self._send_health_report(endpoint) + + async def _send_health_report(self, endpoint: str) -> None: + """ + Send a GET request to the health check endpoint. + + :param endpoint: The endpoint URL. + :return: Nothing. + """ + try: + async with self.owncast_client.session.get( + endpoint, allow_redirects=True + ) as response: + if 200 <= response.status < 300: + self.log.debug( + f"Health check reported successfully (status={response.status})" + ) + else: + self.log.warning( + f"Health check endpoint returned non-success status: {response.status}" + ) + except Exception as e: + self.log.warning(f"Failed to report health check to endpoint: {e}") diff --git a/owncastsentry/stream_monitor.py b/owncastsentry/stream_monitor.py index 143d99d..e427dcf 100644 --- a/owncastsentry/stream_monitor.py +++ b/owncastsentry/stream_monitor.py @@ -17,6 +17,7 @@ from .utils import ( CLEANUP_DELETE_THRESHOLD, should_query_stream, ) +from .health_checker import UpdateResult class StreamMonitor: @@ -45,32 +46,48 @@ class StreamMonitor: # Cache for tracking when streams last went offline self.offline_timer_cache = {} - async def update_all_streams(self, subscribed_domains: list[str]) -> None: + async def update_all_streams(self, subscribed_domains: list[str]) -> UpdateResult: """ Checks the status of all streams with active subscriptions. Updates for all streams are performed asynchronously, with the method returning when the slowest update completes. - :param subscribed_domains: List of stream domains to update - :return: Nothing. + :param subscribed_domains: List of stream domains to update. + :return: UpdateResult with success/failure counts. """ self.log.debug("Updating all stream states...") + total_streams = len(subscribed_domains) + # Build a list of async tasks which update the state for each stream domain tasks = [] for domain in subscribed_domains: tasks.append(asyncio.create_task(self.update_stream(domain))) - # Run the tasks in parallel - await asyncio.gather(*tasks) - self.log.debug("Update complete.") + # Run the tasks in parallel and collect results + results = await asyncio.gather(*tasks) - async def update_stream(self, domain: str) -> None: + # Count successes and failures + successful_checks = sum(1 for result in results if result is True) + failed_checks = sum(1 for result in results if result is False) + + self.log.debug( + f"Update complete. {successful_checks}/{total_streams} succeeded, " + f"{failed_checks} failed." + ) + + return UpdateResult( + total_streams=total_streams, + successful_checks=successful_checks, + failed_checks=failed_checks, + ) + + async def update_stream(self, domain: str) -> bool: """ Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live. Implements progressive backoff for connection failures and auto-cleanup for dead instances. :param domain: The domain of the stream to update. - :return: Nothing. + :return: True if the stream check succeeded (or was skipped due to backoff), False on connection failure. """ # Fetch the current stream state from database to check failure_counter old_state = await self.stream_repo.get_by_domain(domain) @@ -85,7 +102,8 @@ class StreamMonitor: ) # Check cleanup thresholds even when skipping query await self._check_cleanup_thresholds(domain, failure_counter + 1) - return + # Backoff is expected behavior, not a failure + return True # A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live. first_update = False @@ -108,7 +126,8 @@ class StreamMonitor: ) # Check cleanup thresholds after connection failure await self._check_cleanup_thresholds(domain, failure_counter + 1) - return + # Actual connection failure + return False # Fetch succeeded! Reset failure counter await self.stream_repo.reset_failure_counter(domain) @@ -259,6 +278,7 @@ class StreamMonitor: # All done. self.log.debug(f"[{domain}] State update completed.") + return True async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None: """