From c4156f861e0c5edc82bd6039ad4e7070bbb75b3b Mon Sep 17 00:00:00 2001 From: Logan Fick Date: Sat, 5 Apr 2025 17:11:53 -0400 Subject: [PATCH] Reformatted source code using Black. --- owncastsentry.py | 176 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 130 insertions(+), 46 deletions(-) diff --git a/owncastsentry.py b/owncastsentry.py index 40263f6..e95e942 100644 --- a/owncastsentry.py +++ b/owncastsentry.py @@ -24,11 +24,15 @@ OWNCAST_STATUS_PATH = "/api/status" OWNCAST_CONFIG_PATH = "/api/config" # User agent to send with all HTTP requests. -USER_AGENT = "OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)" +USER_AGENT = ( + "OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)" +) # ===== DATABASE MIGRATIONS ===== upgrade_table = UpgradeTable() + + @upgrade_table.register(description="Initial revision") async def upgrade_v1(conn: Connection) -> None: """ @@ -57,17 +61,25 @@ async def upgrade_v1(conn: Connection) -> None: )""" ) + # ===== MAIN BOT CLASS ===== class OwncastSentry(Plugin): # Helper variables for setting up a special HTTP ClientSession for monitoring streams. - headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose. - cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies. - baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream. - timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers. + headers = { + "User-Agent": USER_AGENT + } # Override User Agent to special static value to identify to servers our purpose. + cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies. + baseconnctor = aiohttp.TCPConnector( + use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120 + ) # Set up timeouts to keep one long-lived connection open per monitored stream. + timeouts = aiohttp.ClientTimeout( + sock_connect=5, sock_read=5 + ) # Tighten up timeouts for faster responses to users in case of broken servers. # Final HTTP ClientSession for all requests to Owncast instances. - session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor) - + session = aiohttp.ClientSession( + headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor + ) @classmethod def get_db_upgrade_table(cls) -> UpgradeTable | None: @@ -78,7 +90,6 @@ class OwncastSentry(Plugin): """ return upgrade_table - async def start(self) -> None: """ Method called by Maubot upon startup of the instance. @@ -88,7 +99,6 @@ class OwncastSentry(Plugin): """ self.sched.run_periodically(60, self.update_all_stream_states) - @command.new(help="Subscribes to a new Owncast stream.") @command.argument("url") async def subscribe(self, evt: MessageEvent, url: str) -> None: @@ -114,7 +124,9 @@ class OwncastSentry(Plugin): stream_state = await self.get_stream_state(stream_domain) if len(stream_state) == 0: # The stream state fetch returned nothing. Probably not an Owncast stream. - await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.") + await evt.reply( + "The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline." + ) return # Try to add a new subscription for the requested stream domain in the room the command was executed in. @@ -126,11 +138,17 @@ class OwncastSentry(Plugin): # Something weird happened... Was it due to attempting to insert a duplicate row? if "UNIQUE constraint failed" in exception.args[0]: # Yes, this is an expected condition. Tell the user the room is already subscribed and give up. - await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".") + await evt.reply( + "This room is already subscribed to notifications for " + + stream_domain + + "." + ) return else: # Nope... Something unexpected happened. Give up. - self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}") + self.log.error( + f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}" + ) raise exception # The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table. @@ -146,13 +164,18 @@ class OwncastSentry(Plugin): # Attempts to add rows for streams already known is an expected condition. What is anything except that? if "UNIQUE constraint failed" not in exception.args[0]: # Something unexpected happened. Give up. - self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}") + self.log.error( + f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}" + ) raise exception # All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user. self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.") - await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.") - + await evt.reply( + "Subscription added! This room will receive notifications when " + + stream_domain + + " goes live." + ) @command.new(help="Unsubscribes from an Owncast stream.") @command.argument("url") @@ -175,15 +198,28 @@ class OwncastSentry(Plugin): # Did it work? if result.rowcount == 1: # Yes, one row was deleted. Tell the user. - self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.") - await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".") + self.log.info( + f"[{stream_domain}] Subscription removed for room {evt.room_id}." + ) + await evt.reply( + "Subscription removed! This room will no longer receive notifications for " + + stream_domain + + "." + ) elif result.rowcount == 0: # No, nothing changed. Tell the user. - await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".") + await evt.reply( + "This room is already not subscribed to notifications for " + + stream_domain + + "." + ) else: # Somehow more than 1 (or even less than 0 ???) rows were changed... Log it! - self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!") - + self.log.error( + "Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + + result.rowcount + + " instead. Something very bad may have happened!!!!" + ) async def update_all_stream_states(self) -> None: """ @@ -210,7 +246,6 @@ class OwncastSentry(Plugin): await asyncio.gather(*tasks) self.log.debug("Update complete.") - async def update_stream_state(self, domain: str) -> None: """ Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live. @@ -236,21 +271,36 @@ class OwncastSentry(Plugin): old_state = await connection.fetchrow(query, domain) # Does the last known stream state not have a value for the last connect and disconnect time? - if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None: + if ( + old_state["last_connect_time"] is None + and old_state["last_disconnect_time"] is None + ): # Yes, this is the first update. Don't send any notifications. send_notifications = False # Does the latest stream state have a last connect time and the old state not have one? - if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None: + if ( + new_state["lastConnectTime"] is not None + and old_state["last_connect_time"] is None + ): # Yes! This stream is now live. Send notifications and log it, if allowed. if send_notifications: - self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...") - await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"]) + self.log.info( + f"[{domain}] Stream is now live! Notifying subscribed rooms..." + ) + await self.notify_rooms_of_stream_online( + domain, new_state["streamTitle"] + ) else: - self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.") + self.log.info( + f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms." + ) # Does the latest stream state no longer have a last connect time but the old state does? - elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None: + elif ( + new_state["lastConnectTime"] is None + and old_state["last_connect_time"] is not None + ): # Yep. This stream is now offline. Log it. if send_notifications: self.log.info(f"[{domain}] Stream is now offline.") @@ -261,12 +311,17 @@ class OwncastSentry(Plugin): # TODO: Only update the database if a change actually occurred. This is probably generating useless writes. update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4" async with self.database.acquire() as connection: - await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain) + await connection.execute( + update_query, + new_state["streamTitle"], + new_state["lastConnectTime"], + new_state["lastDisconnectTime"], + domain, + ) # All done. self.log.debug(f"[{domain}] State update completed.") - async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None: """ Sends notifications to rooms with subscriptions to the provided stream domain. @@ -310,19 +365,24 @@ class OwncastSentry(Plugin): room_id = rows["room_id"] try: # Try and send a message. - content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text) + content = TextMessageEventContent( + msgtype=MessageType.TEXT, body=body_text + ) await self.client.send_message(room_id, content) # It worked! Increment the counter. successful_notifications += 1 except Exception as exception: # Something didn't work. Log it and move on to the next one. - self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}") - failed_notifications +=1 + self.log.warning( + f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}" + ) + failed_notifications += 1 # All done! - self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.") - + self.log.info( + f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed." + ) # ========== HELPER METHODS ========== async def get_stream_state(self, domain): @@ -340,36 +400,52 @@ class OwncastSentry(Plugin): # Make a request to the endpoint. try: - response = await self.session.request("GET", status_url, allow_redirects=False) + response = await self.session.request( + "GET", status_url, allow_redirects=False + ) except Exception as e: - self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}") + self.log.warning( + f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}" + ) return {} # Check the response code is success # TODO: Handle 429 rate limiting? if response.status != 200: - self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.") + self.log.warning( + f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead." + ) return {} # Try and interpret the response as JSON. try: new_state = json.loads(await response.read()) except Exception as e: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}" + ) return {} # Validate the response to ensure it contains all the basic info needed to function. if "lastConnectTime" not in new_state: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter." + ) return {} elif "lastDisconnectTime" not in new_state: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter." + ) return {} elif "streamTitle" not in new_state: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter." + ) return {} elif "online" not in new_state: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter." + ) return {} return new_state @@ -389,22 +465,30 @@ class OwncastSentry(Plugin): # Make a request to the endpoint. try: - response = await self.session.request("GET", status_url, allow_redirects=False) + response = await self.session.request( + "GET", status_url, allow_redirects=False + ) except Exception as e: - self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}") + self.log.warning( + f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}" + ) return {} # Check the response code is success # TODO: Handle 429 rate limiting? if response.status != 200: - self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.") + self.log.warning( + f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead." + ) return {} # Try and interpret the response as JSON. try: config = json.loads(await response.read()) except Exception as e: - self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}") + self.log.warning( + f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}" + ) return {} return config