Initial commit.

This commit is contained in:
2025-01-01 14:14:02 -05:00
commit f3a5177ed6
3 changed files with 563 additions and 0 deletions

176
LICENSE.txt Normal file
View File

@@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

9
maubot.yaml Normal file
View File

@@ -0,0 +1,9 @@
maubot: 0.1.0
id: dev.logal.owncastsentry
version: 1.0.0
license: Apache
modules:
- owncastsentry
main_class: OwncastSentry
database: true
database_type: asyncpg

378
owncastsentry.py Normal file
View File

@@ -0,0 +1,378 @@
# Copyright 2025 Logan Fick
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at: https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
import sqlite3
import aiohttp
import json
import asyncio
from maubot import Plugin, MessageEvent
from maubot.handlers import command
from mautrix.types import TextMessageEventContent, MessageType
from mautrix.util.async_db import UpgradeTable, Connection
from urllib.parse import urlparse
# Path to the GetStatus API call on Owncast instances
OWNCAST_STATUS_PATH = "/api/status"
# User agent to send with all HTTP requests.
USER_AGENT = "OwncastSentry/1.0.0 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
# ===== DATABASE MIGRATIONS =====
upgrade_table = UpgradeTable()
@upgrade_table.register(description="Initial revision")
async def upgrade_v1(conn: Connection) -> None:
"""
Runs migrations to upgrade database schema to verison 1 format.
Version 1 is the initial format of the database.
:param conn: A connection to run the v1 database migration on.
:return: Nothing.
"""
await conn.execute(
"""CREATE TABLE "streams" (
"domain" TEXT NOT NULL UNIQUE,
"name" TEXT,
"title" TEXT,
"last_connect_time" TEXT,
"last_disconnect_time" TEXT,
PRIMARY KEY("domain")
)"""
)
await conn.execute(
"""CREATE TABLE "subscriptions" (
"stream_domain" INTEGER NOT NULL,
"room_id" TEXT NOT NULL,
UNIQUE("room_id","stream_domain")
)"""
)
# ===== MAIN BOT CLASS =====
class OwncastSentry(Plugin):
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose.
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream.
timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers.
# Final HTTP ClientSession for all requests to Owncast instances.
session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor)
@classmethod
def get_db_upgrade_table(cls) -> UpgradeTable | None:
"""
Helper method for telling Maubot about our database migrations.
:return: An UpgradeTable with our registered migrations.
"""
return upgrade_table
async def start(self) -> None:
"""
Method called by Maubot upon startup of the instance.
Registers a recurring task every minute to update the state of all subscribed streams.
:return: Nothing.
"""
self.sched.run_periodically(60, self.update_all_stream_states)
@command.new(help="Subscribes to a new Owncast stream.")
@command.argument("url")
async def subscribe(self, evt: MessageEvent, url: str) -> None:
"""
"!subscribe" command handler for users to subscribe a room to a given stream's notifications.
:param evt: MessageEvent of the message calling the command.
:param url: A string containing the user supplied URL to a stream to try and subscribe to.
:return: Nothing.
"""
# Convert the semi-unpredictable user input to only a domain and verify it's an Owncast stream.
stream_domain = await self.validate_url_as_owncast_stream(url)
# Did the validation return an empty string?
if stream_domain == "":
# Yes, it's not valid. Tell the user and give up.
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
return
# Try to add a new subscription for the requested stream domain in the room the command executed in.
try:
query = "INSERT INTO subscriptions (stream_domain, room_id) VALUES ($1, $2)"
async with self.database.acquire() as connection:
await connection.execute(query, stream_domain, evt.room_id)
except sqlite3.IntegrityError as exception:
# Something weird happened... Was it due to attempting to insert a duplicate row?
if "UNIQUE constraint failed" in exception.args[0]:
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".")
return
else:
# Nope... Something unexpected happened. Give up.
self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}")
raise exception
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
try:
query = "INSERT INTO streams (domain) VALUES ($1)"
async with self.database.acquire() as connection:
await connection.execute(query, stream_domain)
# The insert was successful, so this is the first time we're seeing this stream. Log it.
self.log.info(f"[{stream_domain}] Discovered new stream!")
except sqlite3.IntegrityError as exception:
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
if "UNIQUE constraint failed" not in exception.args[0]:
# Something unexpected happened. Give up.
self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}")
raise exception
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.")
@command.new(help="Unsubscribes from an Owncast stream.")
@command.argument("url")
async def unsubscribe(self, evt: MessageEvent, url: str) -> None:
"""
"!unsubscribe" command handler for users to unsubscribe a room from a given stream's notifications.
:param evt: MessageEvent of the message calling the command.
:param url: A string containing the user supplied URL to a stream to try and unsubscribe from.
:return: Nothing.
"""
# Convert the user input to only a domain and verify it's an Owncast stream.
stream_domain = await self.validate_url_as_owncast_stream(url)
# Did the validation return an empty string?
if stream_domain == "":
# Yes, it's not valid. Tell the user and give up.
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
return
# Attempt to delete the requested subscription from the database.
query = "DELETE FROM subscriptions WHERE stream_domain=$1 AND room_id=$2"
async with self.database.acquire() as connection:
result = await connection.execute(query, stream_domain, evt.room_id)
# Did it work?
if result.rowcount == 1:
# Yes, one row was deleted. Tell the user.
self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.")
await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".")
elif result.rowcount == 0:
# No, nothing changed. Tell the user.
await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".")
else:
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!")
async def update_all_stream_states(self) -> None:
"""
Checks the status of all streams with active subscriptions.
Updates for all streams are performed asynchronously, with the method returning when the slowest update completes.
:return: Nothing.
"""
self.log.debug("Updating all stream states...")
# Make a list of all stream domains with an active subscription.
query = "SELECT DISTINCT stream_domain FROM subscriptions"
async with self.database.acquire() as connection:
results = await connection.fetch(query)
# Build a list of async tasks which update the state for each stream domain fetched.
tasks = []
for row in results:
domain = row["stream_domain"]
tasks.append(asyncio.create_task(self.update_stream_state(domain)))
# Run the tasks in parallel.
await asyncio.gather(*tasks)
self.log.debug("Update complete.")
async def update_stream_state(self, domain: str) -> None:
"""
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
:param domain: The domain of the stream to update.
:return: Nothing.
"""
# A flag indicating whether to send a notification.
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
send_notifications = True
# Fetch the latest stream state from the server.
new_state = await self.get_current_stream_state(domain)
# Skip the update if the fetch failed for any reason.
if new_state == {}:
return
# Fetch the last known stream state from the database.
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
async with self.database.acquire() as connection:
old_state = await connection.fetchrow(query, domain)
# Does the last known stream state not have a value for the last connect and disconnect time?
if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None:
# Yes, this is the first update. Don't send any notifications.
send_notifications = False
# Does the latest stream state have a last connect time and the old state not have one?
if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None:
# Yes! This stream is now live. Send notifications and log it, if allowed.
if send_notifications:
self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...")
await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"])
else:
self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.")
# Does the latest stream state no longer have a last connect time but the old state does?
elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None:
# Yep. This stream is now offline. Log it.
if send_notifications:
self.log.info(f"[{domain}] Stream is now offline.")
else:
self.log.info(f"[{domain}] Stream is offline.")
# Update the database with the current stream state.
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
async with self.database.acquire() as connection:
await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain)
# All done.
self.log.debug(f"[{domain}] State update completed.")
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
"""
Sends notifications to rooms with subscriptions to the provided stream domain.
:param domain: The domain of the stream to send notifications for.
:param title: The title of the stream to include in the message.
:return: Nothing.
"""
# Get a list of room IDs with active subscriptions to the stream domain.
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
async with self.database.acquire() as connection:
results = await connection.fetch(query, domain)
# Set up some counters for a high level statistics.
successful_notifications = 0
failed_notifications = 0
# Iterate over the subscribed rooms and try to send a message to each.
# TODO: This should probably be made async.
body_text = "🎥 " + domain + " is now live!\nStream Title: " + title +"\n\nTo tune in, visit: https://" + domain + "/"
for rows in results:
room_id = rows["room_id"]
try:
# Try and send a message.
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text)
await self.client.send_message(room_id, content)
# It worked! Increment the counter.
successful_notifications += 1
except Exception as exception:
# Something didn't work. Log it and move on to the next one.
self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}")
failed_notifications +=1
# All done!
self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.")
# ========== HELPER METHODS ==========
async def get_current_stream_state(self, domain):
"""
Get the current stream state for a given domain.
HTTPS on port 443 is assumed, no other protocols or ports are supported.
:param domain: The domain (not URL) where the stream is hosted.
:return: A dictionary containing the lastConnectTime, lastDisconnectTime, streamTitle, and online values if available. An empty dictionary if an error occurred.
"""
self.log.debug(f"[{domain}] Fetching current stream state...")
# Build a URL to the status API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetStatus)
# Only use HTTPS, even if the user specified something else.
status_url = "https://" + domain + OWNCAST_STATUS_PATH
# Make a request to the endpoint.
try:
response = await self.session.request("GET", status_url, allow_redirects=False)
except Exception as e:
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}")
return {}
# Check the response code is success
# TODO: Handle 429 rate limiting?
if response.status != 200:
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.")
return {}
# Try and interpret the response as JSON.
try:
new_state = json.loads(await response.read())
except Exception as e:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}")
return {}
# Validate the response to ensure it contains all the basic info needed to function.
if "lastConnectTime" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.")
return {}
elif "lastDisconnectTime" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.")
return {}
elif "streamTitle" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.")
return {}
elif "online" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.")
return {}
return new_state
async def validate_url_as_owncast_stream(self, url) -> str:
"""
Take a given URL and validate its domain is a valid Owncast stream.
:param url: A URL with the domain to check for an Owncast stream.
:return: A string with just the domain if it contains an Owncast stream, or an empty string if an error occurred or is otherwise not valid.
"""
# Take whatever input the user provided and try to turn it into just the domain.
# Examples:
# "stream.logal.dev" -> "stream.logal.dev"
# "https://stream.logal.dev" -> "stream.logal.dev"
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev
parsed_url = urlparse(url)
domain = (parsed_url.netloc or parsed_url.path).lower()
# Try to fetch the current status of the stream at the given domain.
stream_state = await self.get_current_stream_state(domain)
# The above method does all the checking. If the length of the output dictionary is more than 0, it should be valid. Otherwise, pass an empty string to say it's invalid.
if len(stream_state) > 0:
return domain
else:
return ""