Compare commits
5 commits
13bd1dabe0
...
cddc1e9d72
| Author | SHA1 | Date | |
|---|---|---|---|
| cddc1e9d72 | |||
| 4c26792c37 | |||
| 7457a8284b | |||
| 73aa86cfb5 | |||
| 13ff5a534e |
11 changed files with 422 additions and 348 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -10,6 +10,7 @@ __pycache__
|
|||
.coverage
|
||||
.vscode
|
||||
coverage.xml
|
||||
.env
|
||||
|
||||
|
||||
# Home Assistant configuration
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from datetime import timedelta
|
||||
import logging
|
||||
import aiohttp
|
||||
from yarl import URL
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.const import Platform
|
||||
|
|
@ -39,7 +40,15 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
jsessionid = entry.data[CONF_JSESSIONID]
|
||||
|
||||
session = async_get_clientsession(hass)
|
||||
session.cookie_jar.update_cookies({"JSESSIONID": jsessionid})
|
||||
# The shared HA session may already hold a JSESSIONID for this domain
|
||||
# from a previous load (set via Set-Cookie). Clear it first; otherwise
|
||||
# a bare update_cookies adds an unscoped duplicate and aiohttp ships the
|
||||
# older, domain-matched (stale) value, breaking auth after a UI reauth.
|
||||
session.cookie_jar.clear_domain("smartthingsfind.samsung.com")
|
||||
session.cookie_jar.update_cookies(
|
||||
{"JSESSIONID": jsessionid},
|
||||
response_url=URL("https://smartthingsfind.samsung.com/"),
|
||||
)
|
||||
|
||||
active_smarttags = entry.options.get(CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT)
|
||||
active_others = entry.options.get(CONF_ACTIVE_MODE_OTHERS, CONF_ACTIVE_MODE_OTHERS_DEFAULT)
|
||||
|
|
@ -60,25 +69,39 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
# fetch data from STF and update the device_tracker and sensor
|
||||
# entities
|
||||
update_interval = entry.options.get(CONF_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL_DEFAULT)
|
||||
coordinator = SmartThingsFindCoordinator(hass, session, devices, update_interval)
|
||||
coordinator = SmartThingsFindCoordinator(hass, session, entry.entry_id, update_interval)
|
||||
|
||||
# Store everything the coordinator needs BEFORE the first refresh,
|
||||
# because _async_update_data reads `devices` out of hass.data.
|
||||
hass.data[DOMAIN][entry.entry_id].update({
|
||||
CONF_JSESSIONID: jsessionid,
|
||||
"session": session,
|
||||
"coordinator": coordinator,
|
||||
"devices": devices,
|
||||
})
|
||||
|
||||
# This is what makes the whole integration slow to load (around 10-15
|
||||
# seconds for my 15 devices) but it is the right way to do it. Only if
|
||||
# it succeeds, the integration will be marked as successfully loaded.
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
hass.data[DOMAIN][entry.entry_id].update({
|
||||
CONF_JSESSIONID: jsessionid,
|
||||
"session": session,
|
||||
"coordinator": coordinator,
|
||||
"devices": devices
|
||||
})
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
)
|
||||
return True
|
||||
|
||||
async def async_remove_config_entry_device(
|
||||
hass: HomeAssistant, entry: ConfigEntry, device
|
||||
) -> bool:
|
||||
"""Allow the user to remove a device from this config entry.
|
||||
|
||||
Returning True unconditionally lets HA delete the device + its entities
|
||||
from the registry. If the same dvceID later reappears in Samsung's
|
||||
response, the next setup will recreate fresh entries for it.
|
||||
"""
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Unload a config entry."""
|
||||
unload_success = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
||||
|
|
@ -92,10 +115,10 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
class SmartThingsFindCoordinator(DataUpdateCoordinator):
|
||||
"""Class to manage fetching SmartThings Find data."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, devices, update_interval : int):
|
||||
def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str, update_interval : int):
|
||||
"""Initialize the coordinator."""
|
||||
self.session = session
|
||||
self.devices = devices
|
||||
self._entry_id = entry_id
|
||||
self.hass = hass
|
||||
super().__init__(
|
||||
hass,
|
||||
|
|
@ -107,11 +130,12 @@ class SmartThingsFindCoordinator(DataUpdateCoordinator):
|
|||
async def _async_update_data(self):
|
||||
"""Fetch data from SmartThings Find."""
|
||||
try:
|
||||
devices = self.hass.data[DOMAIN][self._entry_id]["devices"]
|
||||
tags = {}
|
||||
_LOGGER.debug(f"Updating locations...")
|
||||
for device in self.devices:
|
||||
for device in devices:
|
||||
dev_data = device['data']
|
||||
tag_data = await get_device_location(self.hass, self.session, dev_data, self.config_entry.entry_id)
|
||||
tag_data = await get_device_location(self.hass, self.session, dev_data, self._entry_id)
|
||||
tags[dev_data['dvceID']] = tag_data
|
||||
_LOGGER.debug(f"Fetched {len(tags)} locations")
|
||||
return tags
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
import voluptuous as vol
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.config_entries import (
|
||||
ConfigEntry,
|
||||
ConfigFlowResult,
|
||||
OptionsFlowWithConfigEntry
|
||||
OptionsFlowWithConfigEntry,
|
||||
)
|
||||
from .const import (
|
||||
DOMAIN,
|
||||
|
|
@ -15,129 +15,64 @@ from .const import (
|
|||
CONF_ACTIVE_MODE_SMARTTAGS,
|
||||
CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT,
|
||||
CONF_ACTIVE_MODE_OTHERS,
|
||||
CONF_ACTIVE_MODE_OTHERS_DEFAULT
|
||||
CONF_ACTIVE_MODE_OTHERS_DEFAULT,
|
||||
)
|
||||
from .utils import do_login_stage_one, do_login_stage_two, gen_qr_code_base64
|
||||
import asyncio
|
||||
from .utils import validate_jsessionid
|
||||
import logging
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for SmartThings Find."""
|
||||
"""Handle a config flow for SmartThings Find.
|
||||
|
||||
Samsung's account.samsung.com login was rebuilt as a JS SPA (IAM/OAuth2
|
||||
with bot protection), so the original QR-code login flow no longer works.
|
||||
Until that is reverse-engineered we fall back to letting the user paste
|
||||
the JSESSIONID cookie they obtain from a normal browser session at
|
||||
https://smartthingsfind.samsung.com/.
|
||||
"""
|
||||
|
||||
VERSION = 1
|
||||
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
|
||||
|
||||
reauth_entry: ConfigEntry | None = None
|
||||
|
||||
task_stage_one: asyncio.Task | None = None
|
||||
task_stage_two: asyncio.Task | None = None
|
||||
|
||||
qr_url = None
|
||||
session = None
|
||||
|
||||
jsessionid = None
|
||||
|
||||
|
||||
error = None
|
||||
|
||||
async def do_stage_one(self):
|
||||
_LOGGER.debug("Running login stage 1")
|
||||
try:
|
||||
stage_one_res = await do_login_stage_one(self.hass)
|
||||
if not stage_one_res is None:
|
||||
self.session, self.qr_url = stage_one_res
|
||||
else:
|
||||
self.error = "Login stage 1 failed. Check logs for details."
|
||||
_LOGGER.warn("Login stage 1 failed")
|
||||
_LOGGER.debug("Login stage 1 done")
|
||||
except Exception as e:
|
||||
self.error = "Login stage 1 failed. Check logs for details."
|
||||
_LOGGER.error(f"Exception in stage 1: {e}", exc_info=True)
|
||||
|
||||
async def do_stage_two(self):
|
||||
_LOGGER.debug("Running login stage 2")
|
||||
try:
|
||||
stage_two_res = await do_login_stage_two(self.session)
|
||||
if not stage_two_res is None:
|
||||
self.jsessionid = stage_two_res
|
||||
_LOGGER.info("Login successful")
|
||||
else:
|
||||
self.error = "Login stage 2 failed. Check logs for details."
|
||||
_LOGGER.warning("Login stage 2 failed")
|
||||
_LOGGER.debug("Login stage 2 done")
|
||||
except Exception as e:
|
||||
self.error = "Login stage 2 failed. Check logs for details."
|
||||
_LOGGER.error(f"Exception in stage 2: {e}", exc_info=True)
|
||||
|
||||
# First step: Get QR Code login URL
|
||||
async def async_step_user(self, user_input=None):
|
||||
_LOGGER.debug("Entering login stage 1")
|
||||
if not self.task_stage_one:
|
||||
self.task_stage_one = self.hass.async_create_task(self.do_stage_one())
|
||||
if not self.task_stage_one.done():
|
||||
return self.async_show_progress(
|
||||
progress_action="task_stage_one",
|
||||
progress_task=self.task_stage_one
|
||||
)
|
||||
# At this point stage 1 is completed
|
||||
if self.error:
|
||||
# An error occurred, cancel the flow by moving to the finish-
|
||||
# form which shows the error
|
||||
return self.async_show_progress_done(next_step_id="finish")
|
||||
# No error -> Proceed to stage 2
|
||||
return self.async_show_progress_done(next_step_id="auth_stage_two")
|
||||
|
||||
# Second step: Wait until QR scanned an log in
|
||||
async def async_step_auth_stage_two(self, user_input=None):
|
||||
if not self.task_stage_two:
|
||||
self.task_stage_two = self.hass.async_create_task(self.do_stage_two())
|
||||
if not self.task_stage_two.done():
|
||||
return self.async_show_progress(
|
||||
progress_action="task_stage_two",
|
||||
progress_task=self.task_stage_two,
|
||||
description_placeholders={
|
||||
"qr_code": gen_qr_code_base64(self.qr_url),
|
||||
"url": self.qr_url,
|
||||
"code": self.qr_url.split('/')[-1],
|
||||
}
|
||||
)
|
||||
return self.async_show_progress_done(next_step_id="finish")
|
||||
|
||||
async def async_step_finish(self, user_input=None):
|
||||
if self.error:
|
||||
return self.async_show_form(step_id="finish", errors={'base': self.error})
|
||||
|
||||
data={CONF_JSESSIONID: self.jsessionid}
|
||||
|
||||
async def _async_handle_jsessionid(self, jsessionid: str) -> ConfigFlowResult:
|
||||
data = {CONF_JSESSIONID: jsessionid}
|
||||
if self.reauth_entry:
|
||||
# Finish step was called by reauth-flow. Do not create a new entry,
|
||||
# instead update the existing entry
|
||||
return self.async_update_reload_and_abort(
|
||||
self.reauth_entry,
|
||||
data=data
|
||||
)
|
||||
|
||||
return self.async_update_reload_and_abort(self.reauth_entry, data=data)
|
||||
return self.async_create_entry(title="SmartThings Find", data=data)
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
errors: dict[str, str] = {}
|
||||
if user_input is not None:
|
||||
jsessionid = (user_input.get(CONF_JSESSIONID) or "").strip()
|
||||
if not jsessionid:
|
||||
errors["base"] = "empty_cookie"
|
||||
elif await validate_jsessionid(self.hass, jsessionid):
|
||||
return await self._async_handle_jsessionid(jsessionid)
|
||||
else:
|
||||
errors["base"] = "invalid_auth"
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema({vol.Required(CONF_JSESSIONID): str}),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_reauth(self, user_input=None):
|
||||
self.reauth_entry = self.hass.config_entries.async_get_entry(
|
||||
self.context["entry_id"]
|
||||
)
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(self, user_input=None):
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema({}),
|
||||
)
|
||||
return await self.async_step_user()
|
||||
|
||||
async def async_step_reconfigure(self, user_input: dict[str, Any] | None = None):
|
||||
return await self.async_step_reauth_confirm(self)
|
||||
self.reauth_entry = self.hass.config_entries.async_get_entry(
|
||||
self.context["entry_id"]
|
||||
)
|
||||
return await self.async_step_user()
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
|
|
|
|||
|
|
@ -43,7 +43,13 @@ class SmartThingsDeviceTracker(DeviceTrackerEntity):
|
|||
|
||||
if 'icons' in device['data'] and 'coloredIcon' in device['data']['icons']:
|
||||
self._attr_entity_picture = device['data']['icons']['coloredIcon']
|
||||
self.async_update = coordinator.async_add_listener(self.async_write_ha_state)
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Subscribe to coordinator updates once added to HA."""
|
||||
await super().async_added_to_hass()
|
||||
self.async_on_remove(
|
||||
self.coordinator.async_add_listener(self.async_write_ha_state)
|
||||
)
|
||||
|
||||
def async_write_ha_state(self):
|
||||
if not self.enabled:
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@
|
|||
"domain": "smartthings_find",
|
||||
"name": "SmartThings Find",
|
||||
"after_dependencies": ["http"],
|
||||
"version": "0.2.1",
|
||||
"documentation": "https://github.com/Vedeneb/HA-SmartThings-Find",
|
||||
"issue_tracker": "https://github.com/Vedeneb/HA-SmartThings-Find/issues",
|
||||
"version": "0.3.1",
|
||||
"documentation": "https://git.jeena.net/jeena/HA-SmartThings-Find",
|
||||
"issue_tracker": "https://git.jeena.net/jeena/HA-SmartThings-Find/issues",
|
||||
"integration_type": "hub",
|
||||
"dependencies": [],
|
||||
"codeowners": ["@Vedeneb"],
|
||||
"requirements": ["requests", "qrcode"],
|
||||
"codeowners": ["@jeena"],
|
||||
"requirements": ["pytz"],
|
||||
"iot_class": "cloud_polling",
|
||||
"config_flow": true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,9 +3,18 @@
|
|||
"abort": {
|
||||
"reauth_successful": "Erfolgreich authentifiziert."
|
||||
},
|
||||
"progress": {
|
||||
"task_stage_one": "Vorbereitung läuft, bitte warten...",
|
||||
"task_stage_two": "Bitte scanne den QR Code mit deinem Galaxy-Gerät, um dich anzumelden. Alternativ kannst du dich direkt im Browser anmelden, indum du [hier]({url}) klickst. \n\n\n\nCode kann nicht gescannt werden? Gehe auf [signin.samsung.com/key](https://signin.samsung.com/key/) und gib den folgenden Code ein:\n\n## ```{code}```\n"
|
||||
"error": {
|
||||
"invalid_auth": "Das Cookie wurde von SmartThings Find abgelehnt. Stelle sicher, dass du nur den Wert des JSESSIONID-Cookies (nicht den gesamten Cookie-Header) aus einer angemeldeten Sitzung kopiert hast.",
|
||||
"empty_cookie": "Bitte einen JSESSIONID-Wert einfügen."
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "JSESSIONID-Cookie einfügen",
|
||||
"description": "Samsungs QR-Code-Login funktioniert nicht mehr (account.samsung.com wurde umgebaut). Bis das ersetzt ist, musst du das Session-Cookie manuell besorgen:\n\n1. Öffne [smartthingsfind.samsung.com](https://smartthingsfind.samsung.com/) in einem normalen Browser und melde dich an.\n2. Öffne DevTools → Anwendung (Chrome) bzw. Speicher (Firefox) → Cookies → smartthingsfind.samsung.com.\n3. Kopiere den Wert des **JSESSIONID**-Cookies und füge ihn unten ein.\n\nDas Cookie bleibt meist mehrere Wochen gültig. Läuft es ab, fordert Home Assistant eine erneute Authentifizierung an – dann diese Schritte einfach wiederholen.",
|
||||
"data": {
|
||||
"jsessionid": "JSESSIONID"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
|
|
|
|||
|
|
@ -3,9 +3,18 @@
|
|||
"abort": {
|
||||
"reauth_successful": "Reauthentication successful."
|
||||
},
|
||||
"progress": {
|
||||
"task_stage_one": "Preparing, please wait...",
|
||||
"task_stage_two": "To login please scan the following QR Code with your Galaxy device. You can also login using this browser by clicking [here]({url}). \n\n\n\nUnable to scan the code? Go to [signin.samsung.com/key](https://signin.samsung.com/key/) and enter the following code:\n\n## ```{code}```\n"
|
||||
"error": {
|
||||
"invalid_auth": "Cookie was rejected by SmartThings Find. Make sure you copied the JSESSIONID value (not the whole cookie header) from a tab where you are logged in.",
|
||||
"empty_cookie": "Please paste a JSESSIONID value."
|
||||
},
|
||||
"step": {
|
||||
"user": {
|
||||
"title": "Paste JSESSIONID cookie",
|
||||
"description": "Samsung's QR code login flow no longer works (account.samsung.com was rebuilt). Until it is replaced you have to obtain the session cookie manually:\n\n1. Open [smartthingsfind.samsung.com](https://smartthingsfind.samsung.com/) in a regular browser and log in.\n2. Open DevTools → Application (Chrome) or Storage (Firefox) → Cookies → smartthingsfind.samsung.com.\n3. Copy the value of the **JSESSIONID** cookie and paste it below.\n\nThe cookie usually stays valid for several weeks. When it expires Home Assistant will trigger a reauth and you repeat these steps.",
|
||||
"data": {
|
||||
"jsessionid": "JSESSIONID"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
|
|
|
|||
|
|
@ -1,17 +1,9 @@
|
|||
import logging
|
||||
import json
|
||||
import pytz
|
||||
import qrcode
|
||||
import base64
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import random
|
||||
import string
|
||||
import re
|
||||
import html
|
||||
from io import BytesIO
|
||||
from datetime import datetime, timedelta
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from datetime import datetime
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity import DeviceInfo
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
|
@ -21,194 +13,39 @@ from .const import DOMAIN, BATTERY_LEVELS, CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTI
|
|||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
URL_PRE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInGate?state={state}&redirect_uri=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin.do&response_type=code&client_id=ntly6zvfpn&scope=iot.client&locale=de_DE&acr_values=urn:samsungaccount:acr:basic&goBackURL=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin'
|
||||
URL_QR_CODE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCode'
|
||||
URL_SIGNIN_XHR = 'https://account.samsung.com/accounts/v1/FMM2/signInXhr'
|
||||
URL_QR_POLL = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCodeProc'
|
||||
URL_SIGNIN_SUCCESS = 'https://account.samsung.com{next_url}'
|
||||
URL_GET_CSRF = "https://smartthingsfind.samsung.com/chkLogin.do"
|
||||
URL_DEVICE_LIST = "https://smartthingsfind.samsung.com/device/getDeviceList.do"
|
||||
URL_REQUEST_LOC_UPDATE = "https://smartthingsfind.samsung.com/dm/addOperation.do"
|
||||
URL_SET_LAST_DEVICE = "https://smartthingsfind.samsung.com/device/setLastSelect.do"
|
||||
|
||||
|
||||
async def do_login_stage_one(hass: HomeAssistant) -> tuple:
|
||||
"""
|
||||
Perform the first stage of the login process.
|
||||
async def validate_jsessionid(hass: HomeAssistant, jsessionid: str) -> bool:
|
||||
"""Check whether a JSESSIONID cookie is currently valid for SmartThings Find.
|
||||
|
||||
This function performs the initial login steps for the SmartThings Find service.
|
||||
It generates a random state string, sends a pre-login request, and retrieves
|
||||
the QR code URL from the response.
|
||||
Uses an isolated aiohttp session: the shared HA session may already hold
|
||||
a JSESSIONID cookie pinned to ``smartthingsfind.samsung.com`` from a prior
|
||||
Set-Cookie response. A bare ``update_cookies({"JSESSIONID": ...})`` would
|
||||
only add an unscoped duplicate, and aiohttp's cookie jar would still ship
|
||||
the older, domain-matched (stale) cookie — making the freshly-pasted
|
||||
cookie look invalid to the server.
|
||||
|
||||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
|
||||
Returns:
|
||||
tuple: A tuple containing the session and QR code URL if successful, None otherwise.
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
session.cookie_jar.clear()
|
||||
|
||||
# Generating the state parameter
|
||||
state = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
||||
|
||||
try:
|
||||
# Load the initial login page which already sets some cookies. 'state'
|
||||
# is a randomly generated string. 'client_id' seems to be static for
|
||||
# SmartThings find.
|
||||
async with session.get(URL_PRE_SIGNIN.format(state=state)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"Pre-login request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(f"Step 1: Pre-Login: Status Code: {res.status}")
|
||||
|
||||
# Load the "Login with QR-Code"-page
|
||||
async with session.get(URL_QR_CODE_SIGNIN) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"QR code URL request failed with status {res.status}, Response: {text[:250]}...")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 2: QR-Code URL: Status Code: {res.status}")
|
||||
|
||||
# Search the URL which is embedded in the QR Code. It looks like this:
|
||||
# https://signin.samsung.com/key/abcdefgh
|
||||
match = re.search(r"https://signin\.samsung\.com/key/[^'\"]+", text)
|
||||
if not match:
|
||||
_LOGGER.error("QR code URL not found in the response")
|
||||
return None
|
||||
|
||||
qr_url = match.group(0)
|
||||
_LOGGER.info(f"Extracted QR code URL: {qr_url}")
|
||||
|
||||
return session, qr_url
|
||||
except Exception as e:
|
||||
_LOGGER.error(
|
||||
f"An error occurred during the login process (stage 1): {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
||||
"""
|
||||
Perform the second stage of the login process.
|
||||
|
||||
This function continues the login process by fetching the
|
||||
CSRF token, polling the server for login status, and ultimately
|
||||
retrieving the JSESSIONID required for SmartThings Find.
|
||||
|
||||
Args:
|
||||
session (aiohttp.ClientSession): The current session with cookies set from login stage one.
|
||||
|
||||
Returns:
|
||||
str: The JSESSIONID if successful, None otherwise.
|
||||
A short-lived ClientSession with its own jar avoids that entirely.
|
||||
"""
|
||||
try:
|
||||
# Here you would generate and display the QR code. This is environment-specific.
|
||||
# qr = qrcode.QRCode()
|
||||
# qr.add_data(extracted_url)
|
||||
# qr.print_ascii() # Or any other method to display the QR code to the user.
|
||||
|
||||
# Fetch the _csrf token. This needs to be sent with each QR-Poll-Request
|
||||
async with session.get(URL_SIGNIN_XHR) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"XHR login request failed with status {res.status}")
|
||||
return None
|
||||
json_res = await res.json()
|
||||
_LOGGER.debug(
|
||||
f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}")
|
||||
|
||||
csrf_token = json_res.get('_csrf', {}).get('token')
|
||||
if not csrf_token:
|
||||
_LOGGER.error("CSRF token not found in the response")
|
||||
return None
|
||||
|
||||
# Define a timeout. We don't want to poll forever.
|
||||
end_time = datetime.now() + timedelta(seconds=120)
|
||||
next_url = None
|
||||
|
||||
# We check, whether the QR-Code was scanned and the login was successful.
|
||||
# This request returns either:
|
||||
# {"rtnCd":"POLLING"} <-- Not yet logged in. Login still pending.
|
||||
# OR
|
||||
# {"rtnCd":"SUCCESS","nextURL":"/accounts/v1/FMM2/signInComplete"}
|
||||
# So we fetch this URL every few seconds, until we receive "SUCCESS", which
|
||||
# indicates the user has successfully scanned the Code and approved the login.
|
||||
# This is exactly the same way, the website does it.
|
||||
# In case the user never scans the QR-Code, we run in the timeout defined above.
|
||||
while datetime.now() < end_time:
|
||||
try:
|
||||
await asyncio.sleep(2)
|
||||
async with session.post(URL_QR_POLL, json={}, headers={'X-Csrf-Token': csrf_token}) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"QR check request failed with status {res.status}")
|
||||
continue
|
||||
js = await res.json()
|
||||
_LOGGER.debug(
|
||||
f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}")
|
||||
|
||||
if js.get('rtnCd') == "SUCCESS":
|
||||
next_url = js.get('nextURL')
|
||||
break
|
||||
except aiohttp.ClientError as e:
|
||||
_LOGGER.error(f"QR Poll request failed: {e}")
|
||||
return None
|
||||
|
||||
if not next_url:
|
||||
_LOGGER.error("QR Code not scanned within 2 mins")
|
||||
return None
|
||||
|
||||
# Fetch the 'next_url' we received from the previous request. On success, this sets
|
||||
# the initial JSESSIONID-cookie. We're not done yet, since this cookie is not valid
|
||||
# for SmartThings Find (which uses it's own JSESSIONID).
|
||||
async with session.get(URL_SIGNIN_SUCCESS.format(next_url=next_url)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"Login success URL request failed with status {res.status}")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 5: Login success: Status Code: {res.status}")
|
||||
|
||||
# The response contains another redirect URL which we need to extract from the
|
||||
# received HTML/JS-content. This URL looks something like this:
|
||||
# https://smartthingsfind.samsung.com/login.do?auth_server_url=eu-auth2.samsungosp.com
|
||||
# &code=[...]&code_expires_in=300&state=[state we generated above]
|
||||
# &api_server_url=eu-auth2.samsungosp.com
|
||||
match = re.search(
|
||||
r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text)
|
||||
if not match:
|
||||
_LOGGER.error(
|
||||
"Redirect URL not found in the login success response")
|
||||
return None
|
||||
|
||||
redirect_url = match.group(1)
|
||||
_LOGGER.debug(f"Found Redirect URL: {redirect_url}")
|
||||
|
||||
# Fetch the received redirect_url. This response finally contains our JSESSIONID,
|
||||
# which is what actually authenticates us for SmartThings Find.
|
||||
async with session.get(redirect_url) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(
|
||||
f"Redirect URL request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(
|
||||
f"Step 6: Follow redirect URL: Status Code: {res.status}")
|
||||
|
||||
jsessionid = session.cookie_jar.filter_cookies(
|
||||
'https://smartthingsfind.samsung.com').get('JSESSIONID')
|
||||
if not jsessionid:
|
||||
_LOGGER.error("JSESSIONID not found in cookies")
|
||||
return None
|
||||
|
||||
_LOGGER.debug(f"JSESSIONID: {jsessionid.value[:20]}...")
|
||||
return jsessionid.value
|
||||
|
||||
async with aiohttp.ClientSession(
|
||||
cookies={"JSESSIONID": jsessionid}
|
||||
) as session:
|
||||
async with session.get(URL_GET_CSRF) as response:
|
||||
if response.status == 200 and response.headers.get("_csrf"):
|
||||
return True
|
||||
body = (await response.text())[:200]
|
||||
_LOGGER.warning(
|
||||
f"JSESSIONID validation failed: status={response.status}, body='{body}'"
|
||||
)
|
||||
return False
|
||||
except Exception as e:
|
||||
_LOGGER.error(
|
||||
f"An error occurred during the login process (stage 2): {e}", exc_info=True)
|
||||
return None
|
||||
_LOGGER.error(f"JSESSIONID validation error: {e}", exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str):
|
||||
|
|
@ -377,8 +214,13 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
utcDate = parse_stf_date(
|
||||
op['extra']['gpsUtcDt'])
|
||||
else:
|
||||
_LOGGER.error(
|
||||
f"[{dev_name}] No UTC date found for operation '{op['oprnType']}', this should not happen! OP: {json.dumps(op)}")
|
||||
_LOGGER.warning(
|
||||
f"[{dev_name}] No UTC date found for operation '{op['oprnType']}'. OP: {json.dumps(op)}")
|
||||
continue
|
||||
|
||||
if utcDate is None:
|
||||
_LOGGER.warning(
|
||||
f"[{dev_name}] Could not parse UTC date for operation '{op['oprnType']}'")
|
||||
continue
|
||||
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
|
|
@ -419,6 +261,10 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
continue
|
||||
else:
|
||||
utcDate = parse_stf_date(loc['gpsUtcDt'])
|
||||
if utcDate is None:
|
||||
_LOGGER.warning(
|
||||
f"[{dev_name}] Could not parse UTC date for encrypted location ('{op['oprnType']}')")
|
||||
continue
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
_LOGGER.debug(
|
||||
f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
|
|
@ -519,17 +365,18 @@ def get_sub_location(ops: list, subDeviceName: str) -> tuple:
|
|||
for op in ops:
|
||||
if subDeviceName in op.get('encLocation', {}):
|
||||
loc = op['encLocation'][subDeviceName]
|
||||
gps_date = parse_stf_date(loc.get('gpsUtcDt', ''))
|
||||
sub_loc = {
|
||||
"latitude": float(loc['latitude']),
|
||||
"longitude": float(loc['longitude']),
|
||||
"latitude": float(loc['latitude']) if loc.get('latitude') is not None else None,
|
||||
"longitude": float(loc['longitude']) if loc.get('longitude') is not None else None,
|
||||
"gps_accuracy": calc_gps_accuracy(loc.get('horizontalUncertainty'), loc.get('verticalUncertainty')),
|
||||
"gps_date": parse_stf_date(loc['gpsUtcDt'])
|
||||
"gps_date": gps_date
|
||||
}
|
||||
return op, sub_loc
|
||||
return {}, {}
|
||||
|
||||
|
||||
def parse_stf_date(datestr: str) -> datetime:
|
||||
def parse_stf_date(datestr: str) -> datetime | None:
|
||||
"""
|
||||
Parses a date string in the format "%Y%m%d%H%M%S" to a datetime object.
|
||||
This is the format, the SmartThings Find API uses.
|
||||
|
|
@ -538,9 +385,16 @@ def parse_stf_date(datestr: str) -> datetime:
|
|||
datestr (str): The date string in the format "%Y%m%d%H%M%S".
|
||||
|
||||
Returns:
|
||||
datetime: A datetime object representing the input date string.
|
||||
datetime: A datetime object representing the input date string,
|
||||
or None if parsing fails.
|
||||
"""
|
||||
return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC)
|
||||
if not datestr:
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC)
|
||||
except ValueError:
|
||||
_LOGGER.warning(f"Failed to parse date string: '{datestr}'")
|
||||
return None
|
||||
|
||||
|
||||
def get_battery_level(dev_name: str, ops: list) -> int:
|
||||
|
|
@ -566,22 +420,3 @@ def get_battery_level(dev_name: str, ops: list) -> int:
|
|||
f"[{dev_name}]: Received invalid battery level: {batt_raw}")
|
||||
return batt
|
||||
return None
|
||||
|
||||
|
||||
def gen_qr_code_base64(data: str) -> str:
|
||||
"""
|
||||
Generates a QR code from the provided data and returns it as a base64-encoded string.
|
||||
Used to show a login QR code during authentication flow
|
||||
|
||||
Args:
|
||||
data (str): The data to encode in the QR code.
|
||||
|
||||
Returns:
|
||||
str: The base64-encoded string representation of the QR code.
|
||||
"""
|
||||
qr = qrcode.QRCode()
|
||||
qr.add_data(data)
|
||||
img = qr.make_image(fill_color="black", back_color="white")
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="PNG")
|
||||
return base64.b64encode(buffer.getvalue()).decode("utf-8")
|
||||
|
|
|
|||
3
pytest.ini
Normal file
3
pytest.ini
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
[pytest]
|
||||
asyncio_mode = auto
|
||||
testpaths = tests
|
||||
158
tests/conftest.py
Normal file
158
tests/conftest.py
Normal file
|
|
@ -0,0 +1,158 @@
|
|||
"""Stub the HA imports the integration touches so utils/config_flow can be
|
||||
imported and exercised without a full Home Assistant install.
|
||||
|
||||
Only the bits used by the code under test are faked — anything new the
|
||||
integration starts touching will need to be added here."""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
|
||||
def _module(name: str) -> types.ModuleType:
|
||||
if name not in sys.modules:
|
||||
sys.modules[name] = types.ModuleType(name)
|
||||
return sys.modules[name]
|
||||
|
||||
|
||||
# homeassistant.core
|
||||
ha = _module("homeassistant")
|
||||
core = _module("homeassistant.core")
|
||||
|
||||
|
||||
class HomeAssistant:
|
||||
def __init__(self) -> None:
|
||||
self.data: dict[str, Any] = {}
|
||||
# Tests assign a real aiohttp.ClientSession here.
|
||||
self.clientsession = None
|
||||
|
||||
|
||||
def callback(func):
|
||||
return func
|
||||
|
||||
|
||||
core.HomeAssistant = HomeAssistant
|
||||
core.callback = callback
|
||||
ha.core = core
|
||||
|
||||
# homeassistant.exceptions
|
||||
exc = _module("homeassistant.exceptions")
|
||||
|
||||
|
||||
class ConfigEntryAuthFailed(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ConfigEntryNotReady(Exception):
|
||||
pass
|
||||
|
||||
|
||||
exc.ConfigEntryAuthFailed = ConfigEntryAuthFailed
|
||||
exc.ConfigEntryNotReady = ConfigEntryNotReady
|
||||
|
||||
# homeassistant.helpers + submodules
|
||||
helpers = _module("homeassistant.helpers")
|
||||
aiohttp_client = _module("homeassistant.helpers.aiohttp_client")
|
||||
|
||||
|
||||
def async_get_clientsession(hass: HomeAssistant):
|
||||
return hass.clientsession
|
||||
|
||||
|
||||
aiohttp_client.async_get_clientsession = async_get_clientsession
|
||||
helpers.aiohttp_client = aiohttp_client
|
||||
|
||||
entity = _module("homeassistant.helpers.entity")
|
||||
|
||||
|
||||
class DeviceInfo(dict):
|
||||
pass
|
||||
|
||||
|
||||
entity.DeviceInfo = DeviceInfo
|
||||
helpers.entity = entity
|
||||
|
||||
device_registry = _module("homeassistant.helpers.device_registry")
|
||||
|
||||
|
||||
class _FakeDeviceRegistry:
|
||||
def async_get_device(self, identifiers):
|
||||
return None
|
||||
|
||||
|
||||
def _async_get(_hass):
|
||||
return _FakeDeviceRegistry()
|
||||
|
||||
|
||||
device_registry.async_get = _async_get
|
||||
helpers.device_registry = device_registry
|
||||
|
||||
typing_module = _module("homeassistant.helpers.typing")
|
||||
typing_module.ConfigType = dict
|
||||
|
||||
# config_entries
|
||||
config_entries = _module("homeassistant.config_entries")
|
||||
|
||||
|
||||
class ConfigEntry:
|
||||
pass
|
||||
|
||||
|
||||
class ConfigFlow:
|
||||
def __init_subclass__(cls, *, domain=None, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
|
||||
class OptionsFlow:
|
||||
pass
|
||||
|
||||
|
||||
class OptionsFlowWithConfigEntry(OptionsFlow):
|
||||
def __init__(self, config_entry):
|
||||
self.config_entry = config_entry
|
||||
self.options = dict(getattr(config_entry, "options", {}) or {})
|
||||
|
||||
|
||||
CONN_CLASS_CLOUD_POLL = "cloud_poll"
|
||||
ConfigFlowResult = dict
|
||||
|
||||
config_entries.ConfigEntry = ConfigEntry
|
||||
config_entries.ConfigFlow = ConfigFlow
|
||||
config_entries.OptionsFlow = OptionsFlow
|
||||
config_entries.OptionsFlowWithConfigEntry = OptionsFlowWithConfigEntry
|
||||
config_entries.CONN_CLASS_CLOUD_POLL = CONN_CLASS_CLOUD_POLL
|
||||
config_entries.ConfigFlowResult = ConfigFlowResult
|
||||
|
||||
# const
|
||||
const = _module("homeassistant.const")
|
||||
|
||||
|
||||
class Platform:
|
||||
DEVICE_TRACKER = "device_tracker"
|
||||
BUTTON = "button"
|
||||
SENSOR = "sensor"
|
||||
|
||||
|
||||
const.Platform = Platform
|
||||
|
||||
# update_coordinator
|
||||
update_coord = _module("homeassistant.helpers.update_coordinator")
|
||||
|
||||
|
||||
class DataUpdateCoordinator:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class UpdateFailed(Exception):
|
||||
pass
|
||||
|
||||
|
||||
update_coord.DataUpdateCoordinator = DataUpdateCoordinator
|
||||
update_coord.UpdateFailed = UpdateFailed
|
||||
helpers.update_coordinator = update_coord
|
||||
94
tests/test_validate_jsessionid.py
Normal file
94
tests/test_validate_jsessionid.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
"""Tests for validate_jsessionid.
|
||||
|
||||
The interesting case is cookie-jar pollution: HA's shared aiohttp session can
|
||||
already hold a JSESSIONID cookie that was stored from a previous Set-Cookie
|
||||
response (so it carries a real domain attribute). When we then pre-load a new
|
||||
JSESSIONID via update_cookies({"JSESSIONID": ...}) without specifying a
|
||||
response_url, the new cookie gets stored with no domain and aiohttp ships the
|
||||
older, more-specific one — making the new (valid) cookie look invalid to
|
||||
Samsung."""
|
||||
from __future__ import annotations
|
||||
|
||||
import aiohttp
|
||||
import pytest
|
||||
from aioresponses import aioresponses
|
||||
from yarl import URL
|
||||
|
||||
from custom_components.smartthings_find.utils import (
|
||||
URL_GET_CSRF,
|
||||
validate_jsessionid,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def hass(monkeypatch):
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
h = HomeAssistant()
|
||||
async with aiohttp.ClientSession() as session:
|
||||
h.clientsession = session
|
||||
yield h
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_valid_cookie_returns_true(hass):
|
||||
with aioresponses() as m:
|
||||
m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok123"})
|
||||
assert await validate_jsessionid(hass, "fresh-cookie") is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_csrf_header_returns_false(hass):
|
||||
with aioresponses() as m:
|
||||
m.get(URL_GET_CSRF, status=200, body="fail")
|
||||
assert await validate_jsessionid(hass, "expired-cookie") is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_200_returns_false(hass):
|
||||
with aioresponses() as m:
|
||||
m.get(URL_GET_CSRF, status=401, body="Logout")
|
||||
assert await validate_jsessionid(hass, "anything") is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_validation_does_not_touch_shared_session(hass):
|
||||
"""The shared HA session may already hold a JSESSIONID cookie with a real
|
||||
domain (set by a previous Set-Cookie). Validation must run in an isolated
|
||||
aiohttp session so:
|
||||
|
||||
* the shared jar is not mutated as a side-effect of validation, and
|
||||
* a stale domain-bound JSESSIONID can't shadow the cookie we're validating.
|
||||
|
||||
Without this, aiohttp's cookie jar prefers the domain-matched (stale)
|
||||
cookie over the bare cookie we just added, so Samsung sees the expired
|
||||
session, returns no `_csrf`, and we tell the user their cookie was
|
||||
rejected even though it was perfectly fine."""
|
||||
hass.clientsession.cookie_jar.update_cookies(
|
||||
{"JSESSIONID": "STALE_VALUE"},
|
||||
response_url=URL("https://smartthingsfind.samsung.com/"),
|
||||
)
|
||||
before = sorted(
|
||||
(c.key, c["domain"], c.value)
|
||||
for c in hass.clientsession.cookie_jar
|
||||
)
|
||||
|
||||
with aioresponses() as m:
|
||||
m.get(URL_GET_CSRF, status=200, headers={"_csrf": "tok"})
|
||||
result = await validate_jsessionid(hass, "FRESH_VALUE")
|
||||
|
||||
after = sorted(
|
||||
(c.key, c["domain"], c.value)
|
||||
for c in hass.clientsession.cookie_jar
|
||||
)
|
||||
|
||||
assert before == after, (
|
||||
"validate_jsessionid mutated the shared HA session's cookie jar; "
|
||||
"it must use an isolated session.\n"
|
||||
f" before: {before}\n after: {after}"
|
||||
)
|
||||
assert result is True, (
|
||||
"validate_jsessionid returned False even though the mocked server "
|
||||
"would have replied with _csrf — the stale shared-jar cookie likely "
|
||||
"shadowed the fresh one"
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue