#!/usr/bin/env python3 import argparse import json import os import re import sys from dataclasses import dataclass from typing import Any, Dict, Optional from urllib.request import Request, urlopen import yaml @dataclass class ExtractRule: type: str value: str @dataclass class ServiceConfig: name: str base_url: str current_version_url: Optional[str] current_version_extract: Optional[ExtractRule] current_version_headers: Optional[Dict[str, str]] upstream_latest_version_url: Optional[str] upstream_latest_extract: Optional[ExtractRule] upstream_latest_headers: Optional[Dict[str, str]] notes: Optional[str] def load_yaml(path: str) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as handle: return yaml.safe_load(handle) or {} def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]: if not raw: return None rule_type = raw.get("type") value = raw.get("value") if not rule_type or not value: return None allowed = {"jsonpath", "regex", "text", "header"} if rule_type not in allowed: raise ValueError(f"Unsupported extract rule type: {rule_type}") return ExtractRule(type=rule_type, value=value) def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]: services = config.get("services") if not isinstance(services, list): raise ValueError("Config must include a 'services' list") loaded: Dict[str, ServiceConfig] = {} for entry in services: if not isinstance(entry, dict): raise ValueError("Each service entry must be a mapping") name = entry.get("name") base_url = entry.get("base_url") if not name or not base_url: raise ValueError("Each service must include name and base_url") if not isinstance(name, str) or not isinstance(base_url, str): raise ValueError("Service name and base_url must be strings") current_headers = entry.get("current_version_headers") upstream_headers = entry.get("upstream_latest_headers") if current_headers is not None and not isinstance(current_headers, dict): raise ValueError("current_version_headers must be a mapping") if upstream_headers is not None and not isinstance(upstream_headers, dict): raise ValueError("upstream_latest_headers must be a mapping") current_url = entry.get("current_version_url") upstream_url = entry.get("upstream_latest_version_url") current_extract = parse_extract_rule(entry.get("current_version_extract")) upstream_extract = parse_extract_rule(entry.get("upstream_latest_extract")) if current_url and not current_extract: raise ValueError(f"Service {name} must define current_version_extract") if upstream_url and not upstream_extract: raise ValueError(f"Service {name} must define upstream_latest_extract") loaded[name] = ServiceConfig( name=name, base_url=base_url, current_version_url=current_url, current_version_extract=current_extract, current_version_headers=current_headers, upstream_latest_version_url=upstream_url, upstream_latest_extract=upstream_extract, upstream_latest_headers=upstream_headers, notes=entry.get("notes"), ) return loaded def resolve_env_placeholders(value: str) -> str: pattern = re.compile(r"\{env:([A-Z0-9_]+)\}") def replace(match: re.Match[str]) -> str: env_name = match.group(1) env_value = os.getenv(env_name) if env_value is None: raise ValueError(f"Missing environment variable {env_name}") return env_value return pattern.sub(replace, value) def build_headers(raw_headers: Optional[Dict[str, str]]) -> Dict[str, str]: if not raw_headers: return {} resolved = {} for key, value in raw_headers.items(): resolved[key] = resolve_env_placeholders(str(value)) return resolved def fetch_response( url: str, timeout: float, user_agent: str, extra_headers: Optional[Dict[str, str]] = None, ) -> tuple[str, Dict[str, str]]: headers = {"User-Agent": user_agent} if extra_headers: headers.update(extra_headers) request = Request(url, headers=headers) with urlopen(request, timeout=timeout) as response: body = response.read().decode("utf-8", errors="replace") response_headers = {k.lower(): v for k, v in response.headers.items()} return body, response_headers def extract_jsonpath(payload: Any, path: str) -> Optional[str]: if not path.startswith("$."): return None current = payload for part in path[2:].split("."): if isinstance(current, list): if not part.isdigit(): return None index = int(part) if index >= len(current): return None current = current[index] continue if not isinstance(current, dict): return None if part not in current: return None current = current[part] if current is None: return None return str(current) def extract_version( body: str, rule: ExtractRule, headers: Optional[Dict[str, str]] = None, ) -> Optional[str]: if rule.type == "jsonpath": try: payload = json.loads(body) except json.JSONDecodeError: return None return extract_jsonpath(payload, rule.value) if rule.type == "header": if not headers: return None return headers.get(rule.value.lower()) if rule.type == "regex": import re match = re.search(rule.value, body) if not match: return None if match.lastindex: return match.group(1) return match.group(0) if rule.type == "text": return rule.value if rule.value in body else None return None def normalize_version(raw: Optional[str]) -> Optional[str]: if not raw: return None return raw.strip().lstrip("v") def parse_version(value: str) -> Optional[Dict[str, Any]]: match = re.match( r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+([0-9A-Za-z.-]+))?$", value, ) if not match: return None major, minor, patch, prerelease, _build = match.groups() prerelease_parts = None if prerelease: prerelease_parts = [] for part in prerelease.split("."): if part.isdigit(): prerelease_parts.append((True, int(part))) else: prerelease_parts.append((False, part)) return { "major": int(major), "minor": int(minor), "patch": int(patch), "prerelease": prerelease_parts, } def compare_prerelease(a: Optional[list], b: Optional[list]) -> int: if a is None and b is None: return 0 if a is None: return -1 if b is None: return 1 for left, right in zip(a, b): left_is_num, left_value = left right_is_num, right_value = right if left_is_num and right_is_num: if left_value != right_value: return 1 if left_value > right_value else -1 elif left_is_num != right_is_num: return -1 if left_is_num else 1 else: if left_value != right_value: return 1 if left_value > right_value else -1 if len(a) == len(b): return 0 return 1 if len(a) > len(b) else -1 def compare_versions(current: Optional[str], latest: Optional[str]) -> Optional[int]: if not current or not latest: return None current_norm = normalize_version(current) latest_norm = normalize_version(latest) if not current_norm or not latest_norm: return None current_version = parse_version(current_norm) latest_version = parse_version(latest_norm) if current_version is None or latest_version is None: return None current_tuple = (current_version["major"], current_version["minor"], current_version["patch"]) latest_tuple = (latest_version["major"], latest_version["minor"], latest_version["patch"]) if latest_tuple > current_tuple: return 1 if latest_tuple < current_tuple: return -1 return compare_prerelease(current_version["prerelease"], latest_version["prerelease"]) def build_upstream_fallback(url: Optional[str]) -> Optional[Dict[str, Any]]: if not url: return None if "api.github.com/repos/" in url and url.endswith("/releases/latest"): tag_url = url.replace("/releases/latest", "/tags") return { "url": tag_url, "extract": ExtractRule(type="jsonpath", value="$.0.name"), } if "codeberg.org/api/v1/repos/" in url and url.endswith("/releases/latest"): tag_url = url.replace("/releases/latest", "/tags") return { "url": tag_url, "extract": ExtractRule(type="jsonpath", value="$.0.name"), } return None def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Dict[str, Any]: result: Dict[str, Any] = { "name": service.name, "current": None, "latest": None, "current_error": None, "latest_error": None, "upstream_url": service.upstream_latest_version_url, } if service.current_version_url and service.current_version_extract: try: headers = build_headers(service.current_version_headers) body, response_headers = fetch_response( service.current_version_url, timeout, user_agent, headers, ) result["current"] = extract_version( body, service.current_version_extract, response_headers, ) except Exception as exc: result["current_error"] = str(exc) if service.upstream_latest_version_url and service.upstream_latest_extract: headers = build_headers(service.upstream_latest_headers) try: body, response_headers = fetch_response( service.upstream_latest_version_url, timeout, user_agent, headers, ) result["latest"] = extract_version( body, service.upstream_latest_extract, response_headers, ) if result["latest"] is None: raise ValueError("Latest version extraction returned empty") except Exception as exc: fallback = build_upstream_fallback(service.upstream_latest_version_url) if fallback: try: body, response_headers = fetch_response( fallback["url"], timeout, user_agent, headers, ) result["latest"] = extract_version( body, fallback["extract"], response_headers, ) except Exception as fallback_exc: result["latest_error"] = str(fallback_exc) else: result["latest_error"] = str(exc) return result def main() -> int: parser = argparse.ArgumentParser(description="Check for webservice updates") parser.add_argument("--config", default="services.yaml", help="Path to services YAML") parser.add_argument("--all", action="store_true", help="Show all services") parser.add_argument("--timeout", type=float, default=10.0, help="HTTP timeout in seconds") parser.add_argument("--user-agent", default="check-for-updates/1.0", help="HTTP user agent") args = parser.parse_args() try: config = load_yaml(args.config) services = load_services(config) except Exception as exc: print(f"Failed to load config: {exc}", file=sys.stderr) return 1 results = [] for service in services.values(): results.append(check_service(service, args.timeout, args.user_agent)) output_lines = [] for result in sorted(results, key=lambda item: item["name"].lower()): comparison = compare_versions(result["current"], result["latest"]) has_update = comparison == 1 if not args.all and not has_update: continue current = result["current"] or "unknown" latest = result["latest"] or "unknown" upstream = result["upstream_url"] or "unknown" notes = [] if comparison is None and result["current"] and result["latest"]: notes.append("unparseable") if result["current_error"]: notes.append("current error") if result["latest_error"]: notes.append("latest error") suffix = f" [{' '.join(notes)}]" if notes else "" output_lines.append(f"{result['name']}: {current} -> {latest} ({upstream}){suffix}") if output_lines: print("\n".join(output_lines)) for result in sorted(results, key=lambda item: item["name"].lower()): if result["current_error"]: print( f"{result['name']}: current version error: {result['current_error']}", file=sys.stderr, ) if result["latest_error"]: print( f"{result['name']}: latest version error: {result['latest_error']}", file=sys.stderr, ) return 0 if __name__ == "__main__": raise SystemExit(main())