8 Commits

Author SHA1 Message Date
e73b2b63a1 Updated version number in plugin metadata for v1.0.2 release.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Successful in 8s
2025-04-05 17:55:50 -04:00
cae80b8319 Reformatted previous commit using Black.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 2s
Lint Source Code / Lint (push) Successful in 7s
2025-04-05 17:46:34 -04:00
815b246ad8 Fixed userinfo component of URLs not being filtered out. (Closes #5)
Some checks failed
Build Maubot Plugin Artifact / Build (push) Successful in 2s
Lint Source Code / Lint (push) Failing after 8s
2025-04-05 17:42:36 -04:00
c4156f861e Reformatted source code using Black.
All checks were successful
Build Maubot Plugin Artifact / Build (push) Successful in 2s
Lint Source Code / Lint (push) Successful in 7s
2025-04-05 17:11:53 -04:00
78d2c5a03f Fixed notifications containing extra blank line for streams with no tags. (Closes #4)
Some checks failed
Build Maubot Plugin Artifact / Build (push) Successful in 3s
Lint Source Code / Lint (push) Failing after 8s
2025-04-05 17:05:13 -04:00
aad810d710 Added Gitea Actions workflow for building plugin artifacts.
Some checks failed
Build Maubot Plugin Artifact / Build (push) Successful in 2s
Lint Source Code / Lint (push) Failing after 8s
2025-04-05 10:56:46 -04:00
bb9d360d8f Added Gitea Actions workflow for linting source code. 2025-04-05 10:37:56 -04:00
60f8b21c16 Added display of stream name and tags to notifications. 2025-03-29 14:18:02 -04:00
4 changed files with 227 additions and 49 deletions

View File

@@ -0,0 +1,20 @@
name: Build Maubot Plugin Artifact
on: [push]
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Build artifact
run: zip -v -9 OwncastSentry-v${{env.GITHUB_SHA}}.mbp LICENSE.txt maubot.yaml owncastsentry.py
- name: Upload artifact
uses: actions/upload-artifact@v3
with:
name: Maubot Plugin
path: OwncastSentry-v${{env.GITHUB_SHA}}.mbp

19
.gitea/workflows/lint.yml Normal file
View File

@@ -0,0 +1,19 @@
name: Lint Source Code
on: [push]
jobs:
lint:
name: Lint
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python environment
uses: actions/setup-python@v5
with:
python-version: 3.13
- name: Run linter
uses: psf/black@stable

View File

@@ -1,6 +1,6 @@
maubot: 0.1.0 maubot: 0.1.0
id: dev.logal.owncastsentry id: dev.logal.owncastsentry
version: 1.0.1 version: 1.0.2
license: Apache-2.0 license: Apache-2.0
modules: modules:
- owncastsentry - owncastsentry

View File

@@ -20,12 +20,19 @@ from urllib.parse import urlparse
# Path to the GetStatus API call on Owncast instances # Path to the GetStatus API call on Owncast instances
OWNCAST_STATUS_PATH = "/api/status" OWNCAST_STATUS_PATH = "/api/status"
# Path to GetWebConfig API call on Owncast instances
OWNCAST_CONFIG_PATH = "/api/config"
# User agent to send with all HTTP requests. # User agent to send with all HTTP requests.
USER_AGENT = "OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)" USER_AGENT = (
"OwncastSentry/1.0.2 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
)
# ===== DATABASE MIGRATIONS ===== # ===== DATABASE MIGRATIONS =====
upgrade_table = UpgradeTable() upgrade_table = UpgradeTable()
@upgrade_table.register(description="Initial revision") @upgrade_table.register(description="Initial revision")
async def upgrade_v1(conn: Connection) -> None: async def upgrade_v1(conn: Connection) -> None:
""" """
@@ -54,17 +61,25 @@ async def upgrade_v1(conn: Connection) -> None:
)""" )"""
) )
# ===== MAIN BOT CLASS ===== # ===== MAIN BOT CLASS =====
class OwncastSentry(Plugin): class OwncastSentry(Plugin):
# Helper variables for setting up a special HTTP ClientSession for monitoring streams. # Helper variables for setting up a special HTTP ClientSession for monitoring streams.
headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose. headers = {
"User-Agent": USER_AGENT
} # Override User Agent to special static value to identify to servers our purpose.
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies. cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream. baseconnctor = aiohttp.TCPConnector(
timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers. use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
) # Set up timeouts to keep one long-lived connection open per monitored stream.
timeouts = aiohttp.ClientTimeout(
sock_connect=5, sock_read=5
) # Tighten up timeouts for faster responses to users in case of broken servers.
# Final HTTP ClientSession for all requests to Owncast instances. # Final HTTP ClientSession for all requests to Owncast instances.
session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor) session = aiohttp.ClientSession(
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
)
@classmethod @classmethod
def get_db_upgrade_table(cls) -> UpgradeTable | None: def get_db_upgrade_table(cls) -> UpgradeTable | None:
@@ -75,7 +90,6 @@ class OwncastSentry(Plugin):
""" """
return upgrade_table return upgrade_table
async def start(self) -> None: async def start(self) -> None:
""" """
Method called by Maubot upon startup of the instance. Method called by Maubot upon startup of the instance.
@@ -85,7 +99,6 @@ class OwncastSentry(Plugin):
""" """
self.sched.run_periodically(60, self.update_all_stream_states) self.sched.run_periodically(60, self.update_all_stream_states)
@command.new(help="Subscribes to a new Owncast stream.") @command.new(help="Subscribes to a new Owncast stream.")
@command.argument("url") @command.argument("url")
async def subscribe(self, evt: MessageEvent, url: str) -> None: async def subscribe(self, evt: MessageEvent, url: str) -> None:
@@ -108,10 +121,12 @@ class OwncastSentry(Plugin):
if result[0] == 0: if result[0] == 0:
# There are 0 subscriptions, we need to validate this domain is an Owncast stream. # There are 0 subscriptions, we need to validate this domain is an Owncast stream.
# Attempt to fetch the stream state from this domain. # Attempt to fetch the stream state from this domain.
stream_state = await self.get_current_stream_state(stream_domain) stream_state = await self.get_stream_state(stream_domain)
if len(stream_state) == 0: if len(stream_state) == 0:
# The stream state fetch returned nothing. Probably not an Owncast stream. # The stream state fetch returned nothing. Probably not an Owncast stream.
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.") await evt.reply(
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
)
return return
# Try to add a new subscription for the requested stream domain in the room the command was executed in. # Try to add a new subscription for the requested stream domain in the room the command was executed in.
@@ -123,11 +138,17 @@ class OwncastSentry(Plugin):
# Something weird happened... Was it due to attempting to insert a duplicate row? # Something weird happened... Was it due to attempting to insert a duplicate row?
if "UNIQUE constraint failed" in exception.args[0]: if "UNIQUE constraint failed" in exception.args[0]:
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up. # Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".") await evt.reply(
"This room is already subscribed to notifications for "
+ stream_domain
+ "."
)
return return
else: else:
# Nope... Something unexpected happened. Give up. # Nope... Something unexpected happened. Give up.
self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}") self.log.error(
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}"
)
raise exception raise exception
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table. # The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
@@ -143,13 +164,18 @@ class OwncastSentry(Plugin):
# Attempts to add rows for streams already known is an expected condition. What is anything except that? # Attempts to add rows for streams already known is an expected condition. What is anything except that?
if "UNIQUE constraint failed" not in exception.args[0]: if "UNIQUE constraint failed" not in exception.args[0]:
# Something unexpected happened. Give up. # Something unexpected happened. Give up.
self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}") self.log.error(
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}"
)
raise exception raise exception
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user. # All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.") self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.") await evt.reply(
"Subscription added! This room will receive notifications when "
+ stream_domain
+ " goes live."
)
@command.new(help="Unsubscribes from an Owncast stream.") @command.new(help="Unsubscribes from an Owncast stream.")
@command.argument("url") @command.argument("url")
@@ -172,15 +198,28 @@ class OwncastSentry(Plugin):
# Did it work? # Did it work?
if result.rowcount == 1: if result.rowcount == 1:
# Yes, one row was deleted. Tell the user. # Yes, one row was deleted. Tell the user.
self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.") self.log.info(
await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".") f"[{stream_domain}] Subscription removed for room {evt.room_id}."
)
await evt.reply(
"Subscription removed! This room will no longer receive notifications for "
+ stream_domain
+ "."
)
elif result.rowcount == 0: elif result.rowcount == 0:
# No, nothing changed. Tell the user. # No, nothing changed. Tell the user.
await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".") await evt.reply(
"This room is already not subscribed to notifications for "
+ stream_domain
+ "."
)
else: else:
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it! # Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!") self.log.error(
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
+ result.rowcount
+ " instead. Something very bad may have happened!!!!"
)
async def update_all_stream_states(self) -> None: async def update_all_stream_states(self) -> None:
""" """
@@ -207,7 +246,6 @@ class OwncastSentry(Plugin):
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
self.log.debug("Update complete.") self.log.debug("Update complete.")
async def update_stream_state(self, domain: str) -> None: async def update_stream_state(self, domain: str) -> None:
""" """
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live. Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
@@ -221,7 +259,7 @@ class OwncastSentry(Plugin):
send_notifications = True send_notifications = True
# Fetch the latest stream state from the server. # Fetch the latest stream state from the server.
new_state = await self.get_current_stream_state(domain) new_state = await self.get_stream_state(domain)
# Skip the update if the fetch failed for any reason. # Skip the update if the fetch failed for any reason.
if new_state == {}: if new_state == {}:
@@ -233,21 +271,36 @@ class OwncastSentry(Plugin):
old_state = await connection.fetchrow(query, domain) old_state = await connection.fetchrow(query, domain)
# Does the last known stream state not have a value for the last connect and disconnect time? # Does the last known stream state not have a value for the last connect and disconnect time?
if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None: if (
old_state["last_connect_time"] is None
and old_state["last_disconnect_time"] is None
):
# Yes, this is the first update. Don't send any notifications. # Yes, this is the first update. Don't send any notifications.
send_notifications = False send_notifications = False
# Does the latest stream state have a last connect time and the old state not have one? # Does the latest stream state have a last connect time and the old state not have one?
if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None: if (
new_state["lastConnectTime"] is not None
and old_state["last_connect_time"] is None
):
# Yes! This stream is now live. Send notifications and log it, if allowed. # Yes! This stream is now live. Send notifications and log it, if allowed.
if send_notifications: if send_notifications:
self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...") self.log.info(
await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"]) f"[{domain}] Stream is now live! Notifying subscribed rooms..."
)
await self.notify_rooms_of_stream_online(
domain, new_state["streamTitle"]
)
else: else:
self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.") self.log.info(
f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms."
)
# Does the latest stream state no longer have a last connect time but the old state does? # Does the latest stream state no longer have a last connect time but the old state does?
elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None: elif (
new_state["lastConnectTime"] is None
and old_state["last_connect_time"] is not None
):
# Yep. This stream is now offline. Log it. # Yep. This stream is now offline. Log it.
if send_notifications: if send_notifications:
self.log.info(f"[{domain}] Stream is now offline.") self.log.info(f"[{domain}] Stream is now offline.")
@@ -258,12 +311,17 @@ class OwncastSentry(Plugin):
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes. # TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4" update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
async with self.database.acquire() as connection: async with self.database.acquire() as connection:
await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain) await connection.execute(
update_query,
new_state["streamTitle"],
new_state["lastConnectTime"],
new_state["lastDisconnectTime"],
domain,
)
# All done. # All done.
self.log.debug(f"[{domain}] State update completed.") self.log.debug(f"[{domain}] State update completed.")
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None: async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
""" """
Sends notifications to rooms with subscriptions to the provided stream domain. Sends notifications to rooms with subscriptions to the provided stream domain.
@@ -281,14 +339,25 @@ class OwncastSentry(Plugin):
successful_notifications = 0 successful_notifications = 0
failed_notifications = 0 failed_notifications = 0
stream_config = await self.get_stream_config(domain)
# Build the message body text. # Build the message body text.
if "name" in stream_config:
body_text = "🎥 " + stream_config["name"] + " is now live!"
else:
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
# We'll account for that... Just in case.
body_text = "🎥 " + domain + " is now live!" body_text = "🎥 " + domain + " is now live!"
# Streams can have no title. If there is none, don't even mention it. # Streams can have no title. If there is none, don't even mention it.
if title != "": if title != "":
body_text += "\nStream Title: " + title body_text += "\nStream Title: " + title
body_text += "\n\nTo tune in, visit: https://" + domain + "/" body_text += "\n\nTo tune in, visit: https://" + domain + "/\n\n"
if "tags" in stream_config and len(stream_config["tags"]) > 0:
body_text += "\n\n"
body_text += " ".join("#" + tag for tag in stream_config["tags"])
# Iterate over the subscribed rooms and try to send a message to each. # Iterate over the subscribed rooms and try to send a message to each.
# TODO: This should probably be made async. # TODO: This should probably be made async.
@@ -296,22 +365,27 @@ class OwncastSentry(Plugin):
room_id = rows["room_id"] room_id = rows["room_id"]
try: try:
# Try and send a message. # Try and send a message.
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text) content = TextMessageEventContent(
msgtype=MessageType.TEXT, body=body_text
)
await self.client.send_message(room_id, content) await self.client.send_message(room_id, content)
# It worked! Increment the counter. # It worked! Increment the counter.
successful_notifications += 1 successful_notifications += 1
except Exception as exception: except Exception as exception:
# Something didn't work. Log it and move on to the next one. # Something didn't work. Log it and move on to the next one.
self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}") self.log.warning(
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
)
failed_notifications += 1 failed_notifications += 1
# All done! # All done!
self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.") self.log.info(
f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed."
)
# ========== HELPER METHODS ========== # ========== HELPER METHODS ==========
async def get_current_stream_state(self, domain): async def get_stream_state(self, domain):
""" """
Get the current stream state for a given domain. Get the current stream state for a given domain.
HTTPS on port 443 is assumed, no other protocols or ports are supported. HTTPS on port 443 is assumed, no other protocols or ports are supported.
@@ -326,40 +400,99 @@ class OwncastSentry(Plugin):
# Make a request to the endpoint. # Make a request to the endpoint.
try: try:
response = await self.session.request("GET", status_url, allow_redirects=False) response = await self.session.request(
"GET", status_url, allow_redirects=False
)
except Exception as e: except Exception as e:
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}") self.log.warning(
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
)
return {} return {}
# Check the response code is success # Check the response code is success
# TODO: Handle 429 rate limiting? # TODO: Handle 429 rate limiting?
if response.status != 200: if response.status != 200:
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.") self.log.warning(
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
)
return {} return {}
# Try and interpret the response as JSON. # Try and interpret the response as JSON.
try: try:
new_state = json.loads(await response.read()) new_state = json.loads(await response.read())
except Exception as e: except Exception as e:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}") self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
)
return {} return {}
# Validate the response to ensure it contains all the basic info needed to function. # Validate the response to ensure it contains all the basic info needed to function.
if "lastConnectTime" not in new_state: if "lastConnectTime" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.") self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter."
)
return {} return {}
elif "lastDisconnectTime" not in new_state: elif "lastDisconnectTime" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.") self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter."
)
return {} return {}
elif "streamTitle" not in new_state: elif "streamTitle" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.") self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter."
)
return {} return {}
elif "online" not in new_state: elif "online" not in new_state:
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.") self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter."
)
return {} return {}
return new_state return new_state
async def get_stream_config(self, domain):
"""
Get the current stream config for a given domain.
HTTPS on port 443 is assumed, no other protocols or ports are supported.
:param domain: The domain (not URL) where the stream is hosted.
:return: A dictionary containing the stream's configuration.
"""
self.log.debug(f"[{domain}] Fetching current stream config...")
# Build a URL to the config API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetWebConfig)
# Only use HTTPS, even if the user specified something else.
status_url = "https://" + domain + OWNCAST_CONFIG_PATH
# Make a request to the endpoint.
try:
response = await self.session.request(
"GET", status_url, allow_redirects=False
)
except Exception as e:
self.log.warning(
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
)
return {}
# Check the response code is success
# TODO: Handle 429 rate limiting?
if response.status != 200:
self.log.warning(
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
)
return {}
# Try and interpret the response as JSON.
try:
config = json.loads(await response.read())
except Exception as e:
self.log.warning(
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
)
return {}
return config
def domainify(self, url) -> str: def domainify(self, url) -> str:
""" """
Take a given URL and convert it to just the domain. Take a given URL and convert it to just the domain.
@@ -372,7 +505,13 @@ class OwncastSentry(Plugin):
# "stream.logal.dev" -> "stream.logal.dev" # "stream.logal.dev" -> "stream.logal.dev"
# "https://stream.logal.dev" -> "stream.logal.dev" # "https://stream.logal.dev" -> "stream.logal.dev"
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev" # "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev # "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev"
# "notify@stream.logal.dev" -> "stream.logal.dev"
parsed_url = urlparse(url) parsed_url = urlparse(url)
domain = (parsed_url.netloc or parsed_url.path).lower() domain = (parsed_url.netloc or parsed_url.path).lower()
if "@" in domain:
return domain.split("@")[-1]
return domain return domain