Merge branch 'callbackdispatcher-rewrite'

This commit is contained in:
Micke Prag 2014-02-04 11:10:03 +01:00
commit 1ea1cc5add
7 changed files with 200 additions and 152 deletions

View file

@ -8,64 +8,132 @@
*/
#include "client/CallbackDispatcher.h"
#include "common/Event.h"
#include "common/EventHandler.h"
namespace TelldusCore {
TDEventDispatcher::TDEventDispatcher(EventDataRef cbd, CallbackStruct *cb, EventRef cbDone)
:Thread(), doneRunning(false), callbackData(cbd), callback(cb), callbackExecuted(cbDone) {
this->startAndLock(&callback->mutex);
class TDEventDispatcher::PrivateData {
public:
EventHandler eventHandler;
EventRef stopEvent, callbackEvent;
int id;
void *func, *context;
};
TDEventDispatcher::TDEventDispatcher(int id, void *func, void *context)
:Thread() {
d = new PrivateData;
d->stopEvent = d->eventHandler.addEvent();
d->callbackEvent = d->eventHandler.addEvent();
d->id = id;
d->func = func;
d->context = context;
this->start();
}
TDEventDispatcher::~TDEventDispatcher() {
d->stopEvent->signal();
this->wait();
delete d;
}
bool TDEventDispatcher::done() const {
return doneRunning;
int TDEventDispatcher::id() const {
return d->id;
}
void TDEventDispatcher::queue(EventDataRef eventData) {
d->callbackEvent->signal(eventData);
}
void TDEventDispatcher::run() {
this->fireEvent();
doneRunning = true;
callbackExecuted->signal();
}
void TDEventDispatcher::fireEvent() {
if (callback->type == CallbackStruct::DeviceEvent) {
DeviceEventCallbackData *data = dynamic_cast<DeviceEventCallbackData *>(callbackData.get());
if (!data) {
return;
while (!d->stopEvent->isSignaled()) {
d->eventHandler.waitForAny();
if (d->callbackEvent->isSignaled()) {
TelldusCore::EventDataRef eventData = d->callbackEvent->takeSignal();
this->execute(eventData);
}
((TDDeviceEvent)callback->event)(data->deviceId, data->deviceState, data->deviceStateValue.c_str(), callback->id, callback->context);
} else if (callback->type == CallbackStruct::DeviceChangeEvent) {
DeviceChangeEventCallbackData *data = dynamic_cast<DeviceChangeEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDDeviceChangeEvent)callback->event)(data->deviceId, data->changeEvent, data->changeType, callback->id, callback->context);
} else if (callback->type == CallbackStruct::RawDeviceEvent) {
RawDeviceEventCallbackData *data = dynamic_cast<RawDeviceEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDRawDeviceEvent)callback->event)(data->data.c_str(), data->controllerId, callback->id, callback->context);
} else if (callback->type == CallbackStruct::SensorEvent) {
SensorEventCallbackData *data = dynamic_cast<SensorEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDSensorEvent)callback->event)(data->protocol.c_str(), data->model.c_str(), data->id, data->dataType, data->value.c_str(), data->timestamp, callback->id, callback->context);
} else if (callback->type == CallbackStruct::ControllerEvent) {
ControllerEventCallbackData *data = dynamic_cast<ControllerEventCallbackData *>(callbackData.get());
if (!data) {
return;
}
((TDControllerEvent)callback->event)(data->controllerId, data->changeEvent, data->changeType, data->newValue.c_str(), callback->id, callback->context);
}
}
TDDeviceEventDispatcher::TDDeviceEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDDeviceEventDispatcher::execute(EventDataRef eventData) {
DeviceEventCallbackData *data = dynamic_cast<DeviceEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDDeviceEvent)d->func)(data->deviceId, data->deviceState, data->deviceStateValue.c_str(), d->id, d->context);
}
CallbackStruct::CallbackType TDDeviceEventDispatcher::type() {
return CallbackStruct::DeviceEvent;
}
TDDeviceChangeEventDispatcher::TDDeviceChangeEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDDeviceChangeEventDispatcher::execute(EventDataRef eventData) {
DeviceChangeEventCallbackData *data = dynamic_cast<DeviceChangeEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDDeviceChangeEvent)d->func)(data->deviceId, data->changeEvent, data->changeType, d->id, d->context);
}
CallbackStruct::CallbackType TDDeviceChangeEventDispatcher::type() {
return CallbackStruct::DeviceChangeEvent;
}
TDRawDeviceEventDispatcher::TDRawDeviceEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDRawDeviceEventDispatcher::execute(EventDataRef eventData) {
RawDeviceEventCallbackData *data = dynamic_cast<RawDeviceEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDRawDeviceEvent)d->func)(data->data.c_str(), data->controllerId, d->id, d->context);
}
CallbackStruct::CallbackType TDRawDeviceEventDispatcher::type() {
return CallbackStruct::RawDeviceEvent;
}
TDSensorEventDispatcher::TDSensorEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDSensorEventDispatcher::execute(EventDataRef eventData) {
SensorEventCallbackData *data = dynamic_cast<SensorEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDSensorEvent)d->func)(data->protocol.c_str(), data->model.c_str(), data->id, data->dataType, data->value.c_str(), data->timestamp, d->id, d->context);
}
CallbackStruct::CallbackType TDSensorEventDispatcher::type() {
return CallbackStruct::SensorEvent;
}
TDControllerEventDispatcher::TDControllerEventDispatcher(int id, void *func, void *context)
:TDEventDispatcher(id, func, context)
{}
void TDControllerEventDispatcher::execute(EventDataRef eventData) {
ControllerEventCallbackData *data = dynamic_cast<ControllerEventCallbackData *>(eventData.get());
if (!data) {
return;
}
((TDControllerEvent)d->func)(data->controllerId, data->changeEvent, data->changeType, data->newValue.c_str(), d->id, d->context);
}
CallbackStruct::CallbackType TDControllerEventDispatcher::type() {
return CallbackStruct::ControllerEvent;
}
} // namespace TelldusCore

