This commit is contained in:
@@ -74,6 +74,43 @@ class StreamRepository:
|
||||
result = await self.get_by_domain(domain)
|
||||
return result is not None
|
||||
|
||||
async def increment_failure_counter(self, domain: str) -> None:
|
||||
"""
|
||||
Increment the failure counter for a stream by 1.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Nothing
|
||||
"""
|
||||
query = """UPDATE streams
|
||||
SET failure_counter = failure_counter + 1
|
||||
WHERE domain=$1"""
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute(query, domain)
|
||||
|
||||
async def reset_failure_counter(self, domain: str) -> None:
|
||||
"""
|
||||
Reset the failure counter for a stream to 0.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Nothing
|
||||
"""
|
||||
query = """UPDATE streams
|
||||
SET failure_counter = 0
|
||||
WHERE domain=$1"""
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute(query, domain)
|
||||
|
||||
async def delete(self, domain: str) -> None:
|
||||
"""
|
||||
Delete a stream record from the database.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Nothing
|
||||
"""
|
||||
query = "DELETE FROM streams WHERE domain=$1"
|
||||
async with self.db.acquire() as conn:
|
||||
await conn.execute(query, domain)
|
||||
|
||||
|
||||
class SubscriptionRepository:
|
||||
"""Repository for managing stream subscriptions in the database."""
|
||||
@@ -158,3 +195,15 @@ class SubscriptionRepository:
|
||||
async with self.db.acquire() as conn:
|
||||
result = await conn.fetchrow(query, domain)
|
||||
return result[0]
|
||||
|
||||
async def delete_all_for_domain(self, domain: str) -> int:
|
||||
"""
|
||||
Delete all subscriptions for a given stream domain.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Number of subscriptions deleted
|
||||
"""
|
||||
query = "DELETE FROM subscriptions WHERE stream_domain=$1"
|
||||
async with self.db.acquire() as conn:
|
||||
result = await conn.execute(query, domain)
|
||||
return result.rowcount
|
||||
|
||||
@@ -69,6 +69,21 @@ async def upgrade_v2(conn: Connection) -> None:
|
||||
await conn.execute("ALTER TABLE subscriptions_new RENAME TO subscriptions")
|
||||
|
||||
|
||||
@upgrade_table.register(description="Add failure_counter column for backoff and auto-cleanup")
|
||||
async def upgrade_v3(conn: Connection) -> None:
|
||||
"""
|
||||
Runs migrations to upgrade database schema to version 3 format.
|
||||
Version 3 adds the failure_counter column to track connection failures for backoff and auto-cleanup.
|
||||
|
||||
:param conn: A connection to run the v3 database migration on.
|
||||
:return: Nothing.
|
||||
"""
|
||||
# Add failure_counter column with default value of 0
|
||||
await conn.execute(
|
||||
"""ALTER TABLE streams ADD COLUMN failure_counter INTEGER DEFAULT 0"""
|
||||
)
|
||||
|
||||
|
||||
def get_upgrade_table() -> UpgradeTable:
|
||||
"""
|
||||
Helper function for retrieving the upgrade table.
|
||||
|
||||
@@ -17,6 +17,7 @@ class StreamState:
|
||||
title: Optional[str] = None
|
||||
last_connect_time: Optional[str] = None
|
||||
last_disconnect_time: Optional[str] = None
|
||||
failure_counter: int = 0
|
||||
|
||||
@property
|
||||
def online(self) -> bool:
|
||||
@@ -37,6 +38,7 @@ class StreamState:
|
||||
title=row["title"],
|
||||
last_connect_time=row["last_connect_time"],
|
||||
last_disconnect_time=row["last_disconnect_time"],
|
||||
failure_counter=row["failure_counter"],
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -172,3 +172,70 @@ class NotificationService:
|
||||
:return: Nothing
|
||||
"""
|
||||
self.notification_timers_cache[domain] = time.time()
|
||||
|
||||
async def send_cleanup_warning(self, domain: str) -> None:
|
||||
"""
|
||||
Send 83-day warning notification to all subscribed rooms.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Nothing
|
||||
"""
|
||||
# Get all subscribed rooms
|
||||
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
|
||||
|
||||
# Build the warning message
|
||||
body_text = (
|
||||
f"⚠️ Warning: Subscription Cleanup Scheduled\n\n"
|
||||
f"The Owncast instance at {domain} has been unreachable for 83 days. "
|
||||
f"If it remains unreachable for 7 more days (90 days total), this "
|
||||
f"subscription will be automatically removed."
|
||||
)
|
||||
|
||||
# Send to all rooms in parallel
|
||||
tasks = []
|
||||
for room_id in room_ids:
|
||||
tasks.append(self._send_notification(room_id, body_text, domain))
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Count successes and failures
|
||||
successful = sum(1 for r in results if not isinstance(r, Exception))
|
||||
failed = sum(1 for r in results if isinstance(r, Exception))
|
||||
|
||||
self.log.info(
|
||||
f"[{domain}] Sent cleanup warning to {successful} rooms ({failed} failed)."
|
||||
)
|
||||
|
||||
async def send_cleanup_deletion(self, domain: str) -> None:
|
||||
"""
|
||||
Send 90-day deletion notification to all subscribed rooms.
|
||||
|
||||
:param domain: The stream domain
|
||||
:return: Nothing
|
||||
"""
|
||||
# Get all subscribed rooms
|
||||
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
|
||||
|
||||
# Build the deletion message
|
||||
body_text = (
|
||||
f"🗑️ Subscription Automatically Removed\n\n"
|
||||
f"The Owncast instance at {domain} has been unreachable for 90 days "
|
||||
f"and has been automatically removed from subscriptions in this room.\n\n"
|
||||
f"If the instance comes online again and you want to resubscribe, "
|
||||
f"run `!subscribe {domain}`."
|
||||
)
|
||||
|
||||
# Send to all rooms in parallel
|
||||
tasks = []
|
||||
for room_id in room_ids:
|
||||
tasks.append(self._send_notification(room_id, body_text, domain))
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Count successes and failures
|
||||
successful = sum(1 for r in results if not isinstance(r, Exception))
|
||||
failed = sum(1 for r in results if isinstance(r, Exception))
|
||||
|
||||
self.log.info(
|
||||
f"[{domain}] Sent cleanup deletion notice to {successful} rooms ({failed} failed)."
|
||||
)
|
||||
|
||||
@@ -11,7 +11,12 @@ from .owncast_client import OwncastClient
|
||||
from .database import StreamRepository, SubscriptionRepository
|
||||
from .notification_service import NotificationService
|
||||
from .models import StreamState
|
||||
from .utils import TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN
|
||||
from .utils import (
|
||||
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN,
|
||||
CLEANUP_WARNING_THRESHOLD,
|
||||
CLEANUP_DELETE_THRESHOLD,
|
||||
should_query_stream,
|
||||
)
|
||||
|
||||
|
||||
class StreamMonitor:
|
||||
@@ -62,10 +67,26 @@ class StreamMonitor:
|
||||
async def update_stream(self, domain: str) -> None:
|
||||
"""
|
||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||
Implements progressive backoff for connection failures and auto-cleanup for dead instances.
|
||||
|
||||
:param domain: The domain of the stream to update.
|
||||
:return: Nothing.
|
||||
"""
|
||||
# Fetch the current stream state from database to check failure_counter
|
||||
old_state = await self.stream_repo.get_by_domain(domain)
|
||||
failure_counter = old_state.failure_counter if old_state else 0
|
||||
|
||||
# Check if we should query this stream based on backoff schedule
|
||||
if not should_query_stream(failure_counter):
|
||||
# Skip this cycle, increment counter to track time passage
|
||||
await self.stream_repo.increment_failure_counter(domain)
|
||||
self.log.debug(
|
||||
f"[{domain}] Skipping query due to backoff (counter={failure_counter + 1})"
|
||||
)
|
||||
# Check cleanup thresholds even when skipping query
|
||||
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||
return
|
||||
|
||||
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||
first_update = False
|
||||
|
||||
@@ -79,19 +100,25 @@ class StreamMonitor:
|
||||
# Fetch the latest stream state from the server
|
||||
new_state_dict = await self.owncast_client.get_stream_state(domain)
|
||||
|
||||
# Skip the update if the fetch failed for any reason
|
||||
# If the fetch failed, increment failure counter and skip the update
|
||||
if new_state_dict is None:
|
||||
await self.stream_repo.increment_failure_counter(domain)
|
||||
self.log.warning(
|
||||
f"[{domain}] Connection failure (counter={failure_counter + 1})"
|
||||
)
|
||||
# Check cleanup thresholds after connection failure
|
||||
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||
return
|
||||
|
||||
# Fetch succeeded! Reset failure counter
|
||||
await self.stream_repo.reset_failure_counter(domain)
|
||||
|
||||
# Fix possible race conditions with timers
|
||||
if domain not in self.offline_timer_cache:
|
||||
self.offline_timer_cache[domain] = 0
|
||||
if domain not in self.notification_service.notification_timers_cache:
|
||||
self.notification_service.notification_timers_cache[domain] = 0
|
||||
|
||||
# Fetch the last known stream state from the database
|
||||
old_state = await self.stream_repo.get_by_domain(domain)
|
||||
|
||||
# Does the last known stream state not have a value for the last connect and disconnect time?
|
||||
if (
|
||||
old_state.last_connect_time is None
|
||||
@@ -232,3 +259,39 @@ class StreamMonitor:
|
||||
|
||||
# All done.
|
||||
self.log.debug(f"[{domain}] State update completed.")
|
||||
|
||||
async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None:
|
||||
"""
|
||||
Check if a domain has hit cleanup warning or deletion thresholds.
|
||||
|
||||
:param domain: The domain to check
|
||||
:param counter: The current failure counter value
|
||||
:return: Nothing
|
||||
"""
|
||||
# Check for 83-day warning threshold
|
||||
if counter == CLEANUP_WARNING_THRESHOLD:
|
||||
self.log.warning(
|
||||
f"[{domain}] Reached 83-day warning threshold. Sending cleanup warning."
|
||||
)
|
||||
await self.notification_service.send_cleanup_warning(domain)
|
||||
|
||||
# Check for 90-day deletion threshold
|
||||
if counter >= CLEANUP_DELETE_THRESHOLD:
|
||||
self.log.warning(
|
||||
f"[{domain}] Reached 90-day deletion threshold. Removing all subscriptions."
|
||||
)
|
||||
# Send deletion notification
|
||||
await self.notification_service.send_cleanup_deletion(domain)
|
||||
|
||||
# Delete all subscriptions for this domain
|
||||
from .database import SubscriptionRepository
|
||||
|
||||
subscription_repo = SubscriptionRepository(self.stream_repo.db)
|
||||
deleted_count = await subscription_repo.delete_all_for_domain(domain)
|
||||
|
||||
# Delete the stream record
|
||||
await self.stream_repo.delete(domain)
|
||||
|
||||
self.log.info(
|
||||
f"[{domain}] Cleanup complete. Deleted {deleted_count} subscriptions and stream record."
|
||||
)
|
||||
|
||||
@@ -28,6 +28,35 @@ SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
|
||||
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
|
||||
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||
|
||||
# Counter thresholds for auto-cleanup (based on 60-second polling intervals)
|
||||
CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days
|
||||
CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days
|
||||
|
||||
|
||||
def should_query_stream(failure_counter: int) -> bool:
|
||||
"""
|
||||
Determine if a stream should be queried based on its failure counter.
|
||||
Implements progressive backoff: 60s (5min) -> 2min (5min) -> 3min (5min) -> 5min (15min) -> 15min.
|
||||
|
||||
:param failure_counter: The current failure counter value
|
||||
:return: True if the stream should be queried this cycle, False otherwise
|
||||
"""
|
||||
if failure_counter <= 4:
|
||||
# Query every 60s for first 5 minutes (counters 0-4)
|
||||
return True
|
||||
elif failure_counter <= 9:
|
||||
# Query every 2 minutes for next 5 minutes (counters 5-9)
|
||||
return (failure_counter * 60) % 120 == 0
|
||||
elif failure_counter <= 14:
|
||||
# Query every 3 minutes for next 5 minutes (counters 10-14)
|
||||
return (failure_counter * 60) % 180 == 0
|
||||
elif failure_counter <= 29:
|
||||
# Query every 5 minutes for next 15 minutes (counters 15-29)
|
||||
return (failure_counter * 60) % 300 == 0
|
||||
else:
|
||||
# Query every 15 minutes after 30 minutes (counter 30+)
|
||||
return (failure_counter * 60) % 900 == 0
|
||||
|
||||
|
||||
def domainify(url: str) -> str:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user