service-update-alerts/check_updates.py
Jeena 501a84e0a9 chore: Rename project to service-update-alerts
Update project metadata, docs, and systemd unit names to match the new service-update-alerts naming.
2026-03-12 15:50:49 +00:00

640 lines
22 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import re
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
from http.cookiejar import CookieJar
from urllib.parse import urlencode, quote
from urllib.request import HTTPCookieProcessor, Request, build_opener, urlopen
import bcrypt
import yaml
@dataclass
class ExtractRule:
type: str
value: str
@dataclass
class ServiceConfig:
name: str
base_url: str
current_version_url: Optional[str]
current_version_extract: Optional[ExtractRule]
current_version_headers: Optional[Dict[str, str]]
upstream_latest_version_url: Optional[str]
upstream_latest_extract: Optional[ExtractRule]
upstream_latest_headers: Optional[Dict[str, str]]
notes: Optional[str]
login: Optional[Dict[str, Any]]
def load_yaml(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
def load_dotenv(path: str = ".env") -> None:
if not os.path.exists(path):
return
try:
with open(path, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("\"").strip("'")
if key:
os.environ.setdefault(key, value)
except OSError:
return
def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]:
if not raw:
return None
rule_type = raw.get("type")
value = raw.get("value")
if not rule_type or not value:
return None
allowed = {"jsonpath", "jsonpath_join", "regex", "text", "header"}
if rule_type not in allowed:
raise ValueError(f"Unsupported extract rule type: {rule_type}")
return ExtractRule(type=rule_type, value=value)
def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]:
services = config.get("services")
if not isinstance(services, list):
raise ValueError("Config must include a 'services' list")
loaded: Dict[str, ServiceConfig] = {}
for entry in services:
if not isinstance(entry, dict):
raise ValueError("Each service entry must be a mapping")
name = entry.get("name")
base_url = entry.get("base_url")
if not name or not base_url:
raise ValueError("Each service must include name and base_url")
if not isinstance(name, str) or not isinstance(base_url, str):
raise ValueError("Service name and base_url must be strings")
current_headers = entry.get("current_version_headers")
upstream_headers = entry.get("upstream_latest_headers")
if current_headers is not None and not isinstance(current_headers, dict):
raise ValueError("current_version_headers must be a mapping")
if upstream_headers is not None and not isinstance(upstream_headers, dict):
raise ValueError("upstream_latest_headers must be a mapping")
current_url = entry.get("current_version_url")
upstream_url = entry.get("upstream_latest_version_url")
current_extract = parse_extract_rule(entry.get("current_version_extract"))
upstream_extract = parse_extract_rule(entry.get("upstream_latest_extract"))
if current_url and not current_extract:
raise ValueError(f"Service {name} must define current_version_extract")
if upstream_url and not upstream_extract:
raise ValueError(f"Service {name} must define upstream_latest_extract")
login = entry.get("login")
if login is not None and not isinstance(login, dict):
raise ValueError("login must be a mapping")
loaded[name] = ServiceConfig(
name=name,
base_url=base_url,
current_version_url=current_url,
current_version_extract=current_extract,
current_version_headers=current_headers,
upstream_latest_version_url=upstream_url,
upstream_latest_extract=upstream_extract,
upstream_latest_headers=upstream_headers,
notes=entry.get("notes"),
login=login,
)
return loaded
def resolve_env_placeholders(value: str) -> str:
env_pattern = re.compile(r"\{env:([A-Z0-9_]+)\}")
def replace_env(match: re.Match[str]) -> str:
env_name = match.group(1)
env_value = os.getenv(env_name)
if env_value is None:
raise ValueError(f"Missing environment variable {env_name}")
return env_value
return env_pattern.sub(replace_env, value)
def build_headers(raw_headers: Optional[Dict[str, str]]) -> Dict[str, str]:
if not raw_headers:
return {}
resolved = {}
for key, value in raw_headers.items():
resolved[key] = resolve_env_placeholders(str(value))
return resolved
def extract_hidden_inputs(html: str) -> Dict[str, str]:
hidden = {}
for match in re.finditer(
r"<input[^>]+type=['\"]hidden['\"][^>]*>",
html,
flags=re.IGNORECASE,
):
tag = match.group(0)
name_match = re.search(r"name=['\"]([^'\"]+)['\"]", tag)
value_match = re.search(r"value=['\"]([^'\"]*)['\"]", tag)
if not name_match:
continue
hidden[name_match.group(1)] = value_match.group(1) if value_match else ""
return hidden
def fetch_response(
url: str,
timeout: float,
user_agent: str,
extra_headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
method: str = "GET",
) -> tuple[str, Dict[str, str]]:
headers = {"User-Agent": user_agent}
if extra_headers:
headers.update(extra_headers)
token = os.getenv("GITHUB_TOKEN")
if token and "api.github.com" in url:
headers.setdefault("Authorization", f"Bearer {token}")
request = Request(url, headers=headers, data=data, method=method)
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8", errors="replace")
response_headers = {k.lower(): v for k, v in response.headers.items()}
return body, response_headers
def extract_jsonpath(payload: Any, path: str) -> Optional[str]:
if not path.startswith("$."):
return None
current = payload
for part in path[2:].split("."):
if isinstance(current, list):
if not part.isdigit():
return None
index = int(part)
if index >= len(current):
return None
current = current[index]
continue
if not isinstance(current, dict):
return None
if part not in current:
return None
current = current[part]
if current is None:
return None
return str(current)
def extract_version(
body: str,
rule: ExtractRule,
headers: Optional[Dict[str, str]] = None,
) -> Optional[str]:
if rule.type == "jsonpath":
try:
payload = json.loads(body)
except json.JSONDecodeError:
return None
return extract_jsonpath(payload, rule.value)
if rule.type == "jsonpath_join":
try:
payload = json.loads(body)
except json.JSONDecodeError:
return None
parts = []
for path in rule.value.split(","):
value = extract_jsonpath(payload, path.strip())
if value is None:
return None
parts.append(value)
return ".".join(parts)
if rule.type == "header":
if not headers:
return None
return headers.get(rule.value.lower())
if rule.type == "regex":
import re
match = re.search(rule.value, body)
if not match:
return None
if match.lastindex:
return match.group(1)
return match.group(0)
if rule.type == "text":
return rule.value if rule.value in body else None
return None
def normalize_version(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
return raw.strip().lstrip("v")
def parse_version(value: str) -> Optional[Dict[str, Any]]:
match = re.match(
r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+([0-9A-Za-z.-]+))?$",
value,
)
if not match:
return None
major, minor, patch, prerelease, _build = match.groups()
prerelease_parts = None
if prerelease:
prerelease_parts = []
for part in prerelease.split("."):
if part.isdigit():
prerelease_parts.append((True, int(part)))
else:
prerelease_parts.append((False, part))
return {
"major": int(major),
"minor": int(minor),
"patch": int(patch),
"prerelease": prerelease_parts,
}
def compare_prerelease(a: Optional[list], b: Optional[list]) -> int:
if a is None and b is None:
return 0
if a is None:
return -1
if b is None:
return 1
for left, right in zip(a, b):
left_is_num, left_value = left
right_is_num, right_value = right
if left_is_num and right_is_num:
if left_value != right_value:
return 1 if left_value > right_value else -1
elif left_is_num != right_is_num:
return -1 if left_is_num else 1
else:
if left_value != right_value:
return 1 if left_value > right_value else -1
if len(a) == len(b):
return 0
return 1 if len(a) > len(b) else -1
def compare_versions(current: Optional[str], latest: Optional[str]) -> Optional[int]:
if not current or not latest:
return None
current_norm = normalize_version(current)
latest_norm = normalize_version(latest)
if not current_norm or not latest_norm:
return None
current_version = parse_version(current_norm)
latest_version = parse_version(latest_norm)
if current_version is None or latest_version is None:
return None
current_tuple = (current_version["major"], current_version["minor"], current_version["patch"])
latest_tuple = (latest_version["major"], latest_version["minor"], latest_version["patch"])
if latest_tuple > current_tuple:
return 1
if latest_tuple < current_tuple:
return -1
return compare_prerelease(current_version["prerelease"], latest_version["prerelease"])
def build_upstream_fallback(url: Optional[str]) -> Optional[Dict[str, Any]]:
if not url:
return None
if "api.github.com/repos/" in url and url.endswith("/releases/latest"):
tag_url = url.replace("/releases/latest", "/tags")
return {
"url": tag_url,
"extract": ExtractRule(type="jsonpath", value="$.0.name"),
}
if "codeberg.org/api/v1/repos/" in url and url.endswith("/releases/latest"):
tag_url = url.replace("/releases/latest", "/tags")
return {
"url": tag_url,
"extract": ExtractRule(type="jsonpath", value="$.0.name"),
}
return None
def parse_set_cookie(headers: Dict[str, str]) -> Optional[str]:
set_cookie = headers.get("set-cookie")
if not set_cookie:
return None
return set_cookie.split(";", 1)[0]
def authenticate_service(service: ServiceConfig, timeout: float, user_agent: str) -> Optional[str]:
if not service.login:
return None
login_url = service.login.get("url")
username_env = service.login.get("username_env")
password_env = service.login.get("password_env")
username_field = service.login.get("username_field", "username")
password_field = service.login.get("password_field", "password")
nonce_url = service.login.get("nonce_url")
use_crypto = service.login.get("crypto", False)
if not login_url or not username_env or not password_env:
raise ValueError(f"Login config incomplete for {service.name}")
username = os.getenv(username_env)
password = os.getenv(password_env)
if not username or not password:
raise ValueError(f"Missing credentials for {service.name}")
cookie_jar = CookieJar()
opener = build_opener(HTTPCookieProcessor(cookie_jar))
login_request = Request(login_url, headers={"User-Agent": user_agent})
with opener.open(login_request, timeout=timeout) as response:
login_page = response.read().decode("utf-8", errors="replace")
hidden_fields = extract_hidden_inputs(login_page)
payload_data = {
username_field: username,
**hidden_fields,
}
if use_crypto:
if not nonce_url:
raise ValueError(f"Missing nonce_url for {service.name}")
nonce_request = Request(
f"{nonce_url}{username}",
headers={"User-Agent": user_agent},
)
with opener.open(nonce_request, timeout=timeout) as response:
nonce_payload = json.loads(response.read().decode("utf-8", errors="replace"))
salt1 = nonce_payload.get("salt1")
nonce = nonce_payload.get("nonce")
if not salt1 or not nonce:
raise ValueError(f"Invalid nonce response for {service.name}")
password_bytes = password.encode("utf-8")
if len(password_bytes) > 72:
password_bytes = password_bytes[:72]
first_hash = bcrypt.hashpw(password_bytes, salt1.encode("utf-8"))
combined = (nonce + first_hash.decode("utf-8")).encode("utf-8")
if len(combined) > 72:
combined = combined[:72]
challenge = bcrypt.hashpw(combined, bcrypt.gensalt(rounds=4))
payload_data["challenge"] = challenge.decode("utf-8")
else:
payload_data[password_field] = password
payload = urlencode(payload_data).encode("utf-8")
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": user_agent,
}
post_request = Request(login_url, data=payload, headers=headers, method="POST")
opener.open(post_request, timeout=timeout)
cookies = [f"{cookie.name}={cookie.value}" for cookie in cookie_jar]
if not cookies:
return None
return "; ".join(cookies)
def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Dict[str, Any]:
result: Dict[str, Any] = {
"name": service.name,
"current": None,
"latest": None,
"current_error": None,
"latest_error": None,
"upstream_url": service.upstream_latest_version_url,
}
if service.current_version_url and service.current_version_extract:
try:
headers = build_headers(service.current_version_headers)
if service.login:
login_cookie = authenticate_service(service, timeout, user_agent)
if login_cookie:
headers.setdefault("Cookie", login_cookie)
body, response_headers = fetch_response(
service.current_version_url,
timeout,
user_agent,
headers,
)
result["current"] = extract_version(
body,
service.current_version_extract,
response_headers,
)
except Exception as exc:
result["current_error"] = str(exc)
if service.upstream_latest_version_url and service.upstream_latest_extract:
headers = build_headers(service.upstream_latest_headers)
try:
body, response_headers = fetch_response(
service.upstream_latest_version_url,
timeout,
user_agent,
headers,
)
result["latest"] = extract_version(
body,
service.upstream_latest_extract,
response_headers,
)
if result["latest"] is None:
raise ValueError("Latest version extraction returned empty")
except Exception as exc:
fallback = build_upstream_fallback(service.upstream_latest_version_url)
if fallback:
try:
body, response_headers = fetch_response(
fallback["url"],
timeout,
user_agent,
headers,
)
result["latest"] = extract_version(
body,
fallback["extract"],
response_headers,
)
except Exception as fallback_exc:
result["latest_error"] = str(fallback_exc)
else:
result["latest_error"] = str(exc)
return result
def build_summary(results: list[Dict[str, Any]]) -> tuple[str, bool]:
updates = []
errors = []
for result in results:
comparison = compare_versions(result["current"], result["latest"])
if comparison == 1:
updates.append(result)
if result["current_error"] or result["latest_error"]:
errors.append(result)
lines = []
if updates:
lines.append("Updates available:")
for result in updates:
current = result["current"] or "unknown"
latest = result["latest"] or "unknown"
lines.append(f"- {result['name']}: {current} -> {latest}")
if errors:
lines.append("Errors:")
for result in errors:
if result["current_error"]:
lines.append(f"- {result['name']}: current version error")
if result["latest_error"]:
lines.append(f"- {result['name']}: latest version error")
if not lines:
return "All services up to date.", False
return "\n".join(lines), True
def send_matrix_message(message: str, timeout: float) -> None:
homeserver = os.getenv("MATRIX_HOMESERVER")
room_id = os.getenv("MATRIX_ROOM_ID")
token = os.getenv("MATRIX_ACCESS_TOKEN")
if not homeserver or not room_id or not token:
return
join_matrix_room(homeserver, room_id, token, timeout)
txn_id = str(int(time.time() * 1000))
url = f"{homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}"
payload = json.dumps({
"msgtype": "m.text",
"body": message,
}).encode("utf-8")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
fetch_response(
url,
timeout=timeout,
user_agent="service-update-alerts",
extra_headers=headers,
data=payload,
method="PUT",
)
except Exception as exc:
print(f"Matrix notification failed: {exc}", file=sys.stderr)
def join_matrix_room(homeserver: str, room_id: str, token: str, timeout: float) -> None:
encoded_room = quote(room_id, safe="")
url = f"{homeserver}/_matrix/client/v3/join/{encoded_room}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
fetch_response(
url,
timeout=timeout,
user_agent="service-update-alerts",
extra_headers=headers,
data=b"{}",
method="POST",
)
except Exception:
return
def send_uptime_kuma_ping(timeout: float) -> None:
push_url = os.getenv("UPTIME_KUMA_PUSH_URL")
if not push_url:
return
try:
fetch_response(push_url, timeout=timeout, user_agent="service-update-alerts")
except Exception as exc:
print(f"Uptime Kuma push failed: {exc}", file=sys.stderr)
def main() -> int:
parser = argparse.ArgumentParser(description="Check for webservice updates")
parser.add_argument("--config", default="services.yaml", help="Path to services YAML")
parser.add_argument("--all", action="store_true", help="Show all services")
parser.add_argument("--timeout", type=float, default=10.0, help="HTTP timeout in seconds")
parser.add_argument("--user-agent", default="service-update-alerts/1.0", help="HTTP user agent")
args = parser.parse_args()
load_dotenv()
try:
config = load_yaml(args.config)
services = load_services(config)
except Exception as exc:
print(f"Failed to load config: {exc}", file=sys.stderr)
return 1
results = []
for service in services.values():
results.append(check_service(service, args.timeout, args.user_agent))
output_lines = []
for result in sorted(results, key=lambda item: item["name"].lower()):
comparison = compare_versions(result["current"], result["latest"])
has_update = comparison == 1
if not args.all and not has_update:
continue
current = result["current"] or "unknown"
latest = result["latest"] or "unknown"
notes = []
if comparison is None and result["current"] and result["latest"]:
notes.append("unparseable")
if result["current_error"]:
notes.append("current error")
if result["latest_error"]:
notes.append("latest error")
suffix = f" [{' '.join(notes)}]" if notes else ""
output_lines.append(f"{result['name']}: {current} -> {latest}{suffix}")
if output_lines:
print("\n".join(output_lines))
for result in sorted(results, key=lambda item: item["name"].lower()):
if result["current_error"]:
print(
f"{result['name']}: current version error: {result['current_error']}",
file=sys.stderr,
)
if result["latest_error"]:
print(
f"{result['name']}: latest version error: {result['latest_error']}",
file=sys.stderr,
)
summary, should_notify = build_summary(results)
if should_notify:
send_matrix_message(summary, args.timeout)
send_uptime_kuma_ping(args.timeout)
return 0
if __name__ == "__main__":
raise SystemExit(main())