Added comprehensive sanitization and refactored to use typed API response models.
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import re
|
||||
from urllib.parse import urlparse
|
||||
|
||||
# Path to the GetStatus API call on Owncast instances
|
||||
@@ -32,6 +33,12 @@ TEMPORARY_OFFLINE_NOTIFICATION_COOLDOWN = 7 * 60 # 7 minutes in seconds
|
||||
CLEANUP_WARNING_THRESHOLD = 83 * 24 * 60 # 119,520 cycles = 83 days
|
||||
CLEANUP_DELETE_THRESHOLD = 90 * 24 * 60 # 129,600 cycles = 90 days
|
||||
|
||||
# Maximum field lengths based on Owncast's configuration
|
||||
# Source: https://github.com/owncast/owncast/blob/master/web/utils/config-constants.tsx
|
||||
MAX_INSTANCE_TITLE_LENGTH = 255 # Server Name (line 81)
|
||||
MAX_STREAM_TITLE_LENGTH = 100 # Stream Title (line 91)
|
||||
MAX_TAG_LENGTH = 24 # Per tag (line 208)
|
||||
|
||||
|
||||
def should_query_stream(failure_counter: int) -> bool:
|
||||
"""
|
||||
@@ -60,23 +67,116 @@ def should_query_stream(failure_counter: int) -> bool:
|
||||
|
||||
def domainify(url: str) -> str:
|
||||
"""
|
||||
Take a given URL and convert it to just the domain.
|
||||
Extract and sanitize a domain from user input.
|
||||
|
||||
:param url: URL or domain string
|
||||
:return: Domain extracted from the URL
|
||||
Handles URLs, bare domains, and email-style input (user@domain).
|
||||
Only allows valid domain characters (alphanumeric, hyphens, periods).
|
||||
|
||||
:param url: URL, domain, or email-style string
|
||||
:return: Sanitized domain
|
||||
"""
|
||||
# Take whatever input the user provided and try to turn it into just the domain.
|
||||
# Examples:
|
||||
# "stream.logal.dev" -> "stream.logal.dev"
|
||||
# "https://stream.logal.dev" -> "stream.logal.dev"
|
||||
# "stream.logal.dev/embed/chat/readwrite" -> "stream.logal.dev"
|
||||
# "https://stream.logal.dev/abcdefghijklmno/123456789" -> "stream.logal.dev"
|
||||
# "notify@stream.logal.dev" -> "stream.logal.dev"
|
||||
# Handle email-style format first (e.g., "notify@stream.logal.dev")
|
||||
if "@" in url:
|
||||
url = url.split("@")[-1]
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
domain = (parsed_url.netloc or parsed_url.path).lower()
|
||||
# Prepend // if no scheme so urlparse treats input as netloc
|
||||
if not url.startswith(('http://', 'https://', '//')):
|
||||
url = '//' + url
|
||||
|
||||
if "@" in domain:
|
||||
return domain.split("@")[-1]
|
||||
parsed = urlparse(url)
|
||||
domain = (parsed.netloc or parsed.path).lower()
|
||||
|
||||
return domain
|
||||
# Strip port and path
|
||||
domain = domain.split(':')[0].split('/')[0]
|
||||
|
||||
# Allow only valid domain characters
|
||||
return re.sub(r'[^a-z0-9.-]', '', domain).strip('.-')
|
||||
|
||||
|
||||
def truncate(text: str, max_length: int) -> str:
|
||||
"""
|
||||
Truncate text to a maximum length.
|
||||
|
||||
:param text: The text to truncate
|
||||
:param max_length: Maximum allowed length
|
||||
:return: Truncated text, or original if within limit
|
||||
"""
|
||||
if not text or len(text) <= max_length:
|
||||
return text
|
||||
return text[:max_length]
|
||||
|
||||
|
||||
def escape_markdown(text: str) -> str:
|
||||
"""
|
||||
Escape Markdown special characters to prevent injection attacks.
|
||||
|
||||
This function sanitizes untrusted external input (like stream names and titles)
|
||||
before embedding them in Markdown-formatted messages. It prevents malicious
|
||||
actors from injecting arbitrary Markdown/HTML content.
|
||||
|
||||
:param text: The text to escape
|
||||
:return: The escaped text safe for Markdown rendering
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# Escape Markdown special characters by prefixing with backslash
|
||||
# Covers: formatting (*_~`), links ([]()), headings (#), lists (-+),
|
||||
# blockquotes (>), code blocks (```), and other special characters
|
||||
special_chars = {
|
||||
'\\': '\\\\', # Backslash must be first to avoid double-escaping
|
||||
'*': '\\*',
|
||||
'_': '\\_',
|
||||
'[': '\\[',
|
||||
']': '\\]',
|
||||
'(': '\\(',
|
||||
')': '\\)',
|
||||
'~': '\\~',
|
||||
'`': '\\`',
|
||||
'#': '\\#',
|
||||
'+': '\\+',
|
||||
'-': '\\-',
|
||||
'=': '\\=',
|
||||
'|': '\\|',
|
||||
'{': '\\{',
|
||||
'}': '\\}',
|
||||
'.': '\\.',
|
||||
'!': '\\!',
|
||||
'<': '\\<',
|
||||
'>': '\\>',
|
||||
'&': '\\&',
|
||||
}
|
||||
|
||||
escaped_text = text
|
||||
for char, replacement in special_chars.items():
|
||||
escaped_text = escaped_text.replace(char, replacement)
|
||||
|
||||
return escaped_text
|
||||
|
||||
|
||||
def sanitize_for_markdown(text: str) -> str:
|
||||
"""
|
||||
Sanitize text for safe Markdown rendering.
|
||||
|
||||
Removes newlines, normalizes whitespace, and escapes Markdown special characters.
|
||||
Use this for any untrusted external content before embedding in Markdown messages.
|
||||
|
||||
Note: This function does not truncate. Size limits should be enforced at the
|
||||
model layer (e.g., in from_api_response methods).
|
||||
|
||||
:param text: The text to sanitize
|
||||
:return: Sanitized and escaped text safe for Markdown rendering
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# Remove newlines and carriage returns to prevent multi-line injection
|
||||
sanitized = text.replace('\n', ' ').replace('\r', ' ')
|
||||
|
||||
# Collapse multiple spaces into single space
|
||||
sanitized = ' '.join(sanitized.split())
|
||||
|
||||
# Escape Markdown special characters
|
||||
sanitized = escape_markdown(sanitized)
|
||||
|
||||
return sanitized
|
||||
|
||||
Reference in New Issue
Block a user