#!/usr/bin/env python3 import argparse import json import os import re import sys import time from dataclasses import dataclass from typing import Any, Dict, Optional from http.cookiejar import CookieJar from urllib.parse import urlencode, quote from urllib.request import HTTPCookieProcessor, Request, build_opener, urlopen import bcrypt import yaml @dataclass class ExtractRule: type: str value: str @dataclass class ServiceConfig: name: str base_url: str current_version_url: Optional[str] current_version_extract: Optional[ExtractRule] current_version_headers: Optional[Dict[str, str]] upstream_latest_version_url: Optional[str] upstream_latest_extract: Optional[ExtractRule] upstream_latest_headers: Optional[Dict[str, str]] notes: Optional[str] login: Optional[Dict[str, Any]] def load_yaml(path: str) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as handle: return yaml.safe_load(handle) or {} def load_dotenv(path: str = ".env") -> None: if not os.path.exists(path): return try: with open(path, "r", encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip("\"").strip("'") if key: os.environ.setdefault(key, value) except OSError: return def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]: if not raw: return None rule_type = raw.get("type") value = raw.get("value") if not rule_type or not value: return None allowed = {"jsonpath", "jsonpath_join", "regex", "text", "header"} if rule_type not in allowed: raise ValueError(f"Unsupported extract rule type: {rule_type}") return ExtractRule(type=rule_type, value=value) def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]: services = config.get("services") if not isinstance(services, list): raise ValueError("Config must include a 'services' list") loaded: Dict[str, ServiceConfig] = {} for entry in services: if not isinstance(entry, dict): raise ValueError("Each service entry must be a mapping") name = entry.get("name") base_url = entry.get("base_url") if not name or not base_url: raise ValueError("Each service must include name and base_url") if not isinstance(name, str) or not isinstance(base_url, str): raise ValueError("Service name and base_url must be strings") current_headers = entry.get("current_version_headers") upstream_headers = entry.get("upstream_latest_headers") if current_headers is not None and not isinstance(current_headers, dict): raise ValueError("current_version_headers must be a mapping") if upstream_headers is not None and not isinstance(upstream_headers, dict): raise ValueError("upstream_latest_headers must be a mapping") current_url = entry.get("current_version_url") upstream_url = entry.get("upstream_latest_version_url") current_extract = parse_extract_rule(entry.get("current_version_extract")) upstream_extract = parse_extract_rule(entry.get("upstream_latest_extract")) if current_url and not current_extract: raise ValueError(f"Service {name} must define current_version_extract") if upstream_url and not upstream_extract: raise ValueError(f"Service {name} must define upstream_latest_extract") login = entry.get("login") if login is not None and not isinstance(login, dict): raise ValueError("login must be a mapping") loaded[name] = ServiceConfig( name=name, base_url=base_url, current_version_url=current_url, current_version_extract=current_extract, current_version_headers=current_headers, upstream_latest_version_url=upstream_url, upstream_latest_extract=upstream_extract, upstream_latest_headers=upstream_headers, notes=entry.get("notes"), login=login, ) return loaded def resolve_env_placeholders(value: str) -> str: env_pattern = re.compile(r"\{env:([A-Z0-9_]+)\}") def replace_env(match: re.Match[str]) -> str: env_name = match.group(1) env_value = os.getenv(env_name) if env_value is None: raise ValueError(f"Missing environment variable {env_name}") return env_value return env_pattern.sub(replace_env, value) def build_headers(raw_headers: Optional[Dict[str, str]]) -> Dict[str, str]: if not raw_headers: return {} resolved = {} for key, value in raw_headers.items(): resolved[key] = resolve_env_placeholders(str(value)) return resolved def extract_hidden_inputs(html: str) -> Dict[str, str]: hidden = {} for match in re.finditer( r"]+type=['\"]hidden['\"][^>]*>", html, flags=re.IGNORECASE, ): tag = match.group(0) name_match = re.search(r"name=['\"]([^'\"]+)['\"]", tag) value_match = re.search(r"value=['\"]([^'\"]*)['\"]", tag) if not name_match: continue hidden[name_match.group(1)] = value_match.group(1) if value_match else "" return hidden def fetch_response( url: str, timeout: float, user_agent: str, extra_headers: Optional[Dict[str, str]] = None, data: Optional[bytes] = None, method: str = "GET", ) -> tuple[str, Dict[str, str]]: headers = {"User-Agent": user_agent} if extra_headers: headers.update(extra_headers) token = os.getenv("GITHUB_TOKEN") if token and "api.github.com" in url: headers.setdefault("Authorization", f"Bearer {token}") request = Request(url, headers=headers, data=data, method=method) with urlopen(request, timeout=timeout) as response: body = response.read().decode("utf-8", errors="replace") response_headers = {k.lower(): v for k, v in response.headers.items()} return body, response_headers def extract_jsonpath(payload: Any, path: str) -> Optional[str]: if not path.startswith("$."): return None current = payload for part in path[2:].split("."): if isinstance(current, list): if not part.isdigit(): return None index = int(part) if index >= len(current): return None current = current[index] continue if not isinstance(current, dict): return None if part not in current: return None current = current[part] if current is None: return None return str(current) def extract_version( body: str, rule: ExtractRule, headers: Optional[Dict[str, str]] = None, ) -> Optional[str]: if rule.type == "jsonpath": try: payload = json.loads(body) except json.JSONDecodeError: return None return extract_jsonpath(payload, rule.value) if rule.type == "jsonpath_join": try: payload = json.loads(body) except json.JSONDecodeError: return None parts = [] for path in rule.value.split(","): value = extract_jsonpath(payload, path.strip()) if value is None: return None parts.append(value) return ".".join(parts) if rule.type == "header": if not headers: return None return headers.get(rule.value.lower()) if rule.type == "regex": import re match = re.search(rule.value, body) if not match: return None if match.lastindex: return match.group(1) return match.group(0) if rule.type == "text": return rule.value if rule.value in body else None return None def normalize_version(raw: Optional[str]) -> Optional[str]: if not raw: return None return raw.strip().lstrip("v") def parse_version(value: str) -> Optional[Dict[str, Any]]: match = re.match( r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+([0-9A-Za-z.-]+))?$", value, ) if not match: return None major, minor, patch, prerelease, _build = match.groups() prerelease_parts = None if prerelease: prerelease_parts = [] for part in prerelease.split("."): if part.isdigit(): prerelease_parts.append((True, int(part))) else: prerelease_parts.append((False, part)) return { "major": int(major), "minor": int(minor), "patch": int(patch), "prerelease": prerelease_parts, } def compare_prerelease(a: Optional[list], b: Optional[list]) -> int: if a is None and b is None: return 0 if a is None: return -1 if b is None: return 1 for left, right in zip(a, b): left_is_num, left_value = left right_is_num, right_value = right if left_is_num and right_is_num: if left_value != right_value: return 1 if left_value > right_value else -1 elif left_is_num != right_is_num: return -1 if left_is_num else 1 else: if left_value != right_value: return 1 if left_value > right_value else -1 if len(a) == len(b): return 0 return 1 if len(a) > len(b) else -1 def compare_versions(current: Optional[str], latest: Optional[str]) -> Optional[int]: if not current or not latest: return None current_norm = normalize_version(current) latest_norm = normalize_version(latest) if not current_norm or not latest_norm: return None current_version = parse_version(current_norm) latest_version = parse_version(latest_norm) if current_version is None or latest_version is None: return None current_tuple = (current_version["major"], current_version["minor"], current_version["patch"]) latest_tuple = (latest_version["major"], latest_version["minor"], latest_version["patch"]) if latest_tuple > current_tuple: return 1 if latest_tuple < current_tuple: return -1 return compare_prerelease(current_version["prerelease"], latest_version["prerelease"]) def build_upstream_fallback(url: Optional[str]) -> Optional[Dict[str, Any]]: if not url: return None if "api.github.com/repos/" in url and url.endswith("/releases/latest"): tag_url = url.replace("/releases/latest", "/tags") return { "url": tag_url, "extract": ExtractRule(type="jsonpath", value="$.0.name"), } if "codeberg.org/api/v1/repos/" in url and url.endswith("/releases/latest"): tag_url = url.replace("/releases/latest", "/tags") return { "url": tag_url, "extract": ExtractRule(type="jsonpath", value="$.0.name"), } return None def parse_set_cookie(headers: Dict[str, str]) -> Optional[str]: set_cookie = headers.get("set-cookie") if not set_cookie: return None return set_cookie.split(";", 1)[0] def authenticate_service(service: ServiceConfig, timeout: float, user_agent: str) -> Optional[str]: if not service.login: return None login_url = service.login.get("url") username_env = service.login.get("username_env") password_env = service.login.get("password_env") username_field = service.login.get("username_field", "username") password_field = service.login.get("password_field", "password") nonce_url = service.login.get("nonce_url") use_crypto = service.login.get("crypto", False) if not login_url or not username_env or not password_env: raise ValueError(f"Login config incomplete for {service.name}") username = os.getenv(username_env) password = os.getenv(password_env) if not username or not password: raise ValueError(f"Missing credentials for {service.name}") cookie_jar = CookieJar() opener = build_opener(HTTPCookieProcessor(cookie_jar)) login_request = Request(login_url, headers={"User-Agent": user_agent}) with opener.open(login_request, timeout=timeout) as response: login_page = response.read().decode("utf-8", errors="replace") hidden_fields = extract_hidden_inputs(login_page) payload_data = { username_field: username, **hidden_fields, } if use_crypto: if not nonce_url: raise ValueError(f"Missing nonce_url for {service.name}") nonce_request = Request( f"{nonce_url}{username}", headers={"User-Agent": user_agent}, ) with opener.open(nonce_request, timeout=timeout) as response: nonce_payload = json.loads(response.read().decode("utf-8", errors="replace")) salt1 = nonce_payload.get("salt1") nonce = nonce_payload.get("nonce") if not salt1 or not nonce: raise ValueError(f"Invalid nonce response for {service.name}") password_bytes = password.encode("utf-8") if len(password_bytes) > 72: password_bytes = password_bytes[:72] first_hash = bcrypt.hashpw(password_bytes, salt1.encode("utf-8")) combined = (nonce + first_hash.decode("utf-8")).encode("utf-8") if len(combined) > 72: combined = combined[:72] challenge = bcrypt.hashpw(combined, bcrypt.gensalt(rounds=4)) payload_data["challenge"] = challenge.decode("utf-8") else: payload_data[password_field] = password payload = urlencode(payload_data).encode("utf-8") headers = { "Content-Type": "application/x-www-form-urlencoded", "User-Agent": user_agent, } post_request = Request(login_url, data=payload, headers=headers, method="POST") opener.open(post_request, timeout=timeout) cookies = [f"{cookie.name}={cookie.value}" for cookie in cookie_jar] if not cookies: return None return "; ".join(cookies) def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Dict[str, Any]: result: Dict[str, Any] = { "name": service.name, "current": None, "latest": None, "current_error": None, "latest_error": None, "upstream_url": service.upstream_latest_version_url, } if service.current_version_url and service.current_version_extract: try: headers = build_headers(service.current_version_headers) if service.login: login_cookie = authenticate_service(service, timeout, user_agent) if login_cookie: headers.setdefault("Cookie", login_cookie) body, response_headers = fetch_response( service.current_version_url, timeout, user_agent, headers, ) result["current"] = extract_version( body, service.current_version_extract, response_headers, ) except Exception as exc: result["current_error"] = str(exc) if service.upstream_latest_version_url and service.upstream_latest_extract: headers = build_headers(service.upstream_latest_headers) try: body, response_headers = fetch_response( service.upstream_latest_version_url, timeout, user_agent, headers, ) result["latest"] = extract_version( body, service.upstream_latest_extract, response_headers, ) if result["latest"] is None: raise ValueError("Latest version extraction returned empty") except Exception as exc: fallback = build_upstream_fallback(service.upstream_latest_version_url) if fallback: try: body, response_headers = fetch_response( fallback["url"], timeout, user_agent, headers, ) result["latest"] = extract_version( body, fallback["extract"], response_headers, ) except Exception as fallback_exc: result["latest_error"] = str(fallback_exc) else: result["latest_error"] = str(exc) return result def build_summary(results: list[Dict[str, Any]]) -> tuple[str, bool]: updates = [] errors = [] for result in results: comparison = compare_versions(result["current"], result["latest"]) if comparison == 1: updates.append(result) if result["current_error"] or result["latest_error"]: errors.append(result) lines = [] if updates: lines.append("Updates available:") for result in updates: current = result["current"] or "unknown" latest = result["latest"] or "unknown" lines.append(f"- {result['name']}: {current} -> {latest}") if errors: lines.append("Errors:") for result in errors: if result["current_error"]: lines.append(f"- {result['name']}: current version error") if result["latest_error"]: lines.append(f"- {result['name']}: latest version error") if not lines: return "All services up to date.", False return "\n".join(lines), True def send_matrix_message(message: str, timeout: float) -> None: homeserver = os.getenv("MATRIX_HOMESERVER") room_id = os.getenv("MATRIX_ROOM_ID") token = os.getenv("MATRIX_ACCESS_TOKEN") if not homeserver or not room_id or not token: return join_matrix_room(homeserver, room_id, token, timeout) txn_id = str(int(time.time() * 1000)) url = f"{homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}" payload = json.dumps({ "msgtype": "m.text", "body": message, }).encode("utf-8") headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: fetch_response( url, timeout=timeout, user_agent="service-update-alerts", extra_headers=headers, data=payload, method="PUT", ) except Exception as exc: print(f"Matrix notification failed: {exc}", file=sys.stderr) def join_matrix_room(homeserver: str, room_id: str, token: str, timeout: float) -> None: encoded_room = quote(room_id, safe="") url = f"{homeserver}/_matrix/client/v3/join/{encoded_room}" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: fetch_response( url, timeout=timeout, user_agent="service-update-alerts", extra_headers=headers, data=b"{}", method="POST", ) except Exception: return def send_uptime_kuma_ping(timeout: float) -> None: push_url = os.getenv("UPTIME_KUMA_PUSH_URL") if not push_url: return try: fetch_response(push_url, timeout=timeout, user_agent="service-update-alerts") except Exception as exc: print(f"Uptime Kuma push failed: {exc}", file=sys.stderr) def main() -> int: parser = argparse.ArgumentParser(description="Check for webservice updates") parser.add_argument("--config", default="services.yaml", help="Path to services YAML") parser.add_argument("--all", action="store_true", help="Show all services") parser.add_argument("--timeout", type=float, default=10.0, help="HTTP timeout in seconds") parser.add_argument("--user-agent", default="service-update-alerts/1.0", help="HTTP user agent") args = parser.parse_args() load_dotenv() try: config = load_yaml(args.config) services = load_services(config) except Exception as exc: print(f"Failed to load config: {exc}", file=sys.stderr) return 1 results = [] for service in services.values(): results.append(check_service(service, args.timeout, args.user_agent)) output_lines = [] for result in sorted(results, key=lambda item: item["name"].lower()): comparison = compare_versions(result["current"], result["latest"]) has_update = comparison == 1 if not args.all and not has_update: continue current = result["current"] or "unknown" latest = result["latest"] or "unknown" notes = [] if comparison is None and result["current"] and result["latest"]: notes.append("unparseable") if result["current_error"]: notes.append("current error") if result["latest_error"]: notes.append("latest error") suffix = f" [{' '.join(notes)}]" if notes else "" output_lines.append(f"{result['name']}: {current} -> {latest}{suffix}") if output_lines: print("\n".join(output_lines)) for result in sorted(results, key=lambda item: item["name"].lower()): if result["current_error"]: print( f"{result['name']}: current version error: {result['current_error']}", file=sys.stderr, ) if result["latest_error"]: print( f"{result['name']}: latest version error: {result['latest_error']}", file=sys.stderr, ) summary, should_notify = build_summary(results) if should_notify: send_matrix_message(summary, args.timeout) send_uptime_kuma_ping(args.timeout) return 0 if __name__ == "__main__": raise SystemExit(main())