Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
764fab9879
|
|||
|
0659f70e1a
|
|||
|
ee61ea8562
|
|||
|
b0868c5bd4
|
|||
|
1c6720cc10
|
|||
|
e73b2b63a1
|
|||
|
cae80b8319
|
|||
|
815b246ad8
|
|||
|
c4156f861e
|
|||
|
78d2c5a03f
|
|||
|
aad810d710
|
|||
|
bb9d360d8f
|
|||
|
60f8b21c16
|
|||
|
48b2c30b6d
|
|||
|
05ca37961a
|
|||
|
ad1ae65115
|
|||
|
eeef25c6ed
|
|||
|
ec13dac64c
|
20
.gitea/workflows/build.yml
Normal file
20
.gitea/workflows/build.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: Build Maubot Plugin Artifact
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Build
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Build artifact
|
||||
run: zip -v -9 OwncastSentry-v${{env.GITHUB_SHA}}.mbp LICENSE.txt maubot.yaml owncastsentry.py
|
||||
|
||||
- name: Upload artifact
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: Maubot Plugin
|
||||
path: OwncastSentry-v${{env.GITHUB_SHA}}.mbp
|
||||
19
.gitea/workflows/lint.yml
Normal file
19
.gitea/workflows/lint.yml
Normal file
@@ -0,0 +1,19 @@
|
||||
name: Lint Source Code
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
lint:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Setup Python environment
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.13
|
||||
|
||||
- name: Run linter
|
||||
uses: psf/black@stable
|
||||
@@ -1,7 +1,7 @@
|
||||
maubot: 0.1.0
|
||||
id: dev.logal.owncastsentry
|
||||
version: 1.0.0
|
||||
license: Apache
|
||||
version: 1.0.3
|
||||
license: Apache-2.0
|
||||
modules:
|
||||
- owncastsentry
|
||||
main_class: OwncastSentry
|
||||
|
||||
440
owncastsentry.py
440
owncastsentry.py
@@ -8,6 +8,7 @@ import sqlite3
|
||||
import aiohttp
|
||||
import json
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from maubot import Plugin, MessageEvent
|
||||
from maubot.handlers import command
|
||||
@@ -20,12 +21,29 @@ from urllib.parse import urlparse
|
||||
# Path to the GetStatus API call on Owncast instances
|
||||
OWNCAST_STATUS_PATH = "/api/status"
|
||||
|
||||
# User agent to send with all HTTP requests.
|
||||
USER_AGENT = "OwncastSentry/1.0.0 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||
# Path to GetWebConfig API call on Owncast instances
|
||||
OWNCAST_CONFIG_PATH = "/api/config"
|
||||
|
||||
# User agent to send with all HTTP requests.
|
||||
USER_AGENT = (
|
||||
"OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||
)
|
||||
|
||||
# Hard minimum amount of time between when notifications can be sent for a stream. Prevents spamming notifications for glitchy or malicious streams.
|
||||
SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
|
||||
|
||||
# I'm not sure the best way to name or explain this variable, so let's just say what uses it:
|
||||
#
|
||||
# After a stream goes offline, a timer is started. Then, ...
|
||||
# - If a stream comes back online with the same title within this time, no notification is sent.
|
||||
# - If a stream comes back online with a different title, a rename notification is sent.
|
||||
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
|
||||
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||
|
||||
# ===== DATABASE MIGRATIONS =====
|
||||
upgrade_table = UpgradeTable()
|
||||
|
||||
|
||||
@upgrade_table.register(description="Initial revision")
|
||||
async def upgrade_v1(conn: Connection) -> None:
|
||||
"""
|
||||
@@ -54,17 +72,31 @@ async def upgrade_v1(conn: Connection) -> None:
|
||||
)"""
|
||||
)
|
||||
|
||||
|
||||
# ===== MAIN BOT CLASS =====
|
||||
class OwncastSentry(Plugin):
|
||||
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
|
||||
headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose.
|
||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
||||
baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
||||
timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers.
|
||||
headers = {
|
||||
"User-Agent": USER_AGENT
|
||||
} # Override User Agent to special static value to identify to servers our purpose.
|
||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
||||
baseconnctor = aiohttp.TCPConnector(
|
||||
use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
|
||||
) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
||||
timeouts = aiohttp.ClientTimeout(
|
||||
sock_connect=5, sock_read=5
|
||||
) # Tighten up timeouts for faster responses to users in case of broken servers.
|
||||
|
||||
# Final HTTP ClientSession for all requests to Owncast instances.
|
||||
session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor)
|
||||
session = aiohttp.ClientSession(
|
||||
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
||||
)
|
||||
|
||||
# Keeps track of when a notification was last sent for streams.
|
||||
notification_timers_cache = {}
|
||||
|
||||
# Keeps track of when streams last went offline.
|
||||
offline_timer_cache = {}
|
||||
|
||||
@classmethod
|
||||
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
||||
@@ -75,7 +107,6 @@ class OwncastSentry(Plugin):
|
||||
"""
|
||||
return upgrade_table
|
||||
|
||||
|
||||
async def start(self) -> None:
|
||||
"""
|
||||
Method called by Maubot upon startup of the instance.
|
||||
@@ -85,7 +116,6 @@ class OwncastSentry(Plugin):
|
||||
"""
|
||||
self.sched.run_periodically(60, self.update_all_stream_states)
|
||||
|
||||
|
||||
@command.new(help="Subscribes to a new Owncast stream.")
|
||||
@command.argument("url")
|
||||
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
||||
@@ -97,16 +127,26 @@ class OwncastSentry(Plugin):
|
||||
:return: Nothing.
|
||||
"""
|
||||
|
||||
# Convert the semi-unpredictable user input to only a domain and verify it's an Owncast stream.
|
||||
stream_domain = await self.validate_url_as_owncast_stream(url)
|
||||
# Convert the user input to only a domain.
|
||||
stream_domain = self.domainify(url)
|
||||
|
||||
# Did the validation return an empty string?
|
||||
if stream_domain == "":
|
||||
# Yes, it's not valid. Tell the user and give up.
|
||||
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
|
||||
return
|
||||
# How many subscriptions already exist for this domain?
|
||||
query = "SELECT COUNT(*) FROM subscriptions WHERE stream_domain=$1"
|
||||
async with self.database.acquire() as connection:
|
||||
result = await connection.fetchrow(query, stream_domain)
|
||||
|
||||
# Try to add a new subscription for the requested stream domain in the room the command executed in.
|
||||
if result[0] == 0:
|
||||
# There are 0 subscriptions, we need to validate this domain is an Owncast stream.
|
||||
# Attempt to fetch the stream state from this domain.
|
||||
stream_state = await self.get_stream_state(stream_domain)
|
||||
if len(stream_state) == 0:
|
||||
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
||||
await evt.reply(
|
||||
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
|
||||
)
|
||||
return
|
||||
|
||||
# Try to add a new subscription for the requested stream domain in the room the command was executed in.
|
||||
try:
|
||||
query = "INSERT INTO subscriptions (stream_domain, room_id) VALUES ($1, $2)"
|
||||
async with self.database.acquire() as connection:
|
||||
@@ -115,11 +155,17 @@ class OwncastSentry(Plugin):
|
||||
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
||||
if "UNIQUE constraint failed" in exception.args[0]:
|
||||
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
||||
await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".")
|
||||
await evt.reply(
|
||||
"This room is already subscribed to notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Nope... Something unexpected happened. Give up.
|
||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}")
|
||||
self.log.error(
|
||||
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}"
|
||||
)
|
||||
raise exception
|
||||
|
||||
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
||||
@@ -135,13 +181,18 @@ class OwncastSentry(Plugin):
|
||||
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
||||
if "UNIQUE constraint failed" not in exception.args[0]:
|
||||
# Something unexpected happened. Give up.
|
||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}")
|
||||
self.log.error(
|
||||
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}"
|
||||
)
|
||||
raise exception
|
||||
|
||||
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
||||
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
||||
await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.")
|
||||
|
||||
await evt.reply(
|
||||
"Subscription added! This room will receive notifications when "
|
||||
+ stream_domain
|
||||
+ " goes live."
|
||||
)
|
||||
|
||||
@command.new(help="Unsubscribes from an Owncast stream.")
|
||||
@command.argument("url")
|
||||
@@ -153,14 +204,8 @@ class OwncastSentry(Plugin):
|
||||
:param url: A string containing the user supplied URL to a stream to try and unsubscribe from.
|
||||
:return: Nothing.
|
||||
"""
|
||||
# Convert the user input to only a domain and verify it's an Owncast stream.
|
||||
stream_domain = await self.validate_url_as_owncast_stream(url)
|
||||
|
||||
# Did the validation return an empty string?
|
||||
if stream_domain == "":
|
||||
# Yes, it's not valid. Tell the user and give up.
|
||||
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
|
||||
return
|
||||
# Convert the user input to only a domain.
|
||||
stream_domain = self.domainify(url)
|
||||
|
||||
# Attempt to delete the requested subscription from the database.
|
||||
query = "DELETE FROM subscriptions WHERE stream_domain=$1 AND room_id=$2"
|
||||
@@ -170,15 +215,28 @@ class OwncastSentry(Plugin):
|
||||
# Did it work?
|
||||
if result.rowcount == 1:
|
||||
# Yes, one row was deleted. Tell the user.
|
||||
self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.")
|
||||
await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".")
|
||||
self.log.info(
|
||||
f"[{stream_domain}] Subscription removed for room {evt.room_id}."
|
||||
)
|
||||
await evt.reply(
|
||||
"Subscription removed! This room will no longer receive notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
elif result.rowcount == 0:
|
||||
# No, nothing changed. Tell the user.
|
||||
await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".")
|
||||
await evt.reply(
|
||||
"This room is already not subscribed to notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
else:
|
||||
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
||||
self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!")
|
||||
|
||||
self.log.error(
|
||||
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
|
||||
+ result.rowcount
|
||||
+ " instead. Something very bad may have happened!!!!"
|
||||
)
|
||||
|
||||
async def update_all_stream_states(self) -> None:
|
||||
"""
|
||||
@@ -205,7 +263,6 @@ class OwncastSentry(Plugin):
|
||||
await asyncio.gather(*tasks)
|
||||
self.log.debug("Update complete.")
|
||||
|
||||
|
||||
async def update_stream_state(self, domain: str) -> None:
|
||||
"""
|
||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||
@@ -214,62 +271,186 @@ class OwncastSentry(Plugin):
|
||||
:return: Nothing.
|
||||
"""
|
||||
|
||||
# A flag indicating whether to send a notification.
|
||||
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||
send_notifications = True
|
||||
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||
first_update = False
|
||||
|
||||
# A flag indicating whether to update the stream's state in the databased.
|
||||
# Used to avoid writing to the database when a stream's state hasn't changed at all.
|
||||
update_database = False
|
||||
|
||||
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
|
||||
stream_config = {}
|
||||
|
||||
# Fetch the latest stream state from the server.
|
||||
new_state = await self.get_current_stream_state(domain)
|
||||
new_state = await self.get_stream_state(domain)
|
||||
|
||||
# Skip the update if the fetch failed for any reason.
|
||||
if new_state == {}:
|
||||
return
|
||||
|
||||
# Fix possible race conditions with timers
|
||||
if domain not in self.offline_timer_cache:
|
||||
self.offline_timer_cache[domain] = 0
|
||||
if domain not in self.notification_timers_cache:
|
||||
self.notification_timers_cache[domain] = 0
|
||||
|
||||
# Fetch the last known stream state from the database.
|
||||
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
|
||||
query = "SELECT * FROM streams WHERE domain=$1"
|
||||
async with self.database.acquire() as connection:
|
||||
old_state = await connection.fetchrow(query, domain)
|
||||
|
||||
# Does the last known stream state not have a value for the last connect and disconnect time?
|
||||
if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None:
|
||||
if (
|
||||
old_state["last_connect_time"] is None
|
||||
and old_state["last_disconnect_time"] is None
|
||||
):
|
||||
# Yes, this is the first update. Don't send any notifications.
|
||||
send_notifications = False
|
||||
update_database = True
|
||||
first_update = True
|
||||
|
||||
# Does the latest stream state have a last connect time and the old state not have one?
|
||||
if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None:
|
||||
# Yes! This stream is now live. Send notifications and log it, if allowed.
|
||||
if send_notifications:
|
||||
self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...")
|
||||
await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"])
|
||||
if (
|
||||
new_state["lastConnectTime"] is not None
|
||||
and old_state["last_connect_time"] is None
|
||||
):
|
||||
# Yes! This stream is now live.
|
||||
update_database = True
|
||||
stream_config = await self.get_stream_config(domain)
|
||||
|
||||
self.log.info(f"[{domain}] Stream is now live!")
|
||||
|
||||
# Calculate how many seconds since the stream last went offline
|
||||
seconds_since_last_offline = round(
|
||||
time.time() - self.offline_timer_cache[domain]
|
||||
)
|
||||
|
||||
# Have we queried this stream before? (In other words, is this not the first state update ever?)
|
||||
if not first_update:
|
||||
# Yes. Has this stream been offline for a short amount of time?
|
||||
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
|
||||
# Yes. Did the stream title change?
|
||||
if old_state["title"] != new_state["streamTitle"]:
|
||||
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
|
||||
await self.notify_rooms_of_stream_online(
|
||||
domain,
|
||||
stream_config["name"],
|
||||
new_state["streamTitle"],
|
||||
True,
|
||||
stream_config["tags"],
|
||||
)
|
||||
else:
|
||||
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
|
||||
self.log.info(
|
||||
f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
|
||||
)
|
||||
else:
|
||||
# This stream has been offline for a while. Send a normal notification.
|
||||
await self.notify_rooms_of_stream_online(
|
||||
domain,
|
||||
stream_config["name"],
|
||||
new_state["streamTitle"],
|
||||
False,
|
||||
stream_config["tags"],
|
||||
)
|
||||
else:
|
||||
self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.")
|
||||
# No, this is the first time we're querying
|
||||
self.log.info(
|
||||
f"[{domain}] Not sending notifications. This is the first state update for this stream."
|
||||
)
|
||||
|
||||
if (
|
||||
new_state["lastConnectTime"] is not None
|
||||
and old_state["last_connect_time"] is not None
|
||||
):
|
||||
# Did the stream title change mid-session?
|
||||
if old_state["title"] != new_state["streamTitle"]:
|
||||
self.log.info(f"[{domain}] Stream title was changed!")
|
||||
update_database = True
|
||||
stream_config = await self.get_stream_config(domain)
|
||||
|
||||
# This is a fun case to account for... Let's try and explain this.
|
||||
# Was the last time this stream sent a notification before it last went offline?
|
||||
if (
|
||||
self.offline_timer_cache[domain]
|
||||
> self.notification_timers_cache[domain]
|
||||
):
|
||||
# Yes. Send a regular go live notification.
|
||||
# Why? A title change notification could be confusing to users in this case.
|
||||
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
|
||||
await self.notify_rooms_of_stream_online(
|
||||
domain,
|
||||
stream_config["name"],
|
||||
new_state["streamTitle"],
|
||||
False,
|
||||
stream_config["tags"],
|
||||
)
|
||||
else:
|
||||
# No. Send a normal title change notification.
|
||||
await self.notify_rooms_of_stream_online(
|
||||
domain,
|
||||
stream_config["name"],
|
||||
new_state["streamTitle"],
|
||||
True,
|
||||
stream_config["tags"],
|
||||
)
|
||||
|
||||
# Does the latest stream state no longer have a last connect time but the old state does?
|
||||
elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None:
|
||||
elif (
|
||||
new_state["lastConnectTime"] is None
|
||||
and old_state["last_connect_time"] is not None
|
||||
):
|
||||
# Yep. This stream is now offline. Log it.
|
||||
if send_notifications:
|
||||
self.log.info(f"[{domain}] Stream is now offline.")
|
||||
else:
|
||||
update_database = True
|
||||
stream_config = await self.get_stream_config(domain)
|
||||
self.offline_timer_cache[domain] = time.time()
|
||||
if first_update:
|
||||
self.log.info(f"[{domain}] Stream is offline.")
|
||||
else:
|
||||
self.log.info(f"[{domain}] Stream is now offline.")
|
||||
|
||||
# Update the database with the current stream state.
|
||||
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
|
||||
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
|
||||
async with self.database.acquire() as connection:
|
||||
await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain)
|
||||
# Update the database with the current stream state, if needed.
|
||||
if update_database:
|
||||
self.log.debug(f"[{domain}] Updating stream state in database...")
|
||||
update_query = "UPDATE streams SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4 WHERE domain=$5"
|
||||
async with self.database.acquire() as connection:
|
||||
await connection.execute(
|
||||
update_query,
|
||||
stream_config["name"],
|
||||
new_state["streamTitle"],
|
||||
new_state["lastConnectTime"],
|
||||
new_state["lastDisconnectTime"],
|
||||
domain,
|
||||
)
|
||||
|
||||
# All done.
|
||||
self.log.debug(f"[{domain}] State update completed.")
|
||||
|
||||
|
||||
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
|
||||
async def notify_rooms_of_stream_online(
|
||||
self, domain: str, name: str, title: str, title_change: bool, tags: list[str]
|
||||
) -> None:
|
||||
"""
|
||||
Sends notifications to rooms with subscriptions to the provided stream domain.
|
||||
|
||||
:param domain: The domain of the stream to send notifications for.
|
||||
:param title: The title of the stream to include in the message.
|
||||
:param renamed: Whether or not this is for a stream changing its title rather than going live.
|
||||
:return: Nothing.
|
||||
"""
|
||||
|
||||
# Has enough time passed since the last notification was sent?
|
||||
if domain in self.notification_timers_cache:
|
||||
seconds_since_last_notification = round(
|
||||
time.time() - self.notification_timers_cache[domain]
|
||||
)
|
||||
if seconds_since_last_notification < SECONDS_BETWEEN_NOTIFICATIONS:
|
||||
self.log.info(
|
||||
f"[{domain}] Not sending notifications. Only {seconds_since_last_notification} of required {SECONDS_BETWEEN_NOTIFICATIONS} seconds has passed since last notification."
|
||||
)
|
||||
return
|
||||
|
||||
# Yes. Log the current time and proceed with sending notifications.
|
||||
self.notification_timers_cache[domain] = time.time()
|
||||
|
||||
# Get a list of room IDs with active subscriptions to the stream domain.
|
||||
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
||||
async with self.database.acquire() as connection:
|
||||
@@ -279,29 +460,57 @@ class OwncastSentry(Plugin):
|
||||
successful_notifications = 0
|
||||
failed_notifications = 0
|
||||
|
||||
# Time to start building the body text for the notifications.
|
||||
|
||||
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
|
||||
# We'll account for that... Just in case.
|
||||
stream_name = name if name else domain
|
||||
|
||||
# If the stream changed its title, send a different title change notification.
|
||||
if title_change:
|
||||
self.log.info(f"[{domain}] Sending title change notifications...")
|
||||
body_text = "📝 " + name + " has changed its stream title!"
|
||||
else:
|
||||
self.log.info(f"[{domain}] Sending going live notifications...")
|
||||
body_text = "🎥 " + name + " is now live!"
|
||||
|
||||
# Streams can have no title. If there is none, don't even mention it.
|
||||
if title != "":
|
||||
body_text += "\nStream Title: " + title
|
||||
|
||||
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
||||
|
||||
if "tags" and len(tags) > 0:
|
||||
body_text += "\n\n"
|
||||
body_text += " ".join("#" + tag for tag in tags)
|
||||
|
||||
# Iterate over the subscribed rooms and try to send a message to each.
|
||||
# TODO: This should probably be made async.
|
||||
body_text = "🎥 " + domain + " is now live!\nStream Title: " + title +"\n\nTo tune in, visit: https://" + domain + "/"
|
||||
for rows in results:
|
||||
room_id = rows["room_id"]
|
||||
try:
|
||||
# Try and send a message.
|
||||
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text)
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT, body=body_text
|
||||
)
|
||||
await self.client.send_message(room_id, content)
|
||||
|
||||
# It worked! Increment the counter.
|
||||
successful_notifications += 1
|
||||
except Exception as exception:
|
||||
# Something didn't work. Log it and move on to the next one.
|
||||
self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}")
|
||||
failed_notifications +=1
|
||||
self.log.warning(
|
||||
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
|
||||
)
|
||||
failed_notifications += 1
|
||||
|
||||
# All done!
|
||||
self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.")
|
||||
|
||||
self.log.info(
|
||||
f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed."
|
||||
)
|
||||
|
||||
# ========== HELPER METHODS ==========
|
||||
async def get_current_stream_state(self, domain):
|
||||
async def get_stream_state(self, domain):
|
||||
"""
|
||||
Get the current stream state for a given domain.
|
||||
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||
@@ -316,63 +525,118 @@ class OwncastSentry(Plugin):
|
||||
|
||||
# Make a request to the endpoint.
|
||||
try:
|
||||
response = await self.session.request("GET", status_url, allow_redirects=False)
|
||||
response = await self.session.request(
|
||||
"GET", status_url, allow_redirects=False
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Check the response code is success
|
||||
# TODO: Handle 429 rate limiting?
|
||||
if response.status != 200:
|
||||
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||
)
|
||||
return {}
|
||||
|
||||
# Try and interpret the response as JSON.
|
||||
try:
|
||||
new_state = json.loads(await response.read())
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Validate the response to ensure it contains all the basic info needed to function.
|
||||
if "lastConnectTime" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter."
|
||||
)
|
||||
return {}
|
||||
elif "lastDisconnectTime" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter."
|
||||
)
|
||||
return {}
|
||||
elif "streamTitle" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter."
|
||||
)
|
||||
return {}
|
||||
elif "online" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter."
|
||||
)
|
||||
return {}
|
||||
|
||||
return new_state
|
||||
|
||||
|
||||
async def validate_url_as_owncast_stream(self, url) -> str:
|
||||
async def get_stream_config(self, domain):
|
||||
"""
|
||||
Take a given URL and validate its domain is a valid Owncast stream.
|
||||
Get the current stream config for a given domain.
|
||||
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||
|
||||
:param url: A URL with the domain to check for an Owncast stream.
|
||||
:return: A string with just the domain if it contains an Owncast stream, or an empty string if an error occurred or is otherwise not valid.
|
||||
:param domain: The domain (not URL) where the stream is hosted.
|
||||
:return: A dictionary containing the stream's configuration.
|
||||
"""
|
||||
self.log.debug(f"[{domain}] Fetching current stream config...")
|
||||
# Build a URL to the config API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetWebConfig)
|
||||
# Only use HTTPS, even if the user specified something else.
|
||||
status_url = "https://" + domain + OWNCAST_CONFIG_PATH
|
||||
|
||||
# Make a request to the endpoint.
|
||||
try:
|
||||
response = await self.session.request(
|
||||
"GET", status_url, allow_redirects=False
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(
|
||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Check the response code is success
|
||||
# TODO: Handle 429 rate limiting?
|
||||
if response.status != 200:
|
||||
self.log.warning(
|
||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||
)
|
||||
return {}
|
||||
|
||||
# Try and interpret the response as JSON.
|
||||
try:
|
||||
config = json.loads(await response.read())
|
||||
except Exception as e:
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
return config
|
||||
|
||||
def domainify(self, url) -> str:
|
||||
"""
|
||||
Take a given URL and convert it to just the domain.
|
||||
|
||||
:param url:
|
||||
:return:
|
||||
"""
|
||||
# Take whatever input the user provided and try to turn it into just the domain.
|
||||
# Examples:
|
||||
# "stream.logal.dev" -> "stream.logal.dev"
|
||||
# "https://stream.logal.dev" -> "stream.logal.dev"
|
||||
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
|
||||
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev
|
||||
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev"
|
||||
# "notify@stream.logal.dev" -> "stream.logal.dev"
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
domain = (parsed_url.netloc or parsed_url.path).lower()
|
||||
|
||||
# Try to fetch the current status of the stream at the given domain.
|
||||
stream_state = await self.get_current_stream_state(domain)
|
||||
if "@" in domain:
|
||||
return domain.split("@")[-1]
|
||||
|
||||
# The above method does all the checking. If the length of the output dictionary is more than 0, it should be valid. Otherwise, pass an empty string to say it's invalid.
|
||||
if len(stream_state) > 0:
|
||||
return domain
|
||||
else:
|
||||
return ""
|
||||
return domain
|
||||
|
||||
Reference in New Issue
Block a user