Core: cleanup timer (#5825)

* Minor core cleanup

* Cleanup timer

* Lint

* timeout with correct loop

* Improve timer thanks to pvizeli

* Update core.py

* More tests
This commit is contained in:
Paulus Schoutsen 2017-02-10 09:00:17 -08:00 committed by GitHub
parent c7c3b30e0a
commit 6ffab53377
3 changed files with 130 additions and 129 deletions

View file

@ -2,7 +2,7 @@
# pylint: disable=protected-access
import asyncio
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, sentinel
from datetime import datetime, timedelta
import pytz
@ -14,7 +14,9 @@ from homeassistant.util.async import run_coroutine_threadsafe
import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import (METRIC_SYSTEM)
from homeassistant.const import (
__version__, EVENT_STATE_CHANGED, ATTR_FRIENDLY_NAME, CONF_UNIT_SYSTEM)
__version__, EVENT_STATE_CHANGED, ATTR_FRIENDLY_NAME, CONF_UNIT_SYSTEM,
ATTR_NOW, EVENT_TIME_CHANGED, EVENT_HOMEASSISTANT_STOP,
EVENT_HOMEASSISTANT_START)
from tests.common import get_test_home_assistant
@ -736,37 +738,78 @@ class TestConfig(unittest.TestCase):
self.assertEqual(expected, self.config.as_dict())
class TestAsyncCreateTimer(object):
@patch('homeassistant.core.monotonic')
def test_create_timer(mock_monotonic, loop):
"""Test create timer."""
hass = MagicMock()
funcs = []
orig_callback = ha.callback
@patch('homeassistant.core.asyncio.Event')
@patch('homeassistant.core.dt_util.utcnow')
def test_create_timer(self, mock_utcnow, mock_event, event_loop):
"""Test create timer fires correctly."""
hass = MagicMock()
now = mock_utcnow()
event = mock_event()
now.second = 1
mock_utcnow.reset_mock()
def mock_callback(func):
funcs.append(func)
return orig_callback(func)
with patch.object(ha, 'callback', mock_callback):
ha._async_create_timer(hass)
assert len(hass.bus.async_listen_once.mock_calls) == 2
start_timer = hass.bus.async_listen_once.mock_calls[1][1][1]
event_loop.run_until_complete(start_timer(None))
assert hass.loop.create_task.called
assert len(funcs) == 3
fire_time_event, start_timer, stop_timer = funcs
timer = hass.loop.create_task.mock_calls[0][1][0]
event.is_set.side_effect = False, False, True
event_loop.run_until_complete(timer)
assert len(mock_utcnow.mock_calls) == 1
assert len(hass.bus.async_listen_once.mock_calls) == 1
event_type, callback = hass.bus.async_listen_once.mock_calls[0][1]
assert event_type == EVENT_HOMEASSISTANT_START
assert callback is start_timer
assert hass.loop.call_soon.called
event_type, event_data = hass.loop.call_soon.mock_calls[0][1][1:]
mock_monotonic.side_effect = 10.2, 10.3
assert ha.EVENT_TIME_CHANGED == event_type
assert {ha.ATTR_NOW: now} == event_data
with patch('homeassistant.core.dt_util.utcnow',
return_value=sentinel.mock_date):
start_timer(None)
stop_timer = hass.bus.async_listen_once.mock_calls[0][1][1]
stop_timer(None)
assert event.set.called
assert len(hass.bus.async_listen_once.mock_calls) == 2
assert len(hass.bus.async_fire.mock_calls) == 1
assert len(hass.loop.call_later.mock_calls) == 1
event_type, callback = hass.bus.async_listen_once.mock_calls[1][1]
assert event_type == EVENT_HOMEASSISTANT_STOP
assert callback is stop_timer
slp_seconds, callback, nxt = hass.loop.call_later.mock_calls[0][1]
assert abs(slp_seconds - 0.9) < 0.001
assert callback is fire_time_event
assert abs(nxt - 11.2) < 0.001
event_type, event_data = hass.bus.async_fire.mock_calls[0][1]
assert event_type == EVENT_TIME_CHANGED
assert event_data[ATTR_NOW] is sentinel.mock_date
@patch('homeassistant.core.monotonic')
def test_timer_out_of_sync(mock_monotonic, loop):
"""Test create timer."""
hass = MagicMock()
funcs = []
orig_callback = ha.callback
def mock_callback(func):
funcs.append(func)
return orig_callback(func)
with patch.object(ha, 'callback', mock_callback):
ha._async_create_timer(hass)
assert len(funcs) == 3
fire_time_event, start_timer, stop_timer = funcs
mock_monotonic.side_effect = 10.2, 11.3, 11.3
with patch('homeassistant.core.dt_util.utcnow',
return_value=sentinel.mock_date):
start_timer(None)
assert len(hass.loop.call_later.mock_calls) == 1
slp_seconds, callback, nxt = hass.loop.call_later.mock_calls[0][1]
assert slp_seconds == 1
assert callback is fire_time_event
assert abs(nxt - 12.3) < 0.001