Reformatted source code using Black.
This commit is contained in:
176
owncastsentry.py
176
owncastsentry.py
@@ -24,11 +24,15 @@ OWNCAST_STATUS_PATH = "/api/status"
|
||||
OWNCAST_CONFIG_PATH = "/api/config"
|
||||
|
||||
# User agent to send with all HTTP requests.
|
||||
USER_AGENT = "OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||
USER_AGENT = (
|
||||
"OwncastSentry/1.0.1 (bot; +https://git.logal.dev/LogalDeveloper/OwncastSentry)"
|
||||
)
|
||||
|
||||
|
||||
# ===== DATABASE MIGRATIONS =====
|
||||
upgrade_table = UpgradeTable()
|
||||
|
||||
|
||||
@upgrade_table.register(description="Initial revision")
|
||||
async def upgrade_v1(conn: Connection) -> None:
|
||||
"""
|
||||
@@ -57,17 +61,25 @@ async def upgrade_v1(conn: Connection) -> None:
|
||||
)"""
|
||||
)
|
||||
|
||||
|
||||
# ===== MAIN BOT CLASS =====
|
||||
class OwncastSentry(Plugin):
|
||||
# Helper variables for setting up a special HTTP ClientSession for monitoring streams.
|
||||
headers = {"User-Agent": USER_AGENT} # Override User Agent to special static value to identify to servers our purpose.
|
||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
||||
baseconnctor = aiohttp.TCPConnector(use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
||||
timeouts = aiohttp.ClientTimeout(sock_connect=5, sock_read=5) # Tighten up timeouts for faster responses to users in case of broken servers.
|
||||
headers = {
|
||||
"User-Agent": USER_AGENT
|
||||
} # Override User Agent to special static value to identify to servers our purpose.
|
||||
cookie_jar = aiohttp.DummyCookieJar() # Ignore cookies.
|
||||
baseconnctor = aiohttp.TCPConnector(
|
||||
use_dns_cache=False, limit=1000, limit_per_host=1, keepalive_timeout=120
|
||||
) # Set up timeouts to keep one long-lived connection open per monitored stream.
|
||||
timeouts = aiohttp.ClientTimeout(
|
||||
sock_connect=5, sock_read=5
|
||||
) # Tighten up timeouts for faster responses to users in case of broken servers.
|
||||
|
||||
# Final HTTP ClientSession for all requests to Owncast instances.
|
||||
session = aiohttp.ClientSession(headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor)
|
||||
|
||||
session = aiohttp.ClientSession(
|
||||
headers=headers, cookie_jar=cookie_jar, timeout=timeouts, connector=baseconnctor
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_db_upgrade_table(cls) -> UpgradeTable | None:
|
||||
@@ -78,7 +90,6 @@ class OwncastSentry(Plugin):
|
||||
"""
|
||||
return upgrade_table
|
||||
|
||||
|
||||
async def start(self) -> None:
|
||||
"""
|
||||
Method called by Maubot upon startup of the instance.
|
||||
@@ -88,7 +99,6 @@ class OwncastSentry(Plugin):
|
||||
"""
|
||||
self.sched.run_periodically(60, self.update_all_stream_states)
|
||||
|
||||
|
||||
@command.new(help="Subscribes to a new Owncast stream.")
|
||||
@command.argument("url")
|
||||
async def subscribe(self, evt: MessageEvent, url: str) -> None:
|
||||
@@ -114,7 +124,9 @@ class OwncastSentry(Plugin):
|
||||
stream_state = await self.get_stream_state(stream_domain)
|
||||
if len(stream_state) == 0:
|
||||
# The stream state fetch returned nothing. Probably not an Owncast stream.
|
||||
await evt.reply("The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline.")
|
||||
await evt.reply(
|
||||
"The URL you supplied does not appear to be a valid Owncast instance. You may have specified an invalid domain, or the instance is offline."
|
||||
)
|
||||
return
|
||||
|
||||
# Try to add a new subscription for the requested stream domain in the room the command was executed in.
|
||||
@@ -126,11 +138,17 @@ class OwncastSentry(Plugin):
|
||||
# Something weird happened... Was it due to attempting to insert a duplicate row?
|
||||
if "UNIQUE constraint failed" in exception.args[0]:
|
||||
# Yes, this is an expected condition. Tell the user the room is already subscribed and give up.
|
||||
await evt.reply("This room is already subscribed to notifications for " + stream_domain + ".")
|
||||
await evt.reply(
|
||||
"This room is already subscribed to notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Nope... Something unexpected happened. Give up.
|
||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}")
|
||||
self.log.error(
|
||||
f"[{stream_domain}] An error occurred while attempting to add subscription in room {evt.room_id}: {e}"
|
||||
)
|
||||
raise exception
|
||||
|
||||
# The subscription was successfully added! Try to add a placeholder row for the stream's state in the streams table.
|
||||
@@ -146,13 +164,18 @@ class OwncastSentry(Plugin):
|
||||
# Attempts to add rows for streams already known is an expected condition. What is anything except that?
|
||||
if "UNIQUE constraint failed" not in exception.args[0]:
|
||||
# Something unexpected happened. Give up.
|
||||
self.log.error(f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}")
|
||||
self.log.error(
|
||||
f"[{stream_domain}] An error occurred while attempting to add stream information after adding subscription: {e}"
|
||||
)
|
||||
raise exception
|
||||
|
||||
# All went well! We added a new subscription and (at least tried) to add a row for the stream state. Tell the user.
|
||||
self.log.info(f"[{stream_domain}] Subscription added for room {evt.room_id}.")
|
||||
await evt.reply("Subscription added! This room will receive notifications when " + stream_domain + " goes live.")
|
||||
|
||||
await evt.reply(
|
||||
"Subscription added! This room will receive notifications when "
|
||||
+ stream_domain
|
||||
+ " goes live."
|
||||
)
|
||||
|
||||
@command.new(help="Unsubscribes from an Owncast stream.")
|
||||
@command.argument("url")
|
||||
@@ -175,15 +198,28 @@ class OwncastSentry(Plugin):
|
||||
# Did it work?
|
||||
if result.rowcount == 1:
|
||||
# Yes, one row was deleted. Tell the user.
|
||||
self.log.info(f"[{stream_domain}] Subscription removed for room {evt.room_id}.")
|
||||
await evt.reply("Subscription removed! This room will no longer receive notifications for " + stream_domain + ".")
|
||||
self.log.info(
|
||||
f"[{stream_domain}] Subscription removed for room {evt.room_id}."
|
||||
)
|
||||
await evt.reply(
|
||||
"Subscription removed! This room will no longer receive notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
elif result.rowcount == 0:
|
||||
# No, nothing changed. Tell the user.
|
||||
await evt.reply("This room is already not subscribed to notifications for " + stream_domain + ".")
|
||||
await evt.reply(
|
||||
"This room is already not subscribed to notifications for "
|
||||
+ stream_domain
|
||||
+ "."
|
||||
)
|
||||
else:
|
||||
# Somehow more than 1 (or even less than 0 ???) rows were changed... Log it!
|
||||
self.log.error("Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got " + result.rowcount + " instead. Something very bad may have happened!!!!")
|
||||
|
||||
self.log.error(
|
||||
"Encountered strange situation! Expected 0 or 1 rows on DELETE query for removing subscription; got "
|
||||
+ result.rowcount
|
||||
+ " instead. Something very bad may have happened!!!!"
|
||||
)
|
||||
|
||||
async def update_all_stream_states(self) -> None:
|
||||
"""
|
||||
@@ -210,7 +246,6 @@ class OwncastSentry(Plugin):
|
||||
await asyncio.gather(*tasks)
|
||||
self.log.debug("Update complete.")
|
||||
|
||||
|
||||
async def update_stream_state(self, domain: str) -> None:
|
||||
"""
|
||||
Updates the state of a given stream domain and sends notifications to subscribed Matrix rooms if it goes live.
|
||||
@@ -236,21 +271,36 @@ class OwncastSentry(Plugin):
|
||||
old_state = await connection.fetchrow(query, domain)
|
||||
|
||||
# Does the last known stream state not have a value for the last connect and disconnect time?
|
||||
if old_state["last_connect_time"] is None and old_state["last_disconnect_time"] is None:
|
||||
if (
|
||||
old_state["last_connect_time"] is None
|
||||
and old_state["last_disconnect_time"] is None
|
||||
):
|
||||
# Yes, this is the first update. Don't send any notifications.
|
||||
send_notifications = False
|
||||
|
||||
# Does the latest stream state have a last connect time and the old state not have one?
|
||||
if new_state["lastConnectTime"] is not None and old_state["last_connect_time"] is None:
|
||||
if (
|
||||
new_state["lastConnectTime"] is not None
|
||||
and old_state["last_connect_time"] is None
|
||||
):
|
||||
# Yes! This stream is now live. Send notifications and log it, if allowed.
|
||||
if send_notifications:
|
||||
self.log.info(f"[{domain}] Stream is now live! Notifying subscribed rooms...")
|
||||
await self.notify_rooms_of_stream_online(domain, new_state["streamTitle"])
|
||||
self.log.info(
|
||||
f"[{domain}] Stream is now live! Notifying subscribed rooms..."
|
||||
)
|
||||
await self.notify_rooms_of_stream_online(
|
||||
domain, new_state["streamTitle"]
|
||||
)
|
||||
else:
|
||||
self.log.info(f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms.")
|
||||
self.log.info(
|
||||
f"[{domain}] Stream is live, but performed first update. WIll not notify subscribed rooms."
|
||||
)
|
||||
|
||||
# Does the latest stream state no longer have a last connect time but the old state does?
|
||||
elif new_state["lastConnectTime"] is None and old_state["last_connect_time"] is not None:
|
||||
elif (
|
||||
new_state["lastConnectTime"] is None
|
||||
and old_state["last_connect_time"] is not None
|
||||
):
|
||||
# Yep. This stream is now offline. Log it.
|
||||
if send_notifications:
|
||||
self.log.info(f"[{domain}] Stream is now offline.")
|
||||
@@ -261,12 +311,17 @@ class OwncastSentry(Plugin):
|
||||
# TODO: Only update the database if a change actually occurred. This is probably generating useless writes.
|
||||
update_query = "UPDATE streams SET name=$1, last_connect_time=$2, last_disconnect_time=$3 WHERE domain=$4"
|
||||
async with self.database.acquire() as connection:
|
||||
await connection.execute(update_query, new_state["streamTitle"], new_state["lastConnectTime"], new_state["lastDisconnectTime"], domain)
|
||||
await connection.execute(
|
||||
update_query,
|
||||
new_state["streamTitle"],
|
||||
new_state["lastConnectTime"],
|
||||
new_state["lastDisconnectTime"],
|
||||
domain,
|
||||
)
|
||||
|
||||
# All done.
|
||||
self.log.debug(f"[{domain}] State update completed.")
|
||||
|
||||
|
||||
async def notify_rooms_of_stream_online(self, domain: str, title: str) -> None:
|
||||
"""
|
||||
Sends notifications to rooms with subscriptions to the provided stream domain.
|
||||
@@ -310,19 +365,24 @@ class OwncastSentry(Plugin):
|
||||
room_id = rows["room_id"]
|
||||
try:
|
||||
# Try and send a message.
|
||||
content = TextMessageEventContent(msgtype=MessageType.TEXT, body=body_text)
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT, body=body_text
|
||||
)
|
||||
await self.client.send_message(room_id, content)
|
||||
|
||||
# It worked! Increment the counter.
|
||||
successful_notifications += 1
|
||||
except Exception as exception:
|
||||
# Something didn't work. Log it and move on to the next one.
|
||||
self.log.warning(f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}")
|
||||
failed_notifications +=1
|
||||
self.log.warning(
|
||||
f"[{domain}] Failed to send notification message to room [{room_id}]: {exception}"
|
||||
)
|
||||
failed_notifications += 1
|
||||
|
||||
# All done!
|
||||
self.log.info(f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed.")
|
||||
|
||||
self.log.info(
|
||||
f"[{domain}] Completed sending notifications! {successful_notifications} succeeded, {failed_notifications} failed."
|
||||
)
|
||||
|
||||
# ========== HELPER METHODS ==========
|
||||
async def get_stream_state(self, domain):
|
||||
@@ -340,36 +400,52 @@ class OwncastSentry(Plugin):
|
||||
|
||||
# Make a request to the endpoint.
|
||||
try:
|
||||
response = await self.session.request("GET", status_url, allow_redirects=False)
|
||||
response = await self.session.request(
|
||||
"GET", status_url, allow_redirects=False
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Check the response code is success
|
||||
# TODO: Handle 429 rate limiting?
|
||||
if response.status != 200:
|
||||
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||
)
|
||||
return {}
|
||||
|
||||
# Try and interpret the response as JSON.
|
||||
try:
|
||||
new_state = json.loads(await response.read())
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Validate the response to ensure it contains all the basic info needed to function.
|
||||
if "lastConnectTime" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last connect time parameter."
|
||||
)
|
||||
return {}
|
||||
elif "lastDisconnectTime" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have last disconnect time parameter."
|
||||
)
|
||||
return {}
|
||||
elif "streamTitle" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have stream title parameter."
|
||||
)
|
||||
return {}
|
||||
elif "online" not in new_state:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as it does not have online status parameter."
|
||||
)
|
||||
return {}
|
||||
|
||||
return new_state
|
||||
@@ -389,22 +465,30 @@ class OwncastSentry(Plugin):
|
||||
|
||||
# Make a request to the endpoint.
|
||||
try:
|
||||
response = await self.session.request("GET", status_url, allow_redirects=False)
|
||||
response = await self.session.request(
|
||||
"GET", status_url, allow_redirects=False
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Error making GET request to {OWNCAST_STATUS_PATH}: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
# Check the response code is success
|
||||
# TODO: Handle 429 rate limiting?
|
||||
if response.status != 200:
|
||||
self.log.warning(f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead.")
|
||||
self.log.warning(
|
||||
f"[{domain}] Response to request on {OWNCAST_STATUS_PATH} was not 200, got {response.status} instead."
|
||||
)
|
||||
return {}
|
||||
|
||||
# Try and interpret the response as JSON.
|
||||
try:
|
||||
config = json.loads(await response.read())
|
||||
except Exception as e:
|
||||
self.log.warning(f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}")
|
||||
self.log.warning(
|
||||
f"[{domain}] Rejecting response to request on {OWNCAST_STATUS_PATH} as could not be interpreted as JSON: {e}"
|
||||
)
|
||||
return {}
|
||||
|
||||
return config
|
||||
|
||||
Reference in New Issue
Block a user