# Copyright 2026 Logan Fick # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. import asyncio import time from .owncast_client import OwncastClient from .database import StreamRepository, SubscriptionRepository from .notification_service import NotificationService from .models import StreamState from .utils import TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN class StreamMonitor: """Monitors Owncast streams and detects state changes.""" def __init__( self, owncast_client: OwncastClient, stream_repo: StreamRepository, notification_service: NotificationService, logger, ): """ Initialize the stream monitor. :param owncast_client: Client for making API calls to Owncast instances :param stream_repo: Repository for stream data :param notification_service: Service for sending notifications :param logger: Logger instance """ self.owncast_client = owncast_client self.stream_repo = stream_repo self.notification_service = notification_service self.log = logger # Cache for tracking when streams last went offline self.offline_timer_cache = {} async def update_all_streams(self, subscribed_domains: list[str]) -> None: """ Checks the status of all streams with active subscriptions. Updates for all streams are performed asynchronously, with the method returning when the slowest update completes. :param subscribed_domains: List of stream domains to update :return: Nothing. """ self.log.debug("Updating all stream states...") # Build a list of async tasks which update the state for each stream domain tasks = [] for domain in subscribed_domains: tasks.append(asyncio.create_task(self.update_stream(domain))) # Run the tasks in parallel await asyncio.gather(*tasks) self.log.debug("Update complete.") async def update_stream(self, domain: str) -> None: """ Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live. :param domain: The domain of the stream to update. :return: Nothing. """ # A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live. first_update = False # A flag indicating whether to update the stream's state in the database. # Used to avoid writing to the database when a stream's state hasn't changed at all. update_database = False # Holds the stream's latest configuration, if fetched and deemed necessary during the update process. stream_config = None # Fetch the latest stream state from the server new_state_dict = await self.owncast_client.get_stream_state(domain) # Skip the update if the fetch failed for any reason if new_state_dict is None: return # Fix possible race conditions with timers if domain not in self.offline_timer_cache: self.offline_timer_cache[domain] = 0 if domain not in self.notification_service.notification_timers_cache: self.notification_service.notification_timers_cache[domain] = 0 # Fetch the last known stream state from the database old_state = await self.stream_repo.get_by_domain(domain) # Does the last known stream state not have a value for the last connect and disconnect time? if ( old_state.last_connect_time is None and old_state.last_disconnect_time is None ): # Yes, this is the first update. Don't send any notifications. update_database = True first_update = True # Does the latest stream state have a last connect time and the old state not have one? if ( new_state_dict["lastConnectTime"] is not None and old_state.last_connect_time is None ): # Yes! This stream is now live. update_database = True stream_config = await self.owncast_client.get_stream_config(domain) self.log.info(f"[{domain}] Stream is now live!") # Calculate how many seconds since the stream last went offline seconds_since_last_offline = round( time.time() - self.offline_timer_cache[domain] ) # Have we queried this stream before? (In other words, is this not the first state update ever?) if not first_update: # Use fallback values if config fetch failed stream_name = stream_config.name if stream_config else domain stream_tags = stream_config.tags if stream_config else [] # Yes. Has this stream been offline for a short amount of time? if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN: # Yes. Did the stream title change? if old_state.title != new_state_dict["streamTitle"]: # Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name. await self.notification_service.notify_stream_live( domain, stream_name, new_state_dict["streamTitle"], stream_tags, title_change=True, ) else: # No. The stream was only down for a short time and didn't change its title. Don't send a notification. self.log.info( f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title." ) else: # This stream has been offline for a while. Send a normal notification. await self.notification_service.notify_stream_live( domain, stream_name, new_state_dict["streamTitle"], stream_tags, title_change=False, ) else: # No, this is the first time we're querying self.log.info( f"[{domain}] Not sending notifications. This is the first state update for this stream." ) if ( new_state_dict["lastConnectTime"] is not None and old_state.last_connect_time is not None ): # Did the stream title change mid-session? if old_state.title != new_state_dict["streamTitle"]: self.log.info(f"[{domain}] Stream title was changed!") update_database = True stream_config = await self.owncast_client.get_stream_config(domain) # Use fallback values if config fetch failed stream_name = stream_config.name if stream_config else domain stream_tags = stream_config.tags if stream_config else [] # This is a fun case to account for... Let's try and explain this. # Was the last time this stream sent a notification before it last went offline? if ( self.offline_timer_cache[domain] > self.notification_service.notification_timers_cache[domain] ): # Yes. Send a regular go live notification. # Why? A title change notification could be confusing to users in this case. # How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case. await self.notification_service.notify_stream_live( domain, stream_name, new_state_dict["streamTitle"], stream_tags, title_change=False, ) else: # No. Send a normal title change notification. await self.notification_service.notify_stream_live( domain, stream_name, new_state_dict["streamTitle"], stream_tags, title_change=True, ) # Does the latest stream state no longer have a last connect time but the old state does? elif ( new_state_dict["lastConnectTime"] is None and old_state.last_connect_time is not None ): # Yep. This stream is now offline. Log it. update_database = True self.offline_timer_cache[domain] = time.time() if first_update: self.log.info(f"[{domain}] Stream is offline.") else: self.log.info(f"[{domain}] Stream is now offline.") # Update the database with the current stream state, if needed. if update_database: # Ensure we have the stream config before updating the database if stream_config is None: stream_config = await self.owncast_client.get_stream_config(domain) # Use fallback value if config fetch failed stream_name = stream_config.name if stream_config else "" self.log.debug(f"[{domain}] Updating stream state in database...") # Create updated state object updated_state = StreamState( domain=domain, name=stream_name, title=new_state_dict["streamTitle"], last_connect_time=new_state_dict["lastConnectTime"], last_disconnect_time=new_state_dict["lastDisconnectTime"], ) await self.stream_repo.update(updated_state) # All done. self.log.debug(f"[{domain}] State update completed.")