Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
764fab9879
|
|||
|
0659f70e1a
|
|||
|
ee61ea8562
|
201
owncastsentry.py
201
owncastsentry.py
@@ -8,6 +8,7 @@ import sqlite3
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
from maubot import Plugin, MessageEvent
|
from maubot import Plugin, MessageEvent
|
||||||
from maubot.handlers import command
|
from maubot.handlers import command
|
||||||
@@ -28,6 +29,16 @@ USER_AGENT = (
|
|||||||
"OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
"OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Hard minimum amount of time between when notifications can be sent for a stream. Prevents spamming notifications for glitchy or malicious streams.
|
||||||
|
SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
|
||||||
|
|
||||||
|
# I'm not sure the best way to name or explain this variable, so let's just say what uses it:
|
||||||
|
#
|
||||||
|
# After a stream goes offline, a timer is started. Then, ...
|
||||||
|
# - If a stream comes back online with the same title within this time, no notification is sent.
|
||||||
|
# - If a stream comes back online with a different title, a rename notification is sent.
|
||||||
|
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
|
||||||
|
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||||
|
|
||||||
# ===== DATABASE MIGRATIONS =====
|
# ===== DATABASE MIGRATIONS =====
|
||||||
upgrade_table = UpgradeTable()
|
upgrade_table = UpgradeTable()
|
||||||
@@ -81,6 +92,12 @@ class OwncastSentry(Plugin):
|
|||||||
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Keeps track of when a notification was last sent for streams.
|
||||||
|
notification_timers_cache = {}
|
||||||
|
|
||||||
|
# Keeps track of when streams last went offline.
|
||||||
|
offline_timer_cache = {}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
||||||
"""
|
"""
|
||||||
@@ -254,9 +271,15 @@ class OwncastSentry(Plugin):
|
|||||||
:return: Nothing.
|
:return: Nothing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# A flag indicating whether to send a notification.
|
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||||
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
|
first_update = False
|
||||||
send_notifications = True
|
|
||||||
|
# A flag indicating whether to update the stream's state in the databased.
|
||||||
|
# Used to avoid writing to the database when a stream's state hasn't changed at all.
|
||||||
|
update_database = False
|
||||||
|
|
||||||
|
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
|
||||||
|
stream_config = {}
|
||||||
|
|
||||||
# Fetch the latest stream state from the server.
|
# Fetch the latest stream state from the server.
|
||||||
new_state = await self.get_stream_state(domain)
|
new_state = await self.get_stream_state(domain)
|
||||||
@@ -265,8 +288,14 @@ class OwncastSentry(Plugin):
|
|||||||
if new_state == {}:
|
if new_state == {}:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Fix possible race conditions with timers
|
||||||
|
if domain not in self.offline_timer_cache:
|
||||||
|
self.offline_timer_cache[domain] = 0
|
||||||
|
if domain not in self.notification_timers_cache:
|
||||||
|
self.notification_timers_cache[domain] = 0
|
||||||
|
|
||||||
# Fetch the last known stream state from the database.
|
# Fetch the last known stream state from the database.
|
||||||
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
|
query = "SELECT * FROM streams WHERE domain=$1"
|
||||||
async with self.database.acquire() as connection:
|
async with self.database.acquire() as connection:
|
||||||
old_state = await connection.fetchrow(query, domain)
|
old_state = await connection.fetchrow(query, domain)
|
||||||
|
|
||||||
@@ -276,60 +305,152 @@ class OwncastSentry(Plugin):
|
|||||||
and old_state["last_disconnect_time"] is None
|
and old_state["last_disconnect_time"] is None
|
||||||
):
|
):
|
||||||
# Yes, this is the first update. Don't send any notifications.
|
# Yes, this is the first update. Don't send any notifications.
|
||||||
send_notifications = False
|
update_database = True
|
||||||
|
first_update = True
|
||||||
|
|
||||||
# Does the latest stream state have a last connect time and the old state not have one?
|
# Does the latest stream state have a last connect time and the old state not have one?
|
||||||
if (
|
if (
|
||||||
new_state["lastConnectTime"] is not None
|
new_state["lastConnectTime"] is not None
|
||||||
and old_state["last_connect_time"] is None
|
and old_state["last_connect_time"] is None
|
||||||
):
|
):
|
||||||
# Yes! This stream is now live. Send notifications and log it, if allowed.
|
# Yes! This stream is now live.
|
||||||
if send_notifications:
|
update_database = True
|
||||||
self.log.info(
|
stream_config = await self.get_stream_config(domain)
|
||||||
f"[{domain}] Stream is now live! Notifying subscribed rooms..."
|
|
||||||
)
|
self.log.info(f"[{domain}] Stream is now live!")
|
||||||
await self.notify_rooms_of_stream_online(
|
|
||||||
domain, new_state["streamTitle"]
|
# Calculate how many seconds since the stream last went offline
|
||||||
)
|
seconds_since_last_offline = round(
|
||||||
|
time.time() - self.offline_timer_cache[domain]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Have we queried this stream before? (In other words, is this not the first state update ever?)
|
||||||
|
if not first_update:
|
||||||
|
# Yes. Has this stream been offline for a short amount of time?
|
||||||
|
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
|
||||||
|
# Yes. Did the stream title change?
|
||||||
|
if old_state["title"] != new_state["streamTitle"]:
|
||||||
|
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
True,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# This stream has been offline for a while. Send a normal notification.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
False,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
|
# No, this is the first time we're querying
|
||||||
self.log.info(
|
self.log.info(
|
||||||
f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms."
|
f"[{domain}] Not sending notifications. This is the first state update for this stream."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
new_state["lastConnectTime"] is not None
|
||||||
|
and old_state["last_connect_time"] is not None
|
||||||
|
):
|
||||||
|
# Did the stream title change mid-session?
|
||||||
|
if old_state["title"] != new_state["streamTitle"]:
|
||||||
|
self.log.info(f"[{domain}] Stream title was changed!")
|
||||||
|
update_database = True
|
||||||
|
stream_config = await self.get_stream_config(domain)
|
||||||
|
|
||||||
|
# This is a fun case to account for... Let's try and explain this.
|
||||||
|
# Was the last time this stream sent a notification before it last went offline?
|
||||||
|
if (
|
||||||
|
self.offline_timer_cache[domain]
|
||||||
|
> self.notification_timers_cache[domain]
|
||||||
|
):
|
||||||
|
# Yes. Send a regular go live notification.
|
||||||
|
# Why? A title change notification could be confusing to users in this case.
|
||||||
|
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
False,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No. Send a normal title change notification.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
True,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
|
||||||
# Does the latest stream state no longer have a last connect time but the old state does?
|
# Does the latest stream state no longer have a last connect time but the old state does?
|
||||||
elif (
|
elif (
|
||||||
new_state["lastConnectTime"] is None
|
new_state["lastConnectTime"] is None
|
||||||
and old_state["last_connect_time"] is not None
|
and old_state["last_connect_time"] is not None
|
||||||
):
|
):
|
||||||
# Yep. This stream is now offline. Log it.
|
# Yep. This stream is now offline. Log it.
|
||||||
if send_notifications:
|
update_database = True
|
||||||
self.log.info(f"[{domain}] Stream is now offline.")
|
stream_config = await self.get_stream_config(domain)
|
||||||
else:
|
self.offline_timer_cache[domain] = time.time()
|
||||||
|
if first_update:
|
||||||
self.log.info(f"[{domain}] Stream is offline.")
|
self.log.info(f"[{domain}] Stream is offline.")
|
||||||
|
else:
|
||||||
|
self.log.info(f"[{domain}] Stream is now offline.")
|
||||||
|
|
||||||
# Update the database with the current stream state.
|
# Update the database with the current stream state, if needed.
|
||||||
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
|
if update_database:
|
||||||
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
|
self.log.debug(f"[{domain}] Updating stream state in database...")
|
||||||
async with self.database.acquire() as connection:
|
update_query = "UPDATE streams SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4 WHERE domain=$5"
|
||||||
await connection.execute(
|
async with self.database.acquire() as connection:
|
||||||
update_query,
|
await connection.execute(
|
||||||
new_state["streamTitle"],
|
update_query,
|
||||||
new_state["lastConnectTime"],
|
stream_config["name"],
|
||||||
new_state["lastDisconnectTime"],
|
new_state["streamTitle"],
|
||||||
domain,
|
new_state["lastConnectTime"],
|
||||||
)
|
new_state["lastDisconnectTime"],
|
||||||
|
domain,
|
||||||
|
)
|
||||||
|
|
||||||
# All done.
|
# All done.
|
||||||
self.log.debug(f"[{domain}] State update completed.")
|
self.log.debug(f"[{domain}] State update completed.")
|
||||||
|
|
||||||
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
|
async def notify_rooms_of_stream_online(
|
||||||
|
self, domain: str, name: str, title: str, title_change: bool, tags: list[str]
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Sends notifications to rooms with subscriptions to the provided stream domain.
|
Sends notifications to rooms with subscriptions to the provided stream domain.
|
||||||
|
|
||||||
:param domain: The domain of the stream to send notifications for.
|
:param domain: The domain of the stream to send notifications for.
|
||||||
:param title: The title of the stream to include in the message.
|
:param title: The title of the stream to include in the message.
|
||||||
|
:param renamed: Whether or not this is for a stream changing its title rather than going live.
|
||||||
:return: Nothing.
|
:return: Nothing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Has enough time passed since the last notification was sent?
|
||||||
|
if domain in self.notification_timers_cache:
|
||||||
|
seconds_since_last_notification = round(
|
||||||
|
time.time() - self.notification_timers_cache[domain]
|
||||||
|
)
|
||||||
|
if seconds_since_last_notification < SECONDS_BETWEEN_NOTIFICATIONS:
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Only {seconds_since_last_notification} of required {SECONDS_BETWEEN_NOTIFICATIONS} seconds has passed since last notification."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Yes. Log the current time and proceed with sending notifications.
|
||||||
|
self.notification_timers_cache[domain] = time.time()
|
||||||
|
|
||||||
# Get a list of room IDs with active subscriptions to the stream domain.
|
# Get a list of room IDs with active subscriptions to the stream domain.
|
||||||
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
||||||
async with self.database.acquire() as connection:
|
async with self.database.acquire() as connection:
|
||||||
@@ -339,15 +460,19 @@ class OwncastSentry(Plugin):
|
|||||||
successful_notifications = 0
|
successful_notifications = 0
|
||||||
failed_notifications = 0
|
failed_notifications = 0
|
||||||
|
|
||||||
stream_config = await self.get_stream_config(domain)
|
# Time to start building the body text for the notifications.
|
||||||
|
|
||||||
# Build the message body text.
|
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
|
||||||
if "name" in stream_config:
|
# We'll account for that... Just in case.
|
||||||
body_text = "🎥 " + stream_config["name"] + " is now live!"
|
stream_name = name if name else domain
|
||||||
|
|
||||||
|
# If the stream changed its title, send a different title change notification.
|
||||||
|
if title_change:
|
||||||
|
self.log.info(f"[{domain}] Sending title change notifications...")
|
||||||
|
body_text = "📝 " + name + " has changed its stream title!"
|
||||||
else:
|
else:
|
||||||
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
|
self.log.info(f"[{domain}] Sending going live notifications...")
|
||||||
# We'll account for that... Just in case.
|
body_text = "🎥 " + name + " is now live!"
|
||||||
body_text = "🎥 " + domain + " is now live!"
|
|
||||||
|
|
||||||
# Streams can have no title. If there is none, don't even mention it.
|
# Streams can have no title. If there is none, don't even mention it.
|
||||||
if title != "":
|
if title != "":
|
||||||
@@ -355,9 +480,9 @@ class OwncastSentry(Plugin):
|
|||||||
|
|
||||||
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
||||||
|
|
||||||
if "tags" in stream_config and len(stream_config["tags"]) > 0:
|
if "tags" and len(tags) > 0:
|
||||||
body_text += "\n\n"
|
body_text += "\n\n"
|
||||||
body_text += " ".join("#" + tag for tag in stream_config["tags"])
|
body_text += " ".join("#" + tag for tag in tags)
|
||||||
|
|
||||||
# Iterate over the subscribed rooms and try to send a message to each.
|
# Iterate over the subscribed rooms and try to send a message to each.
|
||||||
# TODO: This should probably be made async.
|
# TODO: This should probably be made async.
|
||||||
|
|||||||
Reference in New Issue
Block a user