Rewrite our callback dispatcher. Instead of dynamically create new

threads for each callback they are now precreated when they are
registered. Each callback will now be called using the same thread each
time.
This commit is contained in:
Micke Prag 2014-01-08 11:45:19 +01:00
parent fbaa8ccbbd
commit f7336b34f9
5 changed files with 197 additions and 150 deletions

View file

@ -8,64 +8,136 @@
*/
#include "client/CallbackDispatcher.h"
#include "common/Event.h"
#include "common/EventHandler.h"
namespace TelldusCore {
TDEventDispatcher::TDEventDispatcher(EventDataRef cbd, CallbackStruct *cb, EventRef cbDone)
:Thread(), doneRunning(false), callbackData(cbd), callback(cb), callbackExecuted(cbDone) {
this->startAndLock(&callback->mutex);
class TDEventDispatcher::PrivateData {
public:
EventHandler eventHandler;
EventRef stopEvent, callbackEvent;
int id;
void *func, *context;
};
TDEventDispatcher::TDEventDispatcher(int id, void *func, void *context) //EventDataRef cbd, CallbackStruct *cb, EventRef cbDone)
:Thread() {
d = new PrivateData;
d->stopEvent = d->eventHandler.addEvent();
d->callbackEvent = d->eventHandler.addEvent();
d->id = id;
d->func = func;
d->context = context;
this->start();
}
TDEventDispatcher::~TDEventDispatcher() {
printf("Stopping dispatcher\n");
d->stopEvent->signal();
this->wait();
delete d;
}
bool TDEventDispatcher::done() const {
return doneRunning;
int TDEventDispatcher::id() const {
return d->id;
}
void TDEventDispatcher::queue(EventDataRef eventData) {
d->callbackEvent->signal(eventData);
}
void TDEventDispatcher::run() {
this->fireEvent();
doneRunning = true;
callbackExecuted->signal();
}
void TDEventDispatcher::fireEvent() {
if (callback->type == CallbackStruct::DeviceEvent) {
DeviceEventCallbackData *data = dynamic_cast<DeviceEventCallbackData *>(callbackData.get());
if (!data) {
return;
while (true) {
d->eventHandler.waitForAny();
if (d->stopEvent->isSignaled()) {
break;
}
((TDDeviceEvent)callback->event)(data->deviceId, data->deviceState, data->deviceStateValue.c_str(), callback->id, callback->context);
} else if (callback->type == CallbackStruct::DeviceChangeEvent) {
DeviceChangeEventCallbackData *data = dynamic_cast<DeviceChangeEventCallbackData *>(callbackData.get());
if (!data) {
return;
if (d->callbackEvent->isSignaled()) {
TelldusCore::EventDataRef eventData = d->callbackEvent->takeSignal();
this->execute(eventData);
}
((TDDeviceChangeEvent)callback->event)(data->deviceId, data->changeEvent, data->changeType, callback->id, callback->context);
} else if (callback->type == CallbackStruct::RawDeviceEvent) {
RawDeviceEventCallbackData *data = dynamic_cast<RawDeviceEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDRawDeviceEvent)callback->event)(data->data.c_str(), data->controllerId, callback->id, callback->context);
} else if (callback->type == CallbackStruct::SensorEvent) {
SensorEventCallbackData *data = dynamic_cast<SensorEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDSensorEvent)callback->event)(data->protocol.c_str(), data->model.c_str(), data->id, data->dataType, data->value.c_str(), data->timestamp, callback->id, callback->context);
} else if (callback->type == CallbackStruct::ControllerEvent) {
ControllerEventCallbackData *data = dynamic_cast<ControllerEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDControllerEvent)callback->event)(data->controllerId, data->changeEvent, data->changeType, data->newValue.c_str(), callback->id, callback->context);
}
}
TDDeviceEventDispatcher::TDDeviceEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDDeviceEventDispatcher::execute(EventDataRef eventData) {
DeviceEventCallbackData *data = dynamic_cast<DeviceEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDDeviceEvent)d->func)(data->deviceId, data->deviceState, data->deviceStateValue.c_str(), d->id, d->context);
}
CallbackStruct::CallbackType TDDeviceEventDispatcher::type() {
return CallbackStruct::DeviceEvent;
}
TDDeviceChangeEventDispatcher::TDDeviceChangeEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDDeviceChangeEventDispatcher::execute(EventDataRef eventData) {
DeviceChangeEventCallbackData *data = dynamic_cast<DeviceChangeEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDDeviceChangeEvent)d->func)(data->deviceId, data->changeEvent, data->changeType, d->id, d->context);
}
CallbackStruct::CallbackType TDDeviceChangeEventDispatcher::type() {
return CallbackStruct::DeviceChangeEvent;
}
TDRawDeviceEventDispatcher::TDRawDeviceEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDRawDeviceEventDispatcher::execute(EventDataRef eventData) {
RawDeviceEventCallbackData *data = dynamic_cast<RawDeviceEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDRawDeviceEvent)d->func)(data->data.c_str(), data->controllerId, d->id, d->context);
}
CallbackStruct::CallbackType TDRawDeviceEventDispatcher::type() {
return CallbackStruct::RawDeviceEvent;
}
TDSensorEventDispatcher::TDSensorEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDSensorEventDispatcher::execute(EventDataRef eventData) {
SensorEventCallbackData *data = dynamic_cast<SensorEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDSensorEvent)d->func)(data->protocol.c_str(), data->model.c_str(), data->id, data->dataType, data->value.c_str(), data->timestamp, d->id, d->context);
}
CallbackStruct::CallbackType TDSensorEventDispatcher::type() {
return CallbackStruct::SensorEvent;
}
TDControllerEventDispatcher::TDControllerEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDControllerEventDispatcher::execute(EventDataRef eventData) {
ControllerEventCallbackData *data = dynamic_cast<ControllerEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDControllerEvent)d->func)(data->controllerId, data->changeEvent, data->changeType, data->newValue.c_str(), d->id, d->context);
}
CallbackStruct::CallbackType TDControllerEventDispatcher::type() {
return CallbackStruct::ControllerEvent;
}
} // namespace TelldusCore

View file

@ -19,12 +19,6 @@
namespace TelldusCore {
/*template <typename T> struct CallbackStruct {
T event;
int id;
void *context;
TelldusCore::Mutex mutex;
};*/
struct CallbackStruct {
enum CallbackType { DeviceEvent, DeviceChangeEvent, RawDeviceEvent, SensorEvent, ControllerEvent };
CallbackType type;
@ -33,9 +27,6 @@ namespace TelldusCore {
void *context;
TelldusCore::Mutex mutex;
};
/*typedef CallbackStruct<TDDeviceChangeEvent> DeviceChangeEvent;
typedef CallbackStruct<TDRawDeviceEvent> RawDeviceEvent;
typedef CallbackStruct<TDSensorEvent> SensorEvent;*/
class CallbackData: public EventDataBase {
public:
@ -86,17 +77,52 @@ namespace TelldusCore {
class TDEventDispatcher : public Thread {
public:
TDEventDispatcher(EventDataRef callbackData, CallbackStruct *callback, TelldusCore::EventRef cbDone);
TDEventDispatcher(int id, void *func, void *context);
virtual ~TDEventDispatcher();
bool done() const;
int id() const;
void queue(EventDataRef eventData);
virtual CallbackStruct::CallbackType type() = 0;
protected:
class PrivateData;
PrivateData *d;
virtual void run();
bool doneRunning;
private:
void fireEvent();
EventDataRef callbackData;
CallbackStruct *callback;
EventRef callbackExecuted;
virtual void execute(EventDataRef eventData) = 0;
};
class TDDeviceEventDispatcher : public TDEventDispatcher {
public:
TDDeviceEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDDeviceChangeEventDispatcher : public TDEventDispatcher {
public:
TDDeviceChangeEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDRawDeviceEventDispatcher : public TDEventDispatcher {
public:
TDRawDeviceEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDSensorEventDispatcher : public TDEventDispatcher {
public:
TDSensorEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDControllerEventDispatcher : public TDEventDispatcher {
public:
TDControllerEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
}

View file

@ -13,52 +13,60 @@
namespace TelldusCore {
typedef std::list<CallbackStruct *> CallbackList;
typedef std::list<TelldusCore::TDEventDispatcher *> CallbackList;
class CallbackMainDispatcher::PrivateData {
public:
EventHandler eventHandler;
EventRef stopEvent, generalCallbackEvent, janitor;
Mutex mutex;
std::list<std::tr1::shared_ptr<TelldusCore::TDEventDispatcher> > eventThreadList;
CallbackList callbackList;
int lastCallbackId;
};
CallbackMainDispatcher::CallbackMainDispatcher()
:Thread() {
CallbackMainDispatcher::CallbackMainDispatcher() {
d = new PrivateData;
d->stopEvent = d->eventHandler.addEvent();
d->generalCallbackEvent = d->eventHandler.addEvent();
d->janitor = d->eventHandler.addEvent(); // Used for cleanups
d->lastCallbackId = 0;
}
CallbackMainDispatcher::~CallbackMainDispatcher(void) {
d->stopEvent->signal();
wait();
{
MutexLocker locker(&d->mutex);
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator it = d->callbackList.begin(); it != d->callbackList.end(); ++it) {
delete (*it);
}
}
delete d;
}
EventRef CallbackMainDispatcher::retrieveCallbackEvent() {
return d->generalCallbackEvent;
void CallbackMainDispatcher::execute(CallbackStruct::CallbackType type, EventData *eventData) {
{
TelldusCore::MutexLocker locker(&d->mutex);
EventDataRef eventDataRef(eventData);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->type() != type ) {
continue;
}
(*callback_it)->queue(eventDataRef);
}
}
}
int CallbackMainDispatcher::registerCallback(CallbackStruct::CallbackType type, void *eventFunction, void *context) {
TelldusCore::MutexLocker locker(&d->mutex);
int id = ++d->lastCallbackId;
CallbackStruct *callback = new CallbackStruct;
callback->type = type;
callback->event = eventFunction;
callback->id = id;
callback->context = context;
TelldusCore::TDEventDispatcher *callback;
if (type == CallbackStruct::DeviceEvent) {
callback = new TelldusCore::TDDeviceEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::DeviceChangeEvent) {
callback = new TelldusCore::TDDeviceChangeEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::RawDeviceEvent) {
callback = new TelldusCore::TDRawDeviceEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::SensorEvent) {
callback = new TelldusCore::TDSensorEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::ControllerEvent) {
callback = new TelldusCore::TDControllerEventDispatcher(id, eventFunction, context);
} else {
return -1;
}
d->callbackList.push_back(callback);
return id;
}
@ -68,7 +76,7 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
{
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->id != callbackId ) {
if ( (*callback_it)->id() != callbackId ) {
continue;
}
newEventList.splice(newEventList.begin(), d->callbackList, callback_it);
@ -77,9 +85,6 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
}
if (newEventList.size()) {
CallbackList::iterator it = newEventList.begin();
{ // Lock and unlock to make sure no one else uses the object
TelldusCore::MutexLocker locker( &(*it)->mutex );
}
delete (*it);
newEventList.erase(it);
return TELLSTICK_SUCCESS;
@ -87,53 +92,4 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
return TELLSTICK_ERROR_NOT_FOUND;
}
void CallbackMainDispatcher::run() {
while(!d->stopEvent->isSignaled()) {
if (!d->eventHandler.waitForAny()) {
continue;
}
if(d->generalCallbackEvent->isSignaled()) {
EventDataRef eventData = d->generalCallbackEvent->takeSignal();
CallbackData *cbd = dynamic_cast<CallbackData *>(eventData.get());
if (!cbd) {
continue;
}
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->type == cbd->type ) {
std::tr1::shared_ptr<TelldusCore::TDEventDispatcher> ptr(new TelldusCore::TDEventDispatcher(eventData, *callback_it, d->janitor));
d->eventThreadList.push_back(ptr);
}
}
}
if (d->janitor->isSignaled()) {
// Clear all of them if there is more than one
while(d->janitor->isSignaled()) {
d->janitor->popSignal();
}
this->cleanupCallbacks();
}
}
}
void CallbackMainDispatcher::cleanupCallbacks() {
bool again = false;
// Device Event
do {
again = false;
MutexLocker locker(&d->mutex);
std::list<std::tr1::shared_ptr<TDEventDispatcher> >::iterator it = d->eventThreadList.begin();
for (; it != d->eventThreadList.end(); ++it) {
if ((*it)->done()) {
d->eventThreadList.erase(it);
again = true;
break;
}
}
} while (again);
}
} // namespace TelldusCore

View file

@ -12,29 +12,23 @@
#include "client/CallbackDispatcher.h"
#include "common/Thread.h"
#include "common/Event.h"
#include "common/EventHandler.h"
namespace TelldusCore {
class CallbackMainDispatcher : public Thread
class CallbackMainDispatcher
{
public:
CallbackMainDispatcher(void);
~CallbackMainDispatcher(void);
EventRef retrieveCallbackEvent();
void execute(TelldusCore::CallbackStruct::CallbackType type, EventData *eventData);
int registerCallback( TelldusCore::CallbackStruct::CallbackType type, void *eventFunction, void *context );
int unregisterCallback( int callbackId );
protected:
void run();
private:
class PrivateData;
PrivateData *d;
void cleanupCallbacks(void);
};
}

View file

@ -33,7 +33,6 @@ Client::Client()
d->running = true;
d->sensorCached = false;
d->controllerCached = false;
d->callbackMainDispatcher.start();
start();
}
@ -105,20 +104,20 @@ void Client::run() {
data->deviceId = Message::takeInt(&clientMessage);
data->changeEvent = Message::takeInt(&clientMessage);
data->changeType = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::DeviceChangeEvent, data);
} else if(type == L"TDDeviceEvent") {
DeviceEventCallbackData *data = new DeviceEventCallbackData();
data->deviceId = Message::takeInt(&clientMessage);
data->deviceState = Message::takeInt(&clientMessage);
data->deviceStateValue = TelldusCore::wideToString(Message::takeString(&clientMessage));
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::DeviceEvent, data);
} else if(type == L"TDRawDeviceEvent") {
RawDeviceEventCallbackData *data = new RawDeviceEventCallbackData();
data->data = TelldusCore::wideToString(Message::takeString(&clientMessage));
data->controllerId = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::RawDeviceEvent, data);
} else if(type == L"TDSensorEvent") {
SensorEventCallbackData *data = new SensorEventCallbackData();
@ -128,7 +127,7 @@ void Client::run() {
data->dataType = Message::takeInt(&clientMessage);
data->value = TelldusCore::wideToString(Message::takeString(&clientMessage));
data->timestamp = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::SensorEvent, data);
} else if(type == L"TDControllerEvent") {
ControllerEventCallbackData *data = new ControllerEventCallbackData();
@ -136,7 +135,7 @@ void Client::run() {
data->changeEvent = Message::takeInt(&clientMessage);
data->changeType = Message::takeInt(&clientMessage);
data->newValue = TelldusCore::wideToString(Message::takeString(&clientMessage));
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::ControllerEvent, data);
} else {
clientMessage = L""; // cleanup, if message contained garbage/unhandled data