Added notifications for stream title changes along with more advanced rate limit logic.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Successful in 8s

The rate limit logic has been updated to account for streams going down temporarily and factor in when the stream title changes. I think I've accounted for most of the ways these events can happen, but there's probably still a few race conditions left over. It runs well enough in the development environment, it's time to collect some data in production.
This commit is contained in:
2025-04-12 17:29:40 -04:00
parent ee61ea8562
commit 0659f70e1a

View File

@@ -273,7 +273,14 @@ class OwncastSentry(Plugin):
# A flag indicating whether to send a notification.
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
send_notifications = True
first_update = True
# A flag indicating whether to update the stream's state in the databased.
# Used to avoid writing to the database when a stream's state hasn't changed at all.
update_database = False
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
stream_config = {}
# Fetch the latest stream state from the server.
new_state = await self.get_stream_state(domain)
@@ -282,8 +289,14 @@ class OwncastSentry(Plugin):
if new_state == {}:
return
# Fix possible race conditions with timers
if domain not in self.offline_timer_cache:
self.offline_timer_cache[domain] = 0
if domain not in self.notification_timers_cache:
self.notification_timers_cache[domain] = 0
# Fetch the last known stream state from the database.
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
query = "SELECT * FROM streams WHERE domain=$1"
async with self.database.acquire() as connection:
old_state = await connection.fetchrow(query, domain)
@@ -293,23 +306,95 @@ class OwncastSentry(Plugin):
and old_state["last_disconnect_time"] is None
):
# Yes, this is the first update. Don't send any notifications.
send_notifications = False
update_database = True
first_update = False
# Does the latest stream state have a last connect time and the old state not have one?
if (
new_state["lastConnectTime"] is not None
and old_state["last_connect_time"] is None
):
# Yes! This stream is now live. Send notifications and log it, if allowed.
if send_notifications:
self.log.info(f"[{domain}] Stream is now live!")
await self.notify_rooms_of_stream_online(
domain, new_state["streamTitle"], False
)
else:
# Yes! This stream is now live.
update_database = True
stream_config = await self.get_stream_config(domain)
# Is this the first update for this stream?
if not first_update:
# Yes, only log it and stop any further work on this stream.
self.log.info(
f"[{domain}] Stream is live, but performed first update. Will not notify subscribed rooms."
)
return
self.log.info(f"[{domain}] Stream is now live!")
# Calculate how many seconds since the stream last went offline
seconds_since_last_offline = round(
time.time() - self.offline_timer_cache[domain]
)
# Has this stream been offline for a short amount of time?
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
# Yes. Did the stream title change?
if old_state["title"] != new_state["streamTitle"]:
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
True,
stream_config["tags"],
)
else:
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
self.log.info(
f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
)
else:
# This stream has been offline for a while. Send a normal notification.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
False,
stream_config["tags"],
)
if (
new_state["lastConnectTime"] is not None
and old_state["last_connect_time"] is not None
):
# Did the stream title change mid-session?
if old_state["title"] != new_state["streamTitle"]:
self.log.info(f"[{domain}] Stream title was changed!")
update_database = True
stream_config = await self.get_stream_config(domain)
# This is a fun case to account for... Let's try and explain this.
# Was the last time this stream sent a notification before it last went offline?
if (
self.offline_timer_cache[domain]
> self.notification_timers_cache[domain]
):
# Yes. Send a regular go live notification.
# Why? A title change notification could be confusing to users in this case.
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
False,
stream_config["tags"],
)
else:
# No. Send a normal title change notification.
await self.notify_rooms_of_stream_online(
domain,
stream_config["name"],
new_state["streamTitle"],
True,
stream_config["tags"],
)
# Does the latest stream state no longer have a last connect time but the old state does?
elif (
@@ -317,28 +402,33 @@ class OwncastSentry(Plugin):
and old_state["last_connect_time"] is not None
):
# Yep. This stream is now offline. Log it.
if send_notifications:
update_database = True
stream_config = await self.get_stream_config(domain)
self.offline_timer_cache[domain] = time.time()
if first_update:
self.log.info(f"[{domain}] Stream is now offline.")
else:
self.log.info(f"[{domain}] Stream is offline.")
# Update the database with the current stream state.
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
async with self.database.acquire() as connection:
await connection.execute(
update_query,
new_state["streamTitle"],
new_state["lastConnectTime"],
new_state["lastDisconnectTime"],
domain,
)
# Update the database with the current stream state, if needed.
if update_database:
self.log.debug(f"[{domain}] Updating stream state in database...")
update_query = "UPDATE streams SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4 WHERE domain=$5"
async with self.database.acquire() as connection:
await connection.execute(
update_query,
stream_config["name"],
new_state["streamTitle"],
new_state["lastConnectTime"],
new_state["lastDisconnectTime"],
domain,
)
# All done.
self.log.debug(f"[{domain}] State update completed.")
async def notify_rooms_of_stream_online(
self, domain: str, title: str, renamed: bool
self, domain: str, name: str, title: str, title_change: bool, tags: list[str]
) -> None:
"""
Sends notifications to rooms with subscriptions to the provided stream domain.
@@ -372,15 +462,19 @@ class OwncastSentry(Plugin):
successful_notifications = 0
failed_notifications = 0
stream_config = await self.get_stream_config(domain)
# Time to start building the body text for the notifications.
# Build the message body text.
if "name" in stream_config:
body_text = "🎥 " + stream_config["name"] + " is now live!"
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
# We'll account for that... Just in case.
stream_name = name if name else domain
# If the stream changed its title, send a different title change notification.
if title_change:
self.log.info(f"[{domain}] Sending title change notifications...")
body_text = "📝 " + name + " has changed its stream title!"
else:
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
# We'll account for that... Just in case.
body_text = "🎥 " + domain + " is now live!"
self.log.info(f"[{domain}] Sending going live notifications...")
body_text = "🎥 " + name + " is now live!"
# Streams can have no title. If there is none, don't even mention it.
if title != "":
@@ -388,9 +482,9 @@ class OwncastSentry(Plugin):
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
if "tags" in stream_config and len(stream_config["tags"]) > 0:
if "tags" and len(tags) > 0:
body_text += "\n\n"
body_text += " ".join("#" + tag for tag in stream_config["tags"])
body_text += " ".join("#" + tag for tag in tags)
# Iterate over the subscribed rooms and try to send a message to each.
# TODO: This should probably be made async.