Add option to enable/disable active mode and make update_interval configurable (closes #11)
This commit is contained in:
parent
6f17edae14
commit
065163fdcc
7 changed files with 242 additions and 59 deletions
|
@ -9,7 +9,16 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
|
||||
from .const import DOMAIN, CONF_JSESSIONID
|
||||
from .const import (
|
||||
DOMAIN,
|
||||
CONF_JSESSIONID,
|
||||
CONF_ACTIVE_MODE_OTHERS,
|
||||
CONF_ACTIVE_MODE_OTHERS_DEFAULT,
|
||||
CONF_ACTIVE_MODE_SMARTTAGS,
|
||||
CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT,
|
||||
CONF_UPDATE_INTERVAL,
|
||||
CONF_UPDATE_INTERVAL_DEFAULT
|
||||
)
|
||||
from .utils import fetch_csrf, get_devices, get_device_location
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -25,12 +34,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
"""Set up SmartThings Find from a config entry."""
|
||||
|
||||
hass.data[DOMAIN][entry.entry_id] = {}
|
||||
|
||||
|
||||
# Load the jsessionid from the config and create a session from it
|
||||
jsessionid = entry.data[CONF_JSESSIONID]
|
||||
|
||||
session = async_get_clientsession(hass)
|
||||
session.cookie_jar.update_cookies({"JSESSIONID": jsessionid})
|
||||
|
||||
active_smarttags = entry.options.get(CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT)
|
||||
active_others = entry.options.get(CONF_ACTIVE_MODE_OTHERS, CONF_ACTIVE_MODE_OTHERS_DEFAULT)
|
||||
|
||||
hass.data[DOMAIN][entry.entry_id].update({
|
||||
CONF_ACTIVE_MODE_SMARTTAGS: active_smarttags,
|
||||
CONF_ACTIVE_MODE_OTHERS: active_others,
|
||||
})
|
||||
|
||||
# This raises ConfigEntryAuthFailed-exception if failed. So if we
|
||||
# can continue after fetch_csrf, we know that authentication was ok
|
||||
await fetch_csrf(hass, session, entry.entry_id)
|
||||
|
@ -41,7 +59,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
# Create an update coordinator. This is responsible to regularly
|
||||
# fetch data from STF and update the device_tracker and sensor
|
||||
# entities
|
||||
coordinator = SmartThingsFindCoordinator(hass, session, devices)
|
||||
update_interval = entry.options.get(CONF_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL_DEFAULT)
|
||||
coordinator = SmartThingsFindCoordinator(hass, session, devices, update_interval)
|
||||
|
||||
# This is what makes the whole integration slow to load (around 10-15
|
||||
# seconds for my 15 devices) but it is the right way to do it. Only if
|
||||
|
@ -73,7 +92,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||
class SmartThingsFindCoordinator(DataUpdateCoordinator):
|
||||
"""Class to manage fetching SmartThings Find data."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, devices):
|
||||
def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, devices, update_interval : int):
|
||||
"""Initialize the coordinator."""
|
||||
self.session = session
|
||||
self.devices = devices
|
||||
|
@ -82,7 +101,7 @@ class SmartThingsFindCoordinator(DataUpdateCoordinator):
|
|||
hass,
|
||||
_LOGGER,
|
||||
name=DOMAIN,
|
||||
update_interval=timedelta(minutes=2) # Update interval for all entities
|
||||
update_interval=timedelta(seconds=update_interval) # Update interval for all entities
|
||||
)
|
||||
|
||||
async def _async_update_data(self):
|
||||
|
|
|
@ -1,12 +1,27 @@
|
|||
from typing import Any, Dict
|
||||
import voluptuous as vol
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from .const import DOMAIN, CONF_JSESSIONID
|
||||
from homeassistant.config_entries import (
|
||||
ConfigEntry,
|
||||
ConfigFlowResult,
|
||||
OptionsFlowWithConfigEntry
|
||||
)
|
||||
from .const import (
|
||||
DOMAIN,
|
||||
CONF_JSESSIONID,
|
||||
CONF_UPDATE_INTERVAL,
|
||||
CONF_UPDATE_INTERVAL_DEFAULT,
|
||||
CONF_ACTIVE_MODE_SMARTTAGS,
|
||||
CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT,
|
||||
CONF_ACTIVE_MODE_OTHERS,
|
||||
CONF_ACTIVE_MODE_OTHERS_DEFAULT
|
||||
)
|
||||
from .utils import do_login_stage_one, do_login_stage_two, gen_qr_code_base64
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
|
@ -119,4 +134,53 @@ class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|||
step_id="user",
|
||||
data_schema=vol.Schema({}),
|
||||
)
|
||||
return await self.async_step_user()
|
||||
return await self.async_step_user()
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
def async_get_options_flow(
|
||||
config_entry: config_entries.ConfigEntry,
|
||||
) -> config_entries.OptionsFlow:
|
||||
"""Create the options flow."""
|
||||
return SmartThingsFindOptionsFlowHandler(config_entry)
|
||||
|
||||
|
||||
class SmartThingsFindOptionsFlowHandler(OptionsFlowWithConfigEntry):
|
||||
"""Handle an options flow."""
|
||||
|
||||
async def async_step_init(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle options flow."""
|
||||
|
||||
if user_input is not None:
|
||||
|
||||
res = self.async_create_entry(title="", data=user_input)
|
||||
|
||||
# Reload the integration entry to make sure the newly set options take effect
|
||||
self.hass.config_entries.async_schedule_reload(self.config_entry.entry_id)
|
||||
return res
|
||||
|
||||
data_schema = vol.Schema(
|
||||
{
|
||||
vol.Optional(
|
||||
CONF_UPDATE_INTERVAL,
|
||||
default=self.options.get(
|
||||
CONF_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL_DEFAULT
|
||||
),
|
||||
): vol.All(vol.Coerce(int), vol.Clamp(min=30)),
|
||||
vol.Optional(
|
||||
CONF_ACTIVE_MODE_SMARTTAGS,
|
||||
default=self.options.get(
|
||||
CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT
|
||||
),
|
||||
): bool,
|
||||
vol.Optional(
|
||||
CONF_ACTIVE_MODE_OTHERS,
|
||||
default=self.options.get(
|
||||
CONF_ACTIVE_MODE_OTHERS, CONF_ACTIVE_MODE_OTHERS_DEFAULT
|
||||
),
|
||||
): bool,
|
||||
}
|
||||
)
|
||||
return self.async_show_form(step_id="init", data_schema=data_schema)
|
|
@ -1,8 +1,19 @@
|
|||
DOMAIN = "smartthings_find"
|
||||
|
||||
CONF_JSESSIONID = "jsessionid"
|
||||
|
||||
CONF_ACTIVE_MODE_SMARTTAGS = "active_mode_smarttags"
|
||||
CONF_ACTIVE_MODE_OTHERS = "active_mode_others"
|
||||
|
||||
CONF_ACTIVE_MODE_SMARTTAGS_DEFAULT = True
|
||||
CONF_ACTIVE_MODE_OTHERS_DEFAULT = False
|
||||
|
||||
CONF_UPDATE_INTERVAL = "update_interval"
|
||||
CONF_UPDATE_INTERVAL_DEFAULT = 120
|
||||
|
||||
BATTERY_LEVELS = {
|
||||
'FULL': 100,
|
||||
'MEDIUM': 50,
|
||||
'LOW': 15,
|
||||
'VERY_LOW': 5
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,5 +7,16 @@
|
|||
"task_stage_one": "Vorbereitung läuft, bitte warten...",
|
||||
"task_stage_two": "Bitte scanne den QR Code mit deinem Galaxy-Gerät, um dich anzumelden. Alternativ kannst du dich direkt im Browser anmelden, indum du [hier]({url}) klickst. \n\n\n\nCode kann nicht gescannt werden? Gehe auf [signin.samsung.com/key](https://signin.samsung.com/key/) und gib den folgenden Code ein:\n\n## ```{code}```\n"
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"step": {
|
||||
"init": {
|
||||
"data": {
|
||||
"update_interval": "Aktualisierungsintervall (in Sekunden)",
|
||||
"active_mode_smarttags": "Aktiven Modus für SmartTags verwenden. Könnte Batterienutzung erhöhen",
|
||||
"active_mode_others": "Aktiven Modus für andere Geräte (Handys, Uhren, ...) verwenden. Deutlich erhöhter Akkuverbrauch!"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,5 +7,16 @@
|
|||
"task_stage_one": "Preparing, please wait...",
|
||||
"task_stage_two": "To login please scan the following QR Code with your Galaxy device. You can also login using this browser by clicking [here]({url}). \n\n\n\nUnable to scan the code? Go to [signin.samsung.com/key](https://signin.samsung.com/key/) and enter the following code:\n\n## ```{code}```\n"
|
||||
}
|
||||
},
|
||||
"options": {
|
||||
"step": {
|
||||
"init": {
|
||||
"data": {
|
||||
"update_interval": "Update interval (seconds)",
|
||||
"active_mode_smarttags": "Use active mode for SmartTags. Might increase battery consumption.",
|
||||
"active_mode_others": "Use active mode for other devices (phones, watches, ...). Will heavily increase battery consumption."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,7 +17,7 @@ from homeassistant.helpers.entity import DeviceInfo
|
|||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers import device_registry
|
||||
|
||||
from .const import DOMAIN, BATTERY_LEVELS
|
||||
from .const import DOMAIN, BATTERY_LEVELS, CONF_ACTIVE_MODE_SMARTTAGS, CONF_ACTIVE_MODE_OTHERS
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -31,6 +31,7 @@ URL_DEVICE_LIST = "https://smartthingsfind.samsung.com/device/getDeviceList.do"
|
|||
URL_REQUEST_LOC_UPDATE = "https://smartthingsfind.samsung.com/dm/addOperation.do"
|
||||
URL_SET_LAST_DEVICE = "https://smartthingsfind.samsung.com/device/setLastSelect.do"
|
||||
|
||||
|
||||
async def do_login_stage_one(hass: HomeAssistant) -> tuple:
|
||||
"""
|
||||
Perform the first stage of the login process.
|
||||
|
@ -57,14 +58,16 @@ async def do_login_stage_one(hass: HomeAssistant) -> tuple:
|
|||
# SmartThings find.
|
||||
async with session.get(URL_PRE_SIGNIN.format(state=state)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Pre-login request failed with status {res.status}")
|
||||
_LOGGER.error(
|
||||
f"Pre-login request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(f"Step 1: Pre-Login: Status Code: {res.status}")
|
||||
|
||||
# Load the "Login with QR-Code"-page
|
||||
async with session.get(URL_QR_CODE_SIGNIN) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"QR code URL request failed with status {res.status}, Response: {text[:250]}...")
|
||||
_LOGGER.error(
|
||||
f"QR code URL request failed with status {res.status}, Response: {text[:250]}...")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 2: QR-Code URL: Status Code: {res.status}")
|
||||
|
@ -81,9 +84,11 @@ async def do_login_stage_one(hass: HomeAssistant) -> tuple:
|
|||
|
||||
return session, qr_url
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"An error occurred during the login process (stage 1): {e}", exc_info=True)
|
||||
_LOGGER.error(
|
||||
f"An error occurred during the login process (stage 1): {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
||||
"""
|
||||
Perform the second stage of the login process.
|
||||
|
@ -107,10 +112,12 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
# Fetch the _csrf token. This needs to be sent with each QR-Poll-Request
|
||||
async with session.get(URL_SIGNIN_XHR) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"XHR login request failed with status {res.status}")
|
||||
_LOGGER.error(
|
||||
f"XHR login request failed with status {res.status}")
|
||||
return None
|
||||
json_res = await res.json()
|
||||
_LOGGER.debug(f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}")
|
||||
_LOGGER.debug(
|
||||
f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}")
|
||||
|
||||
csrf_token = json_res.get('_csrf', {}).get('token')
|
||||
if not csrf_token:
|
||||
|
@ -135,10 +142,12 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
await asyncio.sleep(2)
|
||||
async with session.post(URL_QR_POLL, json={}, headers={'X-Csrf-Token': csrf_token}) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"QR check request failed with status {res.status}")
|
||||
_LOGGER.error(
|
||||
f"QR check request failed with status {res.status}")
|
||||
continue
|
||||
js = await res.json()
|
||||
_LOGGER.debug(f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}")
|
||||
_LOGGER.debug(
|
||||
f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}")
|
||||
|
||||
if js.get('rtnCd') == "SUCCESS":
|
||||
next_url = js.get('nextURL')
|
||||
|
@ -156,7 +165,8 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
# for SmartThings Find (which uses it's own JSESSIONID).
|
||||
async with session.get(URL_SIGNIN_SUCCESS.format(next_url=next_url)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Login success URL request failed with status {res.status}")
|
||||
_LOGGER.error(
|
||||
f"Login success URL request failed with status {res.status}")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 5: Login success: Status Code: {res.status}")
|
||||
|
@ -166,9 +176,11 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
# https://smartthingsfind.samsung.com/login.do?auth_server_url=eu-auth2.samsungosp.com
|
||||
# &code=[...]&code_expires_in=300&state=[state we generated above]
|
||||
# &api_server_url=eu-auth2.samsungosp.com
|
||||
match = re.search(r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text)
|
||||
match = re.search(
|
||||
r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text)
|
||||
if not match:
|
||||
_LOGGER.error("Redirect URL not found in the login success response")
|
||||
_LOGGER.error(
|
||||
"Redirect URL not found in the login success response")
|
||||
return None
|
||||
|
||||
redirect_url = match.group(1)
|
||||
|
@ -178,11 +190,14 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
# which is what actually authenticates us for SmartThings Find.
|
||||
async with session.get(redirect_url) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Redirect URL request failed with status {res.status}")
|
||||
_LOGGER.error(
|
||||
f"Redirect URL request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(f"Step 6: Follow redirect URL: Status Code: {res.status}")
|
||||
_LOGGER.debug(
|
||||
f"Step 6: Follow redirect URL: Status Code: {res.status}")
|
||||
|
||||
jsessionid = session.cookie_jar.filter_cookies('https://smartthingsfind.samsung.com').get('JSESSIONID')
|
||||
jsessionid = session.cookie_jar.filter_cookies(
|
||||
'https://smartthingsfind.samsung.com').get('JSESSIONID')
|
||||
if not jsessionid:
|
||||
_LOGGER.error("JSESSIONID not found in cookies")
|
||||
return None
|
||||
|
@ -191,9 +206,11 @@ async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
|||
return jsessionid.value
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"An error occurred during the login process (stage 2): {e}", exc_info=True)
|
||||
_LOGGER.error(
|
||||
f"An error occurred during the login process (stage 2): {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
|
||||
async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str):
|
||||
"""
|
||||
Retrieves the _csrf-Token which needs to be sent with each following request.
|
||||
|
@ -204,7 +221,7 @@ async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession, entry_
|
|||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
session (aiohttp.ClientSession): The current session.
|
||||
|
||||
|
||||
Raises:
|
||||
ConfigEntryAuthFailed: If the CSRF token is not found or if the authentication fails.
|
||||
"""
|
||||
|
@ -227,6 +244,7 @@ async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession, entry_
|
|||
|
||||
raise ConfigEntryAuthFailed(err_msg)
|
||||
|
||||
|
||||
async def get_devices(hass: HomeAssistant, session: aiohttp.ClientSession, entry_id: str) -> list:
|
||||
"""
|
||||
Sends a request to the SmartThings Find API to retrieve a list of devices associated with the user's account.
|
||||
|
@ -239,12 +257,14 @@ async def get_devices(hass: HomeAssistant, session: aiohttp.ClientSession, entry
|
|||
list: A list of devices if successful, empty list otherwise.
|
||||
"""
|
||||
url = f"{URL_DEVICE_LIST}?_csrf={hass.data[DOMAIN][entry_id]['_csrf']}"
|
||||
async with session.post(url, headers = {'Accept': 'application/json'}, data={}) as response:
|
||||
async with session.post(url, headers={'Accept': 'application/json'}, data={}) as response:
|
||||
if response.status != 200:
|
||||
_LOGGER.error(f"Failed to retrieve devices [{response.status}]: {await response.text()}")
|
||||
if response.status == 404:
|
||||
_LOGGER.warn(f"Received 404 while trying to fetch devices -> Triggering reauth")
|
||||
raise ConfigEntryAuthFailed("Request to get device list failed: 404")
|
||||
_LOGGER.warn(
|
||||
f"Received 404 while trying to fetch devices -> Triggering reauth")
|
||||
raise ConfigEntryAuthFailed(
|
||||
"Request to get device list failed: 404")
|
||||
return []
|
||||
response_json = await response.json()
|
||||
devices_data = response_json["deviceList"]
|
||||
|
@ -252,11 +272,14 @@ async def get_devices(hass: HomeAssistant, session: aiohttp.ClientSession, entry
|
|||
for device in devices_data:
|
||||
# Double unescaping required. Example:
|
||||
# "Benedev's S22" first becomes "Benedev's S22" and then "Benedev's S22"
|
||||
device['modelName'] = html.unescape(html.unescape(device['modelName']))
|
||||
device['modelName'] = html.unescape(
|
||||
html.unescape(device['modelName']))
|
||||
identifier = (DOMAIN, device['dvceID'])
|
||||
ha_dev = device_registry.async_get(hass).async_get_device({identifier})
|
||||
ha_dev = device_registry.async_get(
|
||||
hass).async_get_device({identifier})
|
||||
if ha_dev and ha_dev.disabled:
|
||||
_LOGGER.debug(f"Ignoring disabled device: '{device['modelName']}' (disabled by {ha_dev.disabled_by})")
|
||||
_LOGGER.debug(
|
||||
f"Ignoring disabled device: '{device['modelName']}' (disabled by {ha_dev.disabled_by})")
|
||||
continue
|
||||
ha_dev_info = DeviceInfo(
|
||||
identifiers={identifier},
|
||||
|
@ -269,6 +292,7 @@ async def get_devices(hass: HomeAssistant, session: aiohttp.ClientSession, entry
|
|||
_LOGGER.debug(f"Adding device: {device['modelName']}")
|
||||
return devices
|
||||
|
||||
|
||||
async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSession, dev_data: dict, entry_id: str) -> dict:
|
||||
"""
|
||||
Sends requests to update the device's location and retrieves the current location data for the specified device.
|
||||
|
@ -298,12 +322,23 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
csrf_token = hass.data[DOMAIN][entry_id]["_csrf"]
|
||||
|
||||
try:
|
||||
async with session.post(f"{URL_REQUEST_LOC_UPDATE}?_csrf={csrf_token}", json=update_payload) as response:
|
||||
# _LOGGER.debug(f"[{dev_name}] Update request response ({response.status}): {await response.text()}")
|
||||
pass
|
||||
active = (
|
||||
(dev_data['deviceTypeCode'] == 'TAG' and hass.data[DOMAIN][entry_id][CONF_ACTIVE_MODE_SMARTTAGS]) or
|
||||
(dev_data['deviceTypeCode'] != 'TAG' and hass.data[DOMAIN]
|
||||
[entry_id][CONF_ACTIVE_MODE_OTHERS])
|
||||
)
|
||||
|
||||
async with session.post(f"{URL_SET_LAST_DEVICE}?_csrf={csrf_token}", json=set_last_payload, headers = {'Accept': 'application/json'}) as response:
|
||||
_LOGGER.debug(f"[{dev_name}] Location response ({response.status})")
|
||||
if active:
|
||||
_LOGGER.debug("Active mode; requesting location update now")
|
||||
async with session.post(f"{URL_REQUEST_LOC_UPDATE}?_csrf={csrf_token}", json=update_payload) as response:
|
||||
# _LOGGER.debug(f"[{dev_name}] Update request response ({response.status}): {await response.text()}")
|
||||
pass
|
||||
else:
|
||||
_LOGGER.debug("Passive mode; not requesting location update")
|
||||
|
||||
async with session.post(f"{URL_SET_LAST_DEVICE}?_csrf={csrf_token}", json=set_last_payload, headers={'Accept': 'application/json'}) as response:
|
||||
_LOGGER.debug(
|
||||
f"[{dev_name}] Location response ({response.status})")
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
res = {
|
||||
|
@ -339,60 +374,74 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
utcDate = None
|
||||
|
||||
if 'extra' in op and 'gpsUtcDt' in op['extra']:
|
||||
utcDate = parse_stf_date(op['extra']['gpsUtcDt'])
|
||||
utcDate = parse_stf_date(
|
||||
op['extra']['gpsUtcDt'])
|
||||
else:
|
||||
_LOGGER.error(f"[{dev_name}] No UTC date found for operation '{op['oprnType']}', this should not happen! OP: {json.dumps(op)}")
|
||||
_LOGGER.error(
|
||||
f"[{dev_name}] No UTC date found for operation '{op['oprnType']}', this should not happen! OP: {json.dumps(op)}")
|
||||
continue
|
||||
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
_LOGGER.debug(f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
_LOGGER.debug(
|
||||
f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
continue
|
||||
|
||||
locFound = False
|
||||
if 'latitude' in op:
|
||||
used_loc['latitude'] = float(op['latitude'])
|
||||
used_loc['latitude'] = float(
|
||||
op['latitude'])
|
||||
locFound = True
|
||||
if 'longitude' in op:
|
||||
used_loc['longitude'] = float(op['longitude'])
|
||||
used_loc['longitude'] = float(
|
||||
op['longitude'])
|
||||
locFound = True
|
||||
|
||||
if not locFound:
|
||||
_LOGGER.warn(f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
_LOGGER.warn(
|
||||
f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
else:
|
||||
res['location_found'] = True
|
||||
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(op.get('horizontalUncertainty'), op.get('verticalUncertainty'))
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(
|
||||
op.get('horizontalUncertainty'), op.get('verticalUncertainty'))
|
||||
used_loc['gps_date'] = utcDate
|
||||
used_op = op
|
||||
|
||||
elif 'encLocation' in op:
|
||||
loc = op['encLocation']
|
||||
if 'encrypted' in loc and loc['encrypted']:
|
||||
_LOGGER.info(f"[{dev_name}] Ignoring encrypted location ({op['oprnType']})")
|
||||
_LOGGER.info(
|
||||
f"[{dev_name}] Ignoring encrypted location ({op['oprnType']})")
|
||||
continue
|
||||
elif 'gpsUtcDt' not in loc:
|
||||
_LOGGER.info(f"[{dev_name}] Ignoring location with missing date ({op['oprnType']})")
|
||||
_LOGGER.info(
|
||||
f"[{dev_name}] Ignoring location with missing date ({op['oprnType']})")
|
||||
continue
|
||||
else:
|
||||
utcDate = parse_stf_date(loc['gpsUtcDt'])
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
_LOGGER.debug(f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
_LOGGER.debug(
|
||||
f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
continue
|
||||
else:
|
||||
locFound = False
|
||||
if 'latitude' in loc:
|
||||
used_loc['latitude'] = float(loc['latitude'])
|
||||
used_loc['latitude'] = float(
|
||||
loc['latitude'])
|
||||
locFound = True
|
||||
if 'longitude' in loc:
|
||||
used_loc['longitude'] = float(loc['longitude'])
|
||||
used_loc['longitude'] = float(
|
||||
loc['longitude'])
|
||||
locFound = True
|
||||
else:
|
||||
res['location_found'] = True
|
||||
|
||||
if not locFound:
|
||||
_LOGGER.warn(f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
_LOGGER.warn(
|
||||
f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(loc.get('horizontalUncertainty'), loc.get('verticalUncertainty'))
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(
|
||||
loc.get('horizontalUncertainty'), loc.get('verticalUncertainty'))
|
||||
used_loc['gps_date'] = utcDate
|
||||
used_op = op
|
||||
continue
|
||||
|
@ -401,16 +450,20 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
res['used_op'] = used_op
|
||||
res['used_loc'] = used_loc
|
||||
else:
|
||||
_LOGGER.warn(f"[{dev_name}] No useable location-operation found")
|
||||
_LOGGER.warn(
|
||||
f"[{dev_name}] No useable location-operation found")
|
||||
|
||||
_LOGGER.debug(f" --> {dev_name} used operation: {'NONE' if not used_op else used_op['oprnType']}")
|
||||
_LOGGER.debug(
|
||||
f" --> {dev_name} used operation: {'NONE' if not used_op else used_op['oprnType']}")
|
||||
|
||||
else:
|
||||
_LOGGER.warn(f"[{dev_name}] No operation found in response; marking update failed")
|
||||
_LOGGER.warn(
|
||||
f"[{dev_name}] No operation found in response; marking update failed")
|
||||
res['update_success'] = False
|
||||
return res
|
||||
else:
|
||||
_LOGGER.error(f"[{dev_name}] Failed to fetch device data ({response.status})")
|
||||
_LOGGER.error(
|
||||
f"[{dev_name}] Failed to fetch device data ({response.status})")
|
||||
res_text = await response.text()
|
||||
_LOGGER.debug(f"[{dev_name}] Full response: '{res_text}'")
|
||||
|
||||
|
@ -418,15 +471,18 @@ async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSessio
|
|||
# enough at this point. Instead we have to ask the user to go through
|
||||
# the whole auth flow again
|
||||
if res_text == 'Logout' or response.status == 401:
|
||||
raise ConfigEntryAuthFailed(f"Session not valid anymore, received status_code of {response.status} with response '{res_text}'")
|
||||
raise ConfigEntryAuthFailed(
|
||||
f"Session not valid anymore, received status_code of {response.status} with response '{res_text}'")
|
||||
|
||||
except ConfigEntryAuthFailed as e:
|
||||
raise
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"[{dev_name}] Exception occurred while fetching location data for tag '{dev_name}': {e}", exc_info=True)
|
||||
_LOGGER.error(
|
||||
f"[{dev_name}] Exception occurred while fetching location data for tag '{dev_name}': {e}", exc_info=True)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def calc_gps_accuracy(hu: float, vu: float) -> float:
|
||||
"""
|
||||
Calculate the GPS accuracy using the Pythagorean theorem.
|
||||
|
@ -445,6 +501,7 @@ def calc_gps_accuracy(hu: float, vu: float) -> float:
|
|||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def get_sub_location(ops: list, subDeviceName: str) -> tuple:
|
||||
"""
|
||||
Extracts sub-location data for devices that contain multiple
|
||||
|
@ -471,6 +528,7 @@ def get_sub_location(ops: list, subDeviceName: str) -> tuple:
|
|||
return op, sub_loc
|
||||
return {}, {}
|
||||
|
||||
|
||||
def parse_stf_date(datestr: str) -> datetime:
|
||||
"""
|
||||
Parses a date string in the format "%Y%m%d%H%M%S" to a datetime object.
|
||||
|
@ -484,6 +542,7 @@ def parse_stf_date(datestr: str) -> datetime:
|
|||
"""
|
||||
return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC)
|
||||
|
||||
|
||||
def get_battery_level(dev_name: str, ops: list) -> int:
|
||||
"""
|
||||
Try to extract the device battery level from the received operation
|
||||
|
@ -503,10 +562,12 @@ def get_battery_level(dev_name: str, ops: list) -> int:
|
|||
try:
|
||||
batt = int(batt_raw)
|
||||
except ValueError:
|
||||
_LOGGER.warn(f"[{dev_name}]: Received invalid battery level: {batt_raw}")
|
||||
_LOGGER.warn(
|
||||
f"[{dev_name}]: Received invalid battery level: {batt_raw}")
|
||||
return batt
|
||||
return None
|
||||
|
||||
|
||||
def gen_qr_code_base64(data: str) -> str:
|
||||
"""
|
||||
Generates a QR code from the provided data and returns it as a base64-encoded string.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue