Added progressive backoff and auto-cleanup for unreachable Owncast instances. (Closes #2 and closes #3)

This commit is contained in:
2026-01-06 18:13:56 -05:00
parent b6beef0e48
commit 35086cb751
6 changed files with 230 additions and 5 deletions

View File

@@ -74,6 +74,43 @@ class StreamRepository:
result = await self.get_by_domain(domain)
return result is not None
async def increment_failure_counter(self, domain: str) -> None:
"""
Increment the failure counter for a stream by 1.
:param domain: The stream domain
:return: Nothing
"""
query = """UPDATE streams
SET failure_counter = failure_counter + 1
WHERE domain=$1"""
async with self.db.acquire() as conn:
await conn.execute(query, domain)
async def reset_failure_counter(self, domain: str) -> None:
"""
Reset the failure counter for a stream to 0.
:param domain: The stream domain
:return: Nothing
"""
query = """UPDATE streams
SET failure_counter = 0
WHERE domain=$1"""
async with self.db.acquire() as conn:
await conn.execute(query, domain)
async def delete(self, domain: str) -> None:
"""
Delete a stream record from the database.
:param domain: The stream domain
:return: Nothing
"""
query = "DELETE FROM streams WHERE domain=$1"
async with self.db.acquire() as conn:
await conn.execute(query, domain)
class SubscriptionRepository:
"""Repository for managing stream subscriptions in the database."""
@@ -158,3 +195,15 @@ class SubscriptionRepository:
async with self.db.acquire() as conn:
result = await conn.fetchrow(query, domain)
return result[0]
async def delete_all_for_domain(self, domain: str) -> int:
"""
Delete all subscriptions for a given stream domain.
:param domain: The stream domain
:return: Number of subscriptions deleted
"""
query = "DELETE FROM subscriptions WHERE stream_domain=$1"
async with self.db.acquire() as conn:
result = await conn.execute(query, domain)
return result.rowcount

View File

@@ -69,6 +69,21 @@ async def upgrade_v2(conn: Connection) -> None:
await conn.execute("ALTER TABLE subscriptions_new RENAME TO subscriptions")
@upgrade_table.register(description="Add failure_counter column for backoff and auto-cleanup")
async def upgrade_v3(conn: Connection) -> None:
"""
Runs migrations to upgrade database schema to version 3 format.
Version 3 adds the failure_counter column to track connection failures for backoff and auto-cleanup.
:param conn: A connection to run the v3 database migration on.
:return: Nothing.
"""
# Add failure_counter column with default value of 0
await conn.execute(
"""ALTER TABLE streams ADD COLUMN failure_counter INTEGER DEFAULT 0"""
)
def get_upgrade_table() -> UpgradeTable:
"""
Helper function for retrieving the upgrade table.

View File

@@ -17,6 +17,7 @@ class StreamState:
title: Optional[str] = None
last_connect_time: Optional[str] = None
last_disconnect_time: Optional[str] = None
failure_counter: int = 0
@property
def online(self) -> bool:
@@ -37,6 +38,7 @@ class StreamState:
title=row["title"],
last_connect_time=row["last_connect_time"],
last_disconnect_time=row["last_disconnect_time"],
failure_counter=row["failure_counter"],
)

View File

@@ -172,3 +172,70 @@ class NotificationService:
:return: Nothing
"""
self.notification_timers_cache[domain] = time.time()
async def send_cleanup_warning(self, domain: str) -> None:
"""
Send 83-day warning notification to all subscribed rooms.
:param domain: The stream domain
:return: Nothing
"""
# Get all subscribed rooms
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
# Build the warning message
body_text = (
f"⚠️ Warning: Subscription Cleanup Scheduled\n\n"
f"The Owncast instance at {domain} has been unreachable for 83 days. "
f"If it remains unreachable for 7 more days (90 days total), this "
f"subscription will be automatically removed."
)
# Send to all rooms in parallel
tasks = []
for room_id in room_ids:
tasks.append(self._send_notification(room_id, body_text, domain))
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes and failures
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = sum(1 for r in results if isinstance(r, Exception))
self.log.info(
f"[{domain}] Sent cleanup warning to {successful} rooms ({failed} failed)."
)
async def send_cleanup_deletion(self, domain: str) -> None:
"""
Send 90-day deletion notification to all subscribed rooms.
:param domain: The stream domain
:return: Nothing
"""
# Get all subscribed rooms
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
# Build the deletion message
body_text = (
f"🗑️ Subscription Automatically Removed\n\n"
f"The Owncast instance at {domain} has been unreachable for 90 days "
f"and has been automatically removed from subscriptions in this room.\n\n"
f"If the instance comes online again and you want to resubscribe, "
f"run `!subscribe {domain}`."
)
# Send to all rooms in parallel
tasks = []
for room_id in room_ids:
tasks.append(self._send_notification(room_id, body_text, domain))
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes and failures
successful = sum(1 for r in results if not isinstance(r, Exception))
failed = sum(1 for r in results if isinstance(r, Exception))
self.log.info(
f"[{domain}] Sent cleanup deletion notice to {successful} rooms ({failed} failed)."
)

View File

@@ -11,7 +11,12 @@ from .owncast_client import OwncastClient
from .database import StreamRepository, SubscriptionRepository
from .notification_service import NotificationService
from .models import StreamState
from .utils import TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN
from .utils import (
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN,
CLEANUP_WARNING_THRESHOLD,
CLEANUP_DELETE_THRESHOLD,
should_query_stream,
)
class StreamMonitor:
@@ -62,10 +67,26 @@ class StreamMonitor:
async def update_stream(self, domain: str) -> None:
"""
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
Implements progressive backoff for connection failures and auto-cleanup for dead instances.
:param domain: The domain of the stream to update.
:return: Nothing.
"""
# Fetch the current stream state from database to check failure_counter
old_state = await self.stream_repo.get_by_domain(domain)
failure_counter = old_state.failure_counter if old_state else 0
# Check if we should query this stream based on backoff schedule
if not should_query_stream(failure_counter):
# Skip this cycle, increment counter to track time passage
await self.stream_repo.increment_failure_counter(domain)
self.log.debug(
f"[{domain}] Skipping query due to backoff (counter={failure_counter + 1})"
)
# Check cleanup thresholds even when skipping query
await self._check_cleanup_thresholds(domain, failure_counter + 1)
return
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
first_update = False
@@ -79,19 +100,25 @@ class StreamMonitor:
# Fetch the latest stream state from the server
new_state_dict = await self.owncast_client.get_stream_state(domain)
# Skip the update if the fetch failed for any reason
# If the fetch failed, increment failure counter and skip the update
if new_state_dict is None:
await self.stream_repo.increment_failure_counter(domain)
self.log.warning(
f"[{domain}] Connection failure (counter={failure_counter + 1})"
)
# Check cleanup thresholds after connection failure
await self._check_cleanup_thresholds(domain, failure_counter + 1)
return
# Fetch succeeded! Reset failure counter
await self.stream_repo.reset_failure_counter(domain)
# Fix possible race conditions with timers
if domain not in self.offline_timer_cache:
self.offline_timer_cache[domain] = 0
if domain not in self.notification_service.notification_timers_cache:
self.notification_service.notification_timers_cache[domain] = 0
# Fetch the last known stream state from the database
old_state = await self.stream_repo.get_by_domain(domain)
# Does the last known stream state not have a value for the last connect and disconnect time?
if (
old_state.last_connect_time is None
@@ -232,3 +259,39 @@ class StreamMonitor:
# All done.
self.log.debug(f"[{domain}] State update completed.")
async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None:
"""
Check if a domain has hit cleanup warning or deletion thresholds.
:param domain: The domain to check
:param counter: The current failure counter value
:return: Nothing
"""
# Check for 83-day warning threshold
if counter == CLEANUP_WARNING_THRESHOLD:
self.log.warning(
f"[{domain}] Reached 83-day warning threshold. Sending cleanup warning."
)
await self.notification_service.send_cleanup_warning(domain)
# Check for 90-day deletion threshold
if counter >= CLEANUP_DELETE_THRESHOLD:
self.log.warning(
f"[{domain}] Reached 90-day deletion threshold. Removing all subscriptions."
)
# Send deletion notification
await self.notification_service.send_cleanup_deletion(domain)
# Delete all subscriptions for this domain
from .database import SubscriptionRepository
subscription_repo = SubscriptionRepository(self.stream_repo.db)
deleted_count = await subscription_repo.delete_all_for_domain(domain)
# Delete the stream record
await self.stream_repo.delete(domain)
self.log.info(
f"[{domain}] Cleanup complete. Deleted {deleted_count} subscriptions and stream record."
)

View File

@@ -28,6 +28,35 @@ SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
# Counter thresholds for auto-cleanup (based on 60-second polling intervals)
CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days
CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days
def should_query_stream(failure_counter: int) -> bool:
"""
Determine if a stream should be queried based on its failure counter.
Implements progressive backoff: 60s (5min) -> 2min (5min) -> 3min (5min) -> 5min (15min) -> 15min.
:param failure_counter: The current failure counter value
:return: True if the stream should be queried this cycle, False otherwise
"""
if failure_counter <= 4:
# Query every 60s for first 5 minutes (counters 0-4)
return True
elif failure_counter <= 9:
# Query every 2 minutes for next 5 minutes (counters 5-9)
return (failure_counter * 60) % 120 == 0
elif failure_counter <= 14:
# Query every 3 minutes for next 5 minutes (counters 10-14)
return (failure_counter * 60) % 180 == 0
elif failure_counter <= 29:
# Query every 5 minutes for next 15 minutes (counters 15-29)
return (failure_counter * 60) % 300 == 0
else:
# Query every 15 minutes after 30 minutes (counter 30+)
return (failure_counter * 60) % 900 == 0
def domainify(url: str) -> str:
"""