diff --git a/.gitignore b/.gitignore index 4eb8602..5dfb27f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ __pycache__ .coverage .vscode coverage.xml +.env # Home Assistant configuration diff --git a/custom_components/smartthings_find/__init__.py b/custom_components/smartthings_find/__init__.py index e5a9024..ba2728c 100644 --- a/custom_components/smartthings_find/__init__.py +++ b/custom_components/smartthings_find/__init__.py @@ -1,6 +1,7 @@ from datetime import timedelta import logging import aiohttp +from yarl import URL from homeassistant.core import HomeAssistant from homeassistant.helpers.typing import ConfigType from homeassistant.const import Platform @@ -39,7 +40,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: jsessionid = entry.data[CONF_JSESSIONID] session = async_get_clientsession(hass) - session.cookie_jar.update_cookies({"JSESSIONID": jsessionid}) + # The shared HA session may already hold a JSESSIONID for this domain + # from a previous load (set via Set-Cookie). Clear it first; otherwise + # a bare update_cookies adds an unscoped duplicate and aiohttp ships the + # older, domain-matched (stale) value, breaking auth after a UI reauth. + session.cookie_jar.clear_domain("smartthingsfind.samsung.com") + session.cookie_jar.update_cookies( + {"JSESSIONID": jsessionid}, + response_url=URL("https://smartthingsfind.samsung.com/"), + ) active_smarttags = entry.options.get(CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT) active_others = entry.options.get(CONF_ACTIVE_MODE_OTHERS, CONF_ACTIVE_MODE_OTHERS_DEFAULT) @@ -55,30 +64,44 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: # Load all SmartThings-Find devices from the users account devices = await get_devices(hass, session, entry.entry_id) - + # Create an update coordinator. This is responsible to regularly # fetch data from STF and update the device_tracker and sensor # entities update_interval = entry.options.get(CONF_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL_DEFAULT) - coordinator = SmartThingsFindCoordinator(hass, session, devices, update_interval) + coordinator = SmartThingsFindCoordinator(hass, session, entry.entry_id, update_interval) + + # Store everything the coordinator needs BEFORE the first refresh, + # because _async_update_data reads `devices` out of hass.data. + hass.data[DOMAIN][entry.entry_id].update({ + CONF_JSESSIONID: jsessionid, + "session": session, + "coordinator": coordinator, + "devices": devices, + }) # This is what makes the whole integration slow to load (around 10-15 # seconds for my 15 devices) but it is the right way to do it. Only if # it succeeds, the integration will be marked as successfully loaded. await coordinator.async_config_entry_first_refresh() - - hass.data[DOMAIN][entry.entry_id].update({ - CONF_JSESSIONID: jsessionid, - "session": session, - "coordinator": coordinator, - "devices": devices - }) hass.async_create_task( hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) ) return True +async def async_remove_config_entry_device( + hass: HomeAssistant, entry: ConfigEntry, device +) -> bool: + """Allow the user to remove a device from this config entry. + + Returning True unconditionally lets HA delete the device + its entities + from the registry. If the same dvceID later reappears in Samsung's + response, the next setup will recreate fresh entries for it. + """ + return True + + async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" unload_success = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) @@ -92,10 +115,10 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: class SmartThingsFindCoordinator(DataUpdateCoordinator): """Class to manage fetching SmartThings Find data.""" - def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, devices, update_interval : int): + def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str, update_interval : int): """Initialize the coordinator.""" self.session = session - self.devices = devices + self._entry_id = entry_id self.hass = hass super().__init__( hass, @@ -107,11 +130,12 @@ class SmartThingsFindCoordinator(DataUpdateCoordinator): async def _async_update_data(self): """Fetch data from SmartThings Find.""" try: + devices = self.hass.data[DOMAIN][self._entry_id]["devices"] tags = {} _LOGGER.debug(f"Updating locations...") - for device in self.devices: + for device in devices: dev_data = device['data'] - tag_data = await get_device_location(self.hass, self.session, dev_data, self.config_entry.entry_id) + tag_data = await get_device_location(self.hass, self.session, dev_data, self._entry_id) tags[dev_data['dvceID']] = tag_data _LOGGER.debug(f"Fetched {len(tags)} locations") return tags diff --git a/custom_components/smartthings_find/config_flow.py b/custom_components/smartthings_find/config_flow.py index 319ed95..23d1b68 100644 --- a/custom_components/smartthings_find/config_flow.py +++ b/custom_components/smartthings_find/config_flow.py @@ -1,11 +1,11 @@ -from typing import Any, Dict +from typing import Any import voluptuous as vol from homeassistant import config_entries from homeassistant.core import callback from homeassistant.config_entries import ( ConfigEntry, ConfigFlowResult, - OptionsFlowWithConfigEntry + OptionsFlowWithConfigEntry, ) from .const import ( DOMAIN, @@ -15,130 +15,65 @@ from .const import ( CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT, CONF_ACTIVE_MODE_OTHERS, - CONF_ACTIVE_MODE_OTHERS_DEFAULT + CONF_ACTIVE_MODE_OTHERS_DEFAULT, ) -from .utils import do_login_stage_one, do_login_stage_two, gen_qr_code_base64 -import asyncio +from .utils import validate_jsessionid import logging _LOGGER = logging.getLogger(__name__) + class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): - """Handle a config flow for SmartThings Find.""" + """Handle a config flow for SmartThings Find. + + Samsung's account.samsung.com login was rebuilt as a JS SPA (IAM/OAuth2 + with bot protection), so the original QR-code login flow no longer works. + Until that is reverse-engineered we fall back to letting the user paste + the JSESSIONID cookie they obtain from a normal browser session at + https://smartthingsfind.samsung.com/. + """ VERSION = 1 CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL reauth_entry: ConfigEntry | None = None - task_stage_one: asyncio.Task | None = None - task_stage_two: asyncio.Task | None = None - - qr_url = None - session = None - - jsessionid = None - - - error = None - - async def do_stage_one(self): - _LOGGER.debug("Running login stage 1") - try: - stage_one_res = await do_login_stage_one(self.hass) - if not stage_one_res is None: - self.session, self.qr_url = stage_one_res - else: - self.error = "Login stage 1 failed. Check logs for details." - _LOGGER.warn("Login stage 1 failed") - _LOGGER.debug("Login stage 1 done") - except Exception as e: - self.error = "Login stage 1 failed. Check logs for details." - _LOGGER.error(f"Exception in stage 1: {e}", exc_info=True) - - async def do_stage_two(self): - _LOGGER.debug("Running login stage 2") - try: - stage_two_res = await do_login_stage_two(self.session) - if not stage_two_res is None: - self.jsessionid = stage_two_res - _LOGGER.info("Login successful") - else: - self.error = "Login stage 2 failed. Check logs for details." - _LOGGER.warning("Login stage 2 failed") - _LOGGER.debug("Login stage 2 done") - except Exception as e: - self.error = "Login stage 2 failed. Check logs for details." - _LOGGER.error(f"Exception in stage 2: {e}", exc_info=True) - - # First step: Get QR Code login URL - async def async_step_user(self, user_input=None): - _LOGGER.debug("Entering login stage 1") - if not self.task_stage_one: - self.task_stage_one = self.hass.async_create_task(self.do_stage_one()) - if not self.task_stage_one.done(): - return self.async_show_progress( - progress_action="task_stage_one", - progress_task=self.task_stage_one - ) - # At this point stage 1 is completed - if self.error: - # An error occurred, cancel the flow by moving to the finish- - # form which shows the error - return self.async_show_progress_done(next_step_id="finish") - # No error -> Proceed to stage 2 - return self.async_show_progress_done(next_step_id="auth_stage_two") - - # Second step: Wait until QR scanned an log in - async def async_step_auth_stage_two(self, user_input=None): - if not self.task_stage_two: - self.task_stage_two = self.hass.async_create_task(self.do_stage_two()) - if not self.task_stage_two.done(): - return self.async_show_progress( - progress_action="task_stage_two", - progress_task=self.task_stage_two, - description_placeholders={ - "qr_code": gen_qr_code_base64(self.qr_url), - "url": self.qr_url, - "code": self.qr_url.split('/')[-1], - } - ) - return self.async_show_progress_done(next_step_id="finish") - - async def async_step_finish(self, user_input=None): - if self.error: - return self.async_show_form(step_id="finish", errors={'base': self.error}) - - data={CONF_JSESSIONID: self.jsessionid} - + async def _async_handle_jsessionid(self, jsessionid: str) -> ConfigFlowResult: + data = {CONF_JSESSIONID: jsessionid} if self.reauth_entry: - # Finish step was called by reauth-flow. Do not create a new entry, - # instead update the existing entry - return self.async_update_reload_and_abort( - self.reauth_entry, - data=data - ) - + return self.async_update_reload_and_abort(self.reauth_entry, data=data) return self.async_create_entry(title="SmartThings Find", data=data) + async def async_step_user(self, user_input=None): + errors: dict[str, str] = {} + if user_input is not None: + jsessionid = (user_input.get(CONF_JSESSIONID) or "").strip() + if not jsessionid: + errors["base"] = "empty_cookie" + elif await validate_jsessionid(self.hass, jsessionid): + return await self._async_handle_jsessionid(jsessionid) + else: + errors["base"] = "invalid_auth" + + return self.async_show_form( + step_id="user", + data_schema=vol.Schema({vol.Required(CONF_JSESSIONID): str}), + errors=errors, + ) + async def async_step_reauth(self, user_input=None): self.reauth_entry = self.hass.config_entries.async_get_entry( self.context["entry_id"] ) - return await self.async_step_reauth_confirm() - - async def async_step_reauth_confirm(self, user_input=None): - if user_input is None: - return self.async_show_form( - step_id="user", - data_schema=vol.Schema({}), - ) return await self.async_step_user() - + async def async_step_reconfigure(self, user_input: dict[str, Any] | None = None): - return await self.async_step_reauth_confirm(self) - + self.reauth_entry = self.hass.config_entries.async_get_entry( + self.context["entry_id"] + ) + return await self.async_step_user() + @staticmethod @callback def async_get_options_flow( @@ -146,8 +81,8 @@ class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): ) -> config_entries.OptionsFlow: """Create the options flow.""" return SmartThingsFindOptionsFlowHandler(config_entry) - - + + class SmartThingsFindOptionsFlowHandler(OptionsFlowWithConfigEntry): """Handle an options flow.""" @@ -186,4 +121,4 @@ class SmartThingsFindOptionsFlowHandler(OptionsFlowWithConfigEntry): ): bool, } ) - return self.async_show_form(step_id="init", data_schema=data_schema) \ No newline at end of file + return self.async_show_form(step_id="init", data_schema=data_schema) diff --git a/custom_components/smartthings_find/device_tracker.py b/custom_components/smartthings_find/device_tracker.py index 4fcea27..83e4f16 100644 --- a/custom_components/smartthings_find/device_tracker.py +++ b/custom_components/smartthings_find/device_tracker.py @@ -43,8 +43,14 @@ class SmartThingsDeviceTracker(DeviceTrackerEntity): if 'icons' in device['data'] and 'coloredIcon' in device['data']['icons']: self._attr_entity_picture = device['data']['icons']['coloredIcon'] - self.async_update = coordinator.async_add_listener(self.async_write_ha_state) - + + async def async_added_to_hass(self) -> None: + """Subscribe to coordinator updates once added to HA.""" + await super().async_added_to_hass() + self.async_on_remove( + self.coordinator.async_add_listener(self.async_write_ha_state) + ) + def async_write_ha_state(self): if not self.enabled: _LOGGER.debug(f"Ignoring state write request for disabled entity '{self.entity_id}'") diff --git a/custom_components/smartthings_find/manifest.json b/custom_components/smartthings_find/manifest.json index 9f36133..6b7a277 100644 --- a/custom_components/smartthings_find/manifest.json +++ b/custom_components/smartthings_find/manifest.json @@ -2,13 +2,13 @@ "domain": "smartthings_find", "name": "SmartThings Find", "after_dependencies": ["http"], - "version": "0.2.1", - "documentation": "https://github.com/Vedeneb/HA-SmartThings-Find", - "issue_tracker": "https://github.com/Vedeneb/HA-SmartThings-Find/issues", + "version": "0.3.1", + "documentation": "https://git.jeena.net/jeena/HA-SmartThings-Find", + "issue_tracker": "https://git.jeena.net/jeena/HA-SmartThings-Find/issues", "integration_type": "hub", "dependencies": [], - "codeowners": ["@Vedeneb"], - "requirements": ["requests", "qrcode"], + "codeowners": ["@jeena"], + "requirements": ["pytz"], "iot_class": "cloud_polling", "config_flow": true } diff --git a/custom_components/smartthings_find/translations/de.json b/custom_components/smartthings_find/translations/de.json index 3846589..36258d8 100644 --- a/custom_components/smartthings_find/translations/de.json +++ b/custom_components/smartthings_find/translations/de.json @@ -3,9 +3,18 @@ "abort": { "reauth_successful": "Erfolgreich authentifiziert." }, - "progress": { - "task_stage_one": "Vorbereitung läuft, bitte warten...", - "task_stage_two": "Bitte scanne den QR Code mit deinem Galaxy-Gerät, um dich anzumelden. Alternativ kannst du dich direkt im Browser anmelden, indum du [hier]({url}) klickst. \n\n![QR Code](data:image/png;base64,{qr_code})\n\nCode kann nicht gescannt werden? Gehe auf [signin.samsung.com/key](https://signin.samsung.com/key/) und gib den folgenden Code ein:\n\n## ```{code}```\n" + "error": { + "invalid_auth": "Das Cookie wurde von SmartThings Find abgelehnt. Stelle sicher, dass du nur den Wert des JSESSIONID-Cookies (nicht den gesamten Cookie-Header) aus einer angemeldeten Sitzung kopiert hast.", + "empty_cookie": "Bitte einen JSESSIONID-Wert einfügen." + }, + "step": { + "user": { + "title": "JSESSIONID-Cookie einfügen", + "description": "Samsungs QR-Code-Login funktioniert nicht mehr (account.samsung.com wurde umgebaut). Bis das ersetzt ist, musst du das Session-Cookie manuell besorgen:\n\n1. Öffne [smartthingsfind.samsung.com](https://smartthingsfind.samsung.com/) in einem normalen Browser und melde dich an.\n2. Öffne DevTools → Anwendung (Chrome) bzw. Speicher (Firefox) → Cookies → smartthingsfind.samsung.com.\n3. Kopiere den Wert des **JSESSIONID**-Cookies und füge ihn unten ein.\n\nDas Cookie bleibt meist mehrere Wochen gültig. Läuft es ab, fordert Home Assistant eine erneute Authentifizierung an – dann diese Schritte einfach wiederholen.", + "data": { + "jsessionid": "JSESSIONID" + } + } } }, "options": { diff --git a/custom_components/smartthings_find/translations/en.json b/custom_components/smartthings_find/translations/en.json index 9a04e79..49a0aa3 100644 --- a/custom_components/smartthings_find/translations/en.json +++ b/custom_components/smartthings_find/translations/en.json @@ -3,9 +3,18 @@ "abort": { "reauth_successful": "Reauthentication successful." }, - "progress": { - "task_stage_one": "Preparing, please wait...", - "task_stage_two": "To login please scan the following QR Code with your Galaxy device. You can also login using this browser by clicking [here]({url}). \n\n![QR Code](data:image/png;base64,{qr_code})\n\nUnable to scan the code? Go to [signin.samsung.com/key](https://signin.samsung.com/key/) and enter the following code:\n\n## ```{code}```\n" + "error": { + "invalid_auth": "Cookie was rejected by SmartThings Find. Make sure you copied the JSESSIONID value (not the whole cookie header) from a tab where you are logged in.", + "empty_cookie": "Please paste a JSESSIONID value." + }, + "step": { + "user": { + "title": "Paste JSESSIONID cookie", + "description": "Samsung's QR code login flow no longer works (account.samsung.com was rebuilt). Until it is replaced you have to obtain the session cookie manually:\n\n1. Open [smartthingsfind.samsung.com](https://smartthingsfind.samsung.com/) in a regular browser and log in.\n2. Open DevTools → Application (Chrome) or Storage (Firefox) → Cookies → smartthingsfind.samsung.com.\n3. Copy the value of the **JSESSIONID** cookie and paste it below.\n\nThe cookie usually stays valid for several weeks. When it expires Home Assistant will trigger a reauth and you repeat these steps.", + "data": { + "jsessionid": "JSESSIONID" + } + } } }, "options": { @@ -19,4 +28,4 @@ } } } -} \ No newline at end of file +} diff --git a/custom_components/smartthings_find/utils.py b/custom_components/smartthings_find/utils.py index f95ec7c..fcbd880 100644 --- a/custom_components/smartthings_find/utils.py +++ b/custom_components/smartthings_find/utils.py @@ -1,17 +1,9 @@ import logging import json import pytz -import qrcode -import base64 import aiohttp -import asyncio -import random -import string -import re import html -from io import BytesIO -from datetime import datetime, timedelta -from homeassistant.helpers.aiohttp_client import async_get_clientsession +from datetime import datetime from homeassistant.core import HomeAssistant from homeassistant.helpers.entity import DeviceInfo from homeassistant.exceptions import ConfigEntryAuthFailed @@ -21,194 +13,39 @@ from .const import DOMAIN, BATTERY_LEVELS, CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTI _LOGGER = logging.getLogger(__name__) -URL_PRE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInGate?state={state}&redirect_uri=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin.do&response_type=code&client_id=ntly6zvfpn&scope=iot.client&locale=de_DE&acr_values=urn:samsungaccount:acr:basic&goBackURL=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin' -URL_QR_CODE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCode' -URL_SIGNIN_XHR = 'https://account.samsung.com/accounts/v1/FMM2/signInXhr' -URL_QR_POLL = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCodeProc' -URL_SIGNIN_SUCCESS = 'https://account.samsung.com{next_url}' URL_GET_CSRF = "https://smartthingsfind.samsung.com/chkLogin.do" URL_DEVICE_LIST = "https://smartthingsfind.samsung.com/device/getDeviceList.do" URL_REQUEST_LOC_UPDATE = "https://smartthingsfind.samsung.com/dm/addOperation.do" URL_SET_LAST_DEVICE = "https://smartthingsfind.samsung.com/device/setLastSelect.do" -async def do_login_stage_one(hass: HomeAssistant) -> tuple: - """ - Perform the first stage of the login process. +async def validate_jsessionid(hass: HomeAssistant, jsessionid: str) -> bool: + """Check whether a JSESSIONID cookie is currently valid for SmartThings Find. - This function performs the initial login steps for the SmartThings Find service. - It generates a random state string, sends a pre-login request, and retrieves - the QR code URL from the response. + Uses an isolated aiohttp session: the shared HA session may already hold + a JSESSIONID cookie pinned to ``smartthingsfind.samsung.com`` from a prior + Set-Cookie response. A bare ``update_cookies({"JSESSIONID": ...})`` would + only add an unscoped duplicate, and aiohttp's cookie jar would still ship + the older, domain-matched (stale) cookie — making the freshly-pasted + cookie look invalid to the server. - Args: - hass (HomeAssistant): Home Assistant instance. - - Returns: - tuple: A tuple containing the session and QR code URL if successful, None otherwise. - """ - session = async_get_clientsession(hass) - session.cookie_jar.clear() - - # Generating the state parameter - state = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) - - try: - # Load the initial login page which already sets some cookies. 'state' - # is a randomly generated string. 'client_id' seems to be static for - # SmartThings find. - async with session.get(URL_PRE_SIGNIN.format(state=state)) as res: - if res.status != 200: - _LOGGER.error( - f"Pre-login request failed with status {res.status}") - return None - _LOGGER.debug(f"Step 1: Pre-Login: Status Code: {res.status}") - - # Load the "Login with QR-Code"-page - async with session.get(URL_QR_CODE_SIGNIN) as res: - if res.status != 200: - _LOGGER.error( - f"QR code URL request failed with status {res.status}, Response: {text[:250]}...") - return None - text = await res.text() - _LOGGER.debug(f"Step 2: QR-Code URL: Status Code: {res.status}") - - # Search the URL which is embedded in the QR Code. It looks like this: - # https://signin.samsung.com/key/abcdefgh - match = re.search(r"https://signin\.samsung\.com/key/[^'\"]+", text) - if not match: - _LOGGER.error("QR code URL not found in the response") - return None - - qr_url = match.group(0) - _LOGGER.info(f"Extracted QR code URL: {qr_url}") - - return session, qr_url - except Exception as e: - _LOGGER.error( - f"An error occurred during the login process (stage 1): {e}", exc_info=True) - return None - - -async def do_login_stage_two(session: aiohttp.ClientSession) -> str: - """ - Perform the second stage of the login process. - - This function continues the login process by fetching the - CSRF token, polling the server for login status, and ultimately - retrieving the JSESSIONID required for SmartThings Find. - - Args: - session (aiohttp.ClientSession): The current session with cookies set from login stage one. - - Returns: - str: The JSESSIONID if successful, None otherwise. + A short-lived ClientSession with its own jar avoids that entirely. """ try: - # Here you would generate and display the QR code. This is environment-specific. - # qr = qrcode.QRCode() - # qr.add_data(extracted_url) - # qr.print_ascii() # Or any other method to display the QR code to the user. - - # Fetch the _csrf token. This needs to be sent with each QR-Poll-Request - async with session.get(URL_SIGNIN_XHR) as res: - if res.status != 200: - _LOGGER.error( - f"XHR login request failed with status {res.status}") - return None - json_res = await res.json() - _LOGGER.debug( - f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}") - - csrf_token = json_res.get('_csrf', {}).get('token') - if not csrf_token: - _LOGGER.error("CSRF token not found in the response") - return None - - # Define a timeout. We don't want to poll forever. - end_time = datetime.now() + timedelta(seconds=120) - next_url = None - - # We check, whether the QR-Code was scanned and the login was successful. - # This request returns either: - # {"rtnCd":"POLLING"} <-- Not yet logged in. Login still pending. - # OR - # {"rtnCd":"SUCCESS","nextURL":"/accounts/v1/FMM2/signInComplete"} - # So we fetch this URL every few seconds, until we receive "SUCCESS", which - # indicates the user has successfully scanned the Code and approved the login. - # This is exactly the same way, the website does it. - # In case the user never scans the QR-Code, we run in the timeout defined above. - while datetime.now() < end_time: - try: - await asyncio.sleep(2) - async with session.post(URL_QR_POLL, json={}, headers={'X-Csrf-Token': csrf_token}) as res: - if res.status != 200: - _LOGGER.error( - f"QR check request failed with status {res.status}") - continue - js = await res.json() - _LOGGER.debug( - f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}") - - if js.get('rtnCd') == "SUCCESS": - next_url = js.get('nextURL') - break - except aiohttp.ClientError as e: - _LOGGER.error(f"QR Poll request failed: {e}") - return None - - if not next_url: - _LOGGER.error("QR Code not scanned within 2 mins") - return None - - # Fetch the 'next_url' we received from the previous request. On success, this sets - # the initial JSESSIONID-cookie. We're not done yet, since this cookie is not valid - # for SmartThings Find (which uses it's own JSESSIONID). - async with session.get(URL_SIGNIN_SUCCESS.format(next_url=next_url)) as res: - if res.status != 200: - _LOGGER.error( - f"Login success URL request failed with status {res.status}") - return None - text = await res.text() - _LOGGER.debug(f"Step 5: Login success: Status Code: {res.status}") - - # The response contains another redirect URL which we need to extract from the - # received HTML/JS-content. This URL looks something like this: - # https://smartthingsfind.samsung.com/login.do?auth_server_url=eu-auth2.samsungosp.com - # &code=[...]&code_expires_in=300&state=[state we generated above] - # &api_server_url=eu-auth2.samsungosp.com - match = re.search( - r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text) - if not match: - _LOGGER.error( - "Redirect URL not found in the login success response") - return None - - redirect_url = match.group(1) - _LOGGER.debug(f"Found Redirect URL: {redirect_url}") - - # Fetch the received redirect_url. This response finally contains our JSESSIONID, - # which is what actually authenticates us for SmartThings Find. - async with session.get(redirect_url) as res: - if res.status != 200: - _LOGGER.error( - f"Redirect URL request failed with status {res.status}") - return None - _LOGGER.debug( - f"Step 6: Follow redirect URL: Status Code: {res.status}") - - jsessionid = session.cookie_jar.filter_cookies( - 'https://smartthingsfind.samsung.com').get('JSESSIONID') - if not jsessionid: - _LOGGER.error("JSESSIONID not found in cookies") - return None - - _LOGGER.debug(f"JSESSIONID: {jsessionid.value[:20]}...") - return jsessionid.value - + async with aiohttp.ClientSession( + cookies={"JSESSIONID": jsessionid} + ) as session: + async with session.get(URL_GET_CSRF) as response: + if response.status == 200 and response.headers.get("_csrf"): + return True + body = (await response.text())[:200] + _LOGGER.warning( + f"JSESSIONID validation failed: status={response.status}, body='{body}'" + ) + return False except Exception as e: - _LOGGER.error( - f"An error occurred during the login process (stage 2): {e}", exc_info=True) - return None + _LOGGER.error(f"JSESSIONID validation error: {e}", exc_info=True) + return False async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str): @@ -377,8 +214,13 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio utcDate = parse_stf_date( op['extra']['gpsUtcDt']) else: - _LOGGER.error( - f"[{dev_name}] No UTC date found for operation '{op['oprnType']}', this should not happen! OP: {json.dumps(op)}") + _LOGGER.warning( + f"[{dev_name}] No UTC date found for operation '{op['oprnType']}'. OP: {json.dumps(op)}") + continue + + if utcDate is None: + _LOGGER.warning( + f"[{dev_name}] Could not parse UTC date for operation '{op['oprnType']}'") continue if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate: @@ -419,6 +261,10 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio continue else: utcDate = parse_stf_date(loc['gpsUtcDt']) + if utcDate is None: + _LOGGER.warning( + f"[{dev_name}] Could not parse UTC date for encrypted location ('{op['oprnType']}')") + continue if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate: _LOGGER.debug( f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})") @@ -519,17 +365,18 @@ def get_sub_location(ops: list, subDeviceName: str) -> tuple: for op in ops: if subDeviceName in op.get('encLocation', {}): loc = op['encLocation'][subDeviceName] + gps_date = parse_stf_date(loc.get('gpsUtcDt', '')) sub_loc = { - "latitude": float(loc['latitude']), - "longitude": float(loc['longitude']), + "latitude": float(loc['latitude']) if loc.get('latitude') is not None else None, + "longitude": float(loc['longitude']) if loc.get('longitude') is not None else None, "gps_accuracy": calc_gps_accuracy(loc.get('horizontalUncertainty'), loc.get('verticalUncertainty')), - "gps_date": parse_stf_date(loc['gpsUtcDt']) + "gps_date": gps_date } return op, sub_loc return {}, {} -def parse_stf_date(datestr: str) -> datetime: +def parse_stf_date(datestr: str) -> datetime | None: """ Parses a date string in the format "%Y%m%d%H%M%S" to a datetime object. This is the format, the SmartThings Find API uses. @@ -538,9 +385,16 @@ def parse_stf_date(datestr: str) -> datetime: datestr (str): The date string in the format "%Y%m%d%H%M%S". Returns: - datetime: A datetime object representing the input date string. + datetime: A datetime object representing the input date string, + or None if parsing fails. """ - return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC) + if not datestr: + return None + try: + return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC) + except ValueError: + _LOGGER.warning(f"Failed to parse date string: '{datestr}'") + return None def get_battery_level(dev_name: str, ops: list) -> int: @@ -566,22 +420,3 @@ def get_battery_level(dev_name: str, ops: list) -> int: f"[{dev_name}]: Received invalid battery level: {batt_raw}") return batt return None - - -def gen_qr_code_base64(data: str) -> str: - """ - Generates a QR code from the provided data and returns it as a base64-encoded string. - Used to show a login QR code during authentication flow - - Args: - data (str): The data to encode in the QR code. - - Returns: - str: The base64-encoded string representation of the QR code. - """ - qr = qrcode.QRCode() - qr.add_data(data) - img = qr.make_image(fill_color="black", back_color="white") - buffer = BytesIO() - img.save(buffer, format="PNG") - return base64.b64encode(buffer.getvalue()).decode("utf-8") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..78c5011 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +asyncio_mode = auto +testpaths = tests diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..585c749 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,158 @@ +"""Stub the HA imports the integration touches so utils/config_flow can be +imported and exercised without a full Home Assistant install. + +Only the bits used by the code under test are faked — anything new the +integration starts touching will need to be added here.""" +from __future__ import annotations + +import sys +import types +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + + +def _module(name: str) -> types.ModuleType: + if name not in sys.modules: + sys.modules[name] = types.ModuleType(name) + return sys.modules[name] + + +# homeassistant.core +ha = _module("homeassistant") +core = _module("homeassistant.core") + + +class HomeAssistant: + def __init__(self) -> None: + self.data: dict[str, Any] = {} + # Tests assign a real aiohttp.ClientSession here. + self.clientsession = None + + +def callback(func): + return func + + +core.HomeAssistant = HomeAssistant +core.callback = callback +ha.core = core + +# homeassistant.exceptions +exc = _module("homeassistant.exceptions") + + +class ConfigEntryAuthFailed(Exception): + pass + + +class ConfigEntryNotReady(Exception): + pass + + +exc.ConfigEntryAuthFailed = ConfigEntryAuthFailed +exc.ConfigEntryNotReady = ConfigEntryNotReady + +# homeassistant.helpers + submodules +helpers = _module("homeassistant.helpers") +aiohttp_client = _module("homeassistant.helpers.aiohttp_client") + + +def async_get_clientsession(hass: HomeAssistant): + return hass.clientsession + + +aiohttp_client.async_get_clientsession = async_get_clientsession +helpers.aiohttp_client = aiohttp_client + +entity = _module("homeassistant.helpers.entity") + + +class DeviceInfo(dict): + pass + + +entity.DeviceInfo = DeviceInfo +helpers.entity = entity + +device_registry = _module("homeassistant.helpers.device_registry") + + +class _FakeDeviceRegistry: + def async_get_device(self, identifiers): + return None + + +def _async_get(_hass): + return _FakeDeviceRegistry() + + +device_registry.async_get = _async_get +helpers.device_registry = device_registry + +typing_module = _module("homeassistant.helpers.typing") +typing_module.ConfigType = dict + +# config_entries +config_entries = _module("homeassistant.config_entries") + + +class ConfigEntry: + pass + + +class ConfigFlow: + def __init_subclass__(cls, *, domain=None, **kwargs): + super().__init_subclass__(**kwargs) + + +class OptionsFlow: + pass + + +class OptionsFlowWithConfigEntry(OptionsFlow): + def __init__(self, config_entry): + self.config_entry = config_entry + self.options = dict(getattr(config_entry, "options", {}) or {}) + + +CONN_CLASS_CLOUD_POLL = "cloud_poll" +ConfigFlowResult = dict + +config_entries.ConfigEntry = ConfigEntry +config_entries.ConfigFlow = ConfigFlow +config_entries.OptionsFlow = OptionsFlow +config_entries.OptionsFlowWithConfigEntry = OptionsFlowWithConfigEntry +config_entries.CONN_CLASS_CLOUD_POLL = CONN_CLASS_CLOUD_POLL +config_entries.ConfigFlowResult = ConfigFlowResult + +# const +const = _module("homeassistant.const") + + +class Platform: + DEVICE_TRACKER = "device_tracker" + BUTTON = "button" + SENSOR = "sensor" + + +const.Platform = Platform + +# update_coordinator +update_coord = _module("homeassistant.helpers.update_coordinator") + + +class DataUpdateCoordinator: + def __init__(self, *args, **kwargs): + pass + + +class UpdateFailed(Exception): + pass + + +update_coord.DataUpdateCoordinator = DataUpdateCoordinator +update_coord.UpdateFailed = UpdateFailed +helpers.update_coordinator = update_coord diff --git a/tests/test_validate_jsessionid.py b/tests/test_validate_jsessionid.py new file mode 100644 index 0000000..b9be477 --- /dev/null +++ b/tests/test_validate_jsessionid.py @@ -0,0 +1,94 @@ +"""Tests for validate_jsessionid. + +The interesting case is cookie-jar pollution: HA's shared aiohttp session can +already hold a JSESSIONID cookie that was stored from a previous Set-Cookie +response (so it carries a real domain attribute). When we then pre-load a new +JSESSIONID via update_cookies({"JSESSIONID": ...}) without specifying a +response_url, the new cookie gets stored with no domain and aiohttp ships the +older, more-specific one — making the new (valid) cookie look invalid to +Samsung.""" +from __future__ import annotations + +import aiohttp +import pytest +from aioresponses import aioresponses +from yarl import URL + +from custom_components.smartthings_find.utils import ( + URL_GET_CSRF, + validate_jsessionid, +) + + +@pytest.fixture +async def hass(monkeypatch): + from homeassistant.core import HomeAssistant + + h = HomeAssistant() + async with aiohttp.ClientSession() as session: + h.clientsession = session + yield h + + +@pytest.mark.asyncio +async def test_valid_cookie_returns_true(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok123"}) + assert await validate_jsessionid(hass, "fresh-cookie") is True + + +@pytest.mark.asyncio +async def test_no_csrf_header_returns_false(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, body="fail") + assert await validate_jsessionid(hass, "expired-cookie") is False + + +@pytest.mark.asyncio +async def test_non_200_returns_false(hass): + with aioresponses() as m: + m.get(URL_GET_CSRF, status=401, body="Logout") + assert await validate_jsessionid(hass, "anything") is False + + +@pytest.mark.asyncio +async def test_validation_does_not_touch_shared_session(hass): + """The shared HA session may already hold a JSESSIONID cookie with a real + domain (set by a previous Set-Cookie). Validation must run in an isolated + aiohttp session so: + + * the shared jar is not mutated as a side-effect of validation, and + * a stale domain-bound JSESSIONID can't shadow the cookie we're validating. + + Without this, aiohttp's cookie jar prefers the domain-matched (stale) + cookie over the bare cookie we just added, so Samsung sees the expired + session, returns no `_csrf`, and we tell the user their cookie was + rejected even though it was perfectly fine.""" + hass.clientsession.cookie_jar.update_cookies( + {"JSESSIONID": "STALE_VALUE"}, + response_url=URL("https://smartthingsfind.samsung.com/"), + ) + before = sorted( + (c.key, c["domain"], c.value) + for c in hass.clientsession.cookie_jar + ) + + with aioresponses() as m: + m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok"}) + result = await validate_jsessionid(hass, "FRESH_VALUE") + + after = sorted( + (c.key, c["domain"], c.value) + for c in hass.clientsession.cookie_jar + ) + + assert before == after, ( + "validate_jsessionid mutated the shared HA session's cookie jar; " + "it must use an isolated session.\n" + f" before: {before}\n after: {after}" + ) + assert result is True, ( + "validate_jsessionid returned False even though the mocked server " + "would have replied with _csrf — the stale shared-jar cookie likely " + "shadowed the fresh one" + )