Dispatching of events in own thread, to somewhat counter problems with long running events.
This commit is contained in:
parent
f34e1ad53f
commit
c049b97a64
8 changed files with 293 additions and 104 deletions
|
@ -1,17 +1,11 @@
|
|||
#include "Client.h"
|
||||
#include "CallbackDispatcher.h"
|
||||
#include "CallbackMainDispatcher.h"
|
||||
#include "Socket.h"
|
||||
#include "Strings.h"
|
||||
#include "Mutex.h"
|
||||
#include "common.h"
|
||||
|
||||
#include <list>
|
||||
#ifdef _WINDOWS
|
||||
#include <memory>
|
||||
#else
|
||||
#include <tr1/memory>
|
||||
#endif
|
||||
|
||||
|
||||
using namespace TelldusCore;
|
||||
|
||||
|
@ -35,11 +29,8 @@ public:
|
|||
bool running, sensorCached;
|
||||
std::wstring sensorCache;
|
||||
TelldusCore::Mutex mutex;
|
||||
CallbackMainDispatcher callbackMainDispatcher;
|
||||
|
||||
std::list<std::tr1::shared_ptr<TDDeviceEventDispatcher> > deviceEventThreadList;
|
||||
std::list<std::tr1::shared_ptr<TDDeviceChangeEventDispatcher> > deviceChangeEventThreadList;
|
||||
std::list<std::tr1::shared_ptr<TDRawDeviceEventDispatcher> > rawDeviceEventThreadList;
|
||||
std::list<std::tr1::shared_ptr<TDSensorEventDispatcher> > sensorEventThreadList;
|
||||
};
|
||||
|
||||
Client *Client::instance = 0;
|
||||
|
@ -51,6 +42,7 @@ Client::Client()
|
|||
d->lastCallbackId = 0;
|
||||
d->running = true;
|
||||
d->sensorCached = false;
|
||||
d->callbackMainDispatcher.start();
|
||||
start();
|
||||
}
|
||||
|
||||
|
@ -80,32 +72,57 @@ Client *Client::getInstance() {
|
|||
void Client::callbackDeviceEvent(int deviceId, int deviceState, const std::wstring &deviceStateValue){
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
for(DeviceEventList::iterator callback_it = d->deviceEventList.begin(); callback_it != d->deviceEventList.end(); ++callback_it) {
|
||||
std::tr1::shared_ptr<TDDeviceEventDispatcher> ptr(new TDDeviceEventDispatcher(*callback_it, deviceId, deviceState, TelldusCore::wideToString(deviceStateValue)));
|
||||
d->deviceEventThreadList.push_back(ptr);
|
||||
DeviceEventCallbackData *deviceEventCallbackData = new DeviceEventCallbackData();
|
||||
deviceEventCallbackData->data = *callback_it;
|
||||
deviceEventCallbackData->deviceId = deviceId;
|
||||
deviceEventCallbackData->deviceState = deviceState;
|
||||
deviceEventCallbackData->deviceStateValue = TelldusCore::wideToString(deviceStateValue);
|
||||
|
||||
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(deviceEventCallbackData);
|
||||
}
|
||||
}
|
||||
|
||||
void Client::callbackDeviceChangeEvent(int deviceId, int eventDeviceChanges, int eventChangeType){
|
||||
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
for(DeviceChangeList::iterator callback_it = d->deviceChangeEventList.begin(); callback_it != d->deviceChangeEventList.end(); ++callback_it) {
|
||||
std::tr1::shared_ptr<TDDeviceChangeEventDispatcher> ptr(new TDDeviceChangeEventDispatcher(*callback_it, deviceId, eventDeviceChanges, eventChangeType));
|
||||
d->deviceChangeEventThreadList.push_back(ptr);
|
||||
DeviceChangeEventCallbackData *deviceChangeEventCallbackData = new DeviceChangeEventCallbackData();
|
||||
deviceChangeEventCallbackData->data = *callback_it;
|
||||
deviceChangeEventCallbackData->deviceId = deviceId;
|
||||
deviceChangeEventCallbackData->eventDeviceChanges = eventDeviceChanges;
|
||||
deviceChangeEventCallbackData->eventChangeType = eventChangeType;
|
||||
|
||||
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(deviceChangeEventCallbackData);
|
||||
}
|
||||
}
|
||||
|
||||
void Client::callbackRawEvent(std::wstring command, int controllerId) {
|
||||
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
for(RawDeviceEventList::iterator callback_it = d->rawDeviceEventList.begin(); callback_it != d->rawDeviceEventList.end(); ++callback_it) {
|
||||
std::tr1::shared_ptr<TDRawDeviceEventDispatcher> ptr(new TDRawDeviceEventDispatcher(*callback_it, TelldusCore::wideToString(command), controllerId));
|
||||
d->rawDeviceEventThreadList.push_back(ptr);
|
||||
RawDeviceEventCallbackData *rawDeviceEventCallbackData = new RawDeviceEventCallbackData();
|
||||
rawDeviceEventCallbackData->data = *callback_it;
|
||||
rawDeviceEventCallbackData->controllerId = controllerId;
|
||||
rawDeviceEventCallbackData->command = TelldusCore::wideToString(command);
|
||||
|
||||
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(rawDeviceEventCallbackData);
|
||||
}
|
||||
}
|
||||
|
||||
void Client::callbackSensorEvent(const std::wstring &protocol, const std::wstring &model, int id, int dataType, const std::wstring &value, int timestamp) {
|
||||
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
for(SensorEventList::iterator callback_it = d->sensorEventList.begin(); callback_it != d->sensorEventList.end(); ++callback_it) {
|
||||
std::tr1::shared_ptr<TDSensorEventDispatcher> ptr(new TDSensorEventDispatcher(*callback_it, TelldusCore::wideToString(protocol), TelldusCore::wideToString(model), id, dataType, TelldusCore::wideToString(value), timestamp));
|
||||
d->sensorEventThreadList.push_back(ptr);
|
||||
SensorEventCallbackData *sensorEventCallbackData = new SensorEventCallbackData();
|
||||
sensorEventCallbackData->data = *callback_it;
|
||||
sensorEventCallbackData->protocol = TelldusCore::wideToString(protocol);
|
||||
sensorEventCallbackData->model = TelldusCore::wideToString(model);
|
||||
sensorEventCallbackData->id = id;
|
||||
sensorEventCallbackData->dataType = dataType;
|
||||
sensorEventCallbackData->value = TelldusCore::wideToString(value);
|
||||
sensorEventCallbackData->timestamp = timestamp;
|
||||
|
||||
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(sensorEventCallbackData);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -186,7 +203,7 @@ void Client::run(){
|
|||
}
|
||||
|
||||
std::wstring clientMessage = d->eventSocket.read(1000); //testing 5 second timeout
|
||||
|
||||
|
||||
while(clientMessage != L""){
|
||||
//a message arrived
|
||||
std::wstring type = Message::takeString(&clientMessage);
|
||||
|
@ -220,72 +237,9 @@ void Client::run(){
|
|||
clientMessage = L""; //cleanup, if message contained garbage/unhandled data
|
||||
}
|
||||
}
|
||||
|
||||
//Clean up finished callbacks
|
||||
this->cleanupCallbacks();
|
||||
}
|
||||
}
|
||||
|
||||
void Client::cleanupCallbacks() {
|
||||
bool again = false;
|
||||
|
||||
//Device Event
|
||||
do {
|
||||
again = false;
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
std::list<std::tr1::shared_ptr<TDDeviceEventDispatcher> >::iterator it = d->deviceEventThreadList.begin();
|
||||
for (;it != d->deviceEventThreadList.end(); ++it) {
|
||||
if ((*it)->done()) {
|
||||
d->deviceEventThreadList.erase(it);
|
||||
again = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (again);
|
||||
|
||||
//Device Change Event
|
||||
do {
|
||||
again = false;
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
std::list<std::tr1::shared_ptr<TDDeviceChangeEventDispatcher> >::iterator it = d->deviceChangeEventThreadList.begin();
|
||||
for (;it != d->deviceChangeEventThreadList.end(); ++it) {
|
||||
if ((*it)->done()) {
|
||||
d->deviceChangeEventThreadList.erase(it);
|
||||
again = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (again);
|
||||
|
||||
//Raw Device Event
|
||||
do {
|
||||
again = false;
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
std::list<std::tr1::shared_ptr<TDRawDeviceEventDispatcher> >::iterator it = d->rawDeviceEventThreadList.begin();
|
||||
for (;it != d->rawDeviceEventThreadList.end(); ++it) {
|
||||
if ((*it)->done()) {
|
||||
d->rawDeviceEventThreadList.erase(it);
|
||||
again = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (again);
|
||||
|
||||
//Sensor Event
|
||||
do {
|
||||
again = false;
|
||||
TelldusCore::MutexLocker locker(&d->mutex);
|
||||
std::list<std::tr1::shared_ptr<TDSensorEventDispatcher> >::iterator it = d->sensorEventThreadList.begin();
|
||||
for (;it != d->sensorEventThreadList.end(); ++it) {
|
||||
if ((*it)->done()) {
|
||||
d->sensorEventThreadList.erase(it);
|
||||
again = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while (again);
|
||||
}
|
||||
|
||||
std::wstring Client::sendToService(const Message &msg) {
|
||||
|
||||
int tries = 0;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue