
* Add Notify MFA * Fix unit test * Address review comment, change storage implementation * Add retry limit to mfa module * Fix loading * Fix invalaid login log processing * Typing * Change default message template * Change one-time password to 8 digit * Refactoring to not save secret * Bug fixing * Change async_initialize method name to aysnc_initialize_login_mfa_step * Address some simple fix code review comment
175 lines
5.5 KiB
Python
175 lines
5.5 KiB
Python
"""Plugable auth modules for Home Assistant."""
|
|
import importlib
|
|
import logging
|
|
import types
|
|
from typing import Any, Dict, Optional
|
|
|
|
import voluptuous as vol
|
|
from voluptuous.humanize import humanize_error
|
|
|
|
from homeassistant import requirements, data_entry_flow
|
|
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.exceptions import HomeAssistantError
|
|
from homeassistant.util.decorator import Registry
|
|
|
|
MULTI_FACTOR_AUTH_MODULES = Registry()
|
|
|
|
MULTI_FACTOR_AUTH_MODULE_SCHEMA = vol.Schema({
|
|
vol.Required(CONF_TYPE): str,
|
|
vol.Optional(CONF_NAME): str,
|
|
# Specify ID if you have two mfa auth module for same type.
|
|
vol.Optional(CONF_ID): str,
|
|
}, extra=vol.ALLOW_EXTRA)
|
|
|
|
DATA_REQS = 'mfa_auth_module_reqs_processed'
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class MultiFactorAuthModule:
|
|
"""Multi-factor Auth Module of validation function."""
|
|
|
|
DEFAULT_TITLE = 'Unnamed auth module'
|
|
MAX_RETRY_TIME = 3
|
|
|
|
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
|
|
"""Initialize an auth module."""
|
|
self.hass = hass
|
|
self.config = config
|
|
|
|
@property
|
|
def id(self) -> str: # pylint: disable=invalid-name
|
|
"""Return id of the auth module.
|
|
|
|
Default is same as type
|
|
"""
|
|
return self.config.get(CONF_ID, self.type)
|
|
|
|
@property
|
|
def type(self) -> str:
|
|
"""Return type of the module."""
|
|
return self.config[CONF_TYPE] # type: ignore
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
"""Return the name of the auth module."""
|
|
return self.config.get(CONF_NAME, self.DEFAULT_TITLE)
|
|
|
|
# Implement by extending class
|
|
|
|
@property
|
|
def input_schema(self) -> vol.Schema:
|
|
"""Return a voluptuous schema to define mfa auth module's input."""
|
|
raise NotImplementedError
|
|
|
|
async def async_setup_flow(self, user_id: str) -> 'SetupFlow':
|
|
"""Return a data entry flow handler for setup module.
|
|
|
|
Mfa module should extend SetupFlow
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
async def async_setup_user(self, user_id: str, setup_data: Any) -> Any:
|
|
"""Set up user for mfa auth module."""
|
|
raise NotImplementedError
|
|
|
|
async def async_depose_user(self, user_id: str) -> None:
|
|
"""Remove user from mfa module."""
|
|
raise NotImplementedError
|
|
|
|
async def async_is_user_setup(self, user_id: str) -> bool:
|
|
"""Return whether user is setup."""
|
|
raise NotImplementedError
|
|
|
|
async def async_validate(
|
|
self, user_id: str, user_input: Dict[str, Any]) -> bool:
|
|
"""Return True if validation passed."""
|
|
raise NotImplementedError
|
|
|
|
|
|
class SetupFlow(data_entry_flow.FlowHandler):
|
|
"""Handler for the setup flow."""
|
|
|
|
def __init__(self, auth_module: MultiFactorAuthModule,
|
|
setup_schema: vol.Schema,
|
|
user_id: str) -> None:
|
|
"""Initialize the setup flow."""
|
|
self._auth_module = auth_module
|
|
self._setup_schema = setup_schema
|
|
self._user_id = user_id
|
|
|
|
async def async_step_init(
|
|
self, user_input: Optional[Dict[str, str]] = None) \
|
|
-> Dict[str, Any]:
|
|
"""Handle the first step of setup flow.
|
|
|
|
Return self.async_show_form(step_id='init') if user_input == None.
|
|
Return self.async_create_entry(data={'result': result}) if finish.
|
|
"""
|
|
errors = {} # type: Dict[str, str]
|
|
|
|
if user_input:
|
|
result = await self._auth_module.async_setup_user(
|
|
self._user_id, user_input)
|
|
return self.async_create_entry(
|
|
title=self._auth_module.name,
|
|
data={'result': result}
|
|
)
|
|
|
|
return self.async_show_form(
|
|
step_id='init',
|
|
data_schema=self._setup_schema,
|
|
errors=errors
|
|
)
|
|
|
|
|
|
async def auth_mfa_module_from_config(
|
|
hass: HomeAssistant, config: Dict[str, Any]) \
|
|
-> MultiFactorAuthModule:
|
|
"""Initialize an auth module from a config."""
|
|
module_name = config[CONF_TYPE]
|
|
module = await _load_mfa_module(hass, module_name)
|
|
|
|
try:
|
|
config = module.CONFIG_SCHEMA(config) # type: ignore
|
|
except vol.Invalid as err:
|
|
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
|
|
module_name, humanize_error(config, err))
|
|
raise
|
|
|
|
return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore
|
|
|
|
|
|
async def _load_mfa_module(hass: HomeAssistant, module_name: str) \
|
|
-> types.ModuleType:
|
|
"""Load an mfa auth module."""
|
|
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)
|
|
|
|
try:
|
|
module = importlib.import_module(module_path)
|
|
except ImportError as err:
|
|
_LOGGER.error('Unable to load mfa module %s: %s', module_name, err)
|
|
raise HomeAssistantError('Unable to load mfa module {}: {}'.format(
|
|
module_name, err))
|
|
|
|
if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
|
|
return module
|
|
|
|
processed = hass.data.get(DATA_REQS)
|
|
if processed and module_name in processed:
|
|
return module
|
|
|
|
processed = hass.data[DATA_REQS] = set()
|
|
|
|
# https://github.com/python/mypy/issues/1424
|
|
req_success = await requirements.async_process_requirements(
|
|
hass, module_path, module.REQUIREMENTS) # type: ignore
|
|
|
|
if not req_success:
|
|
raise HomeAssistantError(
|
|
'Unable to process requirements of mfa module {}'.format(
|
|
module_name))
|
|
|
|
processed.add(module_name)
|
|
return module
|