Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
764fab9879
|
|||
|
0659f70e1a
|
|||
|
ee61ea8562
|
|||
|
b0868c5bd4
|
|||
|
1c6720cc10
|
|||
|
e73b2b63a1
|
|||
|
cae80b8319
|
|||
|
815b246ad8
|
|||
|
c4156f861e
|
|||
|
78d2c5a03f
|
|||
|
aad810d710
|
|||
|
bb9d360d8f
|
|||
|
60f8b21c16
|
20
.gitea/workflows/build.yml
Normal file
20
.gitea/workflows/build.yml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
name: Build Maubot Plugin Artifact
|
||||||
|
on: [push]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
name: Build
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Build artifact
|
||||||
|
run: zip -v -9 OwncastSentry-v${{env.GITHUB_SHA}}.mbp LICENSE.txt maubot.yaml owncastsentry.py
|
||||||
|
|
||||||
|
- name: Upload artifact
|
||||||
|
uses: actions/upload-artifact@v3
|
||||||
|
with:
|
||||||
|
name: Maubot Plugin
|
||||||
|
path: OwncastSentry-v${{env.GITHUB_SHA}}.mbp
|
||||||
19
.gitea/workflows/lint.yml
Normal file
19
.gitea/workflows/lint.yml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
name: Lint Source Code
|
||||||
|
on: [push]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
lint:
|
||||||
|
name: Lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Setup Python environment
|
||||||
|
uses: actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: 3.13
|
||||||
|
|
||||||
|
- name: Run linter
|
||||||
|
uses: psf/black@stable
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
maubot: 0.1.0
|
maubot: 0.1.0
|
||||||
id: dev.logal.owncastsentry
|
id: dev.logal.owncastsentry
|
||||||
version: 1.0.1
|
version: 1.0.3
|
||||||
license: Apache-2.0
|
license: Apache-2.0
|
||||||
modules:
|
modules:
|
||||||
- owncastsentry
|
- owncastsentry
|
||||||
|
|||||||
386
owncastsentry.py
386
owncastsentry.py
@@ -8,6 +8,7 @@ import sqlite3
|
|||||||
import aiohttp
|
import aiohttp
|
||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
from maubot import Plugin, MessageEvent
|
from maubot import Plugin, MessageEvent
|
||||||
from maubot.handlers import command
|
from maubot.handlers import command
|
||||||
@@ -20,12 +21,29 @@ from urllib.parse import urlparse
|
|||||||
# Path to the GetStatus API call on Owncast instances
|
# Path to the GetStatus API call on Owncast instances
|
||||||
OWNCAST_STATUS_PATH = "/api/status"
|
OWNCAST_STATUS_PATH = "/api/status"
|
||||||
|
|
||||||
# User agent to send with all HTTP requests.
|
# Path to GetWebConfig API call on Owncast instances
|
||||||
USER_AGENT = "OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
OWNCAST_CONFIG_PATH = "/api/config"
|
||||||
|
|
||||||
|
# User agent to send with all HTTP requests.
|
||||||
|
USER_AGENT = (
|
||||||
|
"OwncastSentry/1.0.3 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Hard minimum amount of time between when notifications can be sent for a stream. Prevents spamming notifications for glitchy or malicious streams.
|
||||||
|
SECONDS_BETWEEN_NOTIFICATIONS = 20 * 60 # 20 minutes in seconds
|
||||||
|
|
||||||
|
# I'm not sure the best way to name or explain this variable, so let's just say what uses it:
|
||||||
|
#
|
||||||
|
# After a stream goes offline, a timer is started. Then, ...
|
||||||
|
# - If a stream comes back online with the same title within this time, no notification is sent.
|
||||||
|
# - If a stream comes back online with a different title, a rename notification is sent.
|
||||||
|
# - If this time period passes entirely and a stream comes back online after, it's treated as regular going live.
|
||||||
|
TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||||
|
|
||||||
# ===== DATABASE MIGRATIONS =====
|
# ===== DATABASE MIGRATIONS =====
|
||||||
upgrade_table = UpgradeTable()
|
upgrade_table = UpgradeTable()
|
||||||
|
|
||||||
|
|
||||||
@upgrade_table.register(description="Initial revision")
|
@upgrade_table.register(description="Initial revision")
|
||||||
async def upgrade_v1(conn: Connection) -> None:
|
async def upgrade_v1(conn: Connection) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -54,17 +72,31 @@ async def upgrade_v1(conn: Connection) -> None:
|
|||||||
)"""
|
)"""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# ===== MAIN BOT CLASS =====
|
# ===== MAIN BOT CLASS =====
|
||||||
class OwncastSentry(Plugin):
|
class OwncastSentry(Plugin):
|
||||||
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
|
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
|
||||||
headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose.
|
headers = {
|
||||||
|
"User-Agent": USER_AGENT
|
||||||
|
} # Override User Agent to special static value to identify to servers our purpose.
|
||||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
||||||
baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
baseconnctor = aiohttp.TCPConnector(
|
||||||
timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers.
|
use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
|
||||||
|
) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
||||||
|
timeouts = aiohttp.ClientTimeout(
|
||||||
|
sock_connect=5, sock_read=5
|
||||||
|
) # Tighten up timeouts for faster responses to users in case of broken servers.
|
||||||
|
|
||||||
# Final HTTP ClientSession for all requests to Owncast instances.
|
# Final HTTP ClientSession for all requests to Owncast instances.
|
||||||
session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor)
|
session = aiohttp.ClientSession(
|
||||||
|
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keeps track of when a notification was last sent for streams.
|
||||||
|
notification_timers_cache = {}
|
||||||
|
|
||||||
|
# Keeps track of when streams last went offline.
|
||||||
|
offline_timer_cache = {}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
||||||
@@ -75,7 +107,6 @@ class OwncastSentry(Plugin):
|
|||||||
"""
|
"""
|
||||||
return upgrade_table
|
return upgrade_table
|
||||||
|
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""
|
"""
|
||||||
Method called by Maubot upon startup of the instance.
|
Method called by Maubot upon startup of the instance.
|
||||||
@@ -85,7 +116,6 @@ class OwncastSentry(Plugin):
|
|||||||
"""
|
"""
|
||||||
self.sched.run_periodically(60, self.update_all_stream_states)
|
self.sched.run_periodically(60, self.update_all_stream_states)
|
||||||
|
|
||||||
|
|
||||||
@command.new(help="Subscribes to a new Owncast stream.")
|
@command.new(help="Subscribes to a new Owncast stream.")
|
||||||
@command.argument("url")
|
@command.argument("url")
|
||||||
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
||||||
@@ -108,10 +138,12 @@ class OwncastSentry(Plugin):
|
|||||||
if result[0] == 0:
|
if result[0] == 0:
|
||||||
# There are 0 subscriptions, we need to validate this domain is an Owncast stream.
|
# There are 0 subscriptions, we need to validate this domain is an Owncast stream.
|
||||||
# Attempt to fetch the stream state from this domain.
|
# Attempt to fetch the stream state from this domain.
|
||||||
stream_state = await self.get_current_stream_state(stream_domain)
|
stream_state = await self.get_stream_state(stream_domain)
|
||||||
if len(stream_state) == 0:
|
if len(stream_state) == 0:
|
||||||
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
||||||
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
|
await evt.reply(
|
||||||
|
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Try to add a new subscription for the requested stream domain in the room the command was executed in.
|
# Try to add a new subscription for the requested stream domain in the room the command was executed in.
|
||||||
@@ -123,11 +155,17 @@ class OwncastSentry(Plugin):
|
|||||||
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
||||||
if "UNIQUE constraint failed" in exception.args[0]:
|
if "UNIQUE constraint failed" in exception.args[0]:
|
||||||
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
||||||
await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".")
|
await evt.reply(
|
||||||
|
"This room is already subscribed to notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
# Nope... Something unexpected happened. Give up.
|
# Nope... Something unexpected happened. Give up.
|
||||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}")
|
self.log.error(
|
||||||
|
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}"
|
||||||
|
)
|
||||||
raise exception
|
raise exception
|
||||||
|
|
||||||
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
||||||
@@ -143,13 +181,18 @@ class OwncastSentry(Plugin):
|
|||||||
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
||||||
if "UNIQUE constraint failed" not in exception.args[0]:
|
if "UNIQUE constraint failed" not in exception.args[0]:
|
||||||
# Something unexpected happened. Give up.
|
# Something unexpected happened. Give up.
|
||||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}")
|
self.log.error(
|
||||||
|
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}"
|
||||||
|
)
|
||||||
raise exception
|
raise exception
|
||||||
|
|
||||||
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
||||||
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
||||||
await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.")
|
await evt.reply(
|
||||||
|
"Subscription added! This room will receive notifications when "
|
||||||
|
+ stream_domain
|
||||||
|
+ " goes live."
|
||||||
|
)
|
||||||
|
|
||||||
@command.new(help="Unsubscribes from an Owncast stream.")
|
@command.new(help="Unsubscribes from an Owncast stream.")
|
||||||
@command.argument("url")
|
@command.argument("url")
|
||||||
@@ -172,15 +215,28 @@ class OwncastSentry(Plugin):
|
|||||||
# Did it work?
|
# Did it work?
|
||||||
if result.rowcount == 1:
|
if result.rowcount == 1:
|
||||||
# Yes, one row was deleted. Tell the user.
|
# Yes, one row was deleted. Tell the user.
|
||||||
self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.")
|
self.log.info(
|
||||||
await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".")
|
f"[{stream_domain}] Subscription removed for room {evt.room_id}."
|
||||||
|
)
|
||||||
|
await evt.reply(
|
||||||
|
"Subscription removed! This room will no longer receive notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
elif result.rowcount == 0:
|
elif result.rowcount == 0:
|
||||||
# No, nothing changed. Tell the user.
|
# No, nothing changed. Tell the user.
|
||||||
await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".")
|
await evt.reply(
|
||||||
|
"This room is already not subscribed to notifications for "
|
||||||
|
+ stream_domain
|
||||||
|
+ "."
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
||||||
self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!")
|
self.log.error(
|
||||||
|
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
|
||||||
|
+ result.rowcount
|
||||||
|
+ " instead. Something very bad may have happened!!!!"
|
||||||
|
)
|
||||||
|
|
||||||
async def update_all_stream_states(self) -> None:
|
async def update_all_stream_states(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -207,7 +263,6 @@ class OwncastSentry(Plugin):
|
|||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
self.log.debug("Update complete.")
|
self.log.debug("Update complete.")
|
||||||
|
|
||||||
|
|
||||||
async def update_stream_state(self, domain: str) -> None:
|
async def update_stream_state(self, domain: str) -> None:
|
||||||
"""
|
"""
|
||||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||||
@@ -216,62 +271,186 @@ class OwncastSentry(Plugin):
|
|||||||
:return: Nothing.
|
:return: Nothing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# A flag indicating whether to send a notification.
|
# A flag indicating whether this is the first state update of a brand-new stream to avoid sending notifications if its already live.
|
||||||
# Used for the first state update of a brand-new stream to avoid sending notifications if its already live.
|
first_update = False
|
||||||
send_notifications = True
|
|
||||||
|
# A flag indicating whether to update the stream's state in the databased.
|
||||||
|
# Used to avoid writing to the database when a stream's state hasn't changed at all.
|
||||||
|
update_database = False
|
||||||
|
|
||||||
|
# Holds the stream's latest configuration, if fetched and deemed necessary during the update process.
|
||||||
|
stream_config = {}
|
||||||
|
|
||||||
# Fetch the latest stream state from the server.
|
# Fetch the latest stream state from the server.
|
||||||
new_state = await self.get_current_stream_state(domain)
|
new_state = await self.get_stream_state(domain)
|
||||||
|
|
||||||
# Skip the update if the fetch failed for any reason.
|
# Skip the update if the fetch failed for any reason.
|
||||||
if new_state == {}:
|
if new_state == {}:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Fix possible race conditions with timers
|
||||||
|
if domain not in self.offline_timer_cache:
|
||||||
|
self.offline_timer_cache[domain] = 0
|
||||||
|
if domain not in self.notification_timers_cache:
|
||||||
|
self.notification_timers_cache[domain] = 0
|
||||||
|
|
||||||
# Fetch the last known stream state from the database.
|
# Fetch the last known stream state from the database.
|
||||||
query = "SELECT last_connect_time, last_disconnect_time FROM streams WHERE domain=$1"
|
query = "SELECT * FROM streams WHERE domain=$1"
|
||||||
async with self.database.acquire() as connection:
|
async with self.database.acquire() as connection:
|
||||||
old_state = await connection.fetchrow(query, domain)
|
old_state = await connection.fetchrow(query, domain)
|
||||||
|
|
||||||
# Does the last known stream state not have a value for the last connect and disconnect time?
|
# Does the last known stream state not have a value for the last connect and disconnect time?
|
||||||
if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None:
|
if (
|
||||||
|
old_state["last_connect_time"] is None
|
||||||
|
and old_state["last_disconnect_time"] is None
|
||||||
|
):
|
||||||
# Yes, this is the first update. Don't send any notifications.
|
# Yes, this is the first update. Don't send any notifications.
|
||||||
send_notifications = False
|
update_database = True
|
||||||
|
first_update = True
|
||||||
|
|
||||||
# Does the latest stream state have a last connect time and the old state not have one?
|
# Does the latest stream state have a last connect time and the old state not have one?
|
||||||
if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None:
|
if (
|
||||||
# Yes! This stream is now live. Send notifications and log it, if allowed.
|
new_state["lastConnectTime"] is not None
|
||||||
if send_notifications:
|
and old_state["last_connect_time"] is None
|
||||||
self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...")
|
):
|
||||||
await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"])
|
# Yes! This stream is now live.
|
||||||
|
update_database = True
|
||||||
|
stream_config = await self.get_stream_config(domain)
|
||||||
|
|
||||||
|
self.log.info(f"[{domain}] Stream is now live!")
|
||||||
|
|
||||||
|
# Calculate how many seconds since the stream last went offline
|
||||||
|
seconds_since_last_offline = round(
|
||||||
|
time.time() - self.offline_timer_cache[domain]
|
||||||
|
)
|
||||||
|
|
||||||
|
# Have we queried this stream before? (In other words, is this not the first state update ever?)
|
||||||
|
if not first_update:
|
||||||
|
# Yes. Has this stream been offline for a short amount of time?
|
||||||
|
if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN:
|
||||||
|
# Yes. Did the stream title change?
|
||||||
|
if old_state["title"] != new_state["streamTitle"]:
|
||||||
|
# Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
True,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.")
|
# No. The stream was only down for a short time and didn't change its title. Don't send a notification.
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Stream was only offline for {seconds_since_last_offline} of {TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN} seconds and did not change its title."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# This stream has been offline for a while. Send a normal notification.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
False,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No, this is the first time we're querying
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. This is the first state update for this stream."
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
new_state["lastConnectTime"] is not None
|
||||||
|
and old_state["last_connect_time"] is not None
|
||||||
|
):
|
||||||
|
# Did the stream title change mid-session?
|
||||||
|
if old_state["title"] != new_state["streamTitle"]:
|
||||||
|
self.log.info(f"[{domain}] Stream title was changed!")
|
||||||
|
update_database = True
|
||||||
|
stream_config = await self.get_stream_config(domain)
|
||||||
|
|
||||||
|
# This is a fun case to account for... Let's try and explain this.
|
||||||
|
# Was the last time this stream sent a notification before it last went offline?
|
||||||
|
if (
|
||||||
|
self.offline_timer_cache[domain]
|
||||||
|
> self.notification_timers_cache[domain]
|
||||||
|
):
|
||||||
|
# Yes. Send a regular go live notification.
|
||||||
|
# Why? A title change notification could be confusing to users in this case.
|
||||||
|
# How? If a stream goes offline before its next allowed notification, it'll get rate limited. If it then changes its title, this part of the code will send a title change notification. This can be a little confusing, so override to a normal go live notification in this case.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
False,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No. Send a normal title change notification.
|
||||||
|
await self.notify_rooms_of_stream_online(
|
||||||
|
domain,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
True,
|
||||||
|
stream_config["tags"],
|
||||||
|
)
|
||||||
|
|
||||||
# Does the latest stream state no longer have a last connect time but the old state does?
|
# Does the latest stream state no longer have a last connect time but the old state does?
|
||||||
elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None:
|
elif (
|
||||||
|
new_state["lastConnectTime"] is None
|
||||||
|
and old_state["last_connect_time"] is not None
|
||||||
|
):
|
||||||
# Yep. This stream is now offline. Log it.
|
# Yep. This stream is now offline. Log it.
|
||||||
if send_notifications:
|
update_database = True
|
||||||
self.log.info(f"[{domain}] Stream is now offline.")
|
stream_config = await self.get_stream_config(domain)
|
||||||
else:
|
self.offline_timer_cache[domain] = time.time()
|
||||||
|
if first_update:
|
||||||
self.log.info(f"[{domain}] Stream is offline.")
|
self.log.info(f"[{domain}] Stream is offline.")
|
||||||
|
else:
|
||||||
|
self.log.info(f"[{domain}] Stream is now offline.")
|
||||||
|
|
||||||
# Update the database with the current stream state.
|
# Update the database with the current stream state, if needed.
|
||||||
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
|
if update_database:
|
||||||
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
|
self.log.debug(f"[{domain}] Updating stream state in database...")
|
||||||
|
update_query = "UPDATE streams SET name=$1, title=$2, last_connect_time=$3, last_disconnect_time=$4 WHERE domain=$5"
|
||||||
async with self.database.acquire() as connection:
|
async with self.database.acquire() as connection:
|
||||||
await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain)
|
await connection.execute(
|
||||||
|
update_query,
|
||||||
|
stream_config["name"],
|
||||||
|
new_state["streamTitle"],
|
||||||
|
new_state["lastConnectTime"],
|
||||||
|
new_state["lastDisconnectTime"],
|
||||||
|
domain,
|
||||||
|
)
|
||||||
|
|
||||||
# All done.
|
# All done.
|
||||||
self.log.debug(f"[{domain}] State update completed.")
|
self.log.debug(f"[{domain}] State update completed.")
|
||||||
|
|
||||||
|
async def notify_rooms_of_stream_online(
|
||||||
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
|
self, domain: str, name: str, title: str, title_change: bool, tags: list[str]
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Sends notifications to rooms with subscriptions to the provided stream domain.
|
Sends notifications to rooms with subscriptions to the provided stream domain.
|
||||||
|
|
||||||
:param domain: The domain of the stream to send notifications for.
|
:param domain: The domain of the stream to send notifications for.
|
||||||
:param title: The title of the stream to include in the message.
|
:param title: The title of the stream to include in the message.
|
||||||
|
:param renamed: Whether or not this is for a stream changing its title rather than going live.
|
||||||
:return: Nothing.
|
:return: Nothing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Has enough time passed since the last notification was sent?
|
||||||
|
if domain in self.notification_timers_cache:
|
||||||
|
seconds_since_last_notification = round(
|
||||||
|
time.time() - self.notification_timers_cache[domain]
|
||||||
|
)
|
||||||
|
if seconds_since_last_notification < SECONDS_BETWEEN_NOTIFICATIONS:
|
||||||
|
self.log.info(
|
||||||
|
f"[{domain}] Not sending notifications. Only {seconds_since_last_notification} of required {SECONDS_BETWEEN_NOTIFICATIONS} seconds has passed since last notification."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Yes. Log the current time and proceed with sending notifications.
|
||||||
|
self.notification_timers_cache[domain] = time.time()
|
||||||
|
|
||||||
# Get a list of room IDs with active subscriptions to the stream domain.
|
# Get a list of room IDs with active subscriptions to the stream domain.
|
||||||
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
query = "SELECT room_id FROM subscriptions WHERE stream_domain=$1"
|
||||||
async with self.database.acquire() as connection:
|
async with self.database.acquire() as connection:
|
||||||
@@ -281,8 +460,19 @@ class OwncastSentry(Plugin):
|
|||||||
successful_notifications = 0
|
successful_notifications = 0
|
||||||
failed_notifications = 0
|
failed_notifications = 0
|
||||||
|
|
||||||
# Build the message body text.
|
# Time to start building the body text for the notifications.
|
||||||
body_text = "🎥 " + domain + " is now live!"
|
|
||||||
|
# Turns out it is possible to set an empty name for an Owncast stream if you know how.
|
||||||
|
# We'll account for that... Just in case.
|
||||||
|
stream_name = name if name else domain
|
||||||
|
|
||||||
|
# If the stream changed its title, send a different title change notification.
|
||||||
|
if title_change:
|
||||||
|
self.log.info(f"[{domain}] Sending title change notifications...")
|
||||||
|
body_text = "📝 " + name + " has changed its stream title!"
|
||||||
|
else:
|
||||||
|
self.log.info(f"[{domain}] Sending going live notifications...")
|
||||||
|
body_text = "🎥 " + name + " is now live!"
|
||||||
|
|
||||||
# Streams can have no title. If there is none, don't even mention it.
|
# Streams can have no title. If there is none, don't even mention it.
|
||||||
if title != "":
|
if title != "":
|
||||||
@@ -290,28 +480,37 @@ class OwncastSentry(Plugin):
|
|||||||
|
|
||||||
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
body_text += "\n\nTo tune in, visit: https://" + domain + "/"
|
||||||
|
|
||||||
|
if "tags" and len(tags) > 0:
|
||||||
|
body_text += "\n\n"
|
||||||
|
body_text += " ".join("#" + tag for tag in tags)
|
||||||
|
|
||||||
# Iterate over the subscribed rooms and try to send a message to each.
|
# Iterate over the subscribed rooms and try to send a message to each.
|
||||||
# TODO: This should probably be made async.
|
# TODO: This should probably be made async.
|
||||||
for rows in results:
|
for rows in results:
|
||||||
room_id = rows["room_id"]
|
room_id = rows["room_id"]
|
||||||
try:
|
try:
|
||||||
# Try and send a message.
|
# Try and send a message.
|
||||||
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text)
|
content = TextMessageEventContent(
|
||||||
|
msgtype=MessageType.TEXT, body=body_text
|
||||||
|
)
|
||||||
await self.client.send_message(room_id, content)
|
await self.client.send_message(room_id, content)
|
||||||
|
|
||||||
# It worked! Increment the counter.
|
# It worked! Increment the counter.
|
||||||
successful_notifications += 1
|
successful_notifications += 1
|
||||||
except Exception as exception:
|
except Exception as exception:
|
||||||
# Something didn't work. Log it and move on to the next one.
|
# Something didn't work. Log it and move on to the next one.
|
||||||
self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}")
|
self.log.warning(
|
||||||
failed_notifications +=1
|
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
|
||||||
|
)
|
||||||
|
failed_notifications += 1
|
||||||
|
|
||||||
# All done!
|
# All done!
|
||||||
self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.")
|
self.log.info(
|
||||||
|
f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed."
|
||||||
|
)
|
||||||
|
|
||||||
# ========== HELPER METHODS ==========
|
# ========== HELPER METHODS ==========
|
||||||
async def get_current_stream_state(self, domain):
|
async def get_stream_state(self, domain):
|
||||||
"""
|
"""
|
||||||
Get the current stream state for a given domain.
|
Get the current stream state for a given domain.
|
||||||
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||||
@@ -326,40 +525,99 @@ class OwncastSentry(Plugin):
|
|||||||
|
|
||||||
# Make a request to the endpoint.
|
# Make a request to the endpoint.
|
||||||
try:
|
try:
|
||||||
response = await self.session.request("GET", status_url, allow_redirects=False)
|
response = await self.session.request(
|
||||||
|
"GET", status_url, allow_redirects=False
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}")
|
self.log.warning(
|
||||||
|
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Check the response code is success
|
# Check the response code is success
|
||||||
# TODO: Handle 429 rate limiting?
|
# TODO: Handle 429 rate limiting?
|
||||||
if response.status != 200:
|
if response.status != 200:
|
||||||
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.")
|
self.log.warning(
|
||||||
|
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Try and interpret the response as JSON.
|
# Try and interpret the response as JSON.
|
||||||
try:
|
try:
|
||||||
new_state = json.loads(await response.read())
|
new_state = json.loads(await response.read())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}")
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
# Validate the response to ensure it contains all the basic info needed to function.
|
# Validate the response to ensure it contains all the basic info needed to function.
|
||||||
if "lastConnectTime" not in new_state:
|
if "lastConnectTime" not in new_state:
|
||||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.")
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter."
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
elif "lastDisconnectTime" not in new_state:
|
elif "lastDisconnectTime" not in new_state:
|
||||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.")
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter."
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
elif "streamTitle" not in new_state:
|
elif "streamTitle" not in new_state:
|
||||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.")
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter."
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
elif "online" not in new_state:
|
elif "online" not in new_state:
|
||||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.")
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter."
|
||||||
|
)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
return new_state
|
return new_state
|
||||||
|
|
||||||
|
async def get_stream_config(self, domain):
|
||||||
|
"""
|
||||||
|
Get the current stream config for a given domain.
|
||||||
|
HTTPS on port 443 is assumed, no other protocols or ports are supported.
|
||||||
|
|
||||||
|
:param domain: The domain (not URL) where the stream is hosted.
|
||||||
|
:return: A dictionary containing the stream's configuration.
|
||||||
|
"""
|
||||||
|
self.log.debug(f"[{domain}] Fetching current stream config...")
|
||||||
|
# Build a URL to the config API in Owncast. (https://owncast.online/api/latest/#tag/Internal/operation/GetWebConfig)
|
||||||
|
# Only use HTTPS, even if the user specified something else.
|
||||||
|
status_url = "https://" + domain + OWNCAST_CONFIG_PATH
|
||||||
|
|
||||||
|
# Make a request to the endpoint.
|
||||||
|
try:
|
||||||
|
response = await self.session.request(
|
||||||
|
"GET", status_url, allow_redirects=False
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||||
|
)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Check the response code is success
|
||||||
|
# TODO: Handle 429 rate limiting?
|
||||||
|
if response.status != 200:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||||
|
)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Try and interpret the response as JSON.
|
||||||
|
try:
|
||||||
|
config = json.loads(await response.read())
|
||||||
|
except Exception as e:
|
||||||
|
self.log.warning(
|
||||||
|
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||||
|
)
|
||||||
|
return {}
|
||||||
|
|
||||||
|
return config
|
||||||
|
|
||||||
def domainify(self, url) -> str:
|
def domainify(self, url) -> str:
|
||||||
"""
|
"""
|
||||||
Take a given URL and convert it to just the domain.
|
Take a given URL and convert it to just the domain.
|
||||||
@@ -372,7 +630,13 @@ class OwncastSentry(Plugin):
|
|||||||
# "stream.logal.dev" -> "stream.logal.dev"
|
# "stream.logal.dev" -> "stream.logal.dev"
|
||||||
# "https://stream.logal.dev" -> "stream.logal.dev"
|
# "https://stream.logal.dev" -> "stream.logal.dev"
|
||||||
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
|
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
|
||||||
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev
|
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev"
|
||||||
|
# "notify@stream.logal.dev" -> "stream.logal.dev"
|
||||||
|
|
||||||
parsed_url = urlparse(url)
|
parsed_url = urlparse(url)
|
||||||
domain = (parsed_url.netloc or parsed_url.path).lower()
|
domain = (parsed_url.netloc or parsed_url.path).lower()
|
||||||
|
|
||||||
|
if "@" in domain:
|
||||||
|
return domain.split("@")[-1]
|
||||||
|
|
||||||
return domain
|
return domain
|
||||||
|
|||||||
Reference in New Issue
Block a user