From c049b97a6454904cf4c23215c092c99cabcee15c Mon Sep 17 00:00:00 2001 From: Stefan Persson Date: Thu, 23 Feb 2012 15:44:36 +0100 Subject: [PATCH] Dispatching of events in own thread, to somewhat counter problems with long running events. --- telldus-core/client/CMakeLists.txt | 2 + telldus-core/client/CallbackDispatcher.cpp | 23 +-- telldus-core/client/CallbackDispatcher.h | 17 +- .../client/CallbackMainDispatcher.cpp | 159 ++++++++++++++++++ telldus-core/client/CallbackMainDispatcher.h | 69 ++++++++ telldus-core/client/Client.cpp | 120 ++++--------- telldus-core/client/Client.h | 3 +- telldus-core/common/Socket_win.cpp | 4 +- 8 files changed, 293 insertions(+), 104 deletions(-) create mode 100644 telldus-core/client/CallbackMainDispatcher.cpp create mode 100644 telldus-core/client/CallbackMainDispatcher.h diff --git a/telldus-core/client/CMakeLists.txt b/telldus-core/client/CMakeLists.txt index 7ec93967..03dbd6da 100644 --- a/telldus-core/client/CMakeLists.txt +++ b/telldus-core/client/CMakeLists.txt @@ -7,12 +7,14 @@ FIND_PACKAGE( SignTool REQUIRED ) ######## Non configurable options ######## SET( telldus-core_SRCS CallbackDispatcher.cpp + CallbackMainDispatcher.cpp Client.cpp telldus-core.cpp ) SET( telldus-core_HDRS CallbackDispatcher.h + CallbackMainDispatcher.cpp Client.h ) SET( telldus-core_PUB_HDRS diff --git a/telldus-core/client/CallbackDispatcher.cpp b/telldus-core/client/CallbackDispatcher.cpp index 86dbc0d3..c8dbc873 100644 --- a/telldus-core/client/CallbackDispatcher.cpp +++ b/telldus-core/client/CallbackDispatcher.cpp @@ -8,12 +8,11 @@ */ #include "CallbackDispatcher.h" -#include "common.h" using namespace TelldusCore; -TDDeviceEventDispatcher::TDDeviceEventDispatcher(CallbackStruct *data, int id, int m, const std::string &strD) -:Thread(), d(data), deviceId(id), method(m), strData(strD), doneRunning(false) +TDDeviceEventDispatcher::TDDeviceEventDispatcher(CallbackStruct *data, int id, int m, const std::string &strD, TelldusCore::EventRef cbDone) +:Thread(), d(data), deviceId(id), method(m), strData(strD), doneRunning(false), callbackExecuted(cbDone) { this->startAndLock(&d->mutex); } @@ -29,13 +28,12 @@ bool TDDeviceEventDispatcher::done() const { void TDDeviceEventDispatcher::run() { char *str = wrapStdString(strData); d->event(deviceId, method, strData.c_str(), d->id, d->context); - doneRunning = true; + callbackExecuted->signal(); } - -TDDeviceChangeEventDispatcher::TDDeviceChangeEventDispatcher(CallbackStruct *data, int id, int event, int type) -:Thread(), d(data), deviceId(id), changeEvent(event), changeType(type), doneRunning(false) +TDDeviceChangeEventDispatcher::TDDeviceChangeEventDispatcher(CallbackStruct *data, int id, int event, int type, TelldusCore::EventRef cbDone) +:Thread(), d(data), deviceId(id), changeEvent(event), changeType(type), doneRunning(false), callbackExecuted(cbDone) { this->startAndLock(&d->mutex); } @@ -51,10 +49,11 @@ bool TDDeviceChangeEventDispatcher::done() const { void TDDeviceChangeEventDispatcher::run() { d->event(deviceId, changeEvent, changeType, d->id, d->context); doneRunning = true; + callbackExecuted->signal(); } -TDRawDeviceEventDispatcher::TDRawDeviceEventDispatcher( CallbackStruct *data, const std::string &strD, int id) -:Thread(), d(data), controllerId(id), strData(strD), doneRunning(false) +TDRawDeviceEventDispatcher::TDRawDeviceEventDispatcher( CallbackStruct *data, const std::string &strD, int id, TelldusCore::EventRef cbDone) +:Thread(), d(data), controllerId(id), strData(strD), doneRunning(false), callbackExecuted(cbDone) { this->startAndLock(&d->mutex); } @@ -70,10 +69,11 @@ bool TDRawDeviceEventDispatcher::done() const { void TDRawDeviceEventDispatcher::run() { d->event(strData.c_str(), controllerId, d->id, d->context); doneRunning = true; + callbackExecuted->signal(); } -TDSensorEventDispatcher::TDSensorEventDispatcher( CallbackStruct *data, const std::string &p, const std::string &m, int id, int type, const std::string &v, int t) - :Thread(), d(data), protocol(p), model(m), sensorId(id), dataType(type), value(v), timestamp(t), doneRunning(false) +TDSensorEventDispatcher::TDSensorEventDispatcher( CallbackStruct *data, const std::string &p, const std::string &m, int id, int type, const std::string &v, int t, TelldusCore::EventRef cbDone) + :Thread(), d(data), protocol(p), model(m), sensorId(id), dataType(type), value(v), timestamp(t), doneRunning(false), callbackExecuted(cbDone) { this->startAndLock(&d->mutex); } @@ -89,4 +89,5 @@ bool TDSensorEventDispatcher::done() const { void TDSensorEventDispatcher::run() { d->event(protocol.c_str(), model.c_str(), sensorId, dataType, value.c_str(), timestamp, d->id, d->context); doneRunning = true; + callbackExecuted->signal(); } diff --git a/telldus-core/client/CallbackDispatcher.h b/telldus-core/client/CallbackDispatcher.h index 8cc24add..cf86101d 100644 --- a/telldus-core/client/CallbackDispatcher.h +++ b/telldus-core/client/CallbackDispatcher.h @@ -7,10 +7,11 @@ * */ - #ifndef CALLBACKDISPATCHER_H #define CALLBACKDISPATCHER_H +#include "common.h" +#include "Event.h" #include "Thread.h" #include "Mutex.h" #include "telldus-core.h" @@ -25,7 +26,7 @@ namespace TelldusCore { class TDDeviceEventDispatcher : public Thread { public: - TDDeviceEventDispatcher(CallbackStruct *data, int deviceId, int method, const std::string &strData); + TDDeviceEventDispatcher(CallbackStruct *data, int deviceId, int method, const std::string &strData, TelldusCore::EventRef cbDone); virtual ~TDDeviceEventDispatcher(); bool done() const; protected: @@ -35,22 +36,24 @@ namespace TelldusCore { CallbackStruct *d; int deviceId, method; std::string strData; + TelldusCore::EventRef callbackExecuted; }; class TDDeviceChangeEventDispatcher : public Thread { public: - TDDeviceChangeEventDispatcher(CallbackStruct *data, int deviceId, int changeEvent, int changeType); + TDDeviceChangeEventDispatcher(CallbackStruct *data, int deviceId, int changeEvent, int changeType, TelldusCore::EventRef cbDone); virtual ~TDDeviceChangeEventDispatcher(); bool done() const; protected: virtual void run(); bool doneRunning; - public: + private: CallbackStruct *d; int deviceId, changeEvent, changeType; + TelldusCore::EventRef callbackExecuted; }; class TDRawDeviceEventDispatcher : public Thread { public: - TDRawDeviceEventDispatcher( CallbackStruct *data, const std::string &strData, int controllerId); + TDRawDeviceEventDispatcher( CallbackStruct *data, const std::string &strData, int controllerId, TelldusCore::EventRef cbDone); virtual ~TDRawDeviceEventDispatcher(); bool done() const; protected: @@ -60,10 +63,11 @@ namespace TelldusCore { CallbackStruct *d; int controllerId; std::string strData; + TelldusCore::EventRef callbackExecuted; }; class TDSensorEventDispatcher : public Thread { public: - TDSensorEventDispatcher( CallbackStruct *data, const std::string &protocol, const std::string &model, int id, int dataType, const std::string &value, int timestamp); + TDSensorEventDispatcher( CallbackStruct *data, const std::string &protocol, const std::string &model, int id, int dataType, const std::string &value, int timestamp, TelldusCore::EventRef cbDone); virtual ~TDSensorEventDispatcher(); bool done() const; protected: @@ -77,6 +81,7 @@ namespace TelldusCore { int dataType; std::string value; int timestamp; + TelldusCore::EventRef callbackExecuted; }; } diff --git a/telldus-core/client/CallbackMainDispatcher.cpp b/telldus-core/client/CallbackMainDispatcher.cpp new file mode 100644 index 00000000..fd02b1f0 --- /dev/null +++ b/telldus-core/client/CallbackMainDispatcher.cpp @@ -0,0 +1,159 @@ +/* + * CallbackMainDispatcher.cpp + * telldus-core + * + * Created by Stefan Persson on 2012-02-23. + * Copyright 2012 Telldus Technologies AB. All rights reserved. + * + */ + +#include "CallbackMainDispatcher.h" + +#include + +class CallbackMainDispatcher::PrivateData { +public: + TelldusCore::EventHandler eventHandler; + TelldusCore::EventRef stopEvent, generalCallbackEvent, janitor; + + std::list > deviceEventThreadList; + std::list > deviceChangeEventThreadList; + std::list > rawDeviceEventThreadList; + std::list > sensorEventThreadList; + + TelldusCore::Mutex mutex; +}; + +CallbackMainDispatcher::CallbackMainDispatcher() +:Thread() +{ + d = new PrivateData; + d->stopEvent = d->eventHandler.addEvent(); + d->generalCallbackEvent = d->eventHandler.addEvent(); + d->janitor = d->eventHandler.addEvent(); //Used for cleanups +} + +CallbackMainDispatcher::~CallbackMainDispatcher(void){ + d->stopEvent->signal(); + wait(); + { + TelldusCore::MutexLocker locker(&d->mutex); + } + delete d; +} + +TelldusCore::EventRef CallbackMainDispatcher::retrieveCallbackEvent(){ + return d->generalCallbackEvent; +} + +void CallbackMainDispatcher::run(){ + + while(!d->stopEvent->isSignaled()){ + if (!d->eventHandler.waitForAny()) { + continue; + } + + if(d->generalCallbackEvent->isSignaled()){ + TelldusCore::EventDataRef eventData = d->generalCallbackEvent->takeSignal(); + + DeviceEventCallbackData *decd = dynamic_cast(eventData.get()); + if(decd){ + std::tr1::shared_ptr ptr(new TelldusCore::TDDeviceEventDispatcher(decd->data, decd->deviceId, decd->deviceState, decd->deviceStateValue, d->janitor)); + TelldusCore::MutexLocker locker(&d->mutex); + d->deviceEventThreadList.push_back(ptr); + continue; + } + + DeviceChangeEventCallbackData *dcecd = dynamic_cast(eventData.get()); + if(dcecd){ + std::tr1::shared_ptr ptr(new TelldusCore::TDDeviceChangeEventDispatcher(dcecd->data, dcecd->deviceId, dcecd->eventDeviceChanges, dcecd->eventChangeType, d->janitor)); + TelldusCore::MutexLocker locker(&d->mutex); + d->deviceChangeEventThreadList.push_back(ptr); + continue; + } + + RawDeviceEventCallbackData *rdecd = dynamic_cast(eventData.get()); + if(rdecd){ + std::tr1::shared_ptr ptr(new TelldusCore::TDRawDeviceEventDispatcher(rdecd->data, rdecd->command, rdecd->controllerId, d->janitor)); + TelldusCore::MutexLocker locker(&d->mutex); + d->rawDeviceEventThreadList.push_back(ptr); + continue; + } + + SensorEventCallbackData *secd = dynamic_cast(eventData.get()); + if(secd){ + std::tr1::shared_ptr ptr(new TelldusCore::TDSensorEventDispatcher(secd->data, secd->protocol, secd->model, secd->id, secd->dataType, secd->value, secd->timestamp, d->janitor)); + TelldusCore::MutexLocker locker(&d->mutex); + d->sensorEventThreadList.push_back(ptr); + continue; + } + } + if (d->janitor->isSignaled()) { + //Clear all of them if there is more than one + while(d->janitor->isSignaled()) { + d->janitor->popSignal(); + } + this->cleanupCallbacks(); + } + } +} + +void CallbackMainDispatcher::cleanupCallbacks() { + bool again = false; + + //Device Event + do { + again = false; + TelldusCore::MutexLocker locker(&d->mutex); + std::list >::iterator it = d->deviceEventThreadList.begin(); + for (;it != d->deviceEventThreadList.end(); ++it) { + if ((*it)->done()) { + d->deviceEventThreadList.erase(it); + again = true; + break; + } + } + } while (again); + + //Device Change Event + do { + again = false; + TelldusCore::MutexLocker locker(&d->mutex); + std::list >::iterator it = d->deviceChangeEventThreadList.begin(); + for (;it != d->deviceChangeEventThreadList.end(); ++it) { + if ((*it)->done()) { + d->deviceChangeEventThreadList.erase(it); + again = true; + break; + } + } + } while (again); + + //Raw Device Event + do { + again = false; + TelldusCore::MutexLocker locker(&d->mutex); + std::list >::iterator it = d->rawDeviceEventThreadList.begin(); + for (;it != d->rawDeviceEventThreadList.end(); ++it) { + if ((*it)->done()) { + d->rawDeviceEventThreadList.erase(it); + again = true; + break; + } + } + } while (again); + + //Sensor Event + do { + again = false; + TelldusCore::MutexLocker locker(&d->mutex); + std::list >::iterator it = d->sensorEventThreadList.begin(); + for (;it != d->sensorEventThreadList.end(); ++it) { + if ((*it)->done()) { + d->sensorEventThreadList.erase(it); + again = true; + break; + } + } + } while (again); +} \ No newline at end of file diff --git a/telldus-core/client/CallbackMainDispatcher.h b/telldus-core/client/CallbackMainDispatcher.h new file mode 100644 index 00000000..39403233 --- /dev/null +++ b/telldus-core/client/CallbackMainDispatcher.h @@ -0,0 +1,69 @@ +/* + * CallbackMainDispatcher.h + * telldus-core + * + * Created by Stefan Persson on 2012-02-23. + * Copyright 2012 Telldus Technologies AB. All rights reserved. + * + */ + +#ifndef CALLBACKMAINDISPATCHER_H +#define CALLBACKMAINDISPATCHER_H + +#include "CallbackDispatcher.h" +#include "Thread.h" +#include "Event.h" +#include "EventHandler.h" + +class DeviceChangeEventCallbackData : public TelldusCore::EventDataBase { +public: + TelldusCore::CallbackStruct *data; + int deviceId; + int eventDeviceChanges; + int eventChangeType; +}; + +class DeviceEventCallbackData : public TelldusCore::EventDataBase { +public: + TelldusCore::CallbackStruct *data; + int deviceId; + int deviceState; + std::string deviceStateValue; +}; + +class RawDeviceEventCallbackData : public TelldusCore::EventDataBase { +public: + TelldusCore::CallbackStruct *data; + int controllerId; + std::string command; +}; + +class SensorEventCallbackData : public TelldusCore::EventDataBase { +public: + TelldusCore::CallbackStruct *data; + std::string protocol; + std::string model; + int id; + int dataType; + std::string value; + int timestamp; +}; + +class CallbackMainDispatcher : public TelldusCore::Thread +{ +public: + CallbackMainDispatcher(void); + ~CallbackMainDispatcher(void); + + TelldusCore::EventRef retrieveCallbackEvent(); + +protected: + void run(); + +private: + class PrivateData; + PrivateData *d; + void cleanupCallbacks(void); +}; + +#endif //CALLBACKMAINDISPATCHER_H diff --git a/telldus-core/client/Client.cpp b/telldus-core/client/Client.cpp index cf278757..65067e9e 100644 --- a/telldus-core/client/Client.cpp +++ b/telldus-core/client/Client.cpp @@ -1,17 +1,11 @@ #include "Client.h" #include "CallbackDispatcher.h" +#include "CallbackMainDispatcher.h" #include "Socket.h" #include "Strings.h" #include "Mutex.h" -#include "common.h" #include -#ifdef _WINDOWS -#include -#else -#include -#endif - using namespace TelldusCore; @@ -35,11 +29,8 @@ public: bool running, sensorCached; std::wstring sensorCache; TelldusCore::Mutex mutex; + CallbackMainDispatcher callbackMainDispatcher; - std::list > deviceEventThreadList; - std::list > deviceChangeEventThreadList; - std::list > rawDeviceEventThreadList; - std::list > sensorEventThreadList; }; Client *Client::instance = 0; @@ -51,6 +42,7 @@ Client::Client() d->lastCallbackId = 0; d->running = true; d->sensorCached = false; + d->callbackMainDispatcher.start(); start(); } @@ -80,32 +72,57 @@ Client *Client::getInstance() { void Client::callbackDeviceEvent(int deviceId, int deviceState, const std::wstring &deviceStateValue){ TelldusCore::MutexLocker locker(&d->mutex); for(DeviceEventList::iterator callback_it = d->deviceEventList.begin(); callback_it != d->deviceEventList.end(); ++callback_it) { - std::tr1::shared_ptr ptr(new TDDeviceEventDispatcher(*callback_it, deviceId, deviceState, TelldusCore::wideToString(deviceStateValue))); - d->deviceEventThreadList.push_back(ptr); + DeviceEventCallbackData *deviceEventCallbackData = new DeviceEventCallbackData(); + deviceEventCallbackData->data = *callback_it; + deviceEventCallbackData->deviceId = deviceId; + deviceEventCallbackData->deviceState = deviceState; + deviceEventCallbackData->deviceStateValue = TelldusCore::wideToString(deviceStateValue); + + d->callbackMainDispatcher.retrieveCallbackEvent()->signal(deviceEventCallbackData); } } void Client::callbackDeviceChangeEvent(int deviceId, int eventDeviceChanges, int eventChangeType){ + TelldusCore::MutexLocker locker(&d->mutex); for(DeviceChangeList::iterator callback_it = d->deviceChangeEventList.begin(); callback_it != d->deviceChangeEventList.end(); ++callback_it) { - std::tr1::shared_ptr ptr(new TDDeviceChangeEventDispatcher(*callback_it, deviceId, eventDeviceChanges, eventChangeType)); - d->deviceChangeEventThreadList.push_back(ptr); + DeviceChangeEventCallbackData *deviceChangeEventCallbackData = new DeviceChangeEventCallbackData(); + deviceChangeEventCallbackData->data = *callback_it; + deviceChangeEventCallbackData->deviceId = deviceId; + deviceChangeEventCallbackData->eventDeviceChanges = eventDeviceChanges; + deviceChangeEventCallbackData->eventChangeType = eventChangeType; + + d->callbackMainDispatcher.retrieveCallbackEvent()->signal(deviceChangeEventCallbackData); } } void Client::callbackRawEvent(std::wstring command, int controllerId) { + TelldusCore::MutexLocker locker(&d->mutex); for(RawDeviceEventList::iterator callback_it = d->rawDeviceEventList.begin(); callback_it != d->rawDeviceEventList.end(); ++callback_it) { - std::tr1::shared_ptr ptr(new TDRawDeviceEventDispatcher(*callback_it, TelldusCore::wideToString(command), controllerId)); - d->rawDeviceEventThreadList.push_back(ptr); + RawDeviceEventCallbackData *rawDeviceEventCallbackData = new RawDeviceEventCallbackData(); + rawDeviceEventCallbackData->data = *callback_it; + rawDeviceEventCallbackData->controllerId = controllerId; + rawDeviceEventCallbackData->command = TelldusCore::wideToString(command); + + d->callbackMainDispatcher.retrieveCallbackEvent()->signal(rawDeviceEventCallbackData); } } void Client::callbackSensorEvent(const std::wstring &protocol, const std::wstring &model, int id, int dataType, const std::wstring &value, int timestamp) { + TelldusCore::MutexLocker locker(&d->mutex); for(SensorEventList::iterator callback_it = d->sensorEventList.begin(); callback_it != d->sensorEventList.end(); ++callback_it) { - std::tr1::shared_ptr ptr(new TDSensorEventDispatcher(*callback_it, TelldusCore::wideToString(protocol), TelldusCore::wideToString(model), id, dataType, TelldusCore::wideToString(value), timestamp)); - d->sensorEventThreadList.push_back(ptr); + SensorEventCallbackData *sensorEventCallbackData = new SensorEventCallbackData(); + sensorEventCallbackData->data = *callback_it; + sensorEventCallbackData->protocol = TelldusCore::wideToString(protocol); + sensorEventCallbackData->model = TelldusCore::wideToString(model); + sensorEventCallbackData->id = id; + sensorEventCallbackData->dataType = dataType; + sensorEventCallbackData->value = TelldusCore::wideToString(value); + sensorEventCallbackData->timestamp = timestamp; + + d->callbackMainDispatcher.retrieveCallbackEvent()->signal(sensorEventCallbackData); } } @@ -186,7 +203,7 @@ void Client::run(){ } std::wstring clientMessage = d->eventSocket.read(1000); //testing 5 second timeout - + while(clientMessage != L""){ //a message arrived std::wstring type = Message::takeString(&clientMessage); @@ -220,72 +237,9 @@ void Client::run(){ clientMessage = L""; //cleanup, if message contained garbage/unhandled data } } - - //Clean up finished callbacks - this->cleanupCallbacks(); } } -void Client::cleanupCallbacks() { - bool again = false; - - //Device Event - do { - again = false; - TelldusCore::MutexLocker locker(&d->mutex); - std::list >::iterator it = d->deviceEventThreadList.begin(); - for (;it != d->deviceEventThreadList.end(); ++it) { - if ((*it)->done()) { - d->deviceEventThreadList.erase(it); - again = true; - break; - } - } - } while (again); - - //Device Change Event - do { - again = false; - TelldusCore::MutexLocker locker(&d->mutex); - std::list >::iterator it = d->deviceChangeEventThreadList.begin(); - for (;it != d->deviceChangeEventThreadList.end(); ++it) { - if ((*it)->done()) { - d->deviceChangeEventThreadList.erase(it); - again = true; - break; - } - } - } while (again); - - //Raw Device Event - do { - again = false; - TelldusCore::MutexLocker locker(&d->mutex); - std::list >::iterator it = d->rawDeviceEventThreadList.begin(); - for (;it != d->rawDeviceEventThreadList.end(); ++it) { - if ((*it)->done()) { - d->rawDeviceEventThreadList.erase(it); - again = true; - break; - } - } - } while (again); - - //Sensor Event - do { - again = false; - TelldusCore::MutexLocker locker(&d->mutex); - std::list >::iterator it = d->sensorEventThreadList.begin(); - for (;it != d->sensorEventThreadList.end(); ++it) { - if ((*it)->done()) { - d->sensorEventThreadList.erase(it); - again = true; - break; - } - } - } while (again); -} - std::wstring Client::sendToService(const Message &msg) { int tries = 0; diff --git a/telldus-core/client/Client.h b/telldus-core/client/Client.h index 4287f83c..8e7a128b 100644 --- a/telldus-core/client/Client.h +++ b/telldus-core/client/Client.h @@ -37,8 +37,7 @@ namespace TelldusCore { private: Client(); static std::wstring sendToService(const Message &msg); - void cleanupCallbacks(); - + class PrivateData; PrivateData *d; static Client *instance; diff --git a/telldus-core/common/Socket_win.cpp b/telldus-core/common/Socket_win.cpp index 5fe760e4..f71c84f7 100644 --- a/telldus-core/common/Socket_win.cpp +++ b/telldus-core/common/Socket_win.cpp @@ -95,7 +95,7 @@ std::wstring Socket::read(int timeout){ BOOL fSuccess = false; std::wstring returnString; bool moreData = true; - + while(moreData){ moreData = false; memset(&buf, 0, sizeof(buf)); @@ -117,7 +117,7 @@ std::wstring Socket::read(int timeout){ // Cancel, we still need to cleanup } fSuccess = GetOverlappedResult(d->hPipe, &oOverlap, &cbBytesRead, true); - + if (!fSuccess) { DWORD err = GetLastError();