diff --git a/custom_components/smartthings_find/__init__.py b/custom_components/smartthings_find/__init__.py index 03ea8be..059d1b3 100644 --- a/custom_components/smartthings_find/__init__.py +++ b/custom_components/smartthings_find/__init__.py @@ -1,6 +1,7 @@ from datetime import timedelta import logging import aiohttp +from yarl import URL from homeassistant.core import HomeAssistant from homeassistant.helpers.typing import ConfigType from homeassistant.const import Platform @@ -39,7 +40,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: jsessionid = entry.data[CONF_JSESSIONID] session = async_get_clientsession(hass) - session.cookie_jar.update_cookies({"JSESSIONID": jsessionid}) + # The shared HA session may already hold a JSESSIONID for this domain + # from a previous load (set via Set-Cookie). Clear it first; otherwise + # a bare update_cookies adds an unscoped duplicate and aiohttp ships the + # older, domain-matched (stale) value, breaking auth after a UI reauth. + session.cookie_jar.clear_domain("smartthingsfind.samsung.com") + session.cookie_jar.update_cookies( + {"JSESSIONID": jsessionid}, + response_url=URL("https://smartthingsfind.samsung.com/"), + ) active_smarttags = entry.options.get(CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT) active_others = entry.options.get(CONF_ACTIVE_MODE_OTHERS, CONF_ACTIVE_MODE_OTHERS_DEFAULT) diff --git a/custom_components/smartthings_find/manifest.json b/custom_components/smartthings_find/manifest.json index 9810d3d..6b7a277 100644 --- a/custom_components/smartthings_find/manifest.json +++ b/custom_components/smartthings_find/manifest.json @@ -2,13 +2,13 @@ "domain": "smartthings_find", "name": "SmartThings Find", "after_dependencies": ["http"], - "version": "0.3.0", + "version": "0.3.1", "documentation": "https://git.jeena.net/jeena/HA-SmartThings-Find", "issue_tracker": "https://git.jeena.net/jeena/HA-SmartThings-Find/issues", "integration_type": "hub", "dependencies": [], "codeowners": ["@jeena"], - "requirements": ["requests", "qrcode[pil]", "pillow", "pytz"], + "requirements": ["pytz"], "iot_class": "cloud_polling", "config_flow": true } diff --git a/custom_components/smartthings_find/utils.py b/custom_components/smartthings_find/utils.py index 8ebcb09..fcbd880 100644 --- a/custom_components/smartthings_find/utils.py +++ b/custom_components/smartthings_find/utils.py @@ -1,17 +1,9 @@ import logging import json import pytz -import qrcode -import base64 import aiohttp -import asyncio -import random -import string -import re import html -from io import BytesIO -from datetime import datetime, timedelta -from homeassistant.helpers.aiohttp_client import async_get_clientsession +from datetime import datetime from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import DeviceInfo from homeassistant.exceptions import ConfigEntryAuthFailed @@ -21,213 +13,36 @@ from .const import DOMAIN, BATTERY_LEVELS, CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTI _LOGGER = logging.getLogger(__name__) -URL_PRE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInGate?state={state}&redirect_uri=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin.do&response_type=code&client_id=ntly6zvfpn&scope=iot.client&locale=de_DE&acr_values=urn:samsungaccount:acr:basic&goBackURL=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin' -URL_QR_CODE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCode' -URL_SIGNIN_XHR = 'https://account.samsung.com/accounts/v1/FMM2/signInXhr' -URL_QR_POLL = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCodeProc' -URL_SIGNIN_SUCCESS = 'https://account.samsung.com{next_url}' URL_GET_CSRF = "https://smartthingsfind.samsung.com/chkLogin.do" URL_DEVICE_LIST = "https://smartthingsfind.samsung.com/device/getDeviceList.do" URL_REQUEST_LOC_UPDATE = "https://smartthingsfind.samsung.com/dm/addOperation.do" URL_SET_LAST_DEVICE = "https://smartthingsfind.samsung.com/device/setLastSelect.do" -async def do_login_stage_one(hass: HomeAssistant) -> tuple: - """ - Perform the first stage of the login process. - - This function performs the initial login steps for the SmartThings Find service. - It generates a random state string, sends a pre-login request, and retrieves - the QR code URL from the response. - - Args: - hass (HomeAssistant): Home Assistant instance. - - Returns: - tuple: A tuple containing the session and QR code URL if successful, None otherwise. - """ - session = async_get_clientsession(hass) - session.cookie_jar.clear() - - # Generating the state parameter - state = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) - - try: - # Load the initial login page which already sets some cookies. 'state' - # is a randomly generated string. 'client_id' seems to be static for - # SmartThings find. - async with session.get(URL_PRE_SIGNIN.format(state=state)) as res: - if res.status != 200: - _LOGGER.error( - f"Pre-login request failed with status {res.status}") - return None - _LOGGER.debug(f"Step 1: Pre-Login: Status Code: {res.status}") - - # Load the "Login with QR-Code"-page - async with session.get(URL_QR_CODE_SIGNIN) as res: - if res.status != 200: - _LOGGER.error( - f"QR code URL request failed with status {res.status}, Response: {text[:250]}...") - return None - text = await res.text() - _LOGGER.debug(f"Step 2: QR-Code URL: Status Code: {res.status}") - - # Search the URL which is embedded in the QR Code. It looks like this: - # https://signin.samsung.com/key/abcdefgh - match = re.search(r"https://signin\.samsung\.com/key/[^'\"]+", text) - if not match: - _LOGGER.error("QR code URL not found in the response") - return None - - qr_url = match.group(0) - _LOGGER.info(f"Extracted QR code URL: {qr_url}") - - return session, qr_url - except Exception as e: - _LOGGER.error( - f"An error occurred during the login process (stage 1): {e}", exc_info=True) - return None - - -async def do_login_stage_two(session: aiohttp.ClientSession) -> str: - """ - Perform the second stage of the login process. - - This function continues the login process by fetching the - CSRF token, polling the server for login status, and ultimately - retrieving the JSESSIONID required for SmartThings Find. - - Args: - session (aiohttp.ClientSession): The current session with cookies set from login stage one. - - Returns: - str: The JSESSIONID if successful, None otherwise. - """ - try: - # Here you would generate and display the QR code. This is environment-specific. - # qr = qrcode.QRCode() - # qr.add_data(extracted_url) - # qr.print_ascii() # Or any other method to display the QR code to the user. - - # Fetch the _csrf token. This needs to be sent with each QR-Poll-Request - async with session.get(URL_SIGNIN_XHR) as res: - if res.status != 200: - _LOGGER.error( - f"XHR login request failed with status {res.status}") - return None - json_res = await res.json() - _LOGGER.debug( - f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}") - - csrf_token = json_res.get('_csrf', {}).get('token') - if not csrf_token: - _LOGGER.error("CSRF token not found in the response") - return None - - # Define a timeout. We don't want to poll forever. - end_time = datetime.now() + timedelta(seconds=120) - next_url = None - - # We check, whether the QR-Code was scanned and the login was successful. - # This request returns either: - # {"rtnCd":"POLLING"} <-- Not yet logged in. Login still pending. - # OR - # {"rtnCd":"SUCCESS","nextURL":"/accounts/v1/FMM2/signInComplete"} - # So we fetch this URL every few seconds, until we receive "SUCCESS", which - # indicates the user has successfully scanned the Code and approved the login. - # This is exactly the same way, the website does it. - # In case the user never scans the QR-Code, we run in the timeout defined above. - while datetime.now() < end_time: - try: - await asyncio.sleep(2) - async with session.post(URL_QR_POLL, json={}, headers={'X-Csrf-Token': csrf_token}) as res: - if res.status != 200: - _LOGGER.error( - f"QR check request failed with status {res.status}") - continue - js = await res.json() - _LOGGER.debug( - f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}") - - if js.get('rtnCd') == "SUCCESS": - next_url = js.get('nextURL') - break - except aiohttp.ClientError as e: - _LOGGER.error(f"QR Poll request failed: {e}") - return None - - if not next_url: - _LOGGER.error("QR Code not scanned within 2 mins") - return None - - # Fetch the 'next_url' we received from the previous request. On success, this sets - # the initial JSESSIONID-cookie. We're not done yet, since this cookie is not valid - # for SmartThings Find (which uses it's own JSESSIONID). - async with session.get(URL_SIGNIN_SUCCESS.format(next_url=next_url)) as res: - if res.status != 200: - _LOGGER.error( - f"Login success URL request failed with status {res.status}") - return None - text = await res.text() - _LOGGER.debug(f"Step 5: Login success: Status Code: {res.status}") - - # The response contains another redirect URL which we need to extract from the - # received HTML/JS-content. This URL looks something like this: - # https://smartthingsfind.samsung.com/login.do?auth_server_url=eu-auth2.samsungosp.com - # &code=[...]&code_expires_in=300&state=[state we generated above] - # &api_server_url=eu-auth2.samsungosp.com - match = re.search( - r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text) - if not match: - _LOGGER.error( - "Redirect URL not found in the login success response") - return None - - redirect_url = match.group(1) - _LOGGER.debug(f"Found Redirect URL: {redirect_url}") - - # Fetch the received redirect_url. This response finally contains our JSESSIONID, - # which is what actually authenticates us for SmartThings Find. - async with session.get(redirect_url) as res: - if res.status != 200: - _LOGGER.error( - f"Redirect URL request failed with status {res.status}") - return None - _LOGGER.debug( - f"Step 6: Follow redirect URL: Status Code: {res.status}") - - jsessionid = session.cookie_jar.filter_cookies( - 'https://smartthingsfind.samsung.com').get('JSESSIONID') - if not jsessionid: - _LOGGER.error("JSESSIONID not found in cookies") - return None - - _LOGGER.debug(f"JSESSIONID: {jsessionid.value[:20]}...") - return jsessionid.value - - except Exception as e: - _LOGGER.error( - f"An error occurred during the login process (stage 2): {e}", exc_info=True) - return None - - async def validate_jsessionid(hass: HomeAssistant, jsessionid: str) -> bool: """Check whether a JSESSIONID cookie is currently valid for SmartThings Find. - Performs a single GET against the CSRF endpoint with the cookie applied; - a valid session returns 200 with a `_csrf` response header. + Uses an isolated aiohttp session: the shared HA session may already hold + a JSESSIONID cookie pinned to ``smartthingsfind.samsung.com`` from a prior + Set-Cookie response. A bare ``update_cookies({"JSESSIONID": ...})`` would + only add an unscoped duplicate, and aiohttp's cookie jar would still ship + the older, domain-matched (stale) cookie — making the freshly-pasted + cookie look invalid to the server. + + A short-lived ClientSession with its own jar avoids that entirely. """ - session = async_get_clientsession(hass) - session.cookie_jar.update_cookies({"JSESSIONID": jsessionid}) try: - async with session.get(URL_GET_CSRF) as response: - if response.status == 200 and response.headers.get("_csrf"): - return True - body = (await response.text())[:200] - _LOGGER.warning( - f"JSESSIONID validation failed: status={response.status}, body='{body}'" - ) - return False + async with aiohttp.ClientSession( + cookies={"JSESSIONID": jsessionid} + ) as session: + async with session.get(URL_GET_CSRF) as response: + if response.status == 200 and response.headers.get("_csrf"): + return True + body = (await response.text())[:200] + _LOGGER.warning( + f"JSESSIONID validation failed: status={response.status}, body='{body}'" + ) + return False except Exception as e: _LOGGER.error(f"JSESSIONID validation error: {e}", exc_info=True) return False @@ -605,22 +420,3 @@ def get_battery_level(dev_name: str, ops: list) -> int: f"[{dev_name}]: Received invalid battery level: {batt_raw}") return batt return None - - -def gen_qr_code_base64(data: str) -> str: - """ - Generates a QR code from the provided data and returns it as a base64-encoded string. - Used to show a login QR code during authentication flow - - Args: - data (str): The data to encode in the QR code. - - Returns: - str: The base64-encoded string representation of the QR code. - """ - qr = qrcode.QRCode() - qr.add_data(data) - img = qr.make_image(fill_color="black", back_color="white") - buffer = BytesIO() - img.save(buffer, format="PNG") - return base64.b64encode(buffer.getvalue()).decode("utf-8") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..78c5011 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +asyncio_mode = auto +testpaths = tests diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..585c749 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,158 @@ +"""Stub the HA imports the integration touches so utils/config_flow can be +imported and exercised without a full Home Assistant install. + +Only the bits used by the code under test are faked — anything new the +integration starts touching will need to be added here.""" +from __future__ import annotations + +import sys +import types +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + + +def _module(name: str) -> types.ModuleType: + if name not in sys.modules: + sys.modules[name] = types.ModuleType(name) + return sys.modules[name] + + +# homeassistant.core +ha = _module("homeassistant") +core = _module("homeassistant.core") + + +class HomeAssistant: + def __init__(self) -> None: + self.data: dict[str, Any] = {} + # Tests assign a real aiohttp.ClientSession here. + self.clientsession = None + + +def callback(func): + return func + + +core.HomeAssistant = HomeAssistant +core.callback = callback +ha.core = core + +# homeassistant.exceptions +exc = _module("homeassistant.exceptions") + + +class ConfigEntryAuthFailed(Exception): + pass + + +class ConfigEntryNotReady(Exception): + pass + + +exc.ConfigEntryAuthFailed = ConfigEntryAuthFailed +exc.ConfigEntryNotReady = ConfigEntryNotReady + +# homeassistant.helpers + submodules +helpers = _module("homeassistant.helpers") +aiohttp_client = _module("homeassistant.helpers.aiohttp_client") + + +def async_get_clientsession(hass: HomeAssistant): + return hass.clientsession + + +aiohttp_client.async_get_clientsession = async_get_clientsession +helpers.aiohttp_client = aiohttp_client + +entity = _module("homeassistant.helpers.entity") + + +class DeviceInfo(dict): + pass + + +entity.DeviceInfo = DeviceInfo +helpers.entity = entity + +device_registry = _module("homeassistant.helpers.device_registry") + + +class _FakeDeviceRegistry: + def async_get_device(self, identifiers): + return None + + +def _async_get(_hass): + return _FakeDeviceRegistry() + + +device_registry.async_get = _async_get +helpers.device_registry = device_registry + +typing_module = _module("homeassistant.helpers.typing") +typing_module.ConfigType = dict + +# config_entries +config_entries = _module("homeassistant.config_entries") + + +class ConfigEntry: + pass + + +class ConfigFlow: + def __init_subclass__(cls, *, domain=None, **kwargs): + super().__init_subclass__(**kwargs) + + +class OptionsFlow: + pass + + +class OptionsFlowWithConfigEntry(OptionsFlow): + def __init__(self, config_entry): + self.config_entry = config_entry + self.options = dict(getattr(config_entry, "options", {}) or {}) + + +CONN_CLASS_CLOUD_POLL = "cloud_poll" +ConfigFlowResult = dict + +config_entries.ConfigEntry = ConfigEntry +config_entries.ConfigFlow = ConfigFlow +config_entries.OptionsFlow = OptionsFlow +config_entries.OptionsFlowWithConfigEntry = OptionsFlowWithConfigEntry +config_entries.CONN_CLASS_CLOUD_POLL = CONN_CLASS_CLOUD_POLL +config_entries.ConfigFlowResult = ConfigFlowResult + +# const +const = _module("homeassistant.const") + + +class Platform: + DEVICE_TRACKER = "device_tracker" + BUTTON = "button" + SENSOR = "sensor" + + +const.Platform = Platform + +# update_coordinator +update_coord = _module("homeassistant.helpers.update_coordinator") + + +class DataUpdateCoordinator: + def __init__(self, *args, **kwargs): + pass + + +class UpdateFailed(Exception): + pass + + +update_coord.DataUpdateCoordinator = DataUpdateCoordinator +update_coord.UpdateFailed = UpdateFailed +helpers.update_coordinator = update_coord diff --git a/tests/test_validate_jsessionid.py b/tests/test_validate_jsessionid.py new file mode 100644 index 0000000..b9be477 --- /dev/null +++ b/tests/test_validate_jsessionid.py @@ -0,0 +1,94 @@ +"""Tests for validate_jsessionid. + +The interesting case is cookie-jar pollution: HA's shared aiohttp session can +already hold a JSESSIONID cookie that was stored from a previous Set-Cookie +response (so it carries a real domain attribute). When we then pre-load a new +JSESSIONID via update_cookies({"JSESSIONID": ...}) without specifying a +response_url, the new cookie gets stored with no domain and aiohttp ships the +older, more-specific one — making the new (valid) cookie look invalid to +Samsung.""" +from __future__ import annotations + +import aiohttp +import pytest +from aioresponses import aioresponses +from yarl import URL + +from custom_components.smartthings_find.utils import ( + URL_GET_CSRF, + validate_jsessionid, +) + + +@pytest.fixture +async def hass(monkeypatch): + from homeassistant.core import HomeAssistant + + h = HomeAssistant() + async with aiohttp.ClientSession() as session: + h.clientsession = session + yield h + + +@pytest.mark.asyncio +async def test_valid_cookie_returns_true(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok123"}) + assert await validate_jsessionid(hass, "fresh-cookie") is True + + +@pytest.mark.asyncio +async def test_no_csrf_header_returns_false(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, body="fail") + assert await validate_jsessionid(hass, "expired-cookie") is False + + +@pytest.mark.asyncio +async def test_non_200_returns_false(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=401, body="Logout") + assert await validate_jsessionid(hass, "anything") is False + + +@pytest.mark.asyncio +async def test_validation_does_not_touch_shared_session(hass): + """The shared HA session may already hold a JSESSIONID cookie with a real + domain (set by a previous Set-Cookie). Validation must run in an isolated + aiohttp session so: + + * the shared jar is not mutated as a side-effect of validation, and + * a stale domain-bound JSESSIONID can't shadow the cookie we're validating. + + Without this, aiohttp's cookie jar prefers the domain-matched (stale) + cookie over the bare cookie we just added, so Samsung sees the expired + session, returns no `_csrf`, and we tell the user their cookie was + rejected even though it was perfectly fine.""" + hass.clientsession.cookie_jar.update_cookies( + {"JSESSIONID": "STALE_VALUE"}, + response_url=URL("https://smartthingsfind.samsung.com/"), + ) + before = sorted( + (c.key, c["domain"], c.value) + for c in hass.clientsession.cookie_jar + ) + + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok"}) + result = await validate_jsessionid(hass, "FRESH_VALUE") + + after = sorted( + (c.key, c["domain"], c.value) + for c in hass.clientsession.cookie_jar + ) + + assert before == after, ( + "validate_jsessionid mutated the shared HA session's cookie jar; " + "it must use an isolated session.\n" + f" before: {before}\n after: {after}" + ) + assert result is True, ( + "validate_jsessionid returned False even though the mocked server " + "would have replied with _csrf — the stale shared-jar cookie likely " + "shadowed the fresh one" + )