Initial commit
This commit is contained in:
commit
ac2b38ad81
12 changed files with 1115 additions and 0 deletions
100
custom_components/smartthings_find/__init__.py
Normal file
100
custom_components/smartthings_find/__init__.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
from datetime import timedelta
|
||||
import logging
|
||||
import aiohttp
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.const import Platform
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
|
||||
from .const import DOMAIN, CONF_JSESSIONID
|
||||
from .utils import fetch_csrf, get_devices, get_device_location
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
PLATFORMS = [Platform.DEVICE_TRACKER, Platform.BUTTON, Platform.SENSOR]
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the SmartThings Find component."""
|
||||
hass.data[DOMAIN] = {}
|
||||
return True
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up SmartThings Find from a config entry."""
|
||||
# Load the jsessionid from the config and create a session from it
|
||||
jsessionid = entry.data[CONF_JSESSIONID]
|
||||
session = async_get_clientsession(hass)
|
||||
session.cookie_jar.update_cookies({"JSESSIONID": jsessionid})
|
||||
|
||||
# This raises ConfigEntryAuthFailed-exception if failed. So if we
|
||||
# can continue after fetch_csrf, we know that authentication was ok
|
||||
await fetch_csrf(hass, session)
|
||||
|
||||
# Load all SmartThings-Find devices from the users account
|
||||
devices = await get_devices(hass, session)
|
||||
|
||||
# Create an update coordinator. This is responsible to regularly
|
||||
# fetch data from STF and update the device_tracker and sensor
|
||||
# entities
|
||||
coordinator = SmartThingsFindCoordinator(hass, session, devices)
|
||||
|
||||
# This is what makes the whole integration slow to load (around 10-15
|
||||
# seconds for my 15 devices) but it is the right way to do it. Only if
|
||||
# it succeeds, the integration will be marked as successfully loaded.
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
|
||||
|
||||
hass.data[DOMAIN].update({
|
||||
CONF_JSESSIONID: jsessionid,
|
||||
"session": session,
|
||||
"coordinator": coordinator,
|
||||
"devices": devices
|
||||
})
|
||||
|
||||
hass.async_create_task(
|
||||
hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
|
||||
)
|
||||
return True
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Unload a config entry."""
|
||||
unload_success = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
|
||||
if unload_success:
|
||||
del hass.data[DOMAIN]
|
||||
else:
|
||||
_LOGGER.error(f"Unload failed: {unload_success}")
|
||||
return unload_success
|
||||
|
||||
|
||||
class SmartThingsFindCoordinator(DataUpdateCoordinator):
|
||||
"""Class to manage fetching SmartThings Find data."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, session: aiohttp.ClientSession, devices):
|
||||
"""Initialize the coordinator."""
|
||||
self.session = session
|
||||
self.devices = devices
|
||||
self.hass = hass
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
name=DOMAIN,
|
||||
update_interval=timedelta(minutes=2) # Update interval for all entities
|
||||
)
|
||||
|
||||
async def _async_update_data(self):
|
||||
"""Fetch data from SmartThings Find."""
|
||||
try:
|
||||
tags = {}
|
||||
_LOGGER.debug(f"Updating locations...")
|
||||
for device in self.devices:
|
||||
dev_data = device['data']
|
||||
tag_data = await get_device_location(self.hass, self.session, dev_data)
|
||||
tags[dev_data['dvceID']] = tag_data
|
||||
_LOGGER.debug(f"Fetched {len(tags)} locations")
|
||||
return tags
|
||||
except ConfigEntryAuthFailed as err:
|
||||
raise
|
||||
except Exception as err:
|
||||
raise UpdateFailed(f"Error fetching data: {err}")
|
62
custom_components/smartthings_find/button.py
Normal file
62
custom_components/smartthings_find/button.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import logging
|
||||
from homeassistant.components.button import ButtonEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .const import DOMAIN
|
||||
from .utils import fetch_csrf
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
|
||||
"""Set up SmartThings Find button entities."""
|
||||
devices = hass.data[DOMAIN]["devices"]
|
||||
entities = []
|
||||
for device in devices:
|
||||
entities += [RingButton(hass, device)]
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class RingButton(ButtonEntity):
|
||||
"""Representation a button entity to make a SmartThings Find device ring."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, device):
|
||||
"""Initialize the button."""
|
||||
self._attr_unique_id = f"stf_ring_button_{device['data']['dvceID']}"
|
||||
self._attr_name = f"{device['data']['modelName']} Ring"
|
||||
|
||||
if 'icons' in device['data'] and 'coloredIcon' in device['data']['icons']:
|
||||
self._attr_entity_picture = device['data']['icons']['coloredIcon']
|
||||
self._attr_icon = 'mdi:nfc-search-variant'
|
||||
# self.hass = hass
|
||||
self.device = device['data']
|
||||
self._attr_device_info = device['ha_dev_info']
|
||||
|
||||
async def async_press(self):
|
||||
"""Handle the button press."""
|
||||
session = self.hass.data[DOMAIN]["session"]
|
||||
csrf_token = self.hass.data[DOMAIN]["_csrf"]
|
||||
ring_payload = {
|
||||
"dvceId": self.device['dvceID'],
|
||||
"operation": "RING",
|
||||
"usrId": self.device['usrId'],
|
||||
"status": "start",
|
||||
"lockMessage": "Home Assistant is ringing your device!"
|
||||
}
|
||||
url = f"https://smartthingsfind.samsung.com/dm/addOperation.do?_csrf={
|
||||
csrf_token}"
|
||||
|
||||
try:
|
||||
async with session.post(url, json=ring_payload) as response:
|
||||
_LOGGER.debug("HTTP response status: %s", response.status)
|
||||
if response.status == 200:
|
||||
_LOGGER.info(f"Successfully rang device {self.device['modelName']}")
|
||||
_LOGGER.debug(f"Response: {await response.text()}")
|
||||
else:
|
||||
# Fetch a new CSRF token to make sure we're still logged in
|
||||
await fetch_csrf(self.hass, session)
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"Exception occurred while ringing '{self.device['modelName']}': %s", e)
|
122
custom_components/smartthings_find/config_flow.py
Normal file
122
custom_components/smartthings_find/config_flow.py
Normal file
|
@ -0,0 +1,122 @@
|
|||
import voluptuous as vol
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from .const import DOMAIN, CONF_JSESSIONID
|
||||
from .utils import do_login_stage_one, do_login_stage_two, gen_qr_code_base64
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
class SmartThingsFindConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for SmartThings Find."""
|
||||
|
||||
VERSION = 1
|
||||
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
|
||||
|
||||
reauth_entry: ConfigEntry | None = None
|
||||
|
||||
task_stage_one: asyncio.Task | None = None
|
||||
task_stage_two: asyncio.Task | None = None
|
||||
|
||||
qr_url = None
|
||||
session = None
|
||||
|
||||
jsessionid = None
|
||||
|
||||
|
||||
error = None
|
||||
|
||||
async def do_stage_one(self):
|
||||
_LOGGER.debug("Running login stage 1")
|
||||
try:
|
||||
stage_one_res = await do_login_stage_one(self.hass)
|
||||
if not stage_one_res is None:
|
||||
self.session, self.qr_url = stage_one_res
|
||||
else:
|
||||
self.error = "Login stage 1 failed. Check logs for details."
|
||||
_LOGGER.warn("Login stage 1 failed")
|
||||
_LOGGER.debug("Login stage 1 done")
|
||||
except Exception as e:
|
||||
self.error = "Login stage 1 failed. Check logs for details."
|
||||
_LOGGER.error(f"Exception in stage 1: {e}", exc_info=True)
|
||||
|
||||
async def do_stage_two(self):
|
||||
_LOGGER.debug("Running login stage 2")
|
||||
try:
|
||||
stage_two_res = await do_login_stage_two(self.session)
|
||||
if not stage_two_res is None:
|
||||
self.jsessionid = stage_two_res
|
||||
_LOGGER.info("Login successful")
|
||||
else:
|
||||
self.error = "Login stage 2 failed. Check logs for details."
|
||||
_LOGGER.warning("Login stage 2 failed")
|
||||
_LOGGER.debug("Login stage 2 done")
|
||||
except Exception as e:
|
||||
self.error = "Login stage 2 failed. Check logs for details."
|
||||
_LOGGER.error(f"Exception in stage 2: {e}", exc_info=True)
|
||||
|
||||
# First step: Get QR Code login URL
|
||||
async def async_step_user(self, user_input=None):
|
||||
_LOGGER.debug("Entering login stage 1")
|
||||
if not self.task_stage_one:
|
||||
self.task_stage_one = self.hass.async_create_task(self.do_stage_one())
|
||||
if not self.task_stage_one.done():
|
||||
return self.async_show_progress(
|
||||
progress_action="task_stage_one",
|
||||
progress_task=self.task_stage_one
|
||||
)
|
||||
# At this point stage 1 is completed
|
||||
if self.error:
|
||||
# An error occurred, cancel the flow by moving to the finish-
|
||||
# form which shows the error
|
||||
return self.async_show_progress_done(next_step_id="finish")
|
||||
# No error -> Proceed to stage 2
|
||||
return self.async_show_progress_done(next_step_id="auth_stage_two")
|
||||
|
||||
# Second step: Wait until QR scanned an log in
|
||||
async def async_step_auth_stage_two(self, user_input=None):
|
||||
if not self.task_stage_two:
|
||||
self.task_stage_two = self.hass.async_create_task(self.do_stage_two())
|
||||
if not self.task_stage_two.done():
|
||||
return self.async_show_progress(
|
||||
progress_action="task_stage_two",
|
||||
progress_task=self.task_stage_two,
|
||||
description_placeholders={
|
||||
"qr_code": gen_qr_code_base64(self.qr_url),
|
||||
"url": self.qr_url,
|
||||
"code": self.qr_url.split('/')[-1],
|
||||
}
|
||||
)
|
||||
return self.async_show_progress_done(next_step_id="finish")
|
||||
|
||||
async def async_step_finish(self, user_input=None):
|
||||
if self.error:
|
||||
return self.async_show_form(step_id="finish", errors={'base': self.error})
|
||||
|
||||
data={CONF_JSESSIONID: self.jsessionid}
|
||||
|
||||
if self.reauth_entry:
|
||||
# Finish step was called by reauth-flow. Do not create a new entry,
|
||||
# instead update the existing entry
|
||||
return self.async_update_reload_and_abort(
|
||||
self.reauth_entry,
|
||||
data=data
|
||||
)
|
||||
|
||||
return self.async_create_entry(title="SmartThings Find", data=data)
|
||||
|
||||
async def async_step_reauth(self, user_input=None):
|
||||
self.reauth_entry = self.hass.config_entries.async_get_entry(
|
||||
self.context["entry_id"]
|
||||
)
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(self, user_input=None):
|
||||
if user_input is None:
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
data_schema=vol.Schema({}),
|
||||
)
|
||||
return await self.async_step_user()
|
7
custom_components/smartthings_find/const.py
Normal file
7
custom_components/smartthings_find/const.py
Normal file
|
@ -0,0 +1,7 @@
|
|||
DOMAIN = "smartthings_find"
|
||||
CONF_JSESSIONID = "jsessionid"
|
||||
BATTERY_LEVELS = {
|
||||
'FULL': 100,
|
||||
'MEDIUM': 50,
|
||||
'LOW': 10
|
||||
}
|
113
custom_components/smartthings_find/device_tracker.py
Normal file
113
custom_components/smartthings_find/device_tracker.py
Normal file
|
@ -0,0 +1,113 @@
|
|||
import json
|
||||
import logging
|
||||
from homeassistant.components.device_tracker.config_entry import TrackerEntity as DeviceTrackerEntity
|
||||
from homeassistant.components.device_tracker.const import SourceType
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
|
||||
from .const import DOMAIN, BATTERY_LEVELS
|
||||
from .utils import get_sub_location, get_battery_level
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
|
||||
"""Set up SmartThings Find device tracker entities."""
|
||||
devices = hass.data[DOMAIN]["devices"]
|
||||
coordinator = hass.data[DOMAIN]["coordinator"]
|
||||
entities = []
|
||||
for device in devices:
|
||||
if 'subType' in device['data'] and device['data']['subType'] == 'CANAL2':
|
||||
entities += [SmartThingsDeviceTracker(hass, coordinator, device, "left")]
|
||||
entities += [SmartThingsDeviceTracker(hass, coordinator, device, "right")]
|
||||
entities += [SmartThingsDeviceTracker(hass, coordinator, device)]
|
||||
async_add_entities(entities)
|
||||
|
||||
class SmartThingsDeviceTracker(DeviceTrackerEntity):
|
||||
"""Representation of a SmartTag device tracker."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, coordinator, device, subDeviceName=None):
|
||||
"""Initialize the device tracker."""
|
||||
|
||||
self.coordinator = coordinator
|
||||
self.hass = hass
|
||||
self.device = device['data']
|
||||
self.device_id = device['data']['dvceID']
|
||||
self.subDeviceName = subDeviceName
|
||||
|
||||
self._attr_unique_id = f"stf_device_tracker_{device['data']['dvceID']}{'_' + subDeviceName if subDeviceName else ''}"
|
||||
self._attr_name = device['data']['modelName'] + (' ' + subDeviceName.capitalize() if subDeviceName else '')
|
||||
self._attr_device_info = device['ha_dev_info']
|
||||
self._attr_latitude = None
|
||||
self._attr_longitude = None
|
||||
|
||||
if 'icons' in device['data'] and 'coloredIcon' in device['data']['icons']:
|
||||
self._attr_entity_picture = device['data']['icons']['coloredIcon']
|
||||
|
||||
self.async_update = coordinator.async_add_listener(self.async_write_ha_state)
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""Return true if the device is available."""
|
||||
tag_data = self.coordinator.data.get(self.device_id, {})
|
||||
if not tag_data:
|
||||
_LOGGER.info(f"tag_data none for '{self.name}'; rendering state unavailable")
|
||||
return False
|
||||
if not tag_data['update_success']:
|
||||
_LOGGER.info(f"Last update for '{self.name}' failed; rendering state unavailable")
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def source_type(self) -> str:
|
||||
return SourceType.GPS
|
||||
|
||||
@property
|
||||
def latitude(self):
|
||||
"""Return the latitude of the device."""
|
||||
data = self.coordinator.data.get(self.device_id, {})
|
||||
if not self.subDeviceName:
|
||||
if data['location_found']: return data.get('used_loc', {}).get('latitude', None)
|
||||
return None
|
||||
else:
|
||||
_, loc = get_sub_location(data['ops'], self.subDeviceName)
|
||||
return loc.get('latitude', None)
|
||||
|
||||
@property
|
||||
def longitude(self):
|
||||
"""Return the longitude of the device."""
|
||||
data = self.coordinator.data.get(self.device_id, {})
|
||||
if not self.subDeviceName:
|
||||
if data['location_found']: return data.get('used_loc', {}).get('longitude', None)
|
||||
return None
|
||||
else:
|
||||
_, loc = get_sub_location(data['ops'], self.subDeviceName)
|
||||
return loc.get('longitude', None)
|
||||
|
||||
@property
|
||||
def location_accuracy(self):
|
||||
"""Return the location accuracy of the device."""
|
||||
data = self.coordinator.data.get(self.device_id, {})
|
||||
if not self.subDeviceName:
|
||||
if data['location_found']: return data.get('used_loc', {}).get('gps_accuracy', None)
|
||||
return None
|
||||
else:
|
||||
_, loc = get_sub_location(data['ops'], self.subDeviceName)
|
||||
return loc.get('gps_accuracy', None)
|
||||
|
||||
@property
|
||||
def battery_level(self):
|
||||
"""Return the battery level of the device."""
|
||||
data = self.coordinator.data.get(self.device_id, {})
|
||||
if self.subDeviceName:
|
||||
return None
|
||||
return get_battery_level(self.name, data['ops'])
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self):
|
||||
tag_data = self.coordinator.data.get(self.device_id, {})
|
||||
device_data = self.device
|
||||
if self.subDeviceName:
|
||||
used_op, used_loc = get_sub_location(tag_data['ops'], self.subDeviceName)
|
||||
tag_data = tag_data | used_op | used_loc
|
||||
return tag_data | device_data
|
13
custom_components/smartthings_find/manifest.json
Normal file
13
custom_components/smartthings_find/manifest.json
Normal file
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"domain": "smartthings_find",
|
||||
"name": "SmartThings Find",
|
||||
"after_dependencies": ["http"],
|
||||
"version": "0.0.2",
|
||||
"documentation": "https://github.com/Vedeneb/HA-SmartThings-Find",
|
||||
"integration_type": "hub",
|
||||
"dependencies": [],
|
||||
"codeowners": ["@Vedeneb"],
|
||||
"requirements": ["requests", "qrcode"],
|
||||
"iot_class": "cloud_polling",
|
||||
"config_flow": true
|
||||
}
|
62
custom_components/smartthings_find/sensor.py
Normal file
62
custom_components/smartthings_find/sensor.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
import logging
|
||||
from homeassistant.components.sensor import SensorEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.components.sensor import SensorDeviceClass, SensorStateClass
|
||||
|
||||
from .const import DOMAIN
|
||||
from .utils import get_battery_level
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
|
||||
"""Set up SmartThings Find sensor entities."""
|
||||
devices = hass.data[DOMAIN]["devices"]
|
||||
coordinator = hass.data[DOMAIN]["coordinator"]
|
||||
entities = []
|
||||
for device in devices:
|
||||
entities += [DeviceBatterySensor(hass, coordinator, device)]
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class DeviceBatterySensor(SensorEntity):
|
||||
"""Representation of a Device battery sensor."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, coordinator, device):
|
||||
"""Initialize the sensor."""
|
||||
self.coordinator = coordinator
|
||||
self._attr_unique_id = f"stf_device_battery_{device['data']['dvceID']}"
|
||||
self._attr_name = f"{device['data']['modelName']} Battery"
|
||||
self._state = None
|
||||
self.hass = hass
|
||||
self.device = device['data']
|
||||
self.device_id = device['data']['dvceID']
|
||||
self._attr_device_info = device['ha_dev_info']
|
||||
self._attr_device_class = SensorDeviceClass.BATTERY
|
||||
self._attr_state_class = SensorStateClass.MEASUREMENT
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""
|
||||
Makes the entity show unavailable state if no data was received
|
||||
or there was an error during last update
|
||||
"""
|
||||
tag_data = self.coordinator.data.get(self.device_id, {})
|
||||
if not tag_data:
|
||||
_LOGGER.info(f"battery sensor: tag_data none for '{self.name}'; rendering state unavailable")
|
||||
return False
|
||||
if not tag_data['update_success']:
|
||||
_LOGGER.info(f"Last update for battery sensor'{self.name}' failed; rendering state unavailable")
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def unit_of_measurement(self) -> str:
|
||||
return '%'
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
ops = self.coordinator.data.get(self.device_id, {}).get('ops', [])
|
||||
return get_battery_level(self.name, ops)
|
11
custom_components/smartthings_find/translations/de.json
Normal file
11
custom_components/smartthings_find/translations/de.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"reauth_successful": "Erfolgreich authentifiziert."
|
||||
},
|
||||
"progress": {
|
||||
"task_stage_one": "Vorbereitung läuft, bitte warten...",
|
||||
"task_stage_two": "Bitte scanne den QR Code mit deinem Galaxy-Gerät, um dich anzumelden. Alternativ kannst du dich direkt im Browser anmelden, indum du [hier]({url}) klickst. \n\n\n\nCode kann nicht gescannt werden? Gehe auf [signin.samsung.com/key](https://signin.samsung.com/key/) und gib den folgenden Code ein:\n\n## ```{code}```\n"
|
||||
}
|
||||
}
|
||||
}
|
11
custom_components/smartthings_find/translations/en.json
Normal file
11
custom_components/smartthings_find/translations/en.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"config": {
|
||||
"abort": {
|
||||
"reauth_successful": "Reauthentication successful."
|
||||
},
|
||||
"progress": {
|
||||
"task_stage_one": "Preparing, please wait...",
|
||||
"task_stage_two": "To login please scan the following QR Code with your Galaxy device. You can also login using this browser by clicking [here]({url}). \n\n\n\nUnable to scan the code? Go to [signin.samsung.com/key](https://signin.samsung.com/key/) and enter the following code:\n\n## ```{code}```\n"
|
||||
}
|
||||
}
|
||||
}
|
519
custom_components/smartthings_find/utils.py
Normal file
519
custom_components/smartthings_find/utils.py
Normal file
|
@ -0,0 +1,519 @@
|
|||
import logging
|
||||
import json
|
||||
import pytz
|
||||
import qrcode
|
||||
import base64
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import random
|
||||
import string
|
||||
import re
|
||||
from io import BytesIO
|
||||
from datetime import datetime, timedelta
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity import DeviceInfo
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
from .const import DOMAIN, BATTERY_LEVELS
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
URL_PRE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInGate?state={state}&redirect_uri=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin.do&response_type=code&client_id=ntly6zvfpn&scope=iot.client&locale=de_DE&acr_values=urn:samsungaccount:acr:basic&goBackURL=https:%2F%2Fsmartthingsfind.samsung.com%2Flogin'
|
||||
URL_QR_CODE_SIGNIN = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCode'
|
||||
URL_SIGNIN_XHR = 'https://account.samsung.com/accounts/v1/FMM2/signInXhr'
|
||||
URL_QR_POLL = 'https://account.samsung.com/accounts/v1/FMM2/signInWithQrCodeProc'
|
||||
URL_SIGNIN_SUCCESS = 'https://account.samsung.com{next_url}'
|
||||
URL_GET_CSRF = "https://smartthingsfind.samsung.com/chkLogin.do"
|
||||
URL_DEVICE_LIST = "https://smartthingsfind.samsung.com/device/getDeviceList.do"
|
||||
URL_REQUEST_LOC_UPDATE = "https://smartthingsfind.samsung.com/dm/addOperation.do"
|
||||
URL_SET_LAST_DEVICE = "https://smartthingsfind.samsung.com/device/setLastSelect.do"
|
||||
|
||||
async def do_login_stage_one(hass: HomeAssistant) -> tuple:
|
||||
"""
|
||||
Perform the first stage of the login process.
|
||||
|
||||
This function performs the initial login steps for the SmartThings Find service.
|
||||
It generates a random state string, sends a pre-login request, and retrieves
|
||||
the QR code URL from the response.
|
||||
|
||||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
|
||||
Returns:
|
||||
tuple: A tuple containing the session and QR code URL if successful, None otherwise.
|
||||
"""
|
||||
session = async_get_clientsession(hass)
|
||||
session.cookie_jar.clear()
|
||||
|
||||
# Generating the state parameter
|
||||
state = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
||||
|
||||
try:
|
||||
# Load the initial login page which already sets some cookies. 'state'
|
||||
# is a randomly generated string. 'client_id' seems to be static for
|
||||
# SmartThings find.
|
||||
async with session.get(URL_PRE_SIGNIN.format(state=state)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Pre-login request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(f"Step 1: Pre-Login: Status Code: {res.status}")
|
||||
|
||||
# Load the "Login with QR-Code"-page
|
||||
async with session.get(URL_QR_CODE_SIGNIN) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"QR code URL request failed with status {res.status}, Response: {text[:250]}...")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 2: QR-Code URL: Status Code: {res.status}")
|
||||
|
||||
# Search the URL which is embedded in the QR Code. It looks like this:
|
||||
# https://signin.samsung.com/key/abcdefgh
|
||||
match = re.search(r"https://signin\.samsung\.com/key/[^'\"]+", text)
|
||||
if not match:
|
||||
_LOGGER.error("QR code URL not found in the response")
|
||||
return None
|
||||
|
||||
qr_url = match.group(0)
|
||||
_LOGGER.info(f"Extracted QR code URL: {qr_url}")
|
||||
|
||||
return session, qr_url
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"An error occurred during the login process (stage 1): {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def do_login_stage_two(session: aiohttp.ClientSession) -> str:
|
||||
"""
|
||||
Perform the second stage of the login process.
|
||||
|
||||
This function continues the login process by fetching the
|
||||
CSRF token, polling the server for login status, and ultimately
|
||||
retrieving the JSESSIONID required for SmartThings Find.
|
||||
|
||||
Args:
|
||||
session (aiohttp.ClientSession): The current session with cookies set from login stage one.
|
||||
|
||||
Returns:
|
||||
str: The JSESSIONID if successful, None otherwise.
|
||||
"""
|
||||
try:
|
||||
# Here you would generate and display the QR code. This is environment-specific.
|
||||
# qr = qrcode.QRCode()
|
||||
# qr.add_data(extracted_url)
|
||||
# qr.print_ascii() # Or any other method to display the QR code to the user.
|
||||
|
||||
# Fetch the _csrf token. This needs to be sent with each QR-Poll-Request
|
||||
async with session.get(URL_SIGNIN_XHR) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"XHR login request failed with status {res.status}")
|
||||
return None
|
||||
json_res = await res.json()
|
||||
_LOGGER.debug(f"Step 3: XHR Login: Status Code: {res.status}, Response: {json_res}")
|
||||
|
||||
csrf_token = json_res.get('_csrf', {}).get('token')
|
||||
if not csrf_token:
|
||||
_LOGGER.error("CSRF token not found in the response")
|
||||
return None
|
||||
|
||||
# Define a timeout. We don't want to poll forever.
|
||||
end_time = datetime.now() + timedelta(seconds=120)
|
||||
next_url = None
|
||||
|
||||
# We check, whether the QR-Code was scanned and the login was successful.
|
||||
# This request returns either:
|
||||
# {"rtnCd":"POLLING"} <-- Not yet logged in. Login still pending.
|
||||
# OR
|
||||
# {"rtnCd":"SUCCESS","nextURL":"/accounts/v1/FMM2/signInComplete"}
|
||||
# So we fetch this URL every few seconds, until we receive "SUCCESS", which
|
||||
# indicates the user has successfully scanned the Code and approved the login.
|
||||
# This is exactly the same way, the website does it.
|
||||
# In case the user never scans the QR-Code, we run in the timeout defined above.
|
||||
while datetime.now() < end_time:
|
||||
try:
|
||||
await asyncio.sleep(2)
|
||||
async with session.post(URL_QR_POLL, json={}, headers={'X-Csrf-Token': csrf_token}) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"QR check request failed with status {res.status}")
|
||||
continue
|
||||
js = await res.json()
|
||||
_LOGGER.debug(f"Step 4: QR CHECK: Status Code: {res.status}, Response: {js}")
|
||||
|
||||
if js.get('rtnCd') == "SUCCESS":
|
||||
next_url = js.get('nextURL')
|
||||
break
|
||||
except aiohttp.ClientError as e:
|
||||
_LOGGER.error(f"QR Poll request failed: {e}")
|
||||
return None
|
||||
|
||||
if not next_url:
|
||||
_LOGGER.error("QR Code not scanned within 2 mins")
|
||||
return None
|
||||
|
||||
# Fetch the 'next_url' we received from the previous request. On success, this sets
|
||||
# the initial JSESSIONID-cookie. We're not done yet, since this cookie is not valid
|
||||
# for SmartThings Find (which uses it's own JSESSIONID).
|
||||
async with session.get(URL_SIGNIN_SUCCESS.format(next_url=next_url)) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Login success URL request failed with status {res.status}")
|
||||
return None
|
||||
text = await res.text()
|
||||
_LOGGER.debug(f"Step 5: Login success: Status Code: {res.status}")
|
||||
|
||||
# The response contains another redirect URL which we need to extract from the
|
||||
# received HTML/JS-content. This URL looks something like this:
|
||||
# https://smartthingsfind.samsung.com/login.do?auth_server_url=eu-auth2.samsungosp.com
|
||||
# &code=[...]&code_expires_in=300&state=[state we generated above]
|
||||
# &api_server_url=eu-auth2.samsungosp.com
|
||||
match = re.search(r'window\.location\.href\s*=\s*[\'"]([^\'"]+)[\'"]', text)
|
||||
if not match:
|
||||
_LOGGER.error("Redirect URL not found in the login success response")
|
||||
return None
|
||||
|
||||
redirect_url = match.group(1)
|
||||
_LOGGER.debug(f"Found Redirect URL: {redirect_url}")
|
||||
|
||||
# Fetch the received redirect_url. This response finally contains our JSESSIONID,
|
||||
# which is what actually authenticates us for SmartThings Find.
|
||||
async with session.get(redirect_url) as res:
|
||||
if res.status != 200:
|
||||
_LOGGER.error(f"Redirect URL request failed with status {res.status}")
|
||||
return None
|
||||
_LOGGER.debug(f"Step 6: Follow redirect URL: Status Code: {res.status}")
|
||||
|
||||
jsessionid = session.cookie_jar.filter_cookies('https://smartthingsfind.samsung.com').get('JSESSIONID')
|
||||
if not jsessionid:
|
||||
_LOGGER.error("JSESSIONID not found in cookies")
|
||||
return None
|
||||
|
||||
_LOGGER.debug(f"JSESSIONID: {jsessionid.value[:20]}...")
|
||||
return jsessionid.value
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"An error occurred during the login process (stage 2): {e}", exc_info=True)
|
||||
return None
|
||||
|
||||
async def fetch_csrf(hass: HomeAssistant, session: aiohttp.ClientSession):
|
||||
"""
|
||||
Retrieves the _csrf-Token which needs to be sent with each following request.
|
||||
|
||||
This function retrieves the CSRF token required for further requests to the SmartThings Find service.
|
||||
The JSESSIONID must already be present as a cookie in the session at this point.
|
||||
|
||||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
session (aiohttp.ClientSession): The current session.
|
||||
|
||||
Raises:
|
||||
ConfigEntryAuthFailed: If the CSRF token is not found or if the authentication fails.
|
||||
"""
|
||||
err_msg = ""
|
||||
async with session.get(URL_GET_CSRF) as response:
|
||||
if response.status == 200:
|
||||
csrf_token = response.headers.get("_csrf")
|
||||
if csrf_token:
|
||||
hass.data[DOMAIN]["_csrf"] = csrf_token
|
||||
_LOGGER.info("Successfully fetched new CSRF Token")
|
||||
return
|
||||
else:
|
||||
err_msg = f"CSRF token not found in response headers. Status Code: {response.status}, Response: '{await response.text()}'"
|
||||
_LOGGER.error(err_msg)
|
||||
else:
|
||||
err_msg = f"Failed to authenticate with SmartThings Find: [{response.status}]: {await response.text()}"
|
||||
_LOGGER.error(err_msg)
|
||||
|
||||
_LOGGER.debug(f"Headers: {response.headers}")
|
||||
|
||||
raise ConfigEntryAuthFailed(err_msg)
|
||||
|
||||
async def get_devices(hass: HomeAssistant, session: aiohttp.ClientSession) -> list:
|
||||
"""
|
||||
Sends a request to the SmartThings Find API to retrieve a list of devices associated with the user's account.
|
||||
|
||||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
session (aiohttp.ClientSession): The current session.
|
||||
|
||||
Returns:
|
||||
list: A list of devices if successful, empty list otherwise.
|
||||
"""
|
||||
url = f"{URL_DEVICE_LIST}?_csrf={hass.data[DOMAIN]['_csrf']}"
|
||||
async with session.post(url) as response:
|
||||
if response.status != 200:
|
||||
_LOGGER.error(f"Failed to retrieve devices [{response.status}]: {await response.text()}")
|
||||
if response.status == 404:
|
||||
_LOGGER.warn(f"Received 404 while trying to fetch devices -> Triggering reauth")
|
||||
raise ConfigEntryAuthFailed("Request to get device list failed: 404")
|
||||
return []
|
||||
response_raw = await response.text()
|
||||
response_json = json.loads(response_raw)
|
||||
devices_data = response_json["deviceList"]
|
||||
devices = []
|
||||
for device in devices_data:
|
||||
ha_dev_info = DeviceInfo(
|
||||
identifiers={(DOMAIN, device['dvceID'])},
|
||||
manufacturer="Samsung",
|
||||
name=device['modelName'],
|
||||
model=device['modelID'],
|
||||
configuration_url="https://smartthingsfind.samsung.com/"
|
||||
)
|
||||
devices += [{"data": device, "ha_dev_info": ha_dev_info}]
|
||||
_LOGGER.debug(f"Adding device: {device['modelName']}")
|
||||
return devices
|
||||
|
||||
async def get_device_location(hass: HomeAssistant, session: aiohttp.ClientSession, dev_data: dict) -> dict:
|
||||
"""
|
||||
Sends requests to update the device's location and retrieves the current location data for the specified device.
|
||||
|
||||
Args:
|
||||
hass (HomeAssistant): Home Assistant instance.
|
||||
session (aiohttp.ClientSession): The current session.
|
||||
dev_data (dict): The device information obtained from get_devices.
|
||||
|
||||
Returns:
|
||||
dict: The device location data.
|
||||
"""
|
||||
dev_id = dev_data['dvceID']
|
||||
dev_name = dev_data['modelName']
|
||||
|
||||
set_last_payload = {
|
||||
"dvceId": dev_id,
|
||||
"removeDevice": []
|
||||
}
|
||||
|
||||
update_payload = {
|
||||
"dvceId": dev_id,
|
||||
"operation": "CHECK_CONNECTION_WITH_LOCATION",
|
||||
"usrId": dev_data['usrId']
|
||||
}
|
||||
|
||||
csrf_token = hass.data[DOMAIN]["_csrf"]
|
||||
|
||||
try:
|
||||
async with session.post(f"{URL_REQUEST_LOC_UPDATE}?_csrf={csrf_token}", json=update_payload) as response:
|
||||
# _LOGGER.debug(f"[{dev_name}] Update request response ({response.status}): {await response.text()}")
|
||||
pass
|
||||
|
||||
async with session.post(f"{URL_SET_LAST_DEVICE}?_csrf={csrf_token}", json=set_last_payload) as response:
|
||||
_LOGGER.debug(f"[{dev_name}] Location response ({response.status})")
|
||||
if response.status == 200:
|
||||
data_raw = await response.text()
|
||||
# _LOGGER.debug(f"[{dev_name}] Dev-Tracker HTTP response data: {data_raw}")
|
||||
data = json.loads(data_raw)
|
||||
res = {
|
||||
"dev_name": dev_name,
|
||||
"dev_id": dev_id,
|
||||
"update_success": True,
|
||||
"location_found": False,
|
||||
"used_op": None,
|
||||
"used_loc": None,
|
||||
"ops": []
|
||||
}
|
||||
used_loc = None
|
||||
if 'operation' in data and len(data['operation']) > 0:
|
||||
res['ops'] = data['operation']
|
||||
|
||||
used_op = None
|
||||
used_loc = {
|
||||
"latitude": None,
|
||||
"longitude": None,
|
||||
"gps_accuracy": None,
|
||||
"gps_date": None
|
||||
}
|
||||
# Find and extract the latest location from the response. Often the response
|
||||
# contains multiple locations (especially for non-SmartTag devices such as phones).
|
||||
# We go through all of them and find the "most usable" one. Sometimes locations
|
||||
# are encrypted (usually OFFLINE_LOC), we ignore these. They could probably also
|
||||
# be encrypted; there is a special getEncToken-Endpoint which returns some sort of
|
||||
# key. Since the only encrypted locations I encountered were even older than the
|
||||
# non encrypted ones, I didn't try anything to encrypt them yet.
|
||||
for op in data['operation']:
|
||||
if op['oprnType'] in ['LOCATION', 'LASTLOC', 'OFFLINE_LOC']:
|
||||
if 'latitude' in op:
|
||||
utcDate = None
|
||||
|
||||
if 'extra' in op and 'gpsUtcDt' in op['extra']:
|
||||
utcDate = parse_stf_date(op['extra']['gpsUtcDt'])
|
||||
else:
|
||||
_LOGGER.error(f"[{dev_name}] No UTC date found for operation '{op['oprnType']}', this should not happen! OP: {json.dumps(op)}")
|
||||
continue
|
||||
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
_LOGGER.debug(f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
continue
|
||||
|
||||
locFound = False
|
||||
if 'latitude' in op:
|
||||
used_loc['latitude'] = float(op['latitude'])
|
||||
locFound = True
|
||||
if 'longitude' in op:
|
||||
used_loc['longitude'] = float(op['longitude'])
|
||||
locFound = True
|
||||
|
||||
if not locFound:
|
||||
_LOGGER.warn(f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
else:
|
||||
res['location_found'] = True
|
||||
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(op.get('horizontalUncertainty'), op.get('verticalUncertainty'))
|
||||
used_loc['gps_date'] = utcDate
|
||||
used_op = op
|
||||
|
||||
elif 'encLocation' in op:
|
||||
loc = op['encLocation']
|
||||
if 'encrypted' in loc and loc['encrypted']:
|
||||
_LOGGER.warn(f"[{dev_name}] Ignoring encrypted location ({op['oprnType']})")
|
||||
continue
|
||||
elif 'gpsUtcDt' not in loc:
|
||||
_LOGGER.warn(f"[{dev_name}] Ignoring location with missing date ({op['oprnType']})")
|
||||
continue
|
||||
else:
|
||||
utcDate = parse_stf_date(loc['gpsUtcDt'])
|
||||
if used_loc['gps_date'] and used_loc['gps_date'] >= utcDate:
|
||||
_LOGGER.debug(f"[{dev_name}] Ignoring location older than the previous ({op['oprnType']})")
|
||||
continue
|
||||
else:
|
||||
locFound = False
|
||||
if 'latitude' in loc:
|
||||
used_loc['latitude'] = float(loc['latitude'])
|
||||
locFound = True
|
||||
if 'longitude' in loc:
|
||||
used_loc['longitude'] = float(loc['longitude'])
|
||||
locFound = True
|
||||
else:
|
||||
res['location_found'] = True
|
||||
|
||||
if not locFound:
|
||||
_LOGGER.warn(f"[{dev_name}] Found no coordinates in operation '{op['oprnType']}'")
|
||||
|
||||
used_loc['gps_accuracy'] = calc_gps_accuracy(loc.get('horizontalUncertainty'), loc.get('verticalUncertainty'))
|
||||
used_loc['gps_date'] = utcDate
|
||||
used_op = op
|
||||
continue
|
||||
|
||||
if used_op:
|
||||
res['used_op'] = used_op
|
||||
res['used_loc'] = used_loc
|
||||
else:
|
||||
_LOGGER.warn(f"[{dev_name}] No useable location-operation found")
|
||||
|
||||
_LOGGER.debug(f" --> {dev_name} used operation:\t{'NONE' if not used_op else used_op['oprnType']}")
|
||||
|
||||
else:
|
||||
_LOGGER.warn(f"[{dev_name}] No operation found in response; marking update failed")
|
||||
res['update_success'] = False
|
||||
return res
|
||||
else:
|
||||
_LOGGER.error(f"[{dev_name}] Failed to fetch device data ({response.status})")
|
||||
res_text = await response.text()
|
||||
_LOGGER.debug(f"[{dev_name}] Full response: '{res_text}'")
|
||||
|
||||
# Our session is not valid anymore. Refreshing the CSRF Token ist not
|
||||
# enough at this point. Instead we have to ask the user to go through
|
||||
# the whole auth flow again
|
||||
if res_text == 'Logout' or response.status == 401:
|
||||
raise ConfigEntryAuthFailed(f"Session not valid anymore, received status_code of {response.status} with response '{res_text}'")
|
||||
|
||||
except ConfigEntryAuthFailed as e:
|
||||
raise
|
||||
except Exception as e:
|
||||
_LOGGER.error(f"[{dev_name}] Exception occurred while fetching location data for tag '{dev_name}': {e}", exc_info=True)
|
||||
|
||||
return None
|
||||
|
||||
def calc_gps_accuracy(hu: float, vu: float) -> float:
|
||||
"""
|
||||
Calculate the GPS accuracy using the Pythagorean theorem.
|
||||
Returns the combined GPS accuracy based on the horizontal
|
||||
and vertical uncertainties provided by the API
|
||||
|
||||
Args:
|
||||
hu (float): Horizontal uncertainty.
|
||||
vu (float): Vertical uncertainty.
|
||||
|
||||
Returns:
|
||||
float: Calculated GPS accuracy.
|
||||
"""
|
||||
try:
|
||||
return round((float(hu)**2 + float(vu)**2) ** 0.5, 1)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def get_sub_location(ops: list, subDeviceName: str) -> tuple:
|
||||
"""
|
||||
Extracts sub-location data for devices that contain multiple
|
||||
sub-locations (e.g., left and right earbuds).
|
||||
|
||||
Args:
|
||||
ops (list): List of operations from the API.
|
||||
subDeviceName (str): Name of the sub-device.
|
||||
|
||||
Returns:
|
||||
tuple: The operation and sub-location data.
|
||||
"""
|
||||
if not ops or not subDeviceName or len(ops) < 1:
|
||||
return {}, {}
|
||||
for op in ops:
|
||||
if subDeviceName in op.get('encLocation', {}):
|
||||
loc = op['encLocation'][subDeviceName]
|
||||
sub_loc = {
|
||||
"latitude": float(loc['latitude']),
|
||||
"longitude": float(loc['longitude']),
|
||||
"gps_accuracy": calc_gps_accuracy(loc.get('horizontalUncertainty'), loc.get('verticalUncertainty')),
|
||||
"gps_date": parse_stf_date(loc['gpsUtcDt'])
|
||||
}
|
||||
return op, sub_loc
|
||||
return {}, {}
|
||||
|
||||
def parse_stf_date(datestr: str) -> datetime:
|
||||
"""
|
||||
Parses a date string in the format "%Y%m%d%H%M%S" to a datetime object.
|
||||
This is the format, the SmartThings Find API uses.
|
||||
|
||||
Args:
|
||||
datestr (str): The date string in the format "%Y%m%d%H%M%S".
|
||||
|
||||
Returns:
|
||||
datetime: A datetime object representing the input date string.
|
||||
"""
|
||||
return datetime.strptime(datestr, "%Y%m%d%H%M%S").replace(tzinfo=pytz.UTC)
|
||||
|
||||
def get_battery_level(dev_name: str, ops: list) -> int:
|
||||
"""
|
||||
Try to extract the device battery level from the received operation
|
||||
|
||||
Args:
|
||||
dev_name (str): The name of the device.
|
||||
ops (list): List of operations from the API.
|
||||
|
||||
Returns:
|
||||
int: The battery level if found, None otherwise.
|
||||
"""
|
||||
for op in ops:
|
||||
if op['oprnType'] == 'CHECK_CONNECTION' and 'battery' in op:
|
||||
batt_raw = op['battery']
|
||||
batt = BATTERY_LEVELS.get(batt_raw, None)
|
||||
if batt is None:
|
||||
try:
|
||||
batt = int(batt_raw)
|
||||
except ValueError:
|
||||
_LOGGER.warn(f"[{dev_name}]: Received invalid battery level: {batt_raw}")
|
||||
return batt
|
||||
return None
|
||||
|
||||
def gen_qr_code_base64(data: str) -> str:
|
||||
"""
|
||||
Generates a QR code from the provided data and returns it as a base64-encoded string.
|
||||
Used to show a login QR code during authentication flow
|
||||
|
||||
Args:
|
||||
data (str): The data to encode in the QR code.
|
||||
|
||||
Returns:
|
||||
str: The base64-encoded string representation of the QR code.
|
||||
"""
|
||||
qr = qrcode.QRCode()
|
||||
qr.add_data(data)
|
||||
img = qr.make_image(fill_color="black", back_color="white")
|
||||
buffer = BytesIO()
|
||||
img.save(buffer, format="PNG")
|
||||
return base64.b64encode(buffer.getvalue()).decode("utf-8")
|
Loading…
Add table
Add a link
Reference in a new issue