diff --git a/owncastsentry/commands.py b/owncastsentry/commands.py index fd7da95..5034f9c 100644 --- a/owncastsentry/commands.py +++ b/owncastsentry/commands.py @@ -11,7 +11,8 @@ from mautrix.types import TextMessageEventContent, MessageType from .owncast_client import OwncastClient from .database import StreamRepository, SubscriptionRepository -from .utils import domainify +from .models import StreamStatus +from .utils import domainify, sanitize_for_markdown class CommandHandler: @@ -203,22 +204,27 @@ class CommandHandler: # Determine stream name (use domain as fallback) stream_name = stream_state.name if stream_state.name else domain + safe_stream_name = sanitize_for_markdown(stream_name) # Start building this stream's entry with stream name as main bullet - body_text += f"- **{stream_name}** \n" + body_text += f"- **{safe_stream_name}** \n" # Add title if stream is online (as a sub-bullet) - if stream_state.online and stream_state.title: - body_text += f" - Title: {stream_state.title} \n" + if stream_state.status == StreamStatus.ONLINE and stream_state.title: + safe_title = sanitize_for_markdown(stream_state.title) + body_text += f" - Title: {safe_title} \n" # Determine status and duration (as a sub-bullet) - if stream_state.online: + if stream_state.status == StreamStatus.ONLINE: # Stream is online - use last_connect_time if stream_state.last_connect_time: duration = self._format_duration(stream_state.last_connect_time) body_text += f" - Status: Online for {duration} \n" else: body_text += f" - Status: Online \n" + elif stream_state.status == StreamStatus.UNKNOWN: + # Stream status is unknown - instance unreachable + body_text += f" - Status: Unknown (instance unreachable) \n" else: # Stream is offline - use last_disconnect_time if stream_state.last_disconnect_time: @@ -253,11 +259,11 @@ class CommandHandler: await evt.reply("This room is not subscribed to any Owncast instances.\n\nTo subscribe to an Owncast instance, use `!subscribe `", markdown=True) return - # Filter for only live streams + # Filter for only live streams (exclude unknown status) live_streams = [] for domain in subscribed_domains: stream_state = await self.stream_repo.get_by_domain(domain) - if stream_state and stream_state.online: + if stream_state and stream_state.status == StreamStatus.ONLINE: live_streams.append((domain, stream_state)) # Check if there are no live streams @@ -271,13 +277,15 @@ class CommandHandler: for domain, stream_state in live_streams: # Determine stream name (use domain as fallback) stream_name = stream_state.name if stream_state.name else domain + safe_stream_name = sanitize_for_markdown(stream_name) # Start building this stream's entry with stream name as main bullet - body_text += f"- **{stream_name}** \n" + body_text += f"- **{safe_stream_name}** \n" # Add title (should be present for live streams) if stream_state.title: - body_text += f" - Title: {stream_state.title} \n" + safe_title = sanitize_for_markdown(stream_state.title) + body_text += f" - Title: {safe_title} \n" # Add status with duration if stream_state.last_connect_time: diff --git a/owncastsentry/models.py b/owncastsentry/models.py index c4da93c..60d0cf4 100644 --- a/owncastsentry/models.py +++ b/owncastsentry/models.py @@ -5,9 +5,18 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. from dataclasses import dataclass +from enum import Enum from typing import Optional, List +class StreamStatus(Enum): + """Represents the status of a stream.""" + + ONLINE = "online" + OFFLINE = "offline" + UNKNOWN = "unknown" + + @dataclass class StreamState: """Represents the state of an Owncast stream.""" @@ -20,9 +29,34 @@ class StreamState: failure_counter: int = 0 @property - def online(self) -> bool: - """Returns True if the stream is currently online.""" - return self.last_connect_time is not None + def status(self) -> StreamStatus: + """Returns the stream status considering failure_counter.""" + from .utils import UNKNOWN_STATUS_THRESHOLD + + if self.failure_counter > UNKNOWN_STATUS_THRESHOLD: + return StreamStatus.UNKNOWN + elif self.last_connect_time is not None: + return StreamStatus.ONLINE + else: + return StreamStatus.OFFLINE + + @classmethod + def from_api_response(cls, response: dict, domain: str) -> "StreamState": + """ + Creates a StreamState from an API response. + + :param response: API response as a dictionary (camelCase keys) + :param domain: The stream domain + :return: StreamState instance + """ + from .utils import truncate, MAX_STREAM_TITLE_LENGTH + + return cls( + domain=domain, + title=truncate(response.get("streamTitle", ""), MAX_STREAM_TITLE_LENGTH), + last_connect_time=response.get("lastConnectTime"), + last_disconnect_time=response.get("lastDisconnectTime"), + ) @classmethod def from_db_row(cls, row: dict) -> "StreamState": @@ -62,4 +96,13 @@ class StreamConfig: :param response: API response as a dictionary :return: StreamConfig instance """ - return cls(name=response.get("name", ""), tags=response.get("tags", [])) + from .utils import truncate, MAX_INSTANCE_TITLE_LENGTH, MAX_TAG_LENGTH + + # Truncate instance name to max length + name = truncate(response.get("name", ""), MAX_INSTANCE_TITLE_LENGTH) + + # Truncate each tag to max length + raw_tags = response.get("tags", []) + tags = [truncate(tag, MAX_TAG_LENGTH) for tag in raw_tags] + + return cls(name=name, tags=tags) diff --git a/owncastsentry/notification_service.py b/owncastsentry/notification_service.py index de71039..24c10aa 100644 --- a/owncastsentry/notification_service.py +++ b/owncastsentry/notification_service.py @@ -11,7 +11,7 @@ from typing import List from mautrix.types import TextMessageEventContent, MessageType from .database import SubscriptionRepository -from .utils import SECONDS_BETWEEN_NOTIFICATIONS +from .utils import SECONDS_BETWEEN_NOTIFICATIONS, sanitize_for_markdown class NotificationService: @@ -130,24 +130,33 @@ class NotificationService: """ # Use name if available, fallback to domain stream_name = name if name else domain + safe_stream_name = sanitize_for_markdown(stream_name) # Choose message based on notification type if title_change: - body_text = "📝 " + stream_name + " has changed its stream title!" + body_text = "📝 " + safe_stream_name + " has changed its stream title!" else: - body_text = "🎥 " + stream_name + " is now live!" + body_text = "🎥 " + safe_stream_name + " is now live!" # Add title if present if title != "": - body_text += "\nStream Title: " + title + safe_title = sanitize_for_markdown(title) + body_text += "\nStream Title: " + safe_title # Add stream URL body_text += "\n\nTo tune in, visit: https://" + domain + "/" # Add tags if present if tags and len(tags) > 0: - body_text += "\n\n" - body_text += " ".join("#" + tag for tag in tags) + safe_tags = [] + for tag in tags: + safe_tag = sanitize_for_markdown(tag) + if safe_tag and not safe_tag.startswith('.'): + safe_tags.append(safe_tag) + + if safe_tags: + body_text += "\n\n" + body_text += " ".join("#" + tag for tag in safe_tags) return body_text diff --git a/owncastsentry/owncast_client.py b/owncastsentry/owncast_client.py index 45fc192..b4c0732 100644 --- a/owncastsentry/owncast_client.py +++ b/owncastsentry/owncast_client.py @@ -8,7 +8,7 @@ import aiohttp import json from typing import Optional -from .models import StreamConfig +from .models import StreamConfig, StreamState from .utils import OWNCAST_STATUS_PATH, OWNCAST_CONFIG_PATH, USER_AGENT @@ -35,13 +35,13 @@ class OwncastClient: headers=headers, cookie_jar=cookie_jar, timeout=timeout, connector=connector ) - async def get_stream_state(self, domain: str) -> Optional[dict]: + async def get_stream_state(self, domain: str) -> Optional[StreamState]: """ Get the current stream state for a given domain. HTTPS on port 443 is assumed, no other protocols or ports are supported. :param domain: The domain (not URL) where the stream is hosted. - :return: A dictionary containing stream state if available, None if an error occurred. + :return: A StreamState with stream state if available, None if an error occurred. """ self.log.debug(f"[{domain}] Fetching current stream state...") status_url = "https://" + domain + OWNCAST_STATUS_PATH @@ -87,7 +87,7 @@ class OwncastClient: ) return None - return new_state + return StreamState.from_api_response(new_state, domain) async def get_stream_config(self, domain: str) -> Optional[StreamConfig]: """ diff --git a/owncastsentry/stream_monitor.py b/owncastsentry/stream_monitor.py index 84e552e..b6e9bef 100644 --- a/owncastsentry/stream_monitor.py +++ b/owncastsentry/stream_monitor.py @@ -98,10 +98,10 @@ class StreamMonitor: stream_config = None # Fetch the latest stream state from the server - new_state_dict = await self.owncast_client.get_stream_state(domain) + new_state = await self.owncast_client.get_stream_state(domain) # If the fetch failed, increment failure counter and skip the update - if new_state_dict is None: + if new_state is None: await self.stream_repo.increment_failure_counter(domain) self.log.warning( f"[{domain}] Connection failure (counter={failure_counter + 1})" @@ -130,7 +130,7 @@ class StreamMonitor: # Does the latest stream state have a last connect time and the old state not have one? if ( - new_state_dict["lastConnectTime"] is not None + new_state.last_connect_time is not None and old_state.last_connect_time is None ): # Yes! This stream is now live. @@ -153,12 +153,12 @@ class StreamMonitor: # Yes. Has this stream been offline for a short amount of time? if seconds_since_last_offline < TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN: # Yes. Did the stream title change? - if old_state.title != new_state_dict["streamTitle"]: + if old_state.title != new_state.title: # Yes. The stream was only down for a short time, send a special notification indicating the stream changed its name. await self.notification_service.notify_stream_live( domain, stream_name, - new_state_dict["streamTitle"], + new_state.title, stream_tags, title_change=True, ) @@ -172,7 +172,7 @@ class StreamMonitor: await self.notification_service.notify_stream_live( domain, stream_name, - new_state_dict["streamTitle"], + new_state.title, stream_tags, title_change=False, ) @@ -183,11 +183,11 @@ class StreamMonitor: ) if ( - new_state_dict["lastConnectTime"] is not None + new_state.last_connect_time is not None and old_state.last_connect_time is not None ): # Did the stream title change mid-session? - if old_state.title != new_state_dict["streamTitle"]: + if old_state.title != new_state.title: self.log.info(f"[{domain}] Stream title was changed!") update_database = True stream_config = await self.owncast_client.get_stream_config(domain) @@ -208,7 +208,7 @@ class StreamMonitor: await self.notification_service.notify_stream_live( domain, stream_name, - new_state_dict["streamTitle"], + new_state.title, stream_tags, title_change=False, ) @@ -217,14 +217,14 @@ class StreamMonitor: await self.notification_service.notify_stream_live( domain, stream_name, - new_state_dict["streamTitle"], + new_state.title, stream_tags, title_change=True, ) # Does the latest stream state no longer have a last connect time but the old state does? elif ( - new_state_dict["lastConnectTime"] is None + new_state.last_connect_time is None and old_state.last_connect_time is not None ): # Yep. This stream is now offline. Log it. @@ -246,13 +246,13 @@ class StreamMonitor: self.log.debug(f"[{domain}] Updating stream state in database...") - # Create updated state object + # Create updated state object (title already truncated in new_state) updated_state = StreamState( domain=domain, name=stream_name, - title=new_state_dict["streamTitle"], - last_connect_time=new_state_dict["lastConnectTime"], - last_disconnect_time=new_state_dict["lastDisconnectTime"], + title=new_state.title, + last_connect_time=new_state.last_connect_time, + last_disconnect_time=new_state.last_disconnect_time, ) await self.stream_repo.update(updated_state) diff --git a/owncastsentry/utils.py b/owncastsentry/utils.py index 67fa304..a94acd8 100644 --- a/owncastsentry/utils.py +++ b/owncastsentry/utils.py @@ -4,6 +4,7 @@ # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. +import re from urllib.parse import urlparse # Path to the GetStatus API call on Owncast instances @@ -32,6 +33,15 @@ TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days +# Failure counter threshold for treating stream status as "unknown" +UNKNOWN_STATUS_THRESHOLD = 15 + +# Maximum field lengths based on Owncast's configuration +# Source: https://github.com/owncast/owncast/blob/master/web/utils/config-constants.tsx +MAX_INSTANCE_TITLE_LENGTH = 255 # Server Name (line 81) +MAX_STREAM_TITLE_LENGTH = 100 # Stream Title (line 91) +MAX_TAG_LENGTH = 24 # Per tag (line 208) + def should_query_stream(failure_counter: int) -> bool: """ @@ -60,23 +70,116 @@ def should_query_stream(failure_counter: int) -> bool: def domainify(url: str) -> str: """ - Take a given URL and convert it to just the domain. + Extract and sanitize a domain from user input. - :param url: URL or domain string - :return: Domain extracted from the URL + Handles URLs, bare domains, and email-style input (user@domain). + Only allows valid domain characters (alphanumeric, hyphens, periods). + + :param url: URL, domain, or email-style string + :return: Sanitized domain """ - # Take whatever input the user provided and try to turn it into just the domain. - # Examples: - # "stream.logal.dev" -> "stream.logal.dev" - # "https://stream.logal.dev" -> "stream.logal.dev" - # "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev" - # "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev" - # "notify@stream.logal.dev" -> "stream.logal.dev" + # Handle email-style format first (e.g., "notify@stream.logal.dev") + if "@" in url: + url = url.split("@")[-1] - parsed_url = urlparse(url) - domain = (parsed_url.netloc or parsed_url.path).lower() + # Prepend // if no scheme so urlparse treats input as netloc + if not url.startswith(('http://', 'https://', '//')): + url = '//' + url - if "@" in domain: - return domain.split("@")[-1] + parsed = urlparse(url) + domain = (parsed.netloc or parsed.path).lower() - return domain + # Strip port and path + domain = domain.split(':')[0].split('/')[0] + + # Allow only valid domain characters + return re.sub(r'[^a-z0-9.-]', '', domain).strip('.-') + + +def truncate(text: str, max_length: int) -> str: + """ + Truncate text to a maximum length. + + :param text: The text to truncate + :param max_length: Maximum allowed length + :return: Truncated text, or original if within limit + """ + if not text or len(text) <= max_length: + return text + return text[:max_length] + + +def escape_markdown(text: str) -> str: + """ + Escape Markdown special characters to prevent injection attacks. + + This function sanitizes untrusted external input (like stream names and titles) + before embedding them in Markdown-formatted messages. It prevents malicious + actors from injecting arbitrary Markdown/HTML content. + + :param text: The text to escape + :return: The escaped text safe for Markdown rendering + """ + if not text: + return text + + # Escape Markdown special characters by prefixing with backslash + # Covers: formatting (*_~`), links ([]()), headings (#), lists (-+), + # blockquotes (>), code blocks (```), and other special characters + special_chars = { + '\\': '\\\\', # Backslash must be first to avoid double-escaping + '*': '\\*', + '_': '\\_', + '[': '\\[', + ']': '\\]', + '(': '\\(', + ')': '\\)', + '~': '\\~', + '`': '\\`', + '#': '\\#', + '+': '\\+', + '-': '\\-', + '=': '\\=', + '|': '\\|', + '{': '\\{', + '}': '\\}', + '.': '\\.', + '!': '\\!', + '<': '\\<', + '>': '\\>', + '&': '\\&', + } + + escaped_text = text + for char, replacement in special_chars.items(): + escaped_text = escaped_text.replace(char, replacement) + + return escaped_text + + +def sanitize_for_markdown(text: str) -> str: + """ + Sanitize text for safe Markdown rendering. + + Removes newlines, normalizes whitespace, and escapes Markdown special characters. + Use this for any untrusted external content before embedding in Markdown messages. + + Note: This function does not truncate. Size limits should be enforced at the + model layer (e.g., in from_api_response methods). + + :param text: The text to sanitize + :return: Sanitized and escaped text safe for Markdown rendering + """ + if not text: + return text + + # Remove newlines and carriage returns to prevent multi-line injection + sanitized = text.replace('\n', ' ').replace('\r', ' ') + + # Collapse multiple spaces into single space + sanitized = ' '.join(sanitized.split()) + + # Escape Markdown special characters + sanitized = escape_markdown(sanitized) + + return sanitized