View file

@ -19,12 +19,6 @@
namespace TelldusCore {
/*template <typename T> struct CallbackStruct {
T event;
int id;
void *context;
TelldusCore::Mutex mutex;
};*/
struct CallbackStruct {
enum CallbackType { DeviceEvent, DeviceChangeEvent, RawDeviceEvent, SensorEvent, ControllerEvent };
CallbackType type;
@ -33,9 +27,6 @@ namespace TelldusCore {
void *context;
TelldusCore::Mutex mutex;
};
/*typedef CallbackStruct<TDDeviceChangeEvent> DeviceChangeEvent;
typedef CallbackStruct<TDRawDeviceEvent> RawDeviceEvent;
typedef CallbackStruct<TDSensorEvent> SensorEvent;*/
class CallbackData: public EventDataBase {
public:
@ -86,17 +77,52 @@ namespace TelldusCore {
class TDEventDispatcher : public Thread {
public:
TDEventDispatcher(EventDataRef callbackData, CallbackStruct *callback, TelldusCore::EventRef cbDone);
TDEventDispatcher(int id, void *func, void *context);
virtual ~TDEventDispatcher();
bool done() const;
int id() const;
void queue(EventDataRef eventData);
virtual CallbackStruct::CallbackType type() = 0;
protected:
class PrivateData;
PrivateData *d;
virtual void run();
bool doneRunning;
private:
void fireEvent();
EventDataRef callbackData;
CallbackStruct *callback;
EventRef callbackExecuted;
virtual void execute(EventDataRef eventData) = 0;
};
class TDDeviceEventDispatcher : public TDEventDispatcher {
public:
TDDeviceEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDDeviceChangeEventDispatcher : public TDEventDispatcher {
public:
TDDeviceChangeEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDRawDeviceEventDispatcher : public TDEventDispatcher {
public:
TDRawDeviceEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDSensorEventDispatcher : public TDEventDispatcher {
public:
TDSensorEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
class TDControllerEventDispatcher : public TDEventDispatcher {
public:
TDControllerEventDispatcher(int id, void *func, void *context);
virtual CallbackStruct::CallbackType type();
protected:
virtual void execute(EventDataRef eventData);
};
}

View file

@ -13,52 +13,60 @@
namespace TelldusCore {
typedef std::list<CallbackStruct *> CallbackList;
typedef std::list<TelldusCore::TDEventDispatcher *> CallbackList;
class CallbackMainDispatcher::PrivateData {
public:
EventHandler eventHandler;
EventRef stopEvent, generalCallbackEvent, janitor;
Mutex mutex;
std::list<std::tr1::shared_ptr<TelldusCore::TDEventDispatcher> > eventThreadList;
CallbackList callbackList;
int lastCallbackId;
};
CallbackMainDispatcher::CallbackMainDispatcher()
:Thread() {
CallbackMainDispatcher::CallbackMainDispatcher() {
d = new PrivateData;
d->stopEvent = d->eventHandler.addEvent();
d->generalCallbackEvent = d->eventHandler.addEvent();
d->janitor = d->eventHandler.addEvent(); // Used for cleanups
d->lastCallbackId = 0;
}
CallbackMainDispatcher::~CallbackMainDispatcher(void) {
d->stopEvent->signal();
wait();
{
MutexLocker locker(&d->mutex);
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator it = d->callbackList.begin(); it != d->callbackList.end(); ++it) {
delete (*it);
}
}
delete d;
}
EventRef CallbackMainDispatcher::retrieveCallbackEvent() {
return d->generalCallbackEvent;
void CallbackMainDispatcher::execute(CallbackStruct::CallbackType type, EventData *eventData) {
{
TelldusCore::MutexLocker locker(&d->mutex);
EventDataRef eventDataRef(eventData);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->type() != type ) {
continue;
}
(*callback_it)->queue(eventDataRef);
}
}
}
int CallbackMainDispatcher::registerCallback(CallbackStruct::CallbackType type, void *eventFunction, void *context) {
TelldusCore::MutexLocker locker(&d->mutex);
int id = ++d->lastCallbackId;
CallbackStruct *callback = new CallbackStruct;
callback->type = type;
callback->event = eventFunction;
callback->id = id;
callback->context = context;
TelldusCore::TDEventDispatcher *callback;
if (type == CallbackStruct::DeviceEvent) {
callback = new TelldusCore::TDDeviceEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::DeviceChangeEvent) {
callback = new TelldusCore::TDDeviceChangeEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::RawDeviceEvent) {
callback = new TelldusCore::TDRawDeviceEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::SensorEvent) {
callback = new TelldusCore::TDSensorEventDispatcher(id, eventFunction, context);
} else if (type == CallbackStruct::ControllerEvent) {
callback = new TelldusCore::TDControllerEventDispatcher(id, eventFunction, context);
} else {
return -1;
}
d->callbackList.push_back(callback);
return id;
}
@ -68,7 +76,7 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
{
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->id != callbackId ) {
if ( (*callback_it)->id() != callbackId ) {
continue;
}
newEventList.splice(newEventList.begin(), d->callbackList, callback_it);
@ -77,9 +85,6 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
}
if (newEventList.size()) {
CallbackList::iterator it = newEventList.begin();
{ // Lock and unlock to make sure no one else uses the object
TelldusCore::MutexLocker locker( &(*it)->mutex );
}
delete (*it);
newEventList.erase(it);
return TELLSTICK_SUCCESS;
@ -87,53 +92,4 @@ int CallbackMainDispatcher::unregisterCallback(int callbackId) {
return TELLSTICK_ERROR_NOT_FOUND;
}
void CallbackMainDispatcher::run() {
while(!d->stopEvent->isSignaled()) {
if (!d->eventHandler.waitForAny()) {
continue;
}
if(d->generalCallbackEvent->isSignaled()) {
EventDataRef eventData = d->generalCallbackEvent->takeSignal();
CallbackData *cbd = dynamic_cast<CallbackData *>(eventData.get());
if (!cbd) {
continue;
}
TelldusCore::MutexLocker locker(&d->mutex);
for(CallbackList::iterator callback_it = d->callbackList.begin(); callback_it != d->callbackList.end(); ++callback_it) {
if ( (*callback_it)->type == cbd->type ) {
std::tr1::shared_ptr<TelldusCore::TDEventDispatcher> ptr(new TelldusCore::TDEventDispatcher(eventData, *callback_it, d->janitor));
d->eventThreadList.push_back(ptr);
}
}
}
if (d->janitor->isSignaled()) {
// Clear all of them if there is more than one
while(d->janitor->isSignaled()) {
d->janitor->popSignal();
}
this->cleanupCallbacks();
}
}
}
void CallbackMainDispatcher::cleanupCallbacks() {
bool again = false;
// Device Event
do {
again = false;
MutexLocker locker(&d->mutex);
std::list<std::tr1::shared_ptr<TDEventDispatcher> >::iterator it = d->eventThreadList.begin();
for (; it != d->eventThreadList.end(); ++it) {
if ((*it)->done()) {
d->eventThreadList.erase(it);
again = true;
break;
}
}
} while (again);
}
} // namespace TelldusCore

View file

@ -12,29 +12,23 @@
#include "client/CallbackDispatcher.h"
#include "common/Thread.h"
#include "common/Event.h"
#include "common/EventHandler.h"
namespace TelldusCore {
class CallbackMainDispatcher : public Thread
class CallbackMainDispatcher
{
public:
CallbackMainDispatcher(void);
~CallbackMainDispatcher(void);
EventRef retrieveCallbackEvent();
void execute(TelldusCore::CallbackStruct::CallbackType type, EventData *eventData);
int registerCallback( TelldusCore::CallbackStruct::CallbackType type, void *eventFunction, void *context );
int unregisterCallback( int callbackId );
protected:
void run();
private:
class PrivateData;
PrivateData *d;
void cleanupCallbacks(void);
};
}

View file

@ -33,7 +33,6 @@ Client::Client()
d->running = true;
d->sensorCached = false;
d->controllerCached = false;
d->callbackMainDispatcher.start();
start();
}
@ -105,20 +104,20 @@ void Client::run() {
data->deviceId = Message::takeInt(&clientMessage);
data->changeEvent = Message::takeInt(&clientMessage);
data->changeType = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::DeviceChangeEvent, data);
} else if(type == L"TDDeviceEvent") {
DeviceEventCallbackData *data = new DeviceEventCallbackData();
data->deviceId = Message::takeInt(&clientMessage);
data->deviceState = Message::takeInt(&clientMessage);
data->deviceStateValue = TelldusCore::wideToString(Message::takeString(&clientMessage));
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::DeviceEvent, data);
} else if(type == L"TDRawDeviceEvent") {
RawDeviceEventCallbackData *data = new RawDeviceEventCallbackData();
data->data = TelldusCore::wideToString(Message::takeString(&clientMessage));
data->controllerId = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::RawDeviceEvent, data);
} else if(type == L"TDSensorEvent") {
SensorEventCallbackData *data = new SensorEventCallbackData();
@ -128,7 +127,7 @@ void Client::run() {
data->dataType = Message::takeInt(&clientMessage);
data->value = TelldusCore::wideToString(Message::takeString(&clientMessage));
data->timestamp = Message::takeInt(&clientMessage);
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::SensorEvent, data);
} else if(type == L"TDControllerEvent") {
ControllerEventCallbackData *data = new ControllerEventCallbackData();
@ -136,7 +135,7 @@ void Client::run() {
data->changeEvent = Message::takeInt(&clientMessage);
data->changeType = Message::takeInt(&clientMessage);
data->newValue = TelldusCore::wideToString(Message::takeString(&clientMessage));
d->callbackMainDispatcher.retrieveCallbackEvent()->signal(data);
d->callbackMainDispatcher.execute(CallbackStruct::ControllerEvent, data);
} else {
clientMessage = L""; // cleanup, if message contained garbage/unhandled data

View file

@ -61,9 +61,13 @@ void EventBase::signal() {
}
void EventBase::signal(EventData *eventData) {
this->signal(EventDataRef(eventData));
}
void EventBase::signal(EventDataRef eventData) {
{
TelldusCore::MutexLocker locker(&d->mutex);
d->eventDataList.push_back(EventDataRef(eventData));
d->eventDataList.push_back(eventData);
}
sendSignal();
}

View file

@ -42,6 +42,7 @@ namespace TelldusCore {
bool isSignaled();
void signal();
void signal(EventData *eventData);
void signal(EventDataRef eventData);
EventDataRef takeSignal();
protected: