From 7537de2f53ad11a5a6491e82c9d547e9fee5e215 Mon Sep 17 00:00:00 2001 From: Jeena Date: Thu, 12 Mar 2026 13:57:25 +0000 Subject: [PATCH] feat: Add authenticated service checks Support FreshRSS login with challenge hashing, add GitHub token usage, and update service metadata for Immich and PeerTube. --- .env.sample | 4 + .gitignore | 3 + BACKLOG.md | 9 ++ README.md | 10 ++ check_updates.py | 151 +++++++++++++++++++++++++++++- pyproject.toml | 1 + services.yaml | 22 ++++- tests/test_extraction.py | 24 ----- tests/test_live_services.py | 26 +++++ tests/test_versions.py | 27 ------ tests/test_versions_end_to_end.py | 39 ++++++++ 11 files changed, 257 insertions(+), 59 deletions(-) delete mode 100644 tests/test_extraction.py create mode 100644 tests/test_live_services.py delete mode 100644 tests/test_versions.py create mode 100644 tests/test_versions_end_to_end.py diff --git a/.env.sample b/.env.sample index 2acf9df..0f8813a 100644 --- a/.env.sample +++ b/.env.sample @@ -4,3 +4,7 @@ RADICALE_BASIC_AUTH=base64-user-colon-pass # Optional (use if upstream APIs need auth to avoid rate limits) GITHUB_TOKEN=optional-github-token + +# FreshRSS login for authenticated About page +FRESHRSS_USERNAME=your-username +FRESHRSS_PASSWORD=your-password diff --git a/.gitignore b/.gitignore index 965717a..9e8f996 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ __pycache__/ .pytest_cache/ *.pyc +.env +check_for_updates.egg-info/ +cookies.txt diff --git a/BACKLOG.md b/BACKLOG.md index 6d3febe..33123f7 100644 --- a/BACKLOG.md +++ b/BACKLOG.md @@ -116,3 +116,12 @@ Acceptance criteria: - `pyproject.toml` defines runtime and dev dependencies. - README documents venv setup and installation commands. - `.venv` is ignored by git. + +## US-13 - FreshRSS Authenticated Version Check + +As a maintainer, I want FreshRSS version detection to authenticate with a username and password so that the script can read the About page and extract the running version. + +Acceptance criteria: +- The script can log in to FreshRSS using credentials from environment variables. +- FreshRSS version is extracted from the About page after authentication. +- `.env.sample` documents the FreshRSS credentials required. diff --git a/README.md b/README.md index 7a74fce..a283140 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,12 @@ Copy `.env.sample` to `.env` and fill required values. Export the variables befo ```bash export PAPERLESS_API_TOKEN=... export RADICALE_BASIC_AUTH=... +export FRESHRSS_USERNAME=... +export FRESHRSS_PASSWORD=... ``` +The script also reads `.env` automatically if present. + ## Usage ```bash @@ -35,3 +39,9 @@ python3 check_updates.py --config services.yaml --all ```bash python -m pytest ``` + +To run live integration checks against the real services: + +```bash +RUN_LIVE_TESTS=1 python -m pytest tests/test_live_services.py +``` diff --git a/check_updates.py b/check_updates.py index a11a57c..a434d5d 100644 --- a/check_updates.py +++ b/check_updates.py @@ -6,7 +6,11 @@ import re import sys from dataclasses import dataclass from typing import Any, Dict, Optional -from urllib.request import Request, urlopen +from http.cookiejar import CookieJar +from urllib.parse import urlencode +from urllib.request import HTTPCookieProcessor, Request, build_opener, urlopen + +import bcrypt import yaml @@ -28,6 +32,7 @@ class ServiceConfig: upstream_latest_extract: Optional[ExtractRule] upstream_latest_headers: Optional[Dict[str, str]] notes: Optional[str] + login: Optional[Dict[str, Any]] def load_yaml(path: str) -> Dict[str, Any]: @@ -35,6 +40,26 @@ def load_yaml(path: str) -> Dict[str, Any]: return yaml.safe_load(handle) or {} +def load_dotenv(path: str = ".env") -> None: + if not os.path.exists(path): + return + try: + with open(path, "r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, value = line.split("=", 1) + key = key.strip() + value = value.strip().strip("\"").strip("'") + if key: + os.environ.setdefault(key, value) + except OSError: + return + + def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]: if not raw: return None @@ -42,7 +67,7 @@ def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]: value = raw.get("value") if not rule_type or not value: return None - allowed = {"jsonpath", "regex", "text", "header"} + allowed = {"jsonpath", "jsonpath_join", "regex", "text", "header"} if rule_type not in allowed: raise ValueError(f"Unsupported extract rule type: {rule_type}") return ExtractRule(type=rule_type, value=value) @@ -79,6 +104,10 @@ def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]: if upstream_url and not upstream_extract: raise ValueError(f"Service {name} must define upstream_latest_extract") + login = entry.get("login") + if login is not None and not isinstance(login, dict): + raise ValueError("login must be a mapping") + loaded[name] = ServiceConfig( name=name, base_url=base_url, @@ -89,6 +118,7 @@ def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]: upstream_latest_extract=upstream_extract, upstream_latest_headers=upstream_headers, notes=entry.get("notes"), + login=login, ) return loaded @@ -115,16 +145,37 @@ def build_headers(raw_headers: Optional[Dict[str, str]]) -> Dict[str, str]: return resolved +def extract_hidden_inputs(html: str) -> Dict[str, str]: + hidden = {} + for match in re.finditer( + r"]+type=['\"]hidden['\"][^>]*>", + html, + flags=re.IGNORECASE, + ): + tag = match.group(0) + name_match = re.search(r"name=['\"]([^'\"]+)['\"]", tag) + value_match = re.search(r"value=['\"]([^'\"]*)['\"]", tag) + if not name_match: + continue + hidden[name_match.group(1)] = value_match.group(1) if value_match else "" + return hidden + + def fetch_response( url: str, timeout: float, user_agent: str, extra_headers: Optional[Dict[str, str]] = None, + data: Optional[bytes] = None, + method: str = "GET", ) -> tuple[str, Dict[str, str]]: headers = {"User-Agent": user_agent} if extra_headers: headers.update(extra_headers) - request = Request(url, headers=headers) + token = os.getenv("GITHUB_TOKEN") + if token and "api.github.com" in url: + headers.setdefault("Authorization", f"Bearer {token}") + request = Request(url, headers=headers, data=data, method=method) with urlopen(request, timeout=timeout) as response: body = response.read().decode("utf-8", errors="replace") response_headers = {k.lower(): v for k, v in response.headers.items()} @@ -165,6 +216,18 @@ def extract_version( except json.JSONDecodeError: return None return extract_jsonpath(payload, rule.value) + if rule.type == "jsonpath_join": + try: + payload = json.loads(body) + except json.JSONDecodeError: + return None + parts = [] + for path in rule.value.split(","): + value = extract_jsonpath(payload, path.strip()) + if value is None: + return None + parts.append(value) + return ".".join(parts) if rule.type == "header": if not headers: return None @@ -274,6 +337,82 @@ def build_upstream_fallback(url: Optional[str]) -> Optional[Dict[str, Any]]: return None +def parse_set_cookie(headers: Dict[str, str]) -> Optional[str]: + set_cookie = headers.get("set-cookie") + if not set_cookie: + return None + return set_cookie.split(";", 1)[0] + + +def authenticate_service(service: ServiceConfig, timeout: float, user_agent: str) -> Optional[str]: + if not service.login: + return None + login_url = service.login.get("url") + username_env = service.login.get("username_env") + password_env = service.login.get("password_env") + username_field = service.login.get("username_field", "username") + password_field = service.login.get("password_field", "password") + nonce_url = service.login.get("nonce_url") + use_crypto = service.login.get("crypto", False) + if not login_url or not username_env or not password_env: + raise ValueError(f"Login config incomplete for {service.name}") + + username = os.getenv(username_env) + password = os.getenv(password_env) + if not username or not password: + raise ValueError(f"Missing credentials for {service.name}") + + cookie_jar = CookieJar() + opener = build_opener(HTTPCookieProcessor(cookie_jar)) + + login_request = Request(login_url, headers={"User-Agent": user_agent}) + with opener.open(login_request, timeout=timeout) as response: + login_page = response.read().decode("utf-8", errors="replace") + + hidden_fields = extract_hidden_inputs(login_page) + + payload_data = { + username_field: username, + **hidden_fields, + } + if use_crypto: + if not nonce_url: + raise ValueError(f"Missing nonce_url for {service.name}") + nonce_request = Request( + f"{nonce_url}{username}", + headers={"User-Agent": user_agent}, + ) + with opener.open(nonce_request, timeout=timeout) as response: + nonce_payload = json.loads(response.read().decode("utf-8", errors="replace")) + salt1 = nonce_payload.get("salt1") + nonce = nonce_payload.get("nonce") + if not salt1 or not nonce: + raise ValueError(f"Invalid nonce response for {service.name}") + password_bytes = password.encode("utf-8") + if len(password_bytes) > 72: + password_bytes = password_bytes[:72] + first_hash = bcrypt.hashpw(password_bytes, salt1.encode("utf-8")) + combined = (nonce + first_hash.decode("utf-8")).encode("utf-8") + if len(combined) > 72: + combined = combined[:72] + challenge = bcrypt.hashpw(combined, bcrypt.gensalt(rounds=4)) + payload_data["challenge"] = challenge.decode("utf-8") + else: + payload_data[password_field] = password + payload = urlencode(payload_data).encode("utf-8") + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": user_agent, + } + post_request = Request(login_url, data=payload, headers=headers, method="POST") + opener.open(post_request, timeout=timeout) + + cookies = [f"{cookie.name}={cookie.value}" for cookie in cookie_jar] + if not cookies: + return None + return "; ".join(cookies) + + def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Dict[str, Any]: result: Dict[str, Any] = { "name": service.name, @@ -287,6 +426,10 @@ def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Di if service.current_version_url and service.current_version_extract: try: headers = build_headers(service.current_version_headers) + if service.login: + login_cookie = authenticate_service(service, timeout, user_agent) + if login_cookie: + headers.setdefault("Cookie", login_cookie) body, response_headers = fetch_response( service.current_version_url, timeout, @@ -348,6 +491,8 @@ def main() -> int: parser.add_argument("--user-agent", default="check-for-updates/1.0", help="HTTP user agent") args = parser.parse_args() + load_dotenv() + try: config = load_yaml(args.config) services = load_services(config) diff --git a/pyproject.toml b/pyproject.toml index 1b0f07e..ec53cc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ description = "Check running service versions against upstream releases" requires-python = ">=3.10" dependencies = [ "PyYAML>=6.0", + "bcrypt>=4.0", ] [project.optional-dependencies] diff --git a/services.yaml b/services.yaml index 05c6dd5..3bb91b2 100644 --- a/services.yaml +++ b/services.yaml @@ -21,20 +21,22 @@ services: value: $.tag_name - name: Immich base_url: https://photos.bundang.swierczyniec.info/ - current_version_url: + current_version_url: https://photos.bundang.swierczyniec.info/api/server/version current_version_extract: + type: jsonpath_join + value: $.major, $.minor, $.patch upstream_latest_version_url: https://api.github.com/repos/immich-app/immich/releases/latest upstream_latest_extract: type: jsonpath value: $.tag_name - notes: Instance version endpoint returned 404 for /api/server-info/version. + notes: Version exposed as major/minor/patch fields. - name: PeerTube base_url: https://tube.jeena.net/ current_version_url: https://tube.jeena.net/api/v1/config current_version_extract: type: jsonpath value: $.serverVersion - upstream_latest_version_url: https://api.github.com/repos/peertube/peertube/releases/latest + upstream_latest_version_url: https://api.github.com/repos/Chocobozzz/PeerTube/releases/latest upstream_latest_extract: type: jsonpath value: $.tag_name @@ -96,10 +98,20 @@ services: notes: /version returns 401 without auth; uses Basic auth. - name: FreshRSS base_url: https://rss.jeena.net/ - current_version_url: + current_version_url: https://rss.jeena.net/i/?a=about current_version_extract: + type: regex + value: FreshRSS version\s*
([0-9.]+)
+ login: + url: https://rss.jeena.net/i/?c=auth&a=login + username_env: FRESHRSS_USERNAME + password_env: FRESHRSS_PASSWORD + username_field: username + password_field: password + crypto: true + nonce_url: https://rss.jeena.net/i/?c=javascript&a=nonce&user= upstream_latest_version_url: https://api.github.com/repos/FreshRSS/FreshRSS/releases/latest upstream_latest_extract: type: jsonpath value: $.tag_name - notes: No unauthenticated version endpoint found. + notes: Version scraped from About page after login. diff --git a/tests/test_extraction.py b/tests/test_extraction.py deleted file mode 100644 index 7294149..0000000 --- a/tests/test_extraction.py +++ /dev/null @@ -1,24 +0,0 @@ -import json -import sys -from pathlib import Path - -sys.path.append(str(Path(__file__).resolve().parents[1])) - -from check_updates import ExtractRule, extract_version - - -def test_extract_jsonpath(): - body = json.dumps({"version": "1.2.3"}) - rule = ExtractRule(type="jsonpath", value="$.version") - assert extract_version(body, rule) == "1.2.3" - - -def test_extract_header(): - rule = ExtractRule(type="header", value="x-version") - headers = {"x-version": "2.3.4"} - assert extract_version("", rule, headers) == "2.3.4" - - -def test_extract_regex_group(): - rule = ExtractRule(type="regex", value=r"Version: (\d+\.\d+\.\d+)") - assert extract_version("Version: 1.9.0", rule) == "1.9.0" diff --git a/tests/test_live_services.py b/tests/test_live_services.py new file mode 100644 index 0000000..0a166e9 --- /dev/null +++ b/tests/test_live_services.py @@ -0,0 +1,26 @@ +import os +import sys +from pathlib import Path + +import pytest + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from check_updates import check_service, load_dotenv, load_services, load_yaml + + +@pytest.mark.skipif(os.getenv("RUN_LIVE_TESTS") != "1", reason="Live tests disabled") +def test_live_service_versions(): + load_dotenv() + services = load_services(load_yaml("services.yaml")) + failures = [] + + for service in services.values(): + result = check_service(service, timeout=20, user_agent="check-for-updates-test") + if service.upstream_latest_version_url and not result["latest"]: + failures.append(f"{service.name}: latest version missing ({result['latest_error']})") + if service.current_version_url and not result["current"]: + failures.append(f"{service.name}: current version missing ({result['current_error']})") + + if failures: + pytest.fail("\n".join(failures)) diff --git a/tests/test_versions.py b/tests/test_versions.py deleted file mode 100644 index ca24489..0000000 --- a/tests/test_versions.py +++ /dev/null @@ -1,27 +0,0 @@ -import sys -from pathlib import Path - -sys.path.append(str(Path(__file__).resolve().parents[1])) - -from check_updates import compare_versions - - -def test_compare_versions_newer(): - assert compare_versions("1.2.3", "1.2.4") == 1 - - -def test_compare_versions_equal(): - assert compare_versions("2.0.0", "2.0.0") == 0 - - -def test_compare_versions_older(): - assert compare_versions("2.1.0", "2.0.9") == -1 - - -def test_compare_versions_unparseable(): - assert compare_versions("1.2", "1.2.3") is None - - -def test_compare_versions_prerelease(): - assert compare_versions("1.2.3-alpha.1", "1.2.3") == 1 - assert compare_versions("1.2.3", "1.2.3-alpha.1") == -1 diff --git a/tests/test_versions_end_to_end.py b/tests/test_versions_end_to_end.py new file mode 100644 index 0000000..71a0134 --- /dev/null +++ b/tests/test_versions_end_to_end.py @@ -0,0 +1,39 @@ +import sys +from pathlib import Path +from unittest.mock import patch + +sys.path.append(str(Path(__file__).resolve().parents[1])) + +from check_updates import check_service, load_services + + +def test_services_extract_current_and_latest(): + services = load_services( + { + "services": [ + { + "name": "CurrentVersion", + "base_url": "https://example.com/", + "current_version_url": "https://example.com/current", + "current_version_extract": {"type": "jsonpath", "value": "$.version"}, + "upstream_latest_version_url": "https://example.com/latest", + "upstream_latest_extract": {"type": "jsonpath", "value": "$.tag"}, + } + ] + } + ) + + responses = { + "https://example.com/current": "{\"version\": \"1.2.3\"}", + "https://example.com/latest": "{\"tag\": \"1.2.4\"}", + } + + def fake_fetch(url, _timeout, _user_agent, _headers=None, _data=None, _method="GET"): + return responses[url], {} + + service = list(services.values())[0] + with patch("check_updates.fetch_response", side_effect=fake_fetch): + result = check_service(service, timeout=5, user_agent="test") + + assert result["current"] == "1.2.3" + assert result["latest"] == "1.2.4"