From 468bae6775e13a615110856804096b3d7b7b4005 Mon Sep 17 00:00:00 2001 From: Logan Fick Date: Tue, 6 Jan 2026 18:13:56 -0500 Subject: [PATCH] Added progressive backoff and auto-cleanup for unreachable Owncast instances. (Closes #2 and #3) --- owncastsentry/database.py | 49 ++++++++++++++++++ owncastsentry/migrations.py | 15 ++++++ owncastsentry/models.py | 2 + owncastsentry/notification_service.py | 67 ++++++++++++++++++++++++ owncastsentry/stream_monitor.py | 73 +++++++++++++++++++++++++-- owncastsentry/utils.py | 29 +++++++++++ 6 files changed, 230 insertions(+), 5 deletions(-) diff --git a/owncastsentry/database.py b/owncastsentry/database.py index 9f2fef5..fc9b895 100644 --- a/owncastsentry/database.py +++ b/owncastsentry/database.py @@ -74,6 +74,43 @@ class StreamRepository: result = await self.get_by_domain(domain) return result is not None + async def increment_failure_counter(self, domain: str) -> None: + """ + Increment the failure counter for a stream by 1. + + :param domain: The stream domain + :return: Nothing + """ + query = """UPDATE streams + SET failure_counter = failure_counter + 1 + WHERE domain=$1""" + async with self.db.acquire() as conn: + await conn.execute(query, domain) + + async def reset_failure_counter(self, domain: str) -> None: + """ + Reset the failure counter for a stream to 0. + + :param domain: The stream domain + :return: Nothing + """ + query = """UPDATE streams + SET failure_counter = 0 + WHERE domain=$1""" + async with self.db.acquire() as conn: + await conn.execute(query, domain) + + async def delete(self, domain: str) -> None: + """ + Delete a stream record from the database. + + :param domain: The stream domain + :return: Nothing + """ + query = "DELETE FROM streams WHERE domain=$1" + async with self.db.acquire() as conn: + await conn.execute(query, domain) + class SubscriptionRepository: """Repository for managing stream subscriptions in the database.""" @@ -158,3 +195,15 @@ class SubscriptionRepository: async with self.db.acquire() as conn: result = await conn.fetchrow(query, domain) return result[0] + + async def delete_all_for_domain(self, domain: str) -> int: + """ + Delete all subscriptions for a given stream domain. + + :param domain: The stream domain + :return: Number of subscriptions deleted + """ + query = "DELETE FROM subscriptions WHERE stream_domain=$1" + async with self.db.acquire() as conn: + result = await conn.execute(query, domain) + return result.rowcount diff --git a/owncastsentry/migrations.py b/owncastsentry/migrations.py index c6116e3..fff989f 100644 --- a/owncastsentry/migrations.py +++ b/owncastsentry/migrations.py @@ -69,6 +69,21 @@ async def upgrade_v2(conn: Connection) -> None: await conn.execute("ALTER TABLE subscriptions_new RENAME TO subscriptions") +@upgrade_table.register(description="Add failure_counter column for backoff and auto-cleanup") +async def upgrade_v3(conn: Connection) -> None: + """ + Runs migrations to upgrade database schema to version 3 format. + Version 3 adds the failure_counter column to track connection failures for backoff and auto-cleanup. + + :param conn: A connection to run the v3 database migration on. + :return: Nothing. + """ + # Add failure_counter column with default value of 0 + await conn.execute( + """ALTER TABLE streams ADD COLUMN failure_counter INTEGER DEFAULT 0""" + ) + + def get_upgrade_table() -> UpgradeTable: """ Helper function for retrieving the upgrade table. diff --git a/owncastsentry/models.py b/owncastsentry/models.py index caa1edc..c4da93c 100644 --- a/owncastsentry/models.py +++ b/owncastsentry/models.py @@ -17,6 +17,7 @@ class StreamState: title: Optional[str] = None last_connect_time: Optional[str] = None last_disconnect_time: Optional[str] = None + failure_counter: int = 0 @property def online(self) -> bool: @@ -37,6 +38,7 @@ class StreamState: title=row["title"], last_connect_time=row["last_connect_time"], last_disconnect_time=row["last_disconnect_time"], + failure_counter=row["failure_counter"], ) diff --git a/owncastsentry/notification_service.py b/owncastsentry/notification_service.py index aea82e5..de71039 100644 --- a/owncastsentry/notification_service.py +++ b/owncastsentry/notification_service.py @@ -172,3 +172,70 @@ class NotificationService: :return: Nothing """ self.notification_timers_cache[domain] = time.time() + + async def send_cleanup_warning(self, domain: str) -> None: + """ + Send 83-day warning notification to all subscribed rooms. + + :param domain: The stream domain + :return: Nothing + """ + # Get all subscribed rooms + room_ids = await self.subscription_repo.get_subscribed_rooms(domain) + + # Build the warning message + body_text = ( + f"⚠️ Warning: Subscription Cleanup Scheduled\n\n" + f"The Owncast instance at {domain} has been unreachable for 83 days. " + f"If it remains unreachable for 7 more days (90 days total), this " + f"subscription will be automatically removed." + ) + + # Send to all rooms in parallel + tasks = [] + for room_id in room_ids: + tasks.append(self._send_notification(room_id, body_text, domain)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Count successes and failures + successful = sum(1 for r in results if not isinstance(r, Exception)) + failed = sum(1 for r in results if isinstance(r, Exception)) + + self.log.info( + f"[{domain}] Sent cleanup warning to {successful} rooms ({failed} failed)." + ) + + async def send_cleanup_deletion(self, domain: str) -> None: + """ + Send 90-day deletion notification to all subscribed rooms. + + :param domain: The stream domain + :return: Nothing + """ + # Get all subscribed rooms + room_ids = await self.subscription_repo.get_subscribed_rooms(domain) + + # Build the deletion message + body_text = ( + f"🗑️ Subscription Automatically Removed\n\n" + f"The Owncast instance at {domain} has been unreachable for 90 days " + f"and has been automatically removed from subscriptions in this room.\n\n" + f"If the instance comes online again and you want to resubscribe, " + f"run `!subscribe {domain}`." + ) + + # Send to all rooms in parallel + tasks = [] + for room_id in room_ids: + tasks.append(self._send_notification(room_id, body_text, domain)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Count successes and failures + successful = sum(1 for r in results if not isinstance(r, Exception)) + failed = sum(1 for r in results if isinstance(r, Exception)) + + self.log.info( + f"[{domain}] Sent cleanup deletion notice to {successful} rooms ({failed} failed)." + ) diff --git a/owncastsentry/stream_monitor.py b/owncastsentry/stream_monitor.py index e347508..84e552e 100644 --- a/owncastsentry/stream_monitor.py +++ b/owncastsentry/stream_monitor.py @@ -11,7 +11,12 @@ from .owncast_client import OwncastClient from .database import StreamRepository, SubscriptionRepository from .notification_service import NotificationService from .models import StreamState -from .utils import TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN +from .utils import ( + TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN, + CLEANUP_WARNING_THRESHOLD, + CLEANUP_DELETE_THRESHOLD, + should_query_stream, +) class StreamMonitor: @@ -62,10 +67,26 @@ class StreamMonitor: async def update_stream(self, domain: str) -> None: """ Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live. + Implements progressive backoff for connection failures and auto-cleanup for dead instances. :param domain: The domain of the stream to update. :return: Nothing. """ + # Fetch the current stream state from database to check failure_counter + old_state = await self.stream_repo.get_by_domain(domain) + failure_counter = old_state.failure_counter if old_state else 0 + + # Check if we should query this stream based on backoff schedule + if not should_query_stream(failure_counter): + # Skip this cycle, increment counter to track time passage + await self.stream_repo.increment_failure_counter(domain) + self.log.debug( + f"[{domain}] Skipping query due to backoff (counter={failure_counter + 1})" + ) + # Check cleanup thresholds even when skipping query + await self._check_cleanup_thresholds(domain, failure_counter + 1) + return + # A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live. first_update = False @@ -79,19 +100,25 @@ class StreamMonitor: # Fetch the latest stream state from the server new_state_dict = await self.owncast_client.get_stream_state(domain) - # Skip the update if the fetch failed for any reason + # If the fetch failed, increment failure counter and skip the update if new_state_dict is None: + await self.stream_repo.increment_failure_counter(domain) + self.log.warning( + f"[{domain}] Connection failure (counter={failure_counter + 1})" + ) + # Check cleanup thresholds after connection failure + await self._check_cleanup_thresholds(domain, failure_counter + 1) return + # Fetch succeeded! Reset failure counter + await self.stream_repo.reset_failure_counter(domain) + # Fix possible race conditions with timers if domain not in self.offline_timer_cache: self.offline_timer_cache[domain] = 0 if domain not in self.notification_service.notification_timers_cache: self.notification_service.notification_timers_cache[domain] = 0 - # Fetch the last known stream state from the database - old_state = await self.stream_repo.get_by_domain(domain) - # Does the last known stream state not have a value for the last connect and disconnect time? if ( old_state.last_connect_time is None @@ -232,3 +259,39 @@ class StreamMonitor: # All done. self.log.debug(f"[{domain}] State update completed.") + + async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None: + """ + Check if a domain has hit cleanup warning or deletion thresholds. + + :param domain: The domain to check + :param counter: The current failure counter value + :return: Nothing + """ + # Check for 83-day warning threshold + if counter == CLEANUP_WARNING_THRESHOLD: + self.log.warning( + f"[{domain}] Reached 83-day warning threshold. Sending cleanup warning." + ) + await self.notification_service.send_cleanup_warning(domain) + + # Check for 90-day deletion threshold + if counter >= CLEANUP_DELETE_THRESHOLD: + self.log.warning( + f"[{domain}] Reached 90-day deletion threshold. Removing all subscriptions." + ) + # Send deletion notification + await self.notification_service.send_cleanup_deletion(domain) + + # Delete all subscriptions for this domain + from .database import SubscriptionRepository + + subscription_repo = SubscriptionRepository(self.stream_repo.db) + deleted_count = await subscription_repo.delete_all_for_domain(domain) + + # Delete the stream record + await self.stream_repo.delete(domain) + + self.log.info( + f"[{domain}] Cleanup complete. Deleted {deleted_count} subscriptions and stream record." + ) diff --git a/owncastsentry/utils.py b/owncastsentry/utils.py index 0541a6c..67fa304 100644 --- a/owncastsentry/utils.py +++ b/owncastsentry/utils.py @@ -28,6 +28,35 @@ SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds # - If this time period passes entirely and a stream comes back online after, it's treated as regular going live. TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds +# Counter thresholds for auto-cleanup (based on 60-second polling intervals) +CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days +CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days + + +def should_query_stream(failure_counter: int) -> bool: + """ + Determine if a stream should be queried based on its failure counter. + Implements progressive backoff: 60s (5min) -> 2min (5min) -> 3min (5min) -> 5min (15min) -> 15min. + + :param failure_counter: The current failure counter value + :return: True if the stream should be queried this cycle, False otherwise + """ + if failure_counter <= 4: + # Query every 60s for first 5 minutes (counters 0-4) + return True + elif failure_counter <= 9: + # Query every 2 minutes for next 5 minutes (counters 5-9) + return (failure_counter * 60) % 120 == 0 + elif failure_counter <= 14: + # Query every 3 minutes for next 5 minutes (counters 10-14) + return (failure_counter * 60) % 180 == 0 + elif failure_counter <= 29: + # Query every 5 minutes for next 15 minutes (counters 15-29) + return (failure_counter * 60) % 300 == 0 + else: + # Query every 15 minutes after 30 minutes (counter 30+) + return (failure_counter * 60) % 900 == 0 + def domainify(url: str) -> str: """