2 Commits

8 changed files with 282 additions and 17 deletions

4
base-config.yaml Normal file
View File

@@ -0,0 +1,4 @@
# Health check endpoint URL.
# If configured, a GET request will be sent to this URL after each successful update cycle.
# Leave empty to disable health check reporting.
health_check_endpoint: ""

View File

@@ -7,3 +7,6 @@ modules:
main_class: OwncastSentry
database: true
database_type: asyncpg
config: true
extra_files:
- base-config.yaml

View File

@@ -4,9 +4,12 @@
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
from typing import Type
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.util.async_db import UpgradeTable
from mautrix.util.config import BaseProxyConfig
from .migrations import get_upgrade_table
from .owncast_client import OwncastClient
@@ -14,6 +17,8 @@ from .database import StreamRepository, SubscriptionRepository
from .notification_service import NotificationService
from .stream_monitor import StreamMonitor
from .commands import CommandHandler
from .config import Config
from .health_checker import HealthChecker
class OwncastSentry(Plugin):
@@ -28,6 +33,15 @@ class OwncastSentry(Plugin):
"""
return get_upgrade_table()
@classmethod
def get_config_class(cls) -> Type[BaseProxyConfig]:
"""
Helper method for telling Maubot about our configuration class.
:return: The Config class.
"""
return Config
async def start(self) -> None:
"""
Method called by Maubot upon startup of the instance.
@@ -35,6 +49,9 @@ class OwncastSentry(Plugin):
:return: Nothing.
"""
# Load configuration
self.config.load_and_update()
# Initialize the Owncast API client
self.owncast_client = OwncastClient(self.log)
@@ -55,6 +72,13 @@ class OwncastSentry(Plugin):
self.log,
)
# Initialize health checker
self.health_checker = HealthChecker(
self.database,
self.owncast_client,
self.log,
)
# Initialize command handler
self.command_handler = CommandHandler(
self.owncast_client,
@@ -69,15 +93,21 @@ class OwncastSentry(Plugin):
async def _update_all_stream_states(self) -> None:
"""
Wrapper method for updating all stream states.
Fetches list of subscribed domains and delegates to StreamMonitor.
Fetches list of subscribed domains, delegates to StreamMonitor, and performs health check.
:return: Nothing.
"""
# Get list of all stream domains with active subscriptions
subscribed_domains = await self.subscription_repo.get_all_subscribed_domains()
# Delegate to stream monitor
await self.stream_monitor.update_all_streams(subscribed_domains)
# Delegate to stream monitor and get results
update_result = await self.stream_monitor.update_all_streams(subscribed_domains)
# Perform health check
await self.health_checker.perform_health_check(
update_result,
self.config.health_check_endpoint,
)
@command.new(help="Subscribes to a new Owncast stream.")
@command.argument("url")

29
owncastsentry/config.py Normal file
View File

@@ -0,0 +1,29 @@
# Copyright 2026 Logan Fick
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper
class Config(BaseProxyConfig):
"""Configuration class for OwncastSentry plugin."""
def do_update(self, helper: ConfigUpdateHelper) -> None:
"""
Update configuration with user-provided values.
:param helper: ConfigUpdateHelper for copying values.
:return: Nothing.
"""
helper.copy("health_check_endpoint")
@property
def health_check_endpoint(self) -> str:
"""
Get the health check endpoint URL.
:return: The configured endpoint URL or empty string if not set.
"""
return self["health_check_endpoint"]

View File

@@ -0,0 +1,157 @@
# Copyright 2026 Logan Fick
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
import logging
from dataclasses import dataclass
from mautrix.util.async_db import Database
from .owncast_client import OwncastClient
@dataclass
class UpdateResult:
"""Result of a stream update cycle."""
total_streams: int
successful_checks: int
failed_checks: int
@property
def http_healthy(self) -> bool:
"""
Determine HTTP health based on update results.
HTTP is considered healthy if:
- No streams are subscribed (nothing to check), OR
- At least one stream check succeeded
:return: True if HTTP is considered healthy.
"""
if self.total_streams == 0:
return True
return self.successful_checks > 0
@dataclass
class HealthStatus:
"""Represents the health status of the plugin."""
database_healthy: bool
http_healthy: bool
@property
def is_healthy(self) -> bool:
"""
Check if all health components are healthy.
:return: True if all checks pass.
"""
return self.database_healthy and self.http_healthy
class HealthChecker:
"""Service for performing health checks on the plugin."""
def __init__(
self,
database: Database,
owncast_client: OwncastClient,
logger: logging.Logger,
):
"""
Initialize the health checker.
:param database: The maubot database instance.
:param owncast_client: Client for making HTTP requests.
:param logger: Logger instance.
"""
self.db = database
self.owncast_client = owncast_client
self.log = logger
async def check_database(self) -> bool:
"""
Check if the database is functioning by executing a simple query.
:return: True if database is healthy, False otherwise.
"""
try:
async with self.db.acquire() as conn:
await conn.fetchval("SELECT 1")
return True
except Exception as e:
self.log.warning(f"Database health check failed: {e}")
return False
async def perform_health_check(
self,
update_result: UpdateResult,
endpoint: str,
) -> None:
"""
Perform health check and report to configured endpoint if all healthy.
:param update_result: Result of the stream update cycle.
:param endpoint: Health check endpoint URL (empty string to skip reporting).
:return: Nothing.
"""
# Check database health
database_healthy = await self.check_database()
# Evaluate HTTP health from update results
http_healthy = update_result.http_healthy
# Create health status
status = HealthStatus(
database_healthy=database_healthy,
http_healthy=http_healthy,
)
self.log.debug(
f"Health check: database={database_healthy}, http={http_healthy}, "
f"streams={update_result.total_streams}, "
f"succeeded={update_result.successful_checks}, "
f"failed={update_result.failed_checks}"
)
# Skip endpoint notification if not configured
if not endpoint or not endpoint.strip():
self.log.debug("Health check endpoint not configured, skipping report.")
return
# Only send to endpoint if ALL checks pass
if not status.is_healthy:
self.log.warning(
f"Health check failed, not reporting to endpoint. "
f"database={database_healthy}, http={http_healthy}"
)
return
# Send GET request to health endpoint
await self._send_health_report(endpoint)
async def _send_health_report(self, endpoint: str) -> None:
"""
Send a GET request to the health check endpoint.
:param endpoint: The endpoint URL.
:return: Nothing.
"""
try:
async with self.owncast_client.session.get(
endpoint, allow_redirects=True
) as response:
if 200 <= response.status < 300:
self.log.debug(
f"Health check reported successfully (status={response.status})"
)
else:
self.log.warning(
f"Health check endpoint returned non-success status: {response.status}"
)
except Exception as e:
self.log.warning(f"Failed to report health check to endpoint: {e}")

View File

@@ -11,7 +11,7 @@ from typing import List
from mautrix.types import TextMessageEventContent, MessageType
from .database import SubscriptionRepository
from .utils import SECONDS_BETWEEN_NOTIFICATIONS, sanitize_for_markdown
from .utils import SECONDS_BETWEEN_NOTIFICATIONS, sanitize_for_plain_text
class NotificationService:
@@ -130,7 +130,7 @@ class NotificationService:
"""
# Use name if available, fallback to domain
stream_name = name if name else domain
safe_stream_name = sanitize_for_markdown(stream_name)
safe_stream_name = sanitize_for_plain_text(stream_name)
# Choose message based on notification type
if title_change:
@@ -140,7 +140,7 @@ class NotificationService:
# Add title if present
if title != "":
safe_title = sanitize_for_markdown(title)
safe_title = sanitize_for_plain_text(title)
body_text += "\nStream Title: " + safe_title
# Add stream URL
@@ -150,7 +150,7 @@ class NotificationService:
if tags and len(tags) > 0:
safe_tags = []
for tag in tags:
safe_tag = sanitize_for_markdown(tag)
safe_tag = sanitize_for_plain_text(tag)
if safe_tag and not safe_tag.startswith('.'):
safe_tags.append(safe_tag)

View File

@@ -17,6 +17,7 @@ from .utils import (
CLEANUP_DELETE_THRESHOLD,
should_query_stream,
)
from .health_checker import UpdateResult
class StreamMonitor:
@@ -45,32 +46,48 @@ class StreamMonitor:
# Cache for tracking when streams last went offline
self.offline_timer_cache = {}
async def update_all_streams(self, subscribed_domains: list[str]) -> None:
async def update_all_streams(self, subscribed_domains: list[str]) -> UpdateResult:
"""
Checks the status of all streams with active subscriptions.
Updates for all streams are performed asynchronously, with the method returning when the slowest update completes.
:param subscribed_domains: List of stream domains to update
:return: Nothing.
:param subscribed_domains: List of stream domains to update.
:return: UpdateResult with success/failure counts.
"""
self.log.debug("Updating all stream states...")
total_streams = len(subscribed_domains)
# Build a list of async tasks which update the state for each stream domain
tasks = []
for domain in subscribed_domains:
tasks.append(asyncio.create_task(self.update_stream(domain)))
# Run the tasks in parallel
await asyncio.gather(*tasks)
self.log.debug("Update complete.")
# Run the tasks in parallel and collect results
results = await asyncio.gather(*tasks)
async def update_stream(self, domain: str) -> None:
# Count successes and failures
successful_checks = sum(1 for result in results if result is True)
failed_checks = sum(1 for result in results if result is False)
self.log.debug(
f"Update complete. {successful_checks}/{total_streams} succeeded, "
f"{failed_checks} failed."
)
return UpdateResult(
total_streams=total_streams,
successful_checks=successful_checks,
failed_checks=failed_checks,
)
async def update_stream(self, domain: str) -> bool:
"""
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
Implements progressive backoff for connection failures and auto-cleanup for dead instances.
:param domain: The domain of the stream to update.
:return: Nothing.
:return: True if the stream check succeeded (or was skipped due to backoff), False on connection failure.
"""
# Fetch the current stream state from database to check failure_counter
old_state = await self.stream_repo.get_by_domain(domain)
@@ -85,7 +102,8 @@ class StreamMonitor:
)
# Check cleanup thresholds even when skipping query
await self._check_cleanup_thresholds(domain, failure_counter + 1)
return
# Backoff is expected behavior, not a failure
return True
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
first_update = False
@@ -108,7 +126,8 @@ class StreamMonitor:
)
# Check cleanup thresholds after connection failure
await self._check_cleanup_thresholds(domain, failure_counter + 1)
return
# Actual connection failure
return False
# Fetch succeeded! Reset failure counter
await self.stream_repo.reset_failure_counter(domain)
@@ -259,6 +278,7 @@ class StreamMonitor:
# All done.
self.log.debug(f"[{domain}] State update completed.")
return True
async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None:
"""

View File

@@ -157,6 +157,28 @@ def escape_markdown(text: str) -> str:
return escaped_text
def sanitize_for_plain_text(text: str) -> str:
"""
Sanitize text for plain text rendering.
Removes newlines and normalizes whitespace without escaping special characters.
Use this for plain text notifications where escaping would show literal backslashes.
:param text: The text to sanitize
:return: Sanitized text
"""
if not text:
return text
# Remove newlines and carriage returns to prevent multi-line injection
sanitized = text.replace('\n', ' ').replace('\r', ' ')
# Collapse multiple spaces into single space
sanitized = ' '.join(sanitized.split())
return sanitized
def sanitize_for_markdown(text: str) -> str:
"""
Sanitize text for safe Markdown rendering.