feat: Add authenticated service checks

Support FreshRSS login with challenge hashing, add GitHub token usage, and update service metadata for Immich and PeerTube.
This commit is contained in:
Jeena 2026-03-12 13:57:25 +00:00
parent 95cd8e0906
commit 7537de2f53
11 changed files with 257 additions and 59 deletions

View file

@ -4,3 +4,7 @@ RADICALE_BASIC_AUTH=base64-user-colon-pass
# Optional (use if upstream APIs need auth to avoid rate limits)
GITHUB_TOKEN=optional-github-token
# FreshRSS login for authenticated About page
FRESHRSS_USERNAME=your-username
FRESHRSS_PASSWORD=your-password

3
.gitignore vendored
View file

@ -2,3 +2,6 @@
__pycache__/
.pytest_cache/
*.pyc
.env
check_for_updates.egg-info/
cookies.txt

View file

@ -116,3 +116,12 @@ Acceptance criteria:
- `pyproject.toml` defines runtime and dev dependencies.
- README documents venv setup and installation commands.
- `.venv` is ignored by git.
## US-13 - FreshRSS Authenticated Version Check
As a maintainer, I want FreshRSS version detection to authenticate with a username and password so that the script can read the About page and extract the running version.
Acceptance criteria:
- The script can log in to FreshRSS using credentials from environment variables.
- FreshRSS version is extracted from the About page after authentication.
- `.env.sample` documents the FreshRSS credentials required.

View file

@ -21,8 +21,12 @@ Copy `.env.sample` to `.env` and fill required values. Export the variables befo
```bash
export PAPERLESS_API_TOKEN=...
export RADICALE_BASIC_AUTH=...
export FRESHRSS_USERNAME=...
export FRESHRSS_PASSWORD=...
```
The script also reads `.env` automatically if present.
## Usage
```bash
@ -35,3 +39,9 @@ python3 check_updates.py --config services.yaml --all
```bash
python -m pytest
```
To run live integration checks against the real services:
```bash
RUN_LIVE_TESTS=1 python -m pytest tests/test_live_services.py
```

View file

@ -6,7 +6,11 @@ import re
import sys
from dataclasses import dataclass
from typing import Any, Dict, Optional
from urllib.request import Request, urlopen
from http.cookiejar import CookieJar
from urllib.parse import urlencode
from urllib.request import HTTPCookieProcessor, Request, build_opener, urlopen
import bcrypt
import yaml
@ -28,6 +32,7 @@ class ServiceConfig:
upstream_latest_extract: Optional[ExtractRule]
upstream_latest_headers: Optional[Dict[str, str]]
notes: Optional[str]
login: Optional[Dict[str, Any]]
def load_yaml(path: str) -> Dict[str, Any]:
@ -35,6 +40,26 @@ def load_yaml(path: str) -> Dict[str, Any]:
return yaml.safe_load(handle) or {}
def load_dotenv(path: str = ".env") -> None:
if not os.path.exists(path):
return
try:
with open(path, "r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip("\"").strip("'")
if key:
os.environ.setdefault(key, value)
except OSError:
return
def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]:
if not raw:
return None
@ -42,7 +67,7 @@ def parse_extract_rule(raw: Optional[Dict[str, Any]]) -> Optional[ExtractRule]:
value = raw.get("value")
if not rule_type or not value:
return None
allowed = {"jsonpath", "regex", "text", "header"}
allowed = {"jsonpath", "jsonpath_join", "regex", "text", "header"}
if rule_type not in allowed:
raise ValueError(f"Unsupported extract rule type: {rule_type}")
return ExtractRule(type=rule_type, value=value)
@ -79,6 +104,10 @@ def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]:
if upstream_url and not upstream_extract:
raise ValueError(f"Service {name} must define upstream_latest_extract")
login = entry.get("login")
if login is not None and not isinstance(login, dict):
raise ValueError("login must be a mapping")
loaded[name] = ServiceConfig(
name=name,
base_url=base_url,
@ -89,6 +118,7 @@ def load_services(config: Dict[str, Any]) -> Dict[str, ServiceConfig]:
upstream_latest_extract=upstream_extract,
upstream_latest_headers=upstream_headers,
notes=entry.get("notes"),
login=login,
)
return loaded
@ -115,16 +145,37 @@ def build_headers(raw_headers: Optional[Dict[str, str]]) -> Dict[str, str]:
return resolved
def extract_hidden_inputs(html: str) -> Dict[str, str]:
hidden = {}
for match in re.finditer(
r"<input[^>]+type=['\"]hidden['\"][^>]*>",
html,
flags=re.IGNORECASE,
):
tag = match.group(0)
name_match = re.search(r"name=['\"]([^'\"]+)['\"]", tag)
value_match = re.search(r"value=['\"]([^'\"]*)['\"]", tag)
if not name_match:
continue
hidden[name_match.group(1)] = value_match.group(1) if value_match else ""
return hidden
def fetch_response(
url: str,
timeout: float,
user_agent: str,
extra_headers: Optional[Dict[str, str]] = None,
data: Optional[bytes] = None,
method: str = "GET",
) -> tuple[str, Dict[str, str]]:
headers = {"User-Agent": user_agent}
if extra_headers:
headers.update(extra_headers)
request = Request(url, headers=headers)
token = os.getenv("GITHUB_TOKEN")
if token and "api.github.com" in url:
headers.setdefault("Authorization", f"Bearer {token}")
request = Request(url, headers=headers, data=data, method=method)
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8", errors="replace")
response_headers = {k.lower(): v for k, v in response.headers.items()}
@ -165,6 +216,18 @@ def extract_version(
except json.JSONDecodeError:
return None
return extract_jsonpath(payload, rule.value)
if rule.type == "jsonpath_join":
try:
payload = json.loads(body)
except json.JSONDecodeError:
return None
parts = []
for path in rule.value.split(","):
value = extract_jsonpath(payload, path.strip())
if value is None:
return None
parts.append(value)
return ".".join(parts)
if rule.type == "header":
if not headers:
return None
@ -274,6 +337,82 @@ def build_upstream_fallback(url: Optional[str]) -> Optional[Dict[str, Any]]:
return None
def parse_set_cookie(headers: Dict[str, str]) -> Optional[str]:
set_cookie = headers.get("set-cookie")
if not set_cookie:
return None
return set_cookie.split(";", 1)[0]
def authenticate_service(service: ServiceConfig, timeout: float, user_agent: str) -> Optional[str]:
if not service.login:
return None
login_url = service.login.get("url")
username_env = service.login.get("username_env")
password_env = service.login.get("password_env")
username_field = service.login.get("username_field", "username")
password_field = service.login.get("password_field", "password")
nonce_url = service.login.get("nonce_url")
use_crypto = service.login.get("crypto", False)
if not login_url or not username_env or not password_env:
raise ValueError(f"Login config incomplete for {service.name}")
username = os.getenv(username_env)
password = os.getenv(password_env)
if not username or not password:
raise ValueError(f"Missing credentials for {service.name}")
cookie_jar = CookieJar()
opener = build_opener(HTTPCookieProcessor(cookie_jar))
login_request = Request(login_url, headers={"User-Agent": user_agent})
with opener.open(login_request, timeout=timeout) as response:
login_page = response.read().decode("utf-8", errors="replace")
hidden_fields = extract_hidden_inputs(login_page)
payload_data = {
username_field: username,
**hidden_fields,
}
if use_crypto:
if not nonce_url:
raise ValueError(f"Missing nonce_url for {service.name}")
nonce_request = Request(
f"{nonce_url}{username}",
headers={"User-Agent": user_agent},
)
with opener.open(nonce_request, timeout=timeout) as response:
nonce_payload = json.loads(response.read().decode("utf-8", errors="replace"))
salt1 = nonce_payload.get("salt1")
nonce = nonce_payload.get("nonce")
if not salt1 or not nonce:
raise ValueError(f"Invalid nonce response for {service.name}")
password_bytes = password.encode("utf-8")
if len(password_bytes) > 72:
password_bytes = password_bytes[:72]
first_hash = bcrypt.hashpw(password_bytes, salt1.encode("utf-8"))
combined = (nonce + first_hash.decode("utf-8")).encode("utf-8")
if len(combined) > 72:
combined = combined[:72]
challenge = bcrypt.hashpw(combined, bcrypt.gensalt(rounds=4))
payload_data["challenge"] = challenge.decode("utf-8")
else:
payload_data[password_field] = password
payload = urlencode(payload_data).encode("utf-8")
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": user_agent,
}
post_request = Request(login_url, data=payload, headers=headers, method="POST")
opener.open(post_request, timeout=timeout)
cookies = [f"{cookie.name}={cookie.value}" for cookie in cookie_jar]
if not cookies:
return None
return "; ".join(cookies)
def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Dict[str, Any]:
result: Dict[str, Any] = {
"name": service.name,
@ -287,6 +426,10 @@ def check_service(service: ServiceConfig, timeout: float, user_agent: str) -> Di
if service.current_version_url and service.current_version_extract:
try:
headers = build_headers(service.current_version_headers)
if service.login:
login_cookie = authenticate_service(service, timeout, user_agent)
if login_cookie:
headers.setdefault("Cookie", login_cookie)
body, response_headers = fetch_response(
service.current_version_url,
timeout,
@ -348,6 +491,8 @@ def main() -> int:
parser.add_argument("--user-agent", default="check-for-updates/1.0", help="HTTP user agent")
args = parser.parse_args()
load_dotenv()
try:
config = load_yaml(args.config)
services = load_services(config)

View file

@ -9,6 +9,7 @@ description = "Check running service versions against upstream releases"
requires-python = ">=3.10"
dependencies = [
"PyYAML>=6.0",
"bcrypt>=4.0",
]
[project.optional-dependencies]

View file

@ -21,20 +21,22 @@ services:
value: $.tag_name
- name: Immich
base_url: https://photos.bundang.swierczyniec.info/
current_version_url:
current_version_url: https://photos.bundang.swierczyniec.info/api/server/version
current_version_extract:
type: jsonpath_join
value: $.major, $.minor, $.patch
upstream_latest_version_url: https://api.github.com/repos/immich-app/immich/releases/latest
upstream_latest_extract:
type: jsonpath
value: $.tag_name
notes: Instance version endpoint returned 404 for /api/server-info/version.
notes: Version exposed as major/minor/patch fields.
- name: PeerTube
base_url: https://tube.jeena.net/
current_version_url: https://tube.jeena.net/api/v1/config
current_version_extract:
type: jsonpath
value: $.serverVersion
upstream_latest_version_url: https://api.github.com/repos/peertube/peertube/releases/latest
upstream_latest_version_url: https://api.github.com/repos/Chocobozzz/PeerTube/releases/latest
upstream_latest_extract:
type: jsonpath
value: $.tag_name
@ -96,10 +98,20 @@ services:
notes: /version returns 401 without auth; uses Basic auth.
- name: FreshRSS
base_url: https://rss.jeena.net/
current_version_url:
current_version_url: https://rss.jeena.net/i/?a=about
current_version_extract:
type: regex
value: FreshRSS version</dt>\s*<dd>([0-9.]+)</dd>
login:
url: https://rss.jeena.net/i/?c=auth&a=login
username_env: FRESHRSS_USERNAME
password_env: FRESHRSS_PASSWORD
username_field: username
password_field: password
crypto: true
nonce_url: https://rss.jeena.net/i/?c=javascript&a=nonce&user=
upstream_latest_version_url: https://api.github.com/repos/FreshRSS/FreshRSS/releases/latest
upstream_latest_extract:
type: jsonpath
value: $.tag_name
notes: No unauthenticated version endpoint found.
notes: Version scraped from About page after login.

View file

@ -1,24 +0,0 @@
import json
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
from check_updates import ExtractRule, extract_version
def test_extract_jsonpath():
body = json.dumps({"version": "1.2.3"})
rule = ExtractRule(type="jsonpath", value="$.version")
assert extract_version(body, rule) == "1.2.3"
def test_extract_header():
rule = ExtractRule(type="header", value="x-version")
headers = {"x-version": "2.3.4"}
assert extract_version("", rule, headers) == "2.3.4"
def test_extract_regex_group():
rule = ExtractRule(type="regex", value=r"Version: (\d+\.\d+\.\d+)")
assert extract_version("Version: 1.9.0", rule) == "1.9.0"

View file

@ -0,0 +1,26 @@
import os
import sys
from pathlib import Path
import pytest
sys.path.append(str(Path(__file__).resolve().parents[1]))
from check_updates import check_service, load_dotenv, load_services, load_yaml
@pytest.mark.skipif(os.getenv("RUN_LIVE_TESTS") != "1", reason="Live tests disabled")
def test_live_service_versions():
load_dotenv()
services = load_services(load_yaml("services.yaml"))
failures = []
for service in services.values():
result = check_service(service, timeout=20, user_agent="check-for-updates-test")
if service.upstream_latest_version_url and not result["latest"]:
failures.append(f"{service.name}: latest version missing ({result['latest_error']})")
if service.current_version_url and not result["current"]:
failures.append(f"{service.name}: current version missing ({result['current_error']})")
if failures:
pytest.fail("\n".join(failures))

View file

@ -1,27 +0,0 @@
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
from check_updates import compare_versions
def test_compare_versions_newer():
assert compare_versions("1.2.3", "1.2.4") == 1
def test_compare_versions_equal():
assert compare_versions("2.0.0", "2.0.0") == 0
def test_compare_versions_older():
assert compare_versions("2.1.0", "2.0.9") == -1
def test_compare_versions_unparseable():
assert compare_versions("1.2", "1.2.3") is None
def test_compare_versions_prerelease():
assert compare_versions("1.2.3-alpha.1", "1.2.3") == 1
assert compare_versions("1.2.3", "1.2.3-alpha.1") == -1

View file

@ -0,0 +1,39 @@
import sys
from pathlib import Path
from unittest.mock import patch
sys.path.append(str(Path(__file__).resolve().parents[1]))
from check_updates import check_service, load_services
def test_services_extract_current_and_latest():
services = load_services(
{
"services": [
{
"name": "CurrentVersion",
"base_url": "https://example.com/",
"current_version_url": "https://example.com/current",
"current_version_extract": {"type": "jsonpath", "value": "$.version"},
"upstream_latest_version_url": "https://example.com/latest",
"upstream_latest_extract": {"type": "jsonpath", "value": "$.tag"},
}
]
}
)
responses = {
"https://example.com/current": "{\"version\": \"1.2.3\"}",
"https://example.com/latest": "{\"tag\": \"1.2.4\"}",
}
def fake_fetch(url, _timeout, _user_agent, _headers=None, _data=None, _method="GET"):
return responses[url], {}
service = list(services.values())[0]
with patch("check_updates.fetch_response", side_effect=fake_fetch):
result = check_service(service, timeout=5, user_agent="test")
assert result["current"] == "1.2.3"
assert result["latest"] == "1.2.4"