Compare commits
1 commit
dev
...
permission
Author | SHA1 | Date | |
---|---|---|---|
![]() |
ef0647f491 |
3 changed files with 259 additions and 1 deletions
|
@ -12,7 +12,7 @@ from homeassistant.auth.const import ACCESS_TOKEN_EXPIRATION
|
|||
from homeassistant.core import callback, HomeAssistant
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from . import auth_store, models
|
||||
from . import auth_store, models, permissions
|
||||
from .mfa_modules import auth_mfa_module_from_config, MultiFactorAuthModule
|
||||
from .providers import auth_provider_from_config, AuthProvider, LoginFlow
|
||||
|
||||
|
@ -68,6 +68,7 @@ class AuthManager:
|
|||
"""Initialize the auth manager."""
|
||||
self.hass = hass
|
||||
self._store = store
|
||||
self.permissions = permissions.Permissions()
|
||||
self._providers = providers
|
||||
self._mfa_modules = mfa_modules
|
||||
self.login_flow = data_entry_flow.FlowManager(
|
||||
|
|
100
homeassistant/auth/permissions.py
Normal file
100
homeassistant/auth/permissions.py
Normal file
|
@ -0,0 +1,100 @@
|
|||
"""Permissions for Home Assistant."""
|
||||
import voluptuous as vol
|
||||
|
||||
# Policy if user has no policy applied.
|
||||
DEFAULT_POLICY = {
|
||||
"entities": True
|
||||
}
|
||||
|
||||
ENTITY_POLICY_SCHEMA = vol.Any(bool, vol.Schema({
|
||||
vol.Optional('domains'): vol.Any(bool, vol.Schema({
|
||||
str: bool
|
||||
})),
|
||||
vol.Optional('entity_ids'): vol.Any(bool, vol.Schema({
|
||||
str: bool
|
||||
})),
|
||||
}))
|
||||
|
||||
POLICY_SCHEMA = vol.Schema({
|
||||
vol.Optional('entities'): ENTITY_POLICY_SCHEMA
|
||||
})
|
||||
|
||||
|
||||
class Permissions:
|
||||
"""Handle permissions."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the permission class."""
|
||||
self._compiled = {}
|
||||
|
||||
def check_entity(self, user_id: str, entity_id: str, *keys):
|
||||
"""Test if we can access entity."""
|
||||
func = self._policy_func(user_id, 'entities', _compile_entities)
|
||||
return func(entity_id, keys)
|
||||
|
||||
def filter_entities(self, user_id, entities):
|
||||
"""Filter a list of entities for what the user is allowed to see."""
|
||||
func = self._policy_func(user_id, 'entities', _compile_entities)
|
||||
keys = ('read',)
|
||||
return [entity for entity in entities if func(entity.entity_id, keys)]
|
||||
|
||||
def _policy_func(self, user_id, category, compile_func):
|
||||
"""Get a policy function."""
|
||||
key = (user_id, category)
|
||||
func = self._compiled.get(key)
|
||||
|
||||
if func:
|
||||
return func
|
||||
|
||||
policy = self._resolve_policy(user_id)
|
||||
func = self._compiled[key] = _compile_entities(policy.get(category))
|
||||
return func
|
||||
|
||||
def _resolve_policy(self, user_id):
|
||||
"""Return user policy."""
|
||||
# pylint: disable=no-self-use
|
||||
return DEFAULT_POLICY
|
||||
|
||||
|
||||
def _compile_entities(policy):
|
||||
"""Compile policy into a function that tests policy."""
|
||||
# None, Empty Dict, False
|
||||
if not policy:
|
||||
return lambda entity_id, keys: False
|
||||
|
||||
if policy is True:
|
||||
return lambda entity_id, keys: True
|
||||
|
||||
domains = policy.get('domains')
|
||||
entity_ids = policy.get('entity_ids')
|
||||
|
||||
# Setting domains or entity_ids to True whitelists all entities
|
||||
if domains is True or entity_ids is True:
|
||||
return lambda entity_id, keys: True
|
||||
|
||||
funcs = []
|
||||
|
||||
# If it's False, no need to process it.
|
||||
if domains and domains is not False:
|
||||
def allowed_domain(entity_id, keys):
|
||||
"""Test if allowed domain."""
|
||||
domain = entity_id.split(".", 1)[0]
|
||||
return domains.get(domain) is True
|
||||
|
||||
funcs.append(allowed_domain)
|
||||
|
||||
# If it's False, no need to process it.
|
||||
if entity_ids and entity_ids is not False:
|
||||
def allowed_entity_id(entity_id, keys):
|
||||
"""Test if allowed domain."""
|
||||
return entity_ids.get(entity_id) is True
|
||||
|
||||
funcs.append(allowed_entity_id)
|
||||
|
||||
if not funcs:
|
||||
return lambda entity_id, keys: False
|
||||
|
||||
if len(funcs) == 1:
|
||||
return funcs[0]
|
||||
|
||||
return lambda entity_id, keys: any(func(entity_id, keys) for func in funcs)
|
157
tests/auth/test_permissions.py
Normal file
157
tests/auth/test_permissions.py
Normal file
|
@ -0,0 +1,157 @@
|
|||
"""Tests for the auth permission system."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.core import State
|
||||
from homeassistant.auth import permissions
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def perm():
|
||||
"""Fixture permission object."""
|
||||
perm = permissions.Permissions()
|
||||
perm.mock_policy = permissions.DEFAULT_POLICY
|
||||
perm._resolve_policy = lambda user_id: perm.mock_policy
|
||||
return perm
|
||||
|
||||
|
||||
def test_entities_none():
|
||||
"""Test entity ID policy."""
|
||||
policy = None
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_empty():
|
||||
"""Test entity ID policy."""
|
||||
policy = {}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = False
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = True
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_domains_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': True
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_domains_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': False
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_domains_domain_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': True
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', [])
|
||||
assert not compiled('switch.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_domains_domain_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'domains': {
|
||||
'light': False
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
assert not compiled('switch.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_entity_ids_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': True
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_entity_ids_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': False
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_true():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert compiled('light.kitchen', [])
|
||||
assert not compiled('switch.kitchen', [])
|
||||
|
||||
|
||||
def test_entities_entity_ids_entity_id_false():
|
||||
"""Test entity ID policy."""
|
||||
policy = {
|
||||
'entity_ids': {
|
||||
'light.kitchen': False
|
||||
}
|
||||
}
|
||||
permissions.ENTITY_POLICY_SCHEMA(policy)
|
||||
compiled = permissions._compile_entities(policy)
|
||||
assert not compiled('light.kitchen', [])
|
||||
assert not compiled('switch.kitchen', [])
|
||||
|
||||
|
||||
def test_filter_entities(perm):
|
||||
"""Test filtering entitites."""
|
||||
states = [
|
||||
State('light.kitchen', 'on'),
|
||||
State('light.living_room', 'off'),
|
||||
State('light.balcony', 'on'),
|
||||
]
|
||||
perm.mock_policy = {
|
||||
'entities': {
|
||||
'entity_ids': {
|
||||
'light.kitchen': True,
|
||||
'light.balcony': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
filtered = perm.filter_entities('mock-user-id', states)
|
||||
assert len(filtered) == 2
|
||||
assert filtered == [states[0], states[2]]
|
Loading…
Add table
Add a link
Reference in a new issue