Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Paulus Schoutsen
ef0647f491 Add permissions foundation 2018-09-26 15:30:03 +02:00
3 changed files with 259 additions and 1 deletions

View file

@ -12,7 +12,7 @@ from homeassistant.auth.const import ACCESS_TOKEN_EXPIRATION
from homeassistant.core import callback, HomeAssistant
from homeassistant.util import dt as dt_util
from . import auth_store, models
from . import auth_store, models, permissions
from .mfa_modules import auth_mfa_module_from_config, MultiFactorAuthModule
from .providers import auth_provider_from_config, AuthProvider, LoginFlow
@ -68,6 +68,7 @@ class AuthManager:
"""Initialize the auth manager."""
self.hass = hass
self._store = store
self.permissions = permissions.Permissions()
self._providers = providers
self._mfa_modules = mfa_modules
self.login_flow = data_entry_flow.FlowManager(

View file

@ -0,0 +1,100 @@
"""Permissions for Home Assistant."""
import voluptuous as vol
# Policy if user has no policy applied.
DEFAULT_POLICY = {
"entities": True
}
ENTITY_POLICY_SCHEMA = vol.Any(bool, vol.Schema({
vol.Optional('domains'): vol.Any(bool, vol.Schema({
str: bool
})),
vol.Optional('entity_ids'): vol.Any(bool, vol.Schema({
str: bool
})),
}))
POLICY_SCHEMA = vol.Schema({
vol.Optional('entities'): ENTITY_POLICY_SCHEMA
})
class Permissions:
"""Handle permissions."""
def __init__(self):
"""Initialize the permission class."""
self._compiled = {}
def check_entity(self, user_id: str, entity_id: str, *keys):
"""Test if we can access entity."""
func = self._policy_func(user_id, 'entities', _compile_entities)
return func(entity_id, keys)
def filter_entities(self, user_id, entities):
"""Filter a list of entities for what the user is allowed to see."""
func = self._policy_func(user_id, 'entities', _compile_entities)
keys = ('read',)
return [entity for entity in entities if func(entity.entity_id, keys)]
def _policy_func(self, user_id, category, compile_func):
"""Get a policy function."""
key = (user_id, category)
func = self._compiled.get(key)
if func:
return func
policy = self._resolve_policy(user_id)
func = self._compiled[key] = _compile_entities(policy.get(category))
return func
def _resolve_policy(self, user_id):
"""Return user policy."""
# pylint: disable=no-self-use
return DEFAULT_POLICY
def _compile_entities(policy):
"""Compile policy into a function that tests policy."""
# None, Empty Dict, False
if not policy:
return lambda entity_id, keys: False
if policy is True:
return lambda entity_id, keys: True
domains = policy.get('domains')
entity_ids = policy.get('entity_ids')
# Setting domains or entity_ids to True whitelists all entities
if domains is True or entity_ids is True:
return lambda entity_id, keys: True
funcs = []
# If it's False, no need to process it.
if domains and domains is not False:
def allowed_domain(entity_id, keys):
"""Test if allowed domain."""
domain = entity_id.split(".", 1)[0]
return domains.get(domain) is True
funcs.append(allowed_domain)
# If it's False, no need to process it.
if entity_ids and entity_ids is not False:
def allowed_entity_id(entity_id, keys):
"""Test if allowed domain."""
return entity_ids.get(entity_id) is True
funcs.append(allowed_entity_id)
if not funcs:
return lambda entity_id, keys: False
if len(funcs) == 1:
return funcs[0]
return lambda entity_id, keys: any(func(entity_id, keys) for func in funcs)

View file

@ -0,0 +1,157 @@
"""Tests for the auth permission system."""
import pytest
from homeassistant.core import State
from homeassistant.auth import permissions
@pytest.fixture
def perm():
"""Fixture permission object."""
perm = permissions.Permissions()
perm.mock_policy = permissions.DEFAULT_POLICY
perm._resolve_policy = lambda user_id: perm.mock_policy
return perm
def test_entities_none():
"""Test entity ID policy."""
policy = None
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
def test_entities_empty():
"""Test entity ID policy."""
policy = {}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
def test_entities_false():
"""Test entity ID policy."""
policy = False
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
def test_entities_true():
"""Test entity ID policy."""
policy = True
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert compiled('light.kitchen', [])
def test_entities_domains_true():
"""Test entity ID policy."""
policy = {
'domains': True
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert compiled('light.kitchen', [])
def test_entities_domains_false():
"""Test entity ID policy."""
policy = {
'domains': False
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
def test_entities_domains_domain_true():
"""Test entity ID policy."""
policy = {
'domains': {
'light': True
}
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert compiled('light.kitchen', [])
assert not compiled('switch.kitchen', [])
def test_entities_domains_domain_false():
"""Test entity ID policy."""
policy = {
'domains': {
'light': False
}
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
assert not compiled('switch.kitchen', [])
def test_entities_entity_ids_true():
"""Test entity ID policy."""
policy = {
'entity_ids': True
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert compiled('light.kitchen', [])
def test_entities_entity_ids_false():
"""Test entity ID policy."""
policy = {
'entity_ids': False
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
def test_entities_entity_ids_entity_id_true():
"""Test entity ID policy."""
policy = {
'entity_ids': {
'light.kitchen': True
}
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert compiled('light.kitchen', [])
assert not compiled('switch.kitchen', [])
def test_entities_entity_ids_entity_id_false():
"""Test entity ID policy."""
policy = {
'entity_ids': {
'light.kitchen': False
}
}
permissions.ENTITY_POLICY_SCHEMA(policy)
compiled = permissions._compile_entities(policy)
assert not compiled('light.kitchen', [])
assert not compiled('switch.kitchen', [])
def test_filter_entities(perm):
"""Test filtering entitites."""
states = [
State('light.kitchen', 'on'),
State('light.living_room', 'off'),
State('light.balcony', 'on'),
]
perm.mock_policy = {
'entities': {
'entity_ids': {
'light.kitchen': True,
'light.balcony': True,
}
}
}
filtered = perm.filter_entities('mock-user-id', states)
assert len(filtered) == 2
assert filtered == [states[0], states[2]]