Added configurable health check endpoint for external uptime monitoring integration.
This commit is contained in:
@@ -17,6 +17,7 @@ from .utils import (
|
||||
CLEANUP_DELETE_THRESHOLD,
|
||||
should_query_stream,
|
||||
)
|
||||
from .health_checker import UpdateResult
|
||||
|
||||
|
||||
class StreamMonitor:
|
||||
@@ -45,32 +46,48 @@ class StreamMonitor:
|
||||
# Cache for tracking when streams last went offline
|
||||
self.offline_timer_cache = {}
|
||||
|
||||
async def update_all_streams(self, subscribed_domains: list[str]) -> None:
|
||||
async def update_all_streams(self, subscribed_domains: list[str]) -> UpdateResult:
|
||||
"""
|
||||
Checks the status of all streams with active subscriptions.
|
||||
Updates for all streams are performed asynchronously, with the method returning when the slowest update completes.
|
||||
|
||||
:param subscribed_domains: List of stream domains to update
|
||||
:return: Nothing.
|
||||
:param subscribed_domains: List of stream domains to update.
|
||||
:return: UpdateResult with success/failure counts.
|
||||
"""
|
||||
self.log.debug("Updating all stream states...")
|
||||
|
||||
total_streams = len(subscribed_domains)
|
||||
|
||||
# Build a list of async tasks which update the state for each stream domain
|
||||
tasks = []
|
||||
for domain in subscribed_domains:
|
||||
tasks.append(asyncio.create_task(self.update_stream(domain)))
|
||||
|
||||
# Run the tasks in parallel
|
||||
await asyncio.gather(*tasks)
|
||||
self.log.debug("Update complete.")
|
||||
# Run the tasks in parallel and collect results
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
async def update_stream(self, domain: str) -> None:
|
||||
# Count successes and failures
|
||||
successful_checks = sum(1 for result in results if result is True)
|
||||
failed_checks = sum(1 for result in results if result is False)
|
||||
|
||||
self.log.debug(
|
||||
f"Update complete. {successful_checks}/{total_streams} succeeded, "
|
||||
f"{failed_checks} failed."
|
||||
)
|
||||
|
||||
return UpdateResult(
|
||||
total_streams=total_streams,
|
||||
successful_checks=successful_checks,
|
||||
failed_checks=failed_checks,
|
||||
)
|
||||
|
||||
async def update_stream(self, domain: str) -> bool:
|
||||
"""
|
||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||
Implements progressive backoff for connection failures and auto-cleanup for dead instances.
|
||||
|
||||
:param domain: The domain of the stream to update.
|
||||
:return: Nothing.
|
||||
:return: True if the stream check succeeded (or was skipped due to backoff), False on connection failure.
|
||||
"""
|
||||
# Fetch the current stream state from database to check failure_counter
|
||||
old_state = await self.stream_repo.get_by_domain(domain)
|
||||
@@ -85,7 +102,8 @@ class StreamMonitor:
|
||||
)
|
||||
# Check cleanup thresholds even when skipping query
|
||||
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||
return
|
||||
# Backoff is expected behavior, not a failure
|
||||
return True
|
||||
|
||||
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||
first_update = False
|
||||
@@ -108,7 +126,8 @@ class StreamMonitor:
|
||||
)
|
||||
# Check cleanup thresholds after connection failure
|
||||
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||
return
|
||||
# Actual connection failure
|
||||
return False
|
||||
|
||||
# Fetch succeeded! Reset failure counter
|
||||
await self.stream_repo.reset_failure_counter(domain)
|
||||
@@ -259,6 +278,7 @@ class StreamMonitor:
|
||||
|
||||
# All done.
|
||||
self.log.debug(f"[{domain}] State update completed.")
|
||||
return True
|
||||
|
||||
async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user