Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
548f90f0c0
|
|||
|
f62764a2b2
|
|||
|
dc0df47257
|
|||
|
35086cb751
|
|||
|
b6beef0e48
|
|||
|
c6430a4110
|
|||
|
7c10d15dd6
|
|||
|
b177114540
|
|||
|
1d35ee6d7c
|
|||
|
764fab9879
|
|||
|
0659f70e1a
|
|||
|
ee61ea8562
|
|||
|
b0868c5bd4
|
|||
|
1c6720cc10
|
@@ -1,20 +0,0 @@
|
|||||||
name: Build Maubot Plugin Artifact
|
|
||||||
on: [push]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
name: Build
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Build artifact
|
|
||||||
run: zip -v -9 OwncastSentry-v${{env.GITHUB_SHA}}.mbp LICENSE.txt maubot.yaml owncastsentry.py
|
|
||||||
|
|
||||||
- name: Upload artifact
|
|
||||||
uses: actions/upload-artifact@v3
|
|
||||||
with:
|
|
||||||
name: Maubot Plugin
|
|
||||||
path: OwncastSentry-v${{env.GITHUB_SHA}}.mbp
|
|
||||||
@@ -1,19 +0,0 @@
|
|||||||
name: Lint Source Code
|
|
||||||
on: [push]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
lint:
|
|
||||||
name: Lint
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Setup Python environment
|
|
||||||
uses: actions/setup-python@v5
|
|
||||||
with:
|
|
||||||
python-version: 3.13
|
|
||||||
|
|
||||||
- name: Run linter
|
|
||||||
uses: psf/black@stable
|
|
||||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
maubot: 0.1.0
|
maubot: 0.1.0
|
||||||
id: dev.logal.owncastsentry
|
id: dev.logal.owncastsentry
|
||||||
version: 1.0.2
|
version: 1.1.0
|
||||||
license: Apache-2.0
|
license: Apache-2.0
|
||||||
modules:
|
modules:
|
||||||
- owncastsentry
|
- owncastsentry
|
||||||
|
|||||||
517
owncastsentry.py
517
owncastsentry.py
@@ -1,517 +0,0 @@
|
|||||||
# Copyright 2025 Logan Fick
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
||||||
|
|
||||||
import sqlite3
|
|
||||||
import aiohttp
|
|
||||||
import json
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
from maubot import Plugin, MessageEvent
|
|
||||||
from maubot.handlers import command
|
|
||||||
from mautrix.types import TextMessageEventContent, MessageType
|
|
||||||
from mautrix.util.async_db import UpgradeTable, Connection
|
|
||||||
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
|
|
||||||
# Path to the GetStatus API call on Owncast instances
|
|
||||||
OWNCAST_STATUS_PATH = "/api/status"
|
|
||||||
|
|
||||||
# Path to GetWebConfig API call on Owncast instances
|
|
||||||
OWNCAST_CONFIG_PATH = "/api/config"
|
|
||||||
|
|
||||||
# User agent to send with all HTTP requests.
|
|
||||||
USER_AGENT = (
|
|
||||||
"OwncastSentry/1.0.2 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ===== DATABASE MIGRATIONS =====
|
|
||||||
upgrade_table = UpgradeTable()
|
|
||||||
|
|
||||||
|
|
||||||
@upgrade_table.register(description="Initial revision")
|
|
||||||
async def upgrade_v1(conn: Connection) -> None:
|
|
||||||
"""
|
|
||||||
Runs migrations to upgrade database schema to verison 1 format.
|
|
||||||
Version 1 is the initial format of the database.
|
|
||||||
|
|
||||||
:param conn: A connection to run the v1 database migration on.
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
await conn.execute(
|
|
||||||
"""CREATE TABLE "streams" (
|
|
||||||
"domain" TEXT NOT NULL UNIQUE,
|
|
||||||
"name" TEXT,
|
|
||||||
"title" TEXT,
|
|
||||||
"last_connect_time" TEXT,
|
|
||||||
"last_disconnect_time" TEXT,
|
|
||||||
PRIMARY KEY("domain")
|
|
||||||
)"""
|
|
||||||
)
|
|
||||||
|
|
||||||
await conn.execute(
|
|
||||||
"""CREATE TABLE "subscriptions" (
|
|
||||||
"stream_domain" INTEGER NOT NULL,
|
|
||||||
"room_id" TEXT NOT NULL,
|
|
||||||
UNIQUE("room_id","stream_domain")
|
|
||||||
)"""
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ===== MAIN BOT CLASS =====
|
|
||||||
class OwncastSentry(Plugin):
|
|
||||||
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
|
|
||||||
headers = {
|
|
||||||
"User-Agent": USER_AGENT
|
|
||||||
} # Override User Agent to special static value to identify to servers our purpose.
|
|
||||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
|
||||||
baseconnctor = aiohttp.TCPConnector(
|
|
||||||
use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
|
|
||||||
) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
|
||||||
timeouts = aiohttp.ClientTimeout(
|
|
||||||
sock_connect=5, sock_read=5
|
|
||||||
) # Tighten up timeouts for faster responses to users in case of broken servers.
|
|
||||||
|
|
||||||
# Final HTTP ClientSession for all requests to Owncast instances.
|
|
||||||
session = aiohttp.ClientSession(
|
|
||||||
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
|
||||||
"""
|
|
||||||
Helper method for telling Maubot about our database migrations.
|
|
||||||
|
|
||||||
:return: An UpgradeTable with our registered migrations.
|
|
||||||
"""
|
|
||||||
return upgrade_table
|
|
||||||
|
|
||||||
async def start(self) -> None:
|
|
||||||
"""
|
|
||||||
Method called by Maubot upon startup of the instance.
|
|
||||||
Registers a recurring task every minute to update the state of all subscribed streams.
|
|
||||||
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
self.sched.run_periodically(60, self.update_all_stream_states)
|
|
||||||
|
|
||||||
@command.new(help="Subscribes to a new Owncast stream.")
|
|
||||||
@command.argument("url")
|
|
||||||
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
|
||||||
"""
|
|
||||||
"!subscribe" command handler for users to subscribe a room to a given stream's notifications.
|
|
||||||
|
|
||||||
:param evt: MessageEvent of the message calling the command.
|
|
||||||
:param url: A string containing the user supplied URL to a stream to try and subscribe to.
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Convert the user input to only a domain.
|
|
||||||
stream_domain = self.domainify(url)
|
|
||||||
|
|
||||||
# How many subscriptions already exist for this domain?
|
|
||||||
query = "SELECT COUNT(*) FROM subscriptions WHERE stream_domain=$1"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
result = await connection.fetchrow(query, stream_domain)
|
|
||||||
|
|
||||||
if result[0] == 0:
|
|
||||||
# There are 0 subscriptions, we need to validate this domain is an Owncast stream.
|
|
||||||
# Attempt to fetch the stream state from this domain.
|
|
||||||
stream_state = await self.get_stream_state(stream_domain)
|
|
||||||
if len(stream_state) == 0:
|
|
||||||
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
|
||||||
await evt.reply(
|
|
||||||
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Try to add a new subscription for the requested stream domain in the room the command was executed in.
|
|
||||||
try:
|
|
||||||
query = "INSERT INTO subscriptions (stream_domain, room_id) VALUES ($1, $2)"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
await connection.execute(query, stream_domain, evt.room_id)
|
|
||||||
except sqlite3.IntegrityError as exception:
|
|
||||||
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
|
||||||
if "UNIQUE constraint failed" in exception.args[0]:
|
|
||||||
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
|
||||||
await evt.reply(
|
|
||||||
"This room is already subscribed to notifications for "
|
|
||||||
+ stream_domain
|
|
||||||
+ "."
|
|
||||||
)
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
# Nope... Something unexpected happened. Give up.
|
|
||||||
self.log.error(
|
|
||||||
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}"
|
|
||||||
)
|
|
||||||
raise exception
|
|
||||||
|
|
||||||
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
|
||||||
try:
|
|
||||||
query = "INSERT INTO streams (domain) VALUES ($1)"
|
|
||||||
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
await connection.execute(query, stream_domain)
|
|
||||||
|
|
||||||
# The insert was successful, so this is the first time we're seeing this stream. Log it.
|
|
||||||
self.log.info(f"[{stream_domain}] Discovered new stream!")
|
|
||||||
except sqlite3.IntegrityError as exception:
|
|
||||||
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
|
||||||
if "UNIQUE constraint failed" not in exception.args[0]:
|
|
||||||
# Something unexpected happened. Give up.
|
|
||||||
self.log.error(
|
|
||||||
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}"
|
|
||||||
)
|
|
||||||
raise exception
|
|
||||||
|
|
||||||
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
|
||||||
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
|
||||||
await evt.reply(
|
|
||||||
"Subscription added! This room will receive notifications when "
|
|
||||||
+ stream_domain
|
|
||||||
+ " goes live."
|
|
||||||
)
|
|
||||||
|
|
||||||
@command.new(help="Unsubscribes from an Owncast stream.")
|
|
||||||
@command.argument("url")
|
|
||||||
async def unsubscribe(self, evt: MessageEvent, url: str) -> None:
|
|
||||||
"""
|
|
||||||
"!unsubscribe" command handler for users to unsubscribe a room from a given stream's notifications.
|
|
||||||
|
|
||||||
:param evt: MessageEvent of the message calling the command.
|
|
||||||
:param url: A string containing the user supplied URL to a stream to try and unsubscribe from.
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
# Convert the user input to only a domain.
|
|
||||||
stream_domain = self.domainify(url)
|
|
||||||
|
|
||||||
# Attempt to delete the requested subscription from the database.
|
|
||||||
query = "DELETE FROM subscriptions WHERE stream_domain=$1 AND room_id=$2"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
result = await connection.execute(query, stream_domain, evt.room_id)
|
|
||||||
|
|
||||||
# Did it work?
|
|
||||||
if result.rowcount == 1:
|
|
||||||
# Yes, one row was deleted. Tell the user.
|
|
||||||
self.log.info(
|
|
||||||
f"[{stream_domain}] Subscription removed for room {evt.room_id}."
|
|
||||||
)
|
|
||||||
await evt.reply(
|
|
||||||
"Subscription removed! This room will no longer receive notifications for "
|
|
||||||
+ stream_domain
|
|
||||||
+ "."
|
|
||||||
)
|
|
||||||
elif result.rowcount == 0:
|
|
||||||
# No, nothing changed. Tell the user.
|
|
||||||
await evt.reply(
|
|
||||||
"This room is already not subscribed to notifications for "
|
|
||||||
+ stream_domain
|
|
||||||
+ "."
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
|
||||||
self.log.error(
|
|
||||||
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
|
|
||||||
+ result.rowcount
|
|
||||||
+ " instead. Something very bad may have happened!!!!"
|
|
||||||
)
|
|
||||||
|
|
||||||
async def update_all_stream_states(self) -> None:
|
|
||||||
"""
|
|
||||||
Checks the status of all streams with active subscriptions.
|
|
||||||
Updates for all streams are performed asynchronously, with the method returning when the slowest update completes.
|
|
||||||
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
self.log.debug("Updating all stream states...")
|
|
||||||
|
|
||||||
# Make a list of all stream domains with an active subscription.
|
|
||||||
query = "SELECT DISTINCT stream_domain FROM subscriptions"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
results = await connection.fetch(query)
|
|
||||||
|
|
||||||
# Build a list of async tasks which update the state for each stream domain fetched.
|
|
||||||
tasks = []
|
|
||||||
for row in results:
|
|
||||||
domain = row["stream_domain"]
|
|
||||||
|
|
||||||
tasks.append(asyncio.create_task(self.update_stream_state(domain)))
|
|
||||||
|
|
||||||
# Run the tasks in parallel.
|
|
||||||
await asyncio.gather(*tasks)
|
|
||||||
self.log.debug("Update complete.")
|
|
||||||
|
|
||||||
async def update_stream_state(self, domain: str) -> None:
|
|
||||||
"""
|
|
||||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
|
||||||
|
|
||||||
:param domain: The domain of the stream to update.
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# A flag indicating whether to send a notification.
|
|
||||||
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
|
|
||||||
send_notifications = True
|
|
||||||
|
|
||||||
# Fetch the latest stream state from the server.
|
|
||||||
new_state = await self.get_stream_state(domain)
|
|
||||||
|
|
||||||
# Skip the update if the fetch failed for any reason.
|
|
||||||
if new_state == {}:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Fetch the last known stream state from the database.
|
|
||||||
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
old_state = await connection.fetchrow(query, domain)
|
|
||||||
|
|
||||||
# Does the last known stream state not have a value for the last connect and disconnect time?
|
|
||||||
if (
|
|
||||||
old_state["last_connect_time"] is None
|
|
||||||
and old_state["last_disconnect_time"] is None
|
|
||||||
):
|
|
||||||
# Yes, this is the first update. Don't send any notifications.
|
|
||||||
send_notifications = False
|
|
||||||
|
|
||||||
# Does the latest stream state have a last connect time and the old state not have one?
|
|
||||||
if (
|
|
||||||
new_state["lastConnectTime"] is not None
|
|
||||||
and old_state["last_connect_time"] is None
|
|
||||||
):
|
|
||||||
# Yes! This stream is now live. Send notifications and log it, if allowed.
|
|
||||||
if send_notifications:
|
|
||||||
self.log.info(
|
|
||||||
f"[{domain}] Stream is now live! Notifying subscribed rooms..."
|
|
||||||
)
|
|
||||||
await self.notify_rooms_of_stream_online(
|
|
||||||
domain, new_state["streamTitle"]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.log.info(
|
|
||||||
f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Does the latest stream state no longer have a last connect time but the old state does?
|
|
||||||
elif (
|
|
||||||
new_state["lastConnectTime"] is None
|
|
||||||
and old_state["last_connect_time"] is not None
|
|
||||||
):
|
|
||||||
# Yep. This stream is now offline. Log it.
|
|
||||||
if send_notifications:
|
|
||||||
self.log.info(f"[{domain}] Stream is now offline.")
|
|
||||||
else:
|
|
||||||
self.log.info(f"[{domain}] Stream is offline.")
|
|
||||||
|
|
||||||
# Update the database with the current stream state.
|
|
||||||
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
|
|
||||||
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
await connection.execute(
|
|
||||||
update_query,
|
|
||||||
new_state["streamTitle"],
|
|
||||||
new_state["lastConnectTime"],
|
|
||||||
new_state["lastDisconnectTime"],
|
|
||||||
domain,
|
|
||||||
)
|
|
||||||
|
|
||||||
# All done.
|
|
||||||
self.log.debug(f"[{domain}] State update completed.")
|
|
||||||
|
|
||||||
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
|
|
||||||
"""
|
|
||||||
Sends notifications to rooms with subscriptions to the provided stream domain.
|
|
||||||
|
|
||||||
:param domain: The domain of the stream to send notifications for.
|
|
||||||
:param title: The title of the stream to include in the message.
|
|
||||||
:return: Nothing.
|
|
||||||
"""
|
|
||||||
# Get a list of room IDs with active subscriptions to the stream domain.
|
|
||||||
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
|
||||||
async with self.database.acquire() as connection:
|
|
||||||
results = await connection.fetch(query, domain)
|
|
||||||
|
|
||||||
# Set up some counters for a high level statistics.
|
|
||||||
successful_notifications = 0
|
|
||||||
failed_notifications = 0
|
|
||||||
|
|
||||||
stream_config = await self.get_stream_config(domain)
|
|
||||||
|
|
||||||
# Build the message body text.
|
|
||||||
if "name" in stream_config:
|
|
||||||
body_text = "🎥 " + stream_config["name"] + " is now live!"
|
|
||||||
else:
|
|
||||||
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
|
|
||||||
# We'll account for that... Just in case.
|
|
||||||
body_text = "🎥 " + domain + " is now live!"
|
|
||||||
|
|
||||||
# Streams can have no title. If there is none, don't even mention it.
|
|
||||||
if title != "":
|
|
||||||
body_text += "\nStream Title: " + title
|
|
||||||
|
|
||||||
body_text += "\n\nTo tune in, visit: https://" + domain + "/\n\n"
|
|
||||||
|
|
||||||
if "tags" in stream_config and len(stream_config["tags"]) > 0:
|
|
||||||
body_text += "\n\n"
|
|
||||||
body_text += " ".join("#" + tag for tag in stream_config["tags"])
|
|
||||||
|
|
||||||
# Iterate over the subscribed rooms and try to send a message to each.
|
|
||||||
# TODO: This should probably be made async.
|
|
||||||
for rows in results:
|
|
||||||
room_id = rows["room_id"]
|
|
||||||
try:
|
|
||||||
# Try and send a message.
|
|
||||||
content = TextMessageEventContent(
|
|
||||||
msgtype=MessageType.TEXT, body=body_text
|
|
||||||
)
|
|
||||||
await self.client.send_message(room_id, content)
|
|
||||||
|
|
||||||
# It worked! Increment the counter.
|
|
||||||
successful_notifications += 1
|
|
||||||
except Exception as exception:
|
|
||||||
# Something didn't work. Log it and move on to the next one.
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
|
|
||||||
)
|
|
||||||
failed_notifications += 1
|
|
||||||
|
|
||||||
# All done!
|
|
||||||
self.log.info(
|
|
||||||
f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed."
|
|
||||||
)
|
|
||||||
|
|
||||||
# ========== HELPER METHODS ==========
|
|
||||||
async def get_stream_state(self, domain):
|
|
||||||
"""
|
|
||||||
Get the current stream state for a given domain.
|
|
||||||
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
|
||||||
|
|
||||||
:param domain: The domain (not URL) where the stream is hosted.
|
|
||||||
:return: A dictionary containing the lastConnectTime, lastDisconnectTime, streamTitle, and online values if available. An empty dictionary if an error occurred.
|
|
||||||
"""
|
|
||||||
self.log.debug(f"[{domain}] Fetching current stream state...")
|
|
||||||
# Build a URL to the status API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetStatus)
|
|
||||||
# Only use HTTPS, even if the user specified something else.
|
|
||||||
status_url = "https://" + domain + OWNCAST_STATUS_PATH
|
|
||||||
|
|
||||||
# Make a request to the endpoint.
|
|
||||||
try:
|
|
||||||
response = await self.session.request(
|
|
||||||
"GET", status_url, allow_redirects=False
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Check the response code is success
|
|
||||||
# TODO: Handle 429 rate limiting?
|
|
||||||
if response.status != 200:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Try and interpret the response as JSON.
|
|
||||||
try:
|
|
||||||
new_state = json.loads(await response.read())
|
|
||||||
except Exception as e:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Validate the response to ensure it contains all the basic info needed to function.
|
|
||||||
if "lastConnectTime" not in new_state:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
elif "lastDisconnectTime" not in new_state:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
elif "streamTitle" not in new_state:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
elif "online" not in new_state:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
return new_state
|
|
||||||
|
|
||||||
async def get_stream_config(self, domain):
|
|
||||||
"""
|
|
||||||
Get the current stream config for a given domain.
|
|
||||||
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
|
||||||
|
|
||||||
:param domain: The domain (not URL) where the stream is hosted.
|
|
||||||
:return: A dictionary containing the stream's configuration.
|
|
||||||
"""
|
|
||||||
self.log.debug(f"[{domain}] Fetching current stream config...")
|
|
||||||
# Build a URL to the config API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetWebConfig)
|
|
||||||
# Only use HTTPS, even if the user specified something else.
|
|
||||||
status_url = "https://" + domain + OWNCAST_CONFIG_PATH
|
|
||||||
|
|
||||||
# Make a request to the endpoint.
|
|
||||||
try:
|
|
||||||
response = await self.session.request(
|
|
||||||
"GET", status_url, allow_redirects=False
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Check the response code is success
|
|
||||||
# TODO: Handle 429 rate limiting?
|
|
||||||
if response.status != 200:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
# Try and interpret the response as JSON.
|
|
||||||
try:
|
|
||||||
config = json.loads(await response.read())
|
|
||||||
except Exception as e:
|
|
||||||
self.log.warning(
|
|
||||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
|
||||||
)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
return config
|
|
||||||
|
|
||||||
def domainify(self, url) -> str:
|
|
||||||
"""
|
|
||||||
Take a given URL and convert it to just the domain.
|
|
||||||
|
|
||||||
:param url:
|
|
||||||
:return:
|
|
||||||
"""
|
|
||||||
# Take whatever input the user provided and try to turn it into just the domain.
|
|
||||||
# Examples:
|
|
||||||
# "stream.logal.dev" -> "stream.logal.dev"
|
|
||||||
# "https://stream.logal.dev" -> "stream.logal.dev"
|
|
||||||
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
|
|
||||||
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev"
|
|
||||||
# "notify@stream.logal.dev" -> "stream.logal.dev"
|
|
||||||
|
|
||||||
parsed_url = urlparse(url)
|
|
||||||
domain = (parsed_url.netloc or parsed_url.path).lower()
|
|
||||||
|
|
||||||
if "@" in domain:
|
|
||||||
return domain.split("@")[-1]
|
|
||||||
|
|
||||||
return domain
|
|
||||||
133
owncastsentry/__init__.py
Normal file
133
owncastsentry/__init__.py
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
from maubot import Plugin, MessageEvent
|
||||||
|
from maubot.handlers import command
|
||||||
|
from mautrix.util.async_db import UpgradeTable
|
||||||
|
|
||||||
|
from .migrations import get_upgrade_table
|
||||||
|
from .owncast_client import OwncastClient
|
||||||
|
from .database import StreamRepository, SubscriptionRepository
|
||||||
|
from .notification_service import NotificationService
|
||||||
|
from .stream_monitor import StreamMonitor
|
||||||
|
from .commands import CommandHandler
|
||||||
|
|
||||||
|
|
||||||
|
class OwncastSentry(Plugin):
|
||||||
|
"""Main plugin class for OwncastSentry."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
||||||
|
"""
|
||||||
|
Helper method for telling Maubot about our database migrations.
|
||||||
|
|
||||||
|
:return: An UpgradeTable with our registered migrations.
|
||||||
|
"""
|
||||||
|
return get_upgrade_table()
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""
|
||||||
|
Method called by Maubot upon startup of the instance.
|
||||||
|
Initializes all services and registers a recurring task every minute to update the state of all subscribed streams.
|
||||||
|
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Initialize the Owncast API client
|
||||||
|
self.owncast_client = OwncastClient(self.log)
|
||||||
|
|
||||||
|
# Initialize repositories
|
||||||
|
self.stream_repo = StreamRepository(self.database)
|
||||||
|
self.subscription_repo = SubscriptionRepository(self.database)
|
||||||
|
|
||||||
|
# Initialize notification service
|
||||||
|
self.notification_service = NotificationService(
|
||||||
|
self.client, self.subscription_repo, self.log
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize stream monitor
|
||||||
|
self.stream_monitor = StreamMonitor(
|
||||||
|
self.owncast_client,
|
||||||
|
self.stream_repo,
|
||||||
|
self.notification_service,
|
||||||
|
self.log,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize command handler
|
||||||
|
self.command_handler = CommandHandler(
|
||||||
|
self.owncast_client,
|
||||||
|
self.stream_repo,
|
||||||
|
self.subscription_repo,
|
||||||
|
self.log,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Schedule periodic stream state updates every 60 seconds
|
||||||
|
self.sched.run_periodically(60, self._update_all_stream_states)
|
||||||
|
|
||||||
|
async def _update_all_stream_states(self) -> None:
|
||||||
|
"""
|
||||||
|
Wrapper method for updating all stream states.
|
||||||
|
Fetches list of subscribed domains and delegates to StreamMonitor.
|
||||||
|
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Get list of all stream domains with active subscriptions
|
||||||
|
subscribed_domains = await self.subscription_repo.get_all_subscribed_domains()
|
||||||
|
|
||||||
|
# Delegate to stream monitor
|
||||||
|
await self.stream_monitor.update_all_streams(subscribed_domains)
|
||||||
|
|
||||||
|
@command.new(help="Subscribes to a new Owncast stream.")
|
||||||
|
@command.argument("url")
|
||||||
|
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
||||||
|
"""
|
||||||
|
Command handler that delegates to CommandHandler.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:param url: A string containing the user supplied URL to a stream to try and subscribe to.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await self.command_handler.subscribe(evt, url)
|
||||||
|
|
||||||
|
@command.new(help="Unsubscribes from an Owncast stream.")
|
||||||
|
@command.argument("url")
|
||||||
|
async def unsubscribe(self, evt: MessageEvent, url: str) -> None:
|
||||||
|
"""
|
||||||
|
Command handler that delegates to CommandHandler.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:param url: A string containing the user supplied URL to a stream to try and unsubscribe from.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await self.command_handler.unsubscribe(evt, url)
|
||||||
|
|
||||||
|
@command.new(help="Lists all stream subscriptions in this room.")
|
||||||
|
async def subscriptions(self, evt: MessageEvent) -> None:
|
||||||
|
"""
|
||||||
|
Command handler that delegates to CommandHandler.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await self.command_handler.subscriptions(evt)
|
||||||
|
|
||||||
|
@command.new(help="Lists currently live streams in this room.")
|
||||||
|
async def live(self, evt: MessageEvent) -> None:
|
||||||
|
"""
|
||||||
|
Command handler that delegates to CommandHandler.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await self.command_handler.live(evt)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""
|
||||||
|
Method called by Maubot upon shutdown of the instance.
|
||||||
|
Closes the HTTP session.
|
||||||
|
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await self.owncast_client.close()
|
||||||
299
owncastsentry/commands.py
Normal file
299
owncastsentry/commands.py
Normal file
@@ -0,0 +1,299 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from maubot import MessageEvent
|
||||||
|
from mautrix.types import TextMessageEventContent, MessageType
|
||||||
|
|
||||||
|
from .owncast_client import OwncastClient
|
||||||
|
from .database import StreamRepository, SubscriptionRepository
|
||||||
|
from .models import StreamStatus
|
||||||
|
from .utils import domainify, sanitize_for_markdown
|
||||||
|
|
||||||
|
|
||||||
|
class CommandHandler:
|
||||||
|
"""Handles bot commands for subscribing to streams."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
owncast_client: OwncastClient,
|
||||||
|
stream_repo: StreamRepository,
|
||||||
|
subscription_repo: SubscriptionRepository,
|
||||||
|
logger,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the command handler.
|
||||||
|
|
||||||
|
:param owncast_client: Client for making API calls to Owncast instances
|
||||||
|
:param stream_repo: Repository for stream data
|
||||||
|
:param subscription_repo: Repository for subscription data
|
||||||
|
:param logger: Logger instance
|
||||||
|
"""
|
||||||
|
self.owncast_client = owncast_client
|
||||||
|
self.stream_repo = stream_repo
|
||||||
|
self.subscription_repo = subscription_repo
|
||||||
|
self.log = logger
|
||||||
|
|
||||||
|
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
||||||
|
"""
|
||||||
|
"!subscribe" command handler for users to subscribe a room to a given stream's notifications.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:param url: A string containing the user supplied URL to a stream to try and subscribe to.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Convert the user input to only a domain
|
||||||
|
stream_domain = domainify(url)
|
||||||
|
|
||||||
|
# How many subscriptions already exist for this domain?
|
||||||
|
subscription_count = await self.subscription_repo.count_by_domain(stream_domain)
|
||||||
|
|
||||||
|
if subscription_count == 0:
|
||||||
|
# There are 0 subscriptions, we need to validate this domain is an Owncast stream.
|
||||||
|
is_valid = await self.owncast_client.validate_instance(stream_domain)
|
||||||
|
if not is_valid:
|
||||||
|
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
||||||
|
await evt.reply(
|
||||||
|
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Try to add a new subscription for the requested stream domain in the room the command was executed in
|
||||||
|
try:
|
||||||
|
await self.subscription_repo.add(stream_domain, evt.room_id)
|
||||||
|
except sqlite3.IntegrityError as exception:
|
||||||
|
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
||||||
|
if "UNIQUE constraint failed" in exception.args[0]:
|
||||||
|
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
||||||
|
await evt.reply(
|
||||||
|
"This room is already subscribed to notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# Nope... Something unexpected happened. Give up.
|
||||||
|
self.log.error(
|
||||||
|
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {exception}"
|
||||||
|
)
|
||||||
|
raise exception
|
||||||
|
|
||||||
|
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
||||||
|
try:
|
||||||
|
await self.stream_repo.create(stream_domain)
|
||||||
|
# The insert was successful, so this is the first time we're seeing this stream. Log it.
|
||||||
|
self.log.info(f"[{stream_domain}] Discovered new stream!")
|
||||||
|
except sqlite3.IntegrityError as exception:
|
||||||
|
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
||||||
|
if "UNIQUE constraint failed" not in exception.args[0]:
|
||||||
|
# Something unexpected happened. Give up.
|
||||||
|
self.log.error(
|
||||||
|
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {exception}"
|
||||||
|
)
|
||||||
|
raise exception
|
||||||
|
|
||||||
|
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
||||||
|
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
||||||
|
await evt.reply(
|
||||||
|
"Subscription added! This room will receive notifications when "
|
||||||
|
+ stream_domain
|
||||||
|
+ " goes live."
|
||||||
|
)
|
||||||
|
|
||||||
|
async def unsubscribe(self, evt: MessageEvent, url: str) -> None:
|
||||||
|
"""
|
||||||
|
"!unsubscribe" command handler for users to unsubscribe a room from a given stream's notifications.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:param url: A string containing the user supplied URL to a stream to try and unsubscribe from.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Convert the user input to only a domain
|
||||||
|
stream_domain = domainify(url)
|
||||||
|
|
||||||
|
# Attempt to delete the requested subscription from the database
|
||||||
|
result = await self.subscription_repo.remove(stream_domain, evt.room_id)
|
||||||
|
|
||||||
|
# Did it work?
|
||||||
|
if result == 1:
|
||||||
|
# Yes, one row was deleted. Tell the user.
|
||||||
|
self.log.info(
|
||||||
|
f"[{stream_domain}] Subscription removed for room {evt.room_id}."
|
||||||
|
)
|
||||||
|
await evt.reply(
|
||||||
|
"Subscription removed! This room will no longer receive notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
|
elif result == 0:
|
||||||
|
# No, nothing changed. Tell the user.
|
||||||
|
await evt.reply(
|
||||||
|
"This room is already not subscribed to notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
||||||
|
self.log.error(
|
||||||
|
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
|
||||||
|
+ str(result)
|
||||||
|
+ " instead. Something very bad may have happened!!!!"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _format_duration(self, timestamp_str: str) -> str:
|
||||||
|
"""
|
||||||
|
Calculate and format the duration from a timestamp to now.
|
||||||
|
|
||||||
|
:param timestamp_str: ISO 8601 timestamp string
|
||||||
|
:return: Formatted duration string (e.g., "1 hour", "2 days")
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
delta = now - timestamp
|
||||||
|
|
||||||
|
seconds = int(delta.total_seconds())
|
||||||
|
if seconds < 60:
|
||||||
|
return f"{seconds} second{'s' if seconds != 1 else ''}"
|
||||||
|
elif seconds < 3600:
|
||||||
|
minutes = seconds // 60
|
||||||
|
return f"{minutes} minute{'s' if minutes != 1 else ''}"
|
||||||
|
elif seconds < 86400:
|
||||||
|
hours = seconds // 3600
|
||||||
|
return f"{hours} hour{'s' if hours != 1 else ''}"
|
||||||
|
else:
|
||||||
|
days = seconds // 86400
|
||||||
|
return f"{days} day{'s' if days != 1 else ''}"
|
||||||
|
except Exception:
|
||||||
|
return "unknown duration"
|
||||||
|
|
||||||
|
async def subscriptions(self, evt: MessageEvent) -> None:
|
||||||
|
"""
|
||||||
|
"!subscriptions" command handler for listing all stream subscriptions in the current room.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Get all stream domains this room is subscribed to
|
||||||
|
subscribed_domains = await self.subscription_repo.get_subscribed_streams_for_room(
|
||||||
|
evt.room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if there are no subscriptions
|
||||||
|
if not subscribed_domains:
|
||||||
|
await evt.reply("This room is not subscribed to any Owncast instances.\n\nTo subscribe to an Owncast instance, use `!subscribe <domain>`", markdown=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Build the response message body as Markdown
|
||||||
|
body_text = f"**Subscriptions for this room ({len(subscribed_domains)}):**\n\n"
|
||||||
|
|
||||||
|
for domain in subscribed_domains:
|
||||||
|
# Get the stream state from the database
|
||||||
|
stream_state = await self.stream_repo.get_by_domain(domain)
|
||||||
|
|
||||||
|
if stream_state is None:
|
||||||
|
# Stream exists in subscriptions but not in streams table (shouldn't happen)
|
||||||
|
body_text += f"- **{domain}** \n"
|
||||||
|
body_text += f" - Status: Unknown \n"
|
||||||
|
body_text += f" - Link: https://{domain}\n\n"
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Determine stream name (use domain as fallback)
|
||||||
|
stream_name = stream_state.name if stream_state.name else domain
|
||||||
|
safe_stream_name = sanitize_for_markdown(stream_name)
|
||||||
|
|
||||||
|
# Start building this stream's entry with stream name as main bullet
|
||||||
|
body_text += f"- **{safe_stream_name}** \n"
|
||||||
|
|
||||||
|
# Add title if stream is online (as a sub-bullet)
|
||||||
|
if stream_state.status == StreamStatus.ONLINE and stream_state.title:
|
||||||
|
safe_title = sanitize_for_markdown(stream_state.title)
|
||||||
|
body_text += f" - Title: {safe_title} \n"
|
||||||
|
|
||||||
|
# Determine status and duration (as a sub-bullet)
|
||||||
|
if stream_state.status == StreamStatus.ONLINE:
|
||||||
|
# Stream is online - use last_connect_time
|
||||||
|
if stream_state.last_connect_time:
|
||||||
|
duration = self._format_duration(stream_state.last_connect_time)
|
||||||
|
body_text += f" - Status: Online for {duration} \n"
|
||||||
|
else:
|
||||||
|
body_text += f" - Status: Online \n"
|
||||||
|
elif stream_state.status == StreamStatus.UNKNOWN:
|
||||||
|
# Stream status is unknown - instance unreachable
|
||||||
|
body_text += f" - Status: Unknown (instance unreachable) \n"
|
||||||
|
else:
|
||||||
|
# Stream is offline - use last_disconnect_time
|
||||||
|
if stream_state.last_disconnect_time:
|
||||||
|
duration = self._format_duration(stream_state.last_disconnect_time)
|
||||||
|
body_text += f" - Status: Offline for {duration} \n"
|
||||||
|
else:
|
||||||
|
body_text += f" - Status: Offline \n"
|
||||||
|
|
||||||
|
# Add stream link (as a sub-bullet)
|
||||||
|
body_text += f" - Link: https://{domain}\n\n"
|
||||||
|
|
||||||
|
# Add help text for unsubscribing
|
||||||
|
body_text += "\nTo unsubscribe from any of these Owncast instances, use `!unsubscribe <domain>`"
|
||||||
|
|
||||||
|
# Send the response as Markdown
|
||||||
|
await evt.reply(body_text, markdown=True)
|
||||||
|
|
||||||
|
async def live(self, evt: MessageEvent) -> None:
|
||||||
|
"""
|
||||||
|
"!live" command handler for listing only currently live streams in the current room.
|
||||||
|
|
||||||
|
:param evt: MessageEvent of the message calling the command.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Get all stream domains this room is subscribed to
|
||||||
|
subscribed_domains = await self.subscription_repo.get_subscribed_streams_for_room(
|
||||||
|
evt.room_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if there are no subscriptions
|
||||||
|
if not subscribed_domains:
|
||||||
|
await evt.reply("This room is not subscribed to any Owncast instances.\n\nTo subscribe to an Owncast instance, use `!subscribe <domain>`", markdown=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Filter for only live streams (exclude unknown status)
|
||||||
|
live_streams = []
|
||||||
|
for domain in subscribed_domains:
|
||||||
|
stream_state = await self.stream_repo.get_by_domain(domain)
|
||||||
|
if stream_state and stream_state.status == StreamStatus.ONLINE:
|
||||||
|
live_streams.append((domain, stream_state))
|
||||||
|
|
||||||
|
# Check if there are no live streams
|
||||||
|
if not live_streams:
|
||||||
|
await evt.reply("No subscribed Owncast instances are currently live.\n\nUse `!subscriptions` to list all subscriptions.", markdown=True)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Build the response message body as Markdown
|
||||||
|
body_text = f"**Live Owncast instances ({len(live_streams)}):**\n\n"
|
||||||
|
|
||||||
|
for domain, stream_state in live_streams:
|
||||||
|
# Determine stream name (use domain as fallback)
|
||||||
|
stream_name = stream_state.name if stream_state.name else domain
|
||||||
|
safe_stream_name = sanitize_for_markdown(stream_name)
|
||||||
|
|
||||||
|
# Start building this stream's entry with stream name as main bullet
|
||||||
|
body_text += f"- **{safe_stream_name}** \n"
|
||||||
|
|
||||||
|
# Add title (should be present for live streams)
|
||||||
|
if stream_state.title:
|
||||||
|
safe_title = sanitize_for_markdown(stream_state.title)
|
||||||
|
body_text += f" - Title: {safe_title} \n"
|
||||||
|
|
||||||
|
# Add status with duration
|
||||||
|
if stream_state.last_connect_time:
|
||||||
|
duration = self._format_duration(stream_state.last_connect_time)
|
||||||
|
body_text += f" - Online for {duration} \n"
|
||||||
|
|
||||||
|
# Add stream link
|
||||||
|
body_text += f" - Link: https://{domain}\n\n"
|
||||||
|
|
||||||
|
# Send the response as Markdown
|
||||||
|
await evt.reply(body_text.rstrip(), markdown=True)
|
||||||
209
owncastsentry/database.py
Normal file
209
owncastsentry/database.py
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
from typing import Optional, List
|
||||||
|
from mautrix.util.async_db import Database
|
||||||
|
|
||||||
|
from .models import StreamState
|
||||||
|
|
||||||
|
|
||||||
|
class StreamRepository:
|
||||||
|
"""Repository for managing stream data in the database."""
|
||||||
|
|
||||||
|
def __init__(self, database: Database):
|
||||||
|
"""
|
||||||
|
Initialize the stream repository.
|
||||||
|
|
||||||
|
:param database: The maubot database instance
|
||||||
|
"""
|
||||||
|
self.db = database
|
||||||
|
|
||||||
|
async def get_by_domain(self, domain: str) -> Optional[StreamState]:
|
||||||
|
"""
|
||||||
|
Get a stream's state by domain.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: StreamState if found, None otherwise
|
||||||
|
"""
|
||||||
|
query = "SELECT * FROM streams WHERE domain=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
row = await conn.fetchrow(query, domain)
|
||||||
|
return StreamState.from_db_row(row) if row else None
|
||||||
|
|
||||||
|
async def create(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Create a new stream entry in the database.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
query = "INSERT INTO streams (domain) VALUES ($1)"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(query, domain)
|
||||||
|
|
||||||
|
async def update(self, state: StreamState) -> None:
|
||||||
|
"""
|
||||||
|
Update a stream's state in the database.
|
||||||
|
|
||||||
|
:param state: The StreamState to save
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
query = """UPDATE streams
|
||||||
|
SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4
|
||||||
|
WHERE domain=$5"""
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(
|
||||||
|
query,
|
||||||
|
state.name,
|
||||||
|
state.title,
|
||||||
|
state.last_connect_time,
|
||||||
|
state.last_disconnect_time,
|
||||||
|
state.domain,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def exists(self, domain: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if a stream exists in the database.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: True if exists, False otherwise
|
||||||
|
"""
|
||||||
|
result = await self.get_by_domain(domain)
|
||||||
|
return result is not None
|
||||||
|
|
||||||
|
async def increment_failure_counter(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Increment the failure counter for a stream by 1.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
query = """UPDATE streams
|
||||||
|
SET failure_counter = failure_counter + 1
|
||||||
|
WHERE domain=$1"""
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(query, domain)
|
||||||
|
|
||||||
|
async def reset_failure_counter(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Reset the failure counter for a stream to 0.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
query = """UPDATE streams
|
||||||
|
SET failure_counter = 0
|
||||||
|
WHERE domain=$1"""
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(query, domain)
|
||||||
|
|
||||||
|
async def delete(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Delete a stream record from the database.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
query = "DELETE FROM streams WHERE domain=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(query, domain)
|
||||||
|
|
||||||
|
|
||||||
|
class SubscriptionRepository:
|
||||||
|
"""Repository for managing stream subscriptions in the database."""
|
||||||
|
|
||||||
|
def __init__(self, database: Database):
|
||||||
|
"""
|
||||||
|
Initialize the subscription repository.
|
||||||
|
|
||||||
|
:param database: The maubot database instance
|
||||||
|
"""
|
||||||
|
self.db = database
|
||||||
|
|
||||||
|
async def add(self, domain: str, room_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Add a subscription for a room to a stream.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:param room_id: The Matrix room ID
|
||||||
|
:return: Nothing
|
||||||
|
:raises: sqlite3.IntegrityError if subscription already exists
|
||||||
|
"""
|
||||||
|
query = "INSERT INTO subscriptions (stream_domain, room_id) VALUES ($1, $2)"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
await conn.execute(query, domain, room_id)
|
||||||
|
|
||||||
|
async def remove(self, domain: str, room_id: str) -> int:
|
||||||
|
"""
|
||||||
|
Remove a subscription for a room from a stream.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:param room_id: The Matrix room ID
|
||||||
|
:return: Number of rows deleted (0 or 1)
|
||||||
|
"""
|
||||||
|
query = "DELETE FROM subscriptions WHERE stream_domain=$1 AND room_id=$2"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
result = await conn.execute(query, domain, room_id)
|
||||||
|
return result.rowcount
|
||||||
|
|
||||||
|
async def get_subscribed_rooms(self, domain: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get all room IDs subscribed to a stream.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: List of room IDs
|
||||||
|
"""
|
||||||
|
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
results = await conn.fetch(query, domain)
|
||||||
|
return [row["room_id"] for row in results]
|
||||||
|
|
||||||
|
async def get_subscribed_streams_for_room(self, room_id: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get all stream domains that a room is subscribed to.
|
||||||
|
|
||||||
|
:param room_id: The Matrix room ID
|
||||||
|
:return: List of stream domains
|
||||||
|
"""
|
||||||
|
query = "SELECT stream_domain FROM subscriptions WHERE room_id=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
results = await conn.fetch(query, room_id)
|
||||||
|
return [row["stream_domain"] for row in results]
|
||||||
|
|
||||||
|
async def get_all_subscribed_domains(self) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get all unique stream domains that have at least one subscription.
|
||||||
|
|
||||||
|
:return: List of stream domains
|
||||||
|
"""
|
||||||
|
query = "SELECT DISTINCT stream_domain FROM subscriptions"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
results = await conn.fetch(query)
|
||||||
|
return [row["stream_domain"] for row in results]
|
||||||
|
|
||||||
|
async def count_by_domain(self, domain: str) -> int:
|
||||||
|
"""
|
||||||
|
Count the number of subscriptions for a given stream domain.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Number of subscriptions
|
||||||
|
"""
|
||||||
|
query = "SELECT COUNT(*) FROM subscriptions WHERE stream_domain=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
result = await conn.fetchrow(query, domain)
|
||||||
|
return result[0]
|
||||||
|
|
||||||
|
async def delete_all_for_domain(self, domain: str) -> int:
|
||||||
|
"""
|
||||||
|
Delete all subscriptions for a given stream domain.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Number of subscriptions deleted
|
||||||
|
"""
|
||||||
|
query = "DELETE FROM subscriptions WHERE stream_domain=$1"
|
||||||
|
async with self.db.acquire() as conn:
|
||||||
|
result = await conn.execute(query, domain)
|
||||||
|
return result.rowcount
|
||||||
93
owncastsentry/migrations.py
Normal file
93
owncastsentry/migrations.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
from mautrix.util.async_db import UpgradeTable, Connection
|
||||||
|
|
||||||
|
upgrade_table = UpgradeTable()
|
||||||
|
|
||||||
|
|
||||||
|
@upgrade_table.register(description="Initial revision")
|
||||||
|
async def upgrade_v1(conn: Connection) -> None:
|
||||||
|
"""
|
||||||
|
Runs migrations to upgrade database schema to verison 1 format.
|
||||||
|
Version 1 is the initial format of the database.
|
||||||
|
|
||||||
|
:param conn: A connection to run the v1 database migration on.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
await conn.execute(
|
||||||
|
"""CREATE TABLE "streams" (
|
||||||
|
"domain" TEXT NOT NULL UNIQUE,
|
||||||
|
"name" TEXT,
|
||||||
|
"title" TEXT,
|
||||||
|
"last_connect_time" TEXT,
|
||||||
|
"last_disconnect_time" TEXT,
|
||||||
|
PRIMARY KEY("domain")
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
await conn.execute(
|
||||||
|
"""CREATE TABLE "subscriptions" (
|
||||||
|
"stream_domain" INTEGER NOT NULL,
|
||||||
|
"room_id" TEXT NOT NULL,
|
||||||
|
UNIQUE("room_id","stream_domain")
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@upgrade_table.register(description="Fix stream_domain column type from INTEGER to TEXT")
|
||||||
|
async def upgrade_v2(conn: Connection) -> None:
|
||||||
|
"""
|
||||||
|
Runs migrations to upgrade database schema to version 2 format.
|
||||||
|
Version 2 fixes the stream_domain column type in subscriptions table from INTEGER to TEXT.
|
||||||
|
|
||||||
|
:param conn: A connection to run the v2 database migration on.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Create new subscriptions table with correct schema
|
||||||
|
await conn.execute(
|
||||||
|
"""CREATE TABLE "subscriptions_new" (
|
||||||
|
"stream_domain" TEXT NOT NULL,
|
||||||
|
"room_id" TEXT NOT NULL,
|
||||||
|
UNIQUE("room_id","stream_domain")
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Copy all existing data from old table to new table
|
||||||
|
await conn.execute(
|
||||||
|
"""INSERT INTO subscriptions_new (stream_domain, room_id)
|
||||||
|
SELECT stream_domain, room_id FROM subscriptions"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Drop the old table
|
||||||
|
await conn.execute("DROP TABLE subscriptions")
|
||||||
|
|
||||||
|
# Rename new table to original name
|
||||||
|
await conn.execute("ALTER TABLE subscriptions_new RENAME TO subscriptions")
|
||||||
|
|
||||||
|
|
||||||
|
@upgrade_table.register(description="Add failure_counter column for backoff and auto-cleanup")
|
||||||
|
async def upgrade_v3(conn: Connection) -> None:
|
||||||
|
"""
|
||||||
|
Runs migrations to upgrade database schema to version 3 format.
|
||||||
|
Version 3 adds the failure_counter column to track connection failures for backoff and auto-cleanup.
|
||||||
|
|
||||||
|
:param conn: A connection to run the v3 database migration on.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Add failure_counter column with default value of 0
|
||||||
|
await conn.execute(
|
||||||
|
"""ALTER TABLE streams ADD COLUMN failure_counter INTEGER DEFAULT 0"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_upgrade_table() -> UpgradeTable:
|
||||||
|
"""
|
||||||
|
Helper function for retrieving the upgrade table.
|
||||||
|
|
||||||
|
:return: The upgrade table with registered migrations.
|
||||||
|
"""
|
||||||
|
return upgrade_table
|
||||||
110
owncastsentry/models.py
Normal file
110
owncastsentry/models.py
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from .utils import (
|
||||||
|
MAX_INSTANCE_TITLE_LENGTH,
|
||||||
|
MAX_STREAM_TITLE_LENGTH,
|
||||||
|
MAX_TAG_LENGTH,
|
||||||
|
UNKNOWN_STATUS_THRESHOLD,
|
||||||
|
truncate,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class StreamStatus(Enum):
|
||||||
|
"""Represents the status of a stream."""
|
||||||
|
|
||||||
|
ONLINE = "online"
|
||||||
|
OFFLINE = "offline"
|
||||||
|
UNKNOWN = "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamState:
|
||||||
|
"""Represents the state of an Owncast stream."""
|
||||||
|
|
||||||
|
domain: str
|
||||||
|
name: Optional[str] = None
|
||||||
|
title: Optional[str] = None
|
||||||
|
last_connect_time: Optional[str] = None
|
||||||
|
last_disconnect_time: Optional[str] = None
|
||||||
|
failure_counter: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self) -> StreamStatus:
|
||||||
|
"""Returns the stream status considering failure_counter."""
|
||||||
|
if self.failure_counter > UNKNOWN_STATUS_THRESHOLD:
|
||||||
|
return StreamStatus.UNKNOWN
|
||||||
|
elif self.last_connect_time is not None:
|
||||||
|
return StreamStatus.ONLINE
|
||||||
|
else:
|
||||||
|
return StreamStatus.OFFLINE
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_api_response(cls, response: dict, domain: str) -> "StreamState":
|
||||||
|
"""
|
||||||
|
Creates a StreamState from an API response.
|
||||||
|
|
||||||
|
:param response: API response as a dictionary (camelCase keys)
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: StreamState instance
|
||||||
|
"""
|
||||||
|
return cls(
|
||||||
|
domain=domain,
|
||||||
|
title=truncate(response.get("streamTitle", ""), MAX_STREAM_TITLE_LENGTH),
|
||||||
|
last_connect_time=response.get("lastConnectTime"),
|
||||||
|
last_disconnect_time=response.get("lastDisconnectTime"),
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_db_row(cls, row: dict) -> "StreamState":
|
||||||
|
"""
|
||||||
|
Creates a StreamState from a database row.
|
||||||
|
|
||||||
|
:param row: Database row as a dictionary
|
||||||
|
:return: StreamState instance
|
||||||
|
"""
|
||||||
|
return cls(
|
||||||
|
domain=row["domain"],
|
||||||
|
name=row["name"],
|
||||||
|
title=row["title"],
|
||||||
|
last_connect_time=row["last_connect_time"],
|
||||||
|
last_disconnect_time=row["last_disconnect_time"],
|
||||||
|
failure_counter=row["failure_counter"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamConfig:
|
||||||
|
"""Represents the configuration of an Owncast stream."""
|
||||||
|
|
||||||
|
name: str = ""
|
||||||
|
tags: List[str] = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
"""Initialize default values after dataclass initialization."""
|
||||||
|
if self.tags is None:
|
||||||
|
self.tags = []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_api_response(cls, response: dict) -> "StreamConfig":
|
||||||
|
"""
|
||||||
|
Creates a StreamConfig from an API response.
|
||||||
|
|
||||||
|
:param response: API response as a dictionary
|
||||||
|
:return: StreamConfig instance
|
||||||
|
"""
|
||||||
|
# Truncate instance name to max length
|
||||||
|
name = truncate(response.get("name", ""), MAX_INSTANCE_TITLE_LENGTH)
|
||||||
|
|
||||||
|
# Truncate each tag to max length
|
||||||
|
raw_tags = response.get("tags", [])
|
||||||
|
tags = [truncate(tag, MAX_TAG_LENGTH) for tag in raw_tags]
|
||||||
|
|
||||||
|
return cls(name=name, tags=tags)
|
||||||
250
owncastsentry/notification_service.py
Normal file
250
owncastsentry/notification_service.py
Normal file
@@ -0,0 +1,250 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from mautrix.types import TextMessageEventContent, MessageType
|
||||||
|
|
||||||
|
from .database import SubscriptionRepository
|
||||||
|
from .utils import SECONDS_BETWEEN_NOTIFICATIONS, sanitize_for_markdown
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationService:
|
||||||
|
"""Service for sending Matrix notifications about stream events."""
|
||||||
|
|
||||||
|
def __init__(self, client, subscription_repo: SubscriptionRepository, logger):
|
||||||
|
"""
|
||||||
|
Initialize the notification service.
|
||||||
|
|
||||||
|
:param client: The Matrix client for sending messages
|
||||||
|
:param subscription_repo: Repository for managing subscriptions
|
||||||
|
:param logger: Logger instance
|
||||||
|
"""
|
||||||
|
self.client = client
|
||||||
|
self.subscription_repo = subscription_repo
|
||||||
|
self.log = logger
|
||||||
|
|
||||||
|
# Cache for tracking when notifications were last sent
|
||||||
|
self.notification_timers_cache = {}
|
||||||
|
|
||||||
|
async def notify_stream_live(
|
||||||
|
self,
|
||||||
|
domain: str,
|
||||||
|
name: str,
|
||||||
|
title: str,
|
||||||
|
tags: List[str],
|
||||||
|
title_change: bool = False,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Sends notifications to rooms with subscriptions to the provided stream domain.
|
||||||
|
|
||||||
|
:param domain: The domain of the stream to send notifications for.
|
||||||
|
:param name: The name of the stream to include in the message.
|
||||||
|
:param title: The title of the stream to include in the message.
|
||||||
|
:param tags: List of stream tags to include in the message.
|
||||||
|
:param title_change: Whether or not this is for a stream changing its title rather than going live.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Has enough time passed since the last notification was sent?
|
||||||
|
if not self._can_notify(domain):
|
||||||
|
seconds_since_last = round(
|
||||||
|
time.time() - self.notification_timers_cache[domain]
|
||||||
|
)
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Only {seconds_since_last} of required {SECONDS_BETWEEN_NOTIFICATIONS} seconds has passed since last notification."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Record that we're sending a notification now
|
||||||
|
self._record_notification(domain)
|
||||||
|
|
||||||
|
# Get a list of room IDs with active subscriptions to the stream domain
|
||||||
|
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
|
||||||
|
|
||||||
|
# Build the notification message
|
||||||
|
body_text = self._format_message(name, title, domain, tags, title_change)
|
||||||
|
|
||||||
|
# Set up counters for statistics
|
||||||
|
successful_notifications = 0
|
||||||
|
failed_notifications = 0
|
||||||
|
|
||||||
|
# Send notifications to all subscribed rooms in parallel
|
||||||
|
# IMPROVEMENT: Parallel notification delivery with asyncio.gather (was a TODO in original code)
|
||||||
|
tasks = []
|
||||||
|
for room_id in room_ids:
|
||||||
|
tasks.append(self._send_notification(room_id, body_text, domain))
|
||||||
|
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Count successes and failures
|
||||||
|
for result in results:
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
failed_notifications += 1
|
||||||
|
else:
|
||||||
|
successful_notifications += 1
|
||||||
|
|
||||||
|
# Log completion
|
||||||
|
notification_type = "title change" if title_change else "going live"
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Completed sending {notification_type} notifications! {successful_notifications} succeeded, {failed_notifications} failed."
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _send_notification(
|
||||||
|
self, room_id: str, body_text: str, domain: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Send a notification to a single room.
|
||||||
|
|
||||||
|
:param room_id: The Matrix room ID to send to
|
||||||
|
:param body_text: The message body text
|
||||||
|
:param domain: The stream domain (for logging)
|
||||||
|
:return: Nothing
|
||||||
|
:raises: Exception if sending fails
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text)
|
||||||
|
await self.client.send_message(room_id, content)
|
||||||
|
except Exception as exception:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _format_message(
|
||||||
|
self, name: str, title: str, domain: str, tags: List[str], title_change: bool
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Format the notification message body.
|
||||||
|
|
||||||
|
:param name: The stream name
|
||||||
|
:param title: The stream title
|
||||||
|
:param domain: The stream domain
|
||||||
|
:param tags: List of stream tags
|
||||||
|
:param title_change: Whether this is a title change notification
|
||||||
|
:return: Formatted message body
|
||||||
|
"""
|
||||||
|
# Use name if available, fallback to domain
|
||||||
|
stream_name = name if name else domain
|
||||||
|
safe_stream_name = sanitize_for_markdown(stream_name)
|
||||||
|
|
||||||
|
# Choose message based on notification type
|
||||||
|
if title_change:
|
||||||
|
body_text = "📝 " + safe_stream_name + " has changed its stream title!"
|
||||||
|
else:
|
||||||
|
body_text = "🎥 " + safe_stream_name + " is now live!"
|
||||||
|
|
||||||
|
# Add title if present
|
||||||
|
if title != "":
|
||||||
|
safe_title = sanitize_for_markdown(title)
|
||||||
|
body_text += "\nStream Title: " + safe_title
|
||||||
|
|
||||||
|
# Add stream URL
|
||||||
|
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
||||||
|
|
||||||
|
# Add tags if present
|
||||||
|
if tags and len(tags) > 0:
|
||||||
|
safe_tags = []
|
||||||
|
for tag in tags:
|
||||||
|
safe_tag = sanitize_for_markdown(tag)
|
||||||
|
if safe_tag and not safe_tag.startswith('.'):
|
||||||
|
safe_tags.append(safe_tag)
|
||||||
|
|
||||||
|
if safe_tags:
|
||||||
|
body_text += "\n\n"
|
||||||
|
body_text += " ".join("#" + tag for tag in safe_tags)
|
||||||
|
|
||||||
|
return body_text
|
||||||
|
|
||||||
|
def _can_notify(self, domain: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if enough time has passed to send another notification.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: True if notification can be sent, False otherwise
|
||||||
|
"""
|
||||||
|
if domain not in self.notification_timers_cache:
|
||||||
|
return True
|
||||||
|
|
||||||
|
seconds_since_last = round(time.time() - self.notification_timers_cache[domain])
|
||||||
|
return seconds_since_last >= SECONDS_BETWEEN_NOTIFICATIONS
|
||||||
|
|
||||||
|
def _record_notification(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Record that a notification was sent at the current time.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
self.notification_timers_cache[domain] = time.time()
|
||||||
|
|
||||||
|
async def send_cleanup_warning(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Send 83-day warning notification to all subscribed rooms.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
# Get all subscribed rooms
|
||||||
|
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
|
||||||
|
|
||||||
|
# Build the warning message
|
||||||
|
body_text = (
|
||||||
|
f"⚠️ Warning: Subscription Cleanup Scheduled\n\n"
|
||||||
|
f"The Owncast instance at {domain} has been unreachable for 83 days. "
|
||||||
|
f"If it remains unreachable for 7 more days (90 days total), this "
|
||||||
|
f"subscription will be automatically removed."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send to all rooms in parallel
|
||||||
|
tasks = []
|
||||||
|
for room_id in room_ids:
|
||||||
|
tasks.append(self._send_notification(room_id, body_text, domain))
|
||||||
|
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Count successes and failures
|
||||||
|
successful = sum(1 for r in results if not isinstance(r, Exception))
|
||||||
|
failed = sum(1 for r in results if isinstance(r, Exception))
|
||||||
|
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Sent cleanup warning to {successful} rooms ({failed} failed)."
|
||||||
|
)
|
||||||
|
|
||||||
|
async def send_cleanup_deletion(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Send 90-day deletion notification to all subscribed rooms.
|
||||||
|
|
||||||
|
:param domain: The stream domain
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
# Get all subscribed rooms
|
||||||
|
room_ids = await self.subscription_repo.get_subscribed_rooms(domain)
|
||||||
|
|
||||||
|
# Build the deletion message
|
||||||
|
body_text = (
|
||||||
|
f"🗑️ Subscription Automatically Removed\n\n"
|
||||||
|
f"The Owncast instance at {domain} has been unreachable for 90 days "
|
||||||
|
f"and has been automatically removed from subscriptions in this room.\n\n"
|
||||||
|
f"If the instance comes online again and you want to resubscribe, "
|
||||||
|
f"run `!subscribe {domain}`."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Send to all rooms in parallel
|
||||||
|
tasks = []
|
||||||
|
for room_id in room_ids:
|
||||||
|
tasks.append(self._send_notification(room_id, body_text, domain))
|
||||||
|
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# Count successes and failures
|
||||||
|
successful = sum(1 for r in results if not isinstance(r, Exception))
|
||||||
|
failed = sum(1 for r in results if isinstance(r, Exception))
|
||||||
|
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Sent cleanup deletion notice to {successful} rooms ({failed} failed)."
|
||||||
|
)
|
||||||
145
owncastsentry/owncast_client.py
Normal file
145
owncastsentry/owncast_client.py
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
import json
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .models import StreamConfig, StreamState
|
||||||
|
from .utils import OWNCAST_STATUS_PATH, OWNCAST_CONFIG_PATH, USER_AGENT
|
||||||
|
|
||||||
|
|
||||||
|
class OwncastClient:
|
||||||
|
"""HTTP client for communicating with Owncast instances."""
|
||||||
|
|
||||||
|
def __init__(self, logger):
|
||||||
|
"""
|
||||||
|
Initialize the Owncast client with an HTTP session.
|
||||||
|
|
||||||
|
:param logger: Logger instance for debugging
|
||||||
|
"""
|
||||||
|
self.log = logger
|
||||||
|
|
||||||
|
# Set up HTTP session configuration
|
||||||
|
headers = {"User-Agent": USER_AGENT}
|
||||||
|
cookie_jar = aiohttp.DummyCookieJar()
|
||||||
|
connector = aiohttp.TCPConnector(
|
||||||
|
use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
|
||||||
|
)
|
||||||
|
timeout = aiohttp.ClientTimeout(sock_connect=5, sock_read=5)
|
||||||
|
|
||||||
|
self.session = aiohttp.ClientSession(
|
||||||
|
headers=headers, cookie_jar=cookie_jar, timeout=timeout, connector=connector
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_stream_state(self, domain: str) -> Optional[StreamState]:
|
||||||
|
"""
|
||||||
|
Get the current stream state for a given domain.
|
||||||
|
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||||
|
|
||||||
|
:param domain: The domain (not URL) where the stream is hosted.
|
||||||
|
:return: A StreamState with stream state if available, None if an error occurred.
|
||||||
|
"""
|
||||||
|
self.log.debug(f"[{domain}] Fetching current stream state...")
|
||||||
|
status_url = "https://" + domain + OWNCAST_STATUS_PATH
|
||||||
|
|
||||||
|
# Make a request to the endpoint
|
||||||
|
try:
|
||||||
|
response = await self.session.request(
|
||||||
|
"GET", status_url, allow_redirects=False
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check the response code is success
|
||||||
|
if response.status != 200:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try and interpret the response as JSON
|
||||||
|
try:
|
||||||
|
new_state = json.loads(await response.read())
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Validate the response to ensure it contains all the basic info needed to function
|
||||||
|
required_fields = [
|
||||||
|
"lastConnectTime",
|
||||||
|
"lastDisconnectTime",
|
||||||
|
"streamTitle",
|
||||||
|
"online",
|
||||||
|
]
|
||||||
|
for field in required_fields:
|
||||||
|
if field not in new_state:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have {field} parameter."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
return StreamState.from_api_response(new_state, domain)
|
||||||
|
|
||||||
|
async def get_stream_config(self, domain: str) -> Optional[StreamConfig]:
|
||||||
|
"""
|
||||||
|
Get the current stream config for a given domain.
|
||||||
|
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||||
|
|
||||||
|
:param domain: The domain (not URL) where the stream is hosted.
|
||||||
|
:return: A StreamConfig with the stream's configuration, or None if fetch failed.
|
||||||
|
"""
|
||||||
|
self.log.debug(f"[{domain}] Fetching current stream config...")
|
||||||
|
config_url = "https://" + domain + OWNCAST_CONFIG_PATH
|
||||||
|
|
||||||
|
# Make a request to the endpoint
|
||||||
|
try:
|
||||||
|
response = await self.session.request(
|
||||||
|
"GET", config_url, allow_redirects=False
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Error making GET request to {OWNCAST_CONFIG_PATH}: {e}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check the response code is success
|
||||||
|
if response.status != 200:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Response to request on {OWNCAST_CONFIG_PATH} was not 200, got {response.status} instead."
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try and interpret the response as JSON
|
||||||
|
try:
|
||||||
|
config = json.loads(await response.read())
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_CONFIG_PATH} as could not be interpreted as JSON: {e}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create StreamConfig with validated fields
|
||||||
|
return StreamConfig.from_api_response(config)
|
||||||
|
|
||||||
|
async def validate_instance(self, domain: str) -> bool:
|
||||||
|
"""
|
||||||
|
Validate that a domain is a valid Owncast instance.
|
||||||
|
|
||||||
|
:param domain: The domain to validate
|
||||||
|
:return: True if valid Owncast instance, False otherwise
|
||||||
|
"""
|
||||||
|
state = await self.get_stream_state(domain)
|
||||||
|
return state is not None
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
"""Close the HTTP session."""
|
||||||
|
await self.session.close()
|
||||||
295
owncastsentry/stream_monitor.py
Normal file
295
owncastsentry/stream_monitor.py
Normal file
@@ -0,0 +1,295 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
|
from .owncast_client import OwncastClient
|
||||||
|
from .database import StreamRepository, SubscriptionRepository
|
||||||
|
from .notification_service import NotificationService
|
||||||
|
from .models import StreamState
|
||||||
|
from .utils import (
|
||||||
|
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN,
|
||||||
|
CLEANUP_WARNING_THRESHOLD,
|
||||||
|
CLEANUP_DELETE_THRESHOLD,
|
||||||
|
should_query_stream,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class StreamMonitor:
|
||||||
|
"""Monitors Owncast streams and detects state changes."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
owncast_client: OwncastClient,
|
||||||
|
stream_repo: StreamRepository,
|
||||||
|
notification_service: NotificationService,
|
||||||
|
logger,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the stream monitor.
|
||||||
|
|
||||||
|
:param owncast_client: Client for making API calls to Owncast instances
|
||||||
|
:param stream_repo: Repository for stream data
|
||||||
|
:param notification_service: Service for sending notifications
|
||||||
|
:param logger: Logger instance
|
||||||
|
"""
|
||||||
|
self.owncast_client = owncast_client
|
||||||
|
self.stream_repo = stream_repo
|
||||||
|
self.notification_service = notification_service
|
||||||
|
self.log = logger
|
||||||
|
|
||||||
|
# Cache for tracking when streams last went offline
|
||||||
|
self.offline_timer_cache = {}
|
||||||
|
|
||||||
|
async def update_all_streams(self, subscribed_domains: list[str]) -> None:
|
||||||
|
"""
|
||||||
|
Checks the status of all streams with active subscriptions.
|
||||||
|
Updates for all streams are performed asynchronously, with the method returning when the slowest update completes.
|
||||||
|
|
||||||
|
:param subscribed_domains: List of stream domains to update
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
self.log.debug("Updating all stream states...")
|
||||||
|
|
||||||
|
# Build a list of async tasks which update the state for each stream domain
|
||||||
|
tasks = []
|
||||||
|
for domain in subscribed_domains:
|
||||||
|
tasks.append(asyncio.create_task(self.update_stream(domain)))
|
||||||
|
|
||||||
|
# Run the tasks in parallel
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
self.log.debug("Update complete.")
|
||||||
|
|
||||||
|
async def update_stream(self, domain: str) -> None:
|
||||||
|
"""
|
||||||
|
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||||
|
Implements progressive backoff for connection failures and auto-cleanup for dead instances.
|
||||||
|
|
||||||
|
:param domain: The domain of the stream to update.
|
||||||
|
:return: Nothing.
|
||||||
|
"""
|
||||||
|
# Fetch the current stream state from database to check failure_counter
|
||||||
|
old_state = await self.stream_repo.get_by_domain(domain)
|
||||||
|
failure_counter = old_state.failure_counter if old_state else 0
|
||||||
|
|
||||||
|
# Check if we should query this stream based on backoff schedule
|
||||||
|
if not should_query_stream(failure_counter):
|
||||||
|
# Skip this cycle, increment counter to track time passage
|
||||||
|
await self.stream_repo.increment_failure_counter(domain)
|
||||||
|
self.log.debug(
|
||||||
|
f"[{domain}] Skipping query due to backoff (counter={failure_counter + 1})"
|
||||||
|
)
|
||||||
|
# Check cleanup thresholds even when skipping query
|
||||||
|
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||||
|
return
|
||||||
|
|
||||||
|
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||||
|
first_update = False
|
||||||
|
|
||||||
|
# A flag indicating whether to update the stream's state in the database.
|
||||||
|
# Used to avoid writing to the database when a stream's state hasn't changed at all.
|
||||||
|
update_database = False
|
||||||
|
|
||||||
|
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
|
||||||
|
stream_config = None
|
||||||
|
|
||||||
|
# Fetch the latest stream state from the server
|
||||||
|
new_state = await self.owncast_client.get_stream_state(domain)
|
||||||
|
|
||||||
|
# If the fetch failed, increment failure counter and skip the update
|
||||||
|
if new_state is None:
|
||||||
|
await self.stream_repo.increment_failure_counter(domain)
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Connection failure (counter={failure_counter + 1})"
|
||||||
|
)
|
||||||
|
# Check cleanup thresholds after connection failure
|
||||||
|
await self._check_cleanup_thresholds(domain, failure_counter + 1)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Fetch succeeded! Reset failure counter
|
||||||
|
await self.stream_repo.reset_failure_counter(domain)
|
||||||
|
|
||||||
|
# Fix possible race conditions with timers
|
||||||
|
if domain not in self.offline_timer_cache:
|
||||||
|
self.offline_timer_cache[domain] = 0
|
||||||
|
if domain not in self.notification_service.notification_timers_cache:
|
||||||
|
self.notification_service.notification_timers_cache[domain] = 0
|
||||||
|
|
||||||
|
# Does the last known stream state not have a value for the last connect and disconnect time?
|
||||||
|
if (
|
||||||
|
old_state.last_connect_time is None
|
||||||
|
and old_state.last_disconnect_time is None
|
||||||
|
):
|
||||||
|
# Yes, this is the first update. Don't send any notifications.
|
||||||
|
update_database = True
|
||||||
|
first_update = True
|
||||||
|
|
||||||
|
# Does the latest stream state have a last connect time and the old state not have one?
|
||||||
|
if (
|
||||||
|
new_state.last_connect_time is not None
|
||||||
|
and old_state.last_connect_time is None
|
||||||
|
):
|
||||||
|
# Yes! This stream is now live.
|
||||||
|
update_database = True
|
||||||
|
stream_config = await self.owncast_client.get_stream_config(domain)
|
||||||
|
|
||||||
|
self.log.info(f"[{domain}] Stream is now live!")
|
||||||
|
|
||||||
|
# Calculate how many seconds since the stream last went offline
|
||||||
|
seconds_since_last_offline = round(
|
||||||
|
time.time() - self.offline_timer_cache[domain]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Have we queried this stream before? (In other words, is this not the first state update ever?)
|
||||||
|
if not first_update:
|
||||||
|
# Use fallback values if config fetch failed
|
||||||
|
stream_name = stream_config.name if stream_config else domain
|
||||||
|
stream_tags = stream_config.tags if stream_config else []
|
||||||
|
|
||||||
|
# Yes. Has this stream been offline for a short amount of time?
|
||||||
|
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
|
||||||
|
# Yes. Did the stream title change?
|
||||||
|
if old_state.title != new_state.title:
|
||||||
|
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
|
||||||
|
await self.notification_service.notify_stream_live(
|
||||||
|
domain,
|
||||||
|
stream_name,
|
||||||
|
new_state.title,
|
||||||
|
stream_tags,
|
||||||
|
title_change=True,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# This stream has been offline for a while. Send a normal notification.
|
||||||
|
await self.notification_service.notify_stream_live(
|
||||||
|
domain,
|
||||||
|
stream_name,
|
||||||
|
new_state.title,
|
||||||
|
stream_tags,
|
||||||
|
title_change=False,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No, this is the first time we're querying
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. This is the first state update for this stream."
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
new_state.last_connect_time is not None
|
||||||
|
and old_state.last_connect_time is not None
|
||||||
|
):
|
||||||
|
# Did the stream title change mid-session?
|
||||||
|
if old_state.title != new_state.title:
|
||||||
|
self.log.info(f"[{domain}] Stream title was changed!")
|
||||||
|
update_database = True
|
||||||
|
stream_config = await self.owncast_client.get_stream_config(domain)
|
||||||
|
|
||||||
|
# Use fallback values if config fetch failed
|
||||||
|
stream_name = stream_config.name if stream_config else domain
|
||||||
|
stream_tags = stream_config.tags if stream_config else []
|
||||||
|
|
||||||
|
# This is a fun case to account for... Let's try and explain this.
|
||||||
|
# Was the last time this stream sent a notification before it last went offline?
|
||||||
|
if (
|
||||||
|
self.offline_timer_cache[domain]
|
||||||
|
> self.notification_service.notification_timers_cache[domain]
|
||||||
|
):
|
||||||
|
# Yes. Send a regular go live notification.
|
||||||
|
# Why? A title change notification could be confusing to users in this case.
|
||||||
|
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
|
||||||
|
await self.notification_service.notify_stream_live(
|
||||||
|
domain,
|
||||||
|
stream_name,
|
||||||
|
new_state.title,
|
||||||
|
stream_tags,
|
||||||
|
title_change=False,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No. Send a normal title change notification.
|
||||||
|
await self.notification_service.notify_stream_live(
|
||||||
|
domain,
|
||||||
|
stream_name,
|
||||||
|
new_state.title,
|
||||||
|
stream_tags,
|
||||||
|
title_change=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Does the latest stream state no longer have a last connect time but the old state does?
|
||||||
|
elif (
|
||||||
|
new_state.last_connect_time is None
|
||||||
|
and old_state.last_connect_time is not None
|
||||||
|
):
|
||||||
|
# Yep. This stream is now offline. Log it.
|
||||||
|
update_database = True
|
||||||
|
self.offline_timer_cache[domain] = time.time()
|
||||||
|
if first_update:
|
||||||
|
self.log.info(f"[{domain}] Stream is offline.")
|
||||||
|
else:
|
||||||
|
self.log.info(f"[{domain}] Stream is now offline.")
|
||||||
|
|
||||||
|
# Update the database with the current stream state, if needed.
|
||||||
|
if update_database:
|
||||||
|
# Ensure we have the stream config before updating the database
|
||||||
|
if stream_config is None:
|
||||||
|
stream_config = await self.owncast_client.get_stream_config(domain)
|
||||||
|
|
||||||
|
# Use fallback value if config fetch failed
|
||||||
|
stream_name = stream_config.name if stream_config else ""
|
||||||
|
|
||||||
|
self.log.debug(f"[{domain}] Updating stream state in database...")
|
||||||
|
|
||||||
|
# Create updated state object (title already truncated in new_state)
|
||||||
|
updated_state = StreamState(
|
||||||
|
domain=domain,
|
||||||
|
name=stream_name,
|
||||||
|
title=new_state.title,
|
||||||
|
last_connect_time=new_state.last_connect_time,
|
||||||
|
last_disconnect_time=new_state.last_disconnect_time,
|
||||||
|
)
|
||||||
|
|
||||||
|
await self.stream_repo.update(updated_state)
|
||||||
|
|
||||||
|
# All done.
|
||||||
|
self.log.debug(f"[{domain}] State update completed.")
|
||||||
|
|
||||||
|
async def _check_cleanup_thresholds(self, domain: str, counter: int) -> None:
|
||||||
|
"""
|
||||||
|
Check if a domain has hit cleanup warning or deletion thresholds.
|
||||||
|
|
||||||
|
:param domain: The domain to check
|
||||||
|
:param counter: The current failure counter value
|
||||||
|
:return: Nothing
|
||||||
|
"""
|
||||||
|
# Check for 83-day warning threshold
|
||||||
|
if counter == CLEANUP_WARNING_THRESHOLD:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Reached 83-day warning threshold. Sending cleanup warning."
|
||||||
|
)
|
||||||
|
await self.notification_service.send_cleanup_warning(domain)
|
||||||
|
|
||||||
|
# Check for 90-day deletion threshold
|
||||||
|
if counter >= CLEANUP_DELETE_THRESHOLD:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Reached 90-day deletion threshold. Removing all subscriptions."
|
||||||
|
)
|
||||||
|
# Send deletion notification
|
||||||
|
await self.notification_service.send_cleanup_deletion(domain)
|
||||||
|
|
||||||
|
# Delete all subscriptions for this domain
|
||||||
|
subscription_repo = SubscriptionRepository(self.stream_repo.db)
|
||||||
|
deleted_count = await subscription_repo.delete_all_for_domain(domain)
|
||||||
|
|
||||||
|
# Delete the stream record
|
||||||
|
await self.stream_repo.delete(domain)
|
||||||
|
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Cleanup complete. Deleted {deleted_count} subscriptions and stream record."
|
||||||
|
)
|
||||||
185
owncastsentry/utils.py
Normal file
185
owncastsentry/utils.py
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
# Copyright 2026 Logan Fick
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||||
|
|
||||||
|
import re
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
# Path to the GetStatus API call on Owncast instances
|
||||||
|
OWNCAST_STATUS_PATH = "/api/status"
|
||||||
|
|
||||||
|
# Path to GetWebConfig API call on Owncast instances
|
||||||
|
OWNCAST_CONFIG_PATH = "/api/config"
|
||||||
|
|
||||||
|
# User agent to send with all HTTP requests.
|
||||||
|
USER_AGENT = (
|
||||||
|
"OwncastSentry/1.1.0 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Hard minimum amount of time between when notifications can be sent for a stream. Prevents spamming notifications for glitchy or malicious streams.
|
||||||
|
SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
|
||||||
|
|
||||||
|
# I'm not sure the best way to name or explain this variable, so let's just say what uses it:
|
||||||
|
#
|
||||||
|
# After a stream goes offline, a timer is started. Then, ...
|
||||||
|
# - If a stream comes back online with the same title within this time, no notification is sent.
|
||||||
|
# - If a stream comes back online with a different title, a rename notification is sent.
|
||||||
|
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
|
||||||
|
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||||
|
|
||||||
|
# Counter thresholds for auto-cleanup (based on 60-second polling intervals)
|
||||||
|
CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days
|
||||||
|
CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days
|
||||||
|
|
||||||
|
# Failure counter threshold for treating stream status as "unknown"
|
||||||
|
UNKNOWN_STATUS_THRESHOLD = 15
|
||||||
|
|
||||||
|
# Maximum field lengths based on Owncast's configuration
|
||||||
|
# Source: https://github.com/owncast/owncast/blob/master/web/utils/config-constants.tsx
|
||||||
|
MAX_INSTANCE_TITLE_LENGTH = 255 # Server Name (line 81)
|
||||||
|
MAX_STREAM_TITLE_LENGTH = 100 # Stream Title (line 91)
|
||||||
|
MAX_TAG_LENGTH = 24 # Per tag (line 208)
|
||||||
|
|
||||||
|
|
||||||
|
def should_query_stream(failure_counter: int) -> bool:
|
||||||
|
"""
|
||||||
|
Determine if a stream should be queried based on its failure counter.
|
||||||
|
Implements progressive backoff: 60s (5min) -> 2min (5min) -> 3min (5min) -> 5min (15min) -> 15min.
|
||||||
|
|
||||||
|
:param failure_counter: The current failure counter value
|
||||||
|
:return: True if the stream should be queried this cycle, False otherwise
|
||||||
|
"""
|
||||||
|
if failure_counter <= 4:
|
||||||
|
# Query every 60s for first 5 minutes (counters 0-4)
|
||||||
|
return True
|
||||||
|
elif failure_counter <= 9:
|
||||||
|
# Query every 2 minutes for next 5 minutes (counters 5-9)
|
||||||
|
return (failure_counter * 60) % 120 == 0
|
||||||
|
elif failure_counter <= 14:
|
||||||
|
# Query every 3 minutes for next 5 minutes (counters 10-14)
|
||||||
|
return (failure_counter * 60) % 180 == 0
|
||||||
|
elif failure_counter <= 29:
|
||||||
|
# Query every 5 minutes for next 15 minutes (counters 15-29)
|
||||||
|
return (failure_counter * 60) % 300 == 0
|
||||||
|
else:
|
||||||
|
# Query every 15 minutes after 30 minutes (counter 30+)
|
||||||
|
return (failure_counter * 60) % 900 == 0
|
||||||
|
|
||||||
|
|
||||||
|
def domainify(url: str) -> str:
|
||||||
|
"""
|
||||||
|
Extract and sanitize a domain from user input.
|
||||||
|
|
||||||
|
Handles URLs, bare domains, and email-style input (user@domain).
|
||||||
|
Only allows valid domain characters (alphanumeric, hyphens, periods).
|
||||||
|
|
||||||
|
:param url: URL, domain, or email-style string
|
||||||
|
:return: Sanitized domain
|
||||||
|
"""
|
||||||
|
# Handle email-style format first (e.g., "notify@stream.logal.dev")
|
||||||
|
if "@" in url:
|
||||||
|
url = url.split("@")[-1]
|
||||||
|
|
||||||
|
# Prepend // if no scheme so urlparse treats input as netloc
|
||||||
|
if not url.startswith(('http://', 'https://', '//')):
|
||||||
|
url = '//' + url
|
||||||
|
|
||||||
|
parsed = urlparse(url)
|
||||||
|
domain = (parsed.netloc or parsed.path).lower()
|
||||||
|
|
||||||
|
# Strip port and path
|
||||||
|
domain = domain.split(':')[0].split('/')[0]
|
||||||
|
|
||||||
|
# Allow only valid domain characters
|
||||||
|
return re.sub(r'[^a-z0-9.-]', '', domain).strip('.-')
|
||||||
|
|
||||||
|
|
||||||
|
def truncate(text: str, max_length: int) -> str:
|
||||||
|
"""
|
||||||
|
Truncate text to a maximum length.
|
||||||
|
|
||||||
|
:param text: The text to truncate
|
||||||
|
:param max_length: Maximum allowed length
|
||||||
|
:return: Truncated text, or original if within limit
|
||||||
|
"""
|
||||||
|
if not text or len(text) <= max_length:
|
||||||
|
return text
|
||||||
|
return text[:max_length]
|
||||||
|
|
||||||
|
|
||||||
|
def escape_markdown(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Escape Markdown special characters to prevent injection attacks.
|
||||||
|
|
||||||
|
This function sanitizes untrusted external input (like stream names and titles)
|
||||||
|
before embedding them in Markdown-formatted messages. It prevents malicious
|
||||||
|
actors from injecting arbitrary Markdown/HTML content.
|
||||||
|
|
||||||
|
:param text: The text to escape
|
||||||
|
:return: The escaped text safe for Markdown rendering
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Escape Markdown special characters by prefixing with backslash
|
||||||
|
# Covers: formatting (*_~`), links ([]()), headings (#), lists (-+),
|
||||||
|
# blockquotes (>), code blocks (```), and other special characters
|
||||||
|
special_chars = {
|
||||||
|
'\\': '\\\\', # Backslash must be first to avoid double-escaping
|
||||||
|
'*': '\\*',
|
||||||
|
'_': '\\_',
|
||||||
|
'[': '\\[',
|
||||||
|
']': '\\]',
|
||||||
|
'(': '\\(',
|
||||||
|
')': '\\)',
|
||||||
|
'~': '\\~',
|
||||||
|
'`': '\\`',
|
||||||
|
'#': '\\#',
|
||||||
|
'+': '\\+',
|
||||||
|
'-': '\\-',
|
||||||
|
'=': '\\=',
|
||||||
|
'|': '\\|',
|
||||||
|
'{': '\\{',
|
||||||
|
'}': '\\}',
|
||||||
|
'.': '\\.',
|
||||||
|
'!': '\\!',
|
||||||
|
'<': '\\<',
|
||||||
|
'>': '\\>',
|
||||||
|
'&': '\\&',
|
||||||
|
}
|
||||||
|
|
||||||
|
escaped_text = text
|
||||||
|
for char, replacement in special_chars.items():
|
||||||
|
escaped_text = escaped_text.replace(char, replacement)
|
||||||
|
|
||||||
|
return escaped_text
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_for_markdown(text: str) -> str:
|
||||||
|
"""
|
||||||
|
Sanitize text for safe Markdown rendering.
|
||||||
|
|
||||||
|
Removes newlines, normalizes whitespace, and escapes Markdown special characters.
|
||||||
|
Use this for any untrusted external content before embedding in Markdown messages.
|
||||||
|
|
||||||
|
Note: This function does not truncate. Size limits should be enforced at the
|
||||||
|
model layer (e.g., in from_api_response methods).
|
||||||
|
|
||||||
|
:param text: The text to sanitize
|
||||||
|
:return: Sanitized and escaped text safe for Markdown rendering
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Remove newlines and carriage returns to prevent multi-line injection
|
||||||
|
sanitized = text.replace('\n', ' ').replace('\r', ' ')
|
||||||
|
|
||||||
|
# Collapse multiple spaces into single space
|
||||||
|
sanitized = ' '.join(sanitized.split())
|
||||||
|
|
||||||
|
# Escape Markdown special characters
|
||||||
|
sanitized = escape_markdown(sanitized)
|
||||||
|
|
||||||
|
return sanitized
|
||||||
Reference in New Issue
Block a user