3 Commits

Author SHA1 Message Date
764fab9879 Fixed newly discovered streams never sending notifications.
Some checks failed
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Failing after 11s
2025-05-04 08:44:37 -04:00
0659f70e1a Added notifications for stream title changes along with more advanced rate limit logic.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Successful in 8s
The rate limit logic has been updated to account for streams going down temporarily and factor in when the stream title changes. I think I've accounted for most of the ways these events can happen, but there's probably still a few race conditions left over. It runs well enough in the development environment, it's time to collect some data in production.
2025-04-12 17:29:40 -04:00
ee61ea8562 Added rate limiting to notifications to limit them to once per 20 minutes per stream.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Successful in 8s
There's also a small amount of preliminary work included in this commit for a new type of notification when streams change their title in the middle of a session.
2025-04-12 15:43:05 -04:00

View File

@@ -8,6 +8,7 @@ import sqlite3
import aiohttp import aiohttp
import json import json
import asyncio import asyncio
import time
from maubot import Plugin, MessageEvent from maubot import Plugin, MessageEvent
from maubot.handlers import command from maubot.handlers import command
@@ -28,6 +29,16 @@ USER_AGENT = (
"OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)" "OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
) )
# Hard minimum amount of time between when notifications can be sent for a stream. Prevents spamming notifications for glitchy or malicious streams.
SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
# I'm not sure the best way to name or explain this variable, so let's just say what uses it:
#
# After a stream goes offline, a timer is started. Then, ...
# - If a stream comes back online with the same title within this time, no notification is sent.
# - If a stream comes back online with a different title, a rename notification is sent.
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
# ===== DATABASE MIGRATIONS ===== # ===== DATABASE MIGRATIONS =====
upgrade_table = UpgradeTable() upgrade_table = UpgradeTable()
@@ -81,6 +92,12 @@ class OwncastSentry(Plugin):
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
) )
# Keeps track of when a notification was last sent for streams.
notification_timers_cache = {}
# Keeps track of when streams last went offline.
offline_timer_cache = {}
@classmethod @classmethod
def get_db_upgrade_table(cls) -> UpgradeTable | None: def get_db_upgrade_table(cls) -> UpgradeTable | None:
""" """
@@ -254,9 +271,15 @@ class OwncastSentry(Plugin):
:return: Nothing. :return: Nothing.
""" """
# A flag indicating whether to send a notification. # A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live. first_update = False
send_notifications = True
# A flag indicating whether to update the stream's state in the databased.
# Used to avoid writing to the database when a stream's state hasn't changed at all.
update_database = False
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
stream_config = {}
# Fetch the latest stream state from the server. # Fetch the latest stream state from the server.
new_state = await self.get_stream_state(domain) new_state = await self.get_stream_state(domain)
@@ -265,8 +288,14 @@ class OwncastSentry(Plugin):
if new_state == {}: if new_state == {}:
return return
# Fix possible race conditions with timers
if domain not in self.offline_timer_cache:
self.offline_timer_cache[domain] = 0
if domain not in self.notification_timers_cache:
self.notification_timers_cache[domain] = 0
# Fetch the last known stream state from the database. # Fetch the last known stream state from the database.
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1" query = "SELECT * FROM streams WHERE domain=$1"
async with self.database.acquire() as connection: async with self.database.acquire() as connection:
old_state = await connection.fetchrow(query, domain) old_state = await connection.fetchrow(query, domain)
@@ -276,24 +305,93 @@ class OwncastSentry(Plugin):
and old_state["last_disconnect_time"] is None and old_state["last_disconnect_time"] is None
): ):
# Yes, this is the first update. Don't send any notifications. # Yes, this is the first update. Don't send any notifications.
send_notifications = False update_database = True
first_update = True
# Does the latest stream state have a last connect time and the old state not have one? # Does the latest stream state have a last connect time and the old state not have one?
if ( if (
new_state["lastConnectTime"] is not None new_state["lastConnectTime"] is not None
and old_state["last_connect_time"] is None and old_state["last_connect_time"] is None
): ):
# Yes! This stream is now live. Send notifications and log it, if allowed. # Yes! This stream is now live.
if send_notifications: update_database = True
self.log.info( stream_config = await self.get_stream_config(domain)
f"[{domain}] Stream is now live! Notifying subscribed rooms..."
self.log.info(f"[{domain}] Stream is now live!")
# Calculate how many seconds since the stream last went offline
seconds_since_last_offline = round(
time.time() - self.offline_timer_cache[domain]
) )
# Have we queried this stream before? (In other words, is this not the first state update ever?)
if not first_update:
# Yes. Has this stream been offline for a short amount of time?
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
# Yes. Did the stream title change?
if old_state["title"] != new_state["streamTitle"]:
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
await self.notify_rooms_of_stream_online( await self.notify_rooms_of_stream_online(
domain, new_state["streamTitle"] domain,
stream_config["name"],
new_state["streamTitle"],
True,
stream_config["tags"],
) )
else: else:
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
self.log.info( self.log.info(
f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms." f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
)
else:
# This stream has been offline for a while. Send a normal notification.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
False,
stream_config["tags"],
)
else:
# No, this is the first time we're querying
self.log.info(
f"[{domain}] Not sending notifications. This is the first state update for this stream."
)
if (
new_state["lastConnectTime"] is not None
and old_state["last_connect_time"] is not None
):
# Did the stream title change mid-session?
if old_state["title"] != new_state["streamTitle"]:
self.log.info(f"[{domain}] Stream title was changed!")
update_database = True
stream_config = await self.get_stream_config(domain)
# This is a fun case to account for... Let's try and explain this.
# Was the last time this stream sent a notification before it last went offline?
if (
self.offline_timer_cache[domain]
> self.notification_timers_cache[domain]
):
# Yes. Send a regular go live notification.
# Why? A title change notification could be confusing to users in this case.
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
False,
stream_config["tags"],
)
else:
# No. Send a normal title change notification.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
True,
stream_config["tags"],
) )
# Does the latest stream state no longer have a last connect time but the old state does? # Does the latest stream state no longer have a last connect time but the old state does?
@@ -302,17 +400,22 @@ class OwncastSentry(Plugin):
and old_state["last_connect_time"] is not None and old_state["last_connect_time"] is not None
): ):
# Yep. This stream is now offline. Log it. # Yep. This stream is now offline. Log it.
if send_notifications: update_database = True
self.log.info(f"[{domain}] Stream is now offline.") stream_config = await self.get_stream_config(domain)
else: self.offline_timer_cache[domain] = time.time()
if first_update:
self.log.info(f"[{domain}] Stream is offline.") self.log.info(f"[{domain}] Stream is offline.")
else:
self.log.info(f"[{domain}] Stream is now offline.")
# Update the database with the current stream state. # Update the database with the current stream state, if needed.
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes. if update_database:
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4" self.log.debug(f"[{domain}] Updating stream state in database...")
update_query = "UPDATE streams SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4 WHERE domain=$5"
async with self.database.acquire() as connection: async with self.database.acquire() as connection:
await connection.execute( await connection.execute(
update_query, update_query,
stream_config["name"],
new_state["streamTitle"], new_state["streamTitle"],
new_state["lastConnectTime"], new_state["lastConnectTime"],
new_state["lastDisconnectTime"], new_state["lastDisconnectTime"],
@@ -322,14 +425,32 @@ class OwncastSentry(Plugin):
# All done. # All done.
self.log.debug(f"[{domain}] State update completed.") self.log.debug(f"[{domain}] State update completed.")
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None: async def notify_rooms_of_stream_online(
self, domain: str, name: str, title: str, title_change: bool, tags: list[str]
) -> None:
""" """
Sends notifications to rooms with subscriptions to the provided stream domain. Sends notifications to rooms with subscriptions to the provided stream domain.
:param domain: The domain of the stream to send notifications for. :param domain: The domain of the stream to send notifications for.
:param title: The title of the stream to include in the message. :param title: The title of the stream to include in the message.
:param renamed: Whether or not this is for a stream changing its title rather than going live.
:return: Nothing. :return: Nothing.
""" """
# Has enough time passed since the last notification was sent?
if domain in self.notification_timers_cache:
seconds_since_last_notification = round(
time.time() - self.notification_timers_cache[domain]
)
if seconds_since_last_notification < SECONDS_BETWEEN_NOTIFICATIONS:
self.log.info(
f"[{domain}] Not sending notifications. Only {seconds_since_last_notification} of required {SECONDS_BETWEEN_NOTIFICATIONS} seconds has passed since last notification."
)
return
# Yes. Log the current time and proceed with sending notifications.
self.notification_timers_cache[domain] = time.time()
# Get a list of room IDs with active subscriptions to the stream domain. # Get a list of room IDs with active subscriptions to the stream domain.
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1" query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
async with self.database.acquire() as connection: async with self.database.acquire() as connection:
@@ -339,15 +460,19 @@ class OwncastSentry(Plugin):
successful_notifications = 0 successful_notifications = 0
failed_notifications = 0 failed_notifications = 0
stream_config = await self.get_stream_config(domain) # Time to start building the body text for the notifications.
# Build the message body text.
if "name" in stream_config:
body_text = "🎥 " + stream_config["name"] + " is now live!"
else:
# Turns out it is possible to set an empty name for an Owncast stream if you know how. # Turns out it is possible to set an empty name for an Owncast stream if you know how.
# We'll account for that... Just in case. # We'll account for that... Just in case.
body_text = "🎥 " + domain + " is now live!" stream_name = name if name else domain
# If the stream changed its title, send a different title change notification.
if title_change:
self.log.info(f"[{domain}] Sending title change notifications...")
body_text = "📝 " + name + " has changed its stream title!"
else:
self.log.info(f"[{domain}] Sending going live notifications...")
body_text = "🎥 " + name + " is now live!"
# Streams can have no title. If there is none, don't even mention it. # Streams can have no title. If there is none, don't even mention it.
if title != "": if title != "":
@@ -355,9 +480,9 @@ class OwncastSentry(Plugin):
body_text += "\n\nTo tune in, visit: https://" + domain + "/" body_text += "\n\nTo tune in, visit: https://" + domain + "/"
if "tags" in stream_config and len(stream_config["tags"]) > 0: if "tags" and len(tags) > 0:
body_text += "\n\n" body_text += "\n\n"
body_text += " ".join("#" + tag for tag in stream_config["tags"]) body_text += " ".join("#" + tag for tag in tags)
# Iterate over the subscribed rooms and try to send a message to each. # Iterate over the subscribed rooms and try to send a message to each.
# TODO: This should probably be made async. # TODO: This should probably be made async.