Fix async probs (#9924)
* Update entity.py * Update entity_component.py * Update entity_component.py * Update __init__.py * Update entity_component.py * Update entity_component.py * Update entity.py * cleanup entity * Update entity_component.py * Update entity_component.py * Fix names & comments / fix tests * Revert deadlock protection * Add tests for entity * Add test fix name * Update other code * Fix lint * Remove restore state from template entities * Lint
This commit is contained in:
parent
6cce934f72
commit
c1b197419d
24 changed files with 356 additions and 362 deletions
|
@ -44,7 +44,7 @@ class EntityComponent(object):
|
|||
self.config = None
|
||||
|
||||
self._platforms = {
|
||||
'core': EntityPlatform(self, domain, self.scan_interval, None),
|
||||
'core': EntityPlatform(self, domain, self.scan_interval, 0, None),
|
||||
}
|
||||
self.async_add_entities = self._platforms['core'].async_add_entities
|
||||
self.add_entities = self._platforms['core'].add_entities
|
||||
|
@ -128,17 +128,23 @@ class EntityComponent(object):
|
|||
return
|
||||
|
||||
# Config > Platform > Component
|
||||
scan_interval = (platform_config.get(CONF_SCAN_INTERVAL) or
|
||||
getattr(platform, 'SCAN_INTERVAL', None) or
|
||||
self.scan_interval)
|
||||
scan_interval = (
|
||||
platform_config.get(CONF_SCAN_INTERVAL) or
|
||||
getattr(platform, 'SCAN_INTERVAL', None) or self.scan_interval)
|
||||
parallel_updates = getattr(
|
||||
platform, 'PARALLEL_UPDATES',
|
||||
int(not hasattr(platform, 'async_setup_platform')))
|
||||
|
||||
entity_namespace = platform_config.get(CONF_ENTITY_NAMESPACE)
|
||||
|
||||
key = (platform_type, scan_interval, entity_namespace)
|
||||
|
||||
if key not in self._platforms:
|
||||
self._platforms[key] = EntityPlatform(
|
||||
self, platform_type, scan_interval, entity_namespace)
|
||||
entity_platform = self._platforms[key]
|
||||
entity_platform = self._platforms[key] = EntityPlatform(
|
||||
self, platform_type, scan_interval, parallel_updates,
|
||||
entity_namespace)
|
||||
else:
|
||||
entity_platform = self._platforms[key]
|
||||
|
||||
self.logger.info("Setting up %s.%s", self.domain, platform_type)
|
||||
warn_task = self.hass.loop.call_later(
|
||||
|
@ -204,13 +210,6 @@ class EntityComponent(object):
|
|||
|
||||
entity.hass = self.hass
|
||||
|
||||
# update/init entity data
|
||||
if update_before_add:
|
||||
if hasattr(entity, 'async_update'):
|
||||
yield from entity.async_update()
|
||||
else:
|
||||
yield from self.hass.async_add_job(entity.update)
|
||||
|
||||
if getattr(entity, 'entity_id', None) is None:
|
||||
object_id = entity.name or DEVICE_DEFAULT_NAME
|
||||
|
||||
|
@ -235,7 +234,7 @@ class EntityComponent(object):
|
|||
if hasattr(entity, 'async_added_to_hass'):
|
||||
yield from entity.async_added_to_hass()
|
||||
|
||||
yield from entity.async_update_ha_state()
|
||||
yield from entity.async_update_ha_state(update_before_add)
|
||||
|
||||
return True
|
||||
|
||||
|
@ -316,17 +315,23 @@ class EntityComponent(object):
|
|||
class EntityPlatform(object):
|
||||
"""Keep track of entities for a single platform and stay in loop."""
|
||||
|
||||
def __init__(self, component, platform, scan_interval, entity_namespace):
|
||||
def __init__(self, component, platform, scan_interval, parallel_updates,
|
||||
entity_namespace):
|
||||
"""Initialize the entity platform."""
|
||||
self.component = component
|
||||
self.platform = platform
|
||||
self.scan_interval = scan_interval
|
||||
self.parallel_updates = None
|
||||
self.entity_namespace = entity_namespace
|
||||
self.platform_entities = []
|
||||
self._tasks = []
|
||||
self._async_unsub_polling = None
|
||||
self._process_updates = asyncio.Lock(loop=component.hass.loop)
|
||||
|
||||
if parallel_updates:
|
||||
self.parallel_updates = asyncio.Semaphore(
|
||||
parallel_updates, loop=component.hass.loop)
|
||||
|
||||
@asyncio.coroutine
|
||||
def async_block_entities_done(self):
|
||||
"""Wait until all entities add to hass."""
|
||||
|
@ -377,6 +382,7 @@ class EntityPlatform(object):
|
|||
@asyncio.coroutine
|
||||
def async_process_entity(new_entity):
|
||||
"""Add entities to StateMachine."""
|
||||
new_entity.parallel_updates = self.parallel_updates
|
||||
ret = yield from self.component.async_add_entity(
|
||||
new_entity, self, update_before_add=update_before_add
|
||||
)
|
||||
|
@ -432,26 +438,10 @@ class EntityPlatform(object):
|
|||
|
||||
with (yield from self._process_updates):
|
||||
tasks = []
|
||||
to_update = []
|
||||
|
||||
for entity in self.platform_entities:
|
||||
if not entity.should_poll:
|
||||
continue
|
||||
|
||||
update_coro = entity.async_update_ha_state(True)
|
||||
if hasattr(entity, 'async_update'):
|
||||
tasks.append(
|
||||
self.component.hass.async_add_job(update_coro))
|
||||
else:
|
||||
to_update.append(update_coro)
|
||||
|
||||
for update_coro in to_update:
|
||||
try:
|
||||
yield from update_coro
|
||||
except Exception: # pylint: disable=broad-except
|
||||
self.component.logger.exception(
|
||||
"Error while update entity from %s in %s",
|
||||
self.platform, self.component.domain)
|
||||
tasks.append(entity.async_update_ha_state(True))
|
||||
|
||||
if tasks:
|
||||
yield from asyncio.wait(tasks, loop=self.component.hass.loop)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue