* Serialized multithreaded access to shared data (experimental)

* Refactored event loop code


git-svn-id: http://dev.openwengo.org/svn/openwengo/wengophone-ng/branches/wengophone-dbus-api/libs/dbus@12397 30a43799-04e7-0310-8b2b-ea0d24f86d0e
This commit is contained in:
pdurante 2007-08-28 01:08:04 +00:00
parent 1346caf952
commit 6cb130ee0d
10 changed files with 437 additions and 252 deletions

View file

@ -20,7 +20,6 @@ HEADER_FILES = \
$(HEADER_DIR)/interface.h \
$(HEADER_DIR)/message.h \
$(HEADER_DIR)/dispatcher.h \
$(HEADER_DIR)/eventloop.h \
$(HEADER_DIR)/object.h \
$(HEADER_DIR)/pendingcall.h \
$(HEADER_DIR)/server.h \
@ -29,14 +28,16 @@ HEADER_FILES = \
$(HEADER_DIR)/refptr_impl.h \
$(HEADER_DIR)/introspection.h \
$(HEADER_DIR)/api.h \
$(HEADER_DIR)/eventloop.h \
$(HEADER_DIR)/eventloop-integration.h \
$(GLIB_H)
lib_includedir=$(includedir)/dbus-c++-1/dbus-c++/
lib_include_HEADERS = $(HEADER_FILES)
lib_LTLIBRARIES = libdbus-c++-1.la
libdbus_c___1_la_SOURCES = $(HEADER_FILES) interface.cpp object.cpp introspection.cpp debug.cpp eventloop.cpp types.cpp connection.cpp connection_p.h property.cpp dispatcher.cpp dispatcher_p.h pendingcall.cpp pendingcall_p.h error.cpp internalerror.h message.cpp message_p.h server.cpp server_p.h $(GLIB_CPP)
libdbus_c___1_la_LIBADD = $(dbus_LIBS) $(glib_LIBS)
libdbus_c___1_la_SOURCES = $(HEADER_FILES) interface.cpp object.cpp introspection.cpp debug.cpp types.cpp connection.cpp connection_p.h property.cpp dispatcher.cpp dispatcher_p.h pendingcall.cpp pendingcall_p.h error.cpp internalerror.h message.cpp message_p.h server.cpp server_p.h eventloop.cpp eventloop-integration.cpp $(GLIB_CPP)
libdbus_c___1_la_LIBADD = $(dbus_LIBS) $(glib_LIBS) $(pthread_LIBS)
MAINTAINERCLEANFILES = \
Makefile.in

View file

@ -32,9 +32,7 @@ static void _debug_log_default(const char* format, ...)
{
#ifdef DEBUG
static int debug_env = -1;
if(debug_env < 0) debug_env = getenv("DBUSXX_VERBOSE") ? 1 : 0;
static int debug_env = getenv("DBUSXX_VERBOSE") ? 1 : 0;
if(debug_env)
{

View file

@ -143,11 +143,15 @@ void Dispatcher::Private::on_toggle_timeout( DBusTimeout* timeout, void* data )
void Dispatcher::queue_connection( Connection::Private* cp )
{
_mutex_p.lock();
_pending_queue.push_back(cp);
_mutex_p.unlock();
}
void Dispatcher::dispatch_pending()
{
_mutex_p.lock();
while(_pending_queue.size() > 0)
{
Connection::PrivatePList::iterator i, j;
@ -166,6 +170,7 @@ void Dispatcher::dispatch_pending()
i = j;
}
}
_mutex_p.unlock();
}
#ifdef DBUS_HAS_THREADS_INIT_DEFAULT

View file

@ -0,0 +1,163 @@
/*
*
* D-Bus++ - C++ bindings for D-Bus
*
* Copyright (C) 2005-2007 Paolo Durante <shackan@gmail.com>
*
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <dbus-c++/eventloop-integration.h>
#include <dbus-c++/debug.h>
#include <sys/poll.h>
#include <dbus/dbus.h>
using namespace DBus;
BusTimeout::BusTimeout( Timeout::Internal* ti, BusDispatcher* bd )
: Timeout(ti), DefaultTimeout(Timeout::interval(), true, bd)
{
DefaultTimeout::enabled(Timeout::enabled());
}
void BusTimeout::toggle()
{
debug_log("timeout %p toggled (%s)", this, Timeout::enabled() ? "on":"off");
DefaultTimeout::enabled(Timeout::enabled());
}
BusWatch::BusWatch( Watch::Internal* wi, BusDispatcher* bd )
: Watch(wi), DefaultWatch(Watch::descriptor(), 0, bd)
{
int flags = POLLHUP | POLLERR;
if(Watch::flags() & DBUS_WATCH_READABLE)
flags |= POLLIN;
if(Watch::flags() & DBUS_WATCH_WRITABLE)
flags |= POLLOUT;
DefaultWatch::flags(flags);
DefaultWatch::enabled(Watch::enabled());
}
void BusWatch::toggle()
{
debug_log("watch %p toggled (%s)", this, Watch::enabled() ? "on":"off");
DefaultWatch::enabled(Watch::enabled());
}
void BusDispatcher::enter()
{
debug_log("entering dispatcher %p", this);
_running = true;
while(_running)
{
do_iteration();
}
debug_log("leaving dispatcher %p", this);
}
void BusDispatcher::leave()
{
_running = false;
}
void BusDispatcher::do_iteration()
{
dispatch_pending();
dispatch();
}
Timeout* BusDispatcher::add_timeout( Timeout::Internal* ti )
{
BusTimeout* bt = new BusTimeout(ti, this);
bt->expired = new Callback<BusDispatcher, void, DefaultTimeout&>(this, &BusDispatcher::timeout_expired);
bt->data(bt);
debug_log("added timeout %p (%s)", bt, ((Timeout*)bt)->enabled() ? "on":"off");
return bt;
}
void BusDispatcher::rem_timeout( Timeout* t )
{
debug_log("removed timeout %p", t);
delete t;
}
Watch* BusDispatcher::add_watch( Watch::Internal* wi )
{
BusWatch* bw = new BusWatch(wi, this);
bw->ready = new Callback<BusDispatcher, void, DefaultWatch&>(this, &BusDispatcher::watch_ready);
bw->data(bw);
debug_log("added watch %p (%s) fd=%d flags=%d",
bw, ((Watch*)bw)->enabled() ? "on":"off", ((Watch*)bw)->descriptor(), ((Watch*)bw)->flags()
);
return bw;
}
void BusDispatcher::rem_watch( Watch* w )
{
debug_log("removed watch %p", w);
delete w;
}
void BusDispatcher::timeout_expired( DefaultTimeout& et )
{
debug_log("timeout %p expired", &et);
BusTimeout* timeout = reinterpret_cast<BusTimeout*>(et.data());
timeout->handle();
}
void BusDispatcher::watch_ready( DefaultWatch& ew )
{
BusWatch* watch = reinterpret_cast<BusWatch*>(ew.data());
debug_log("watch %p ready, flags=%d state=%d",
watch, ((Watch*)watch)->flags(), watch->state()
);
int flags = 0;
if(watch->state() & POLLIN)
flags |= DBUS_WATCH_READABLE;
if(watch->state() & POLLOUT)
flags |= DBUS_WATCH_WRITABLE;
if(watch->state() & POLLHUP)
flags |= DBUS_WATCH_HANGUP;
if(watch->state() & POLLERR)
flags |= DBUS_WATCH_ERROR;
watch->handle(flags);
}

View file

@ -37,7 +37,7 @@ static double millis( timeval tv )
return (tv.tv_sec*1000.0 + tv.tv_usec/1000.0);
}
EepleTimeout::EepleTimeout( int interval, bool repeat, EepleMainLoop* ed )
DefaultTimeout::DefaultTimeout( int interval, bool repeat, DefaultMainLoop* ed )
: _enabled(true), _interval(interval), _repeat(repeat), _expiration(0), _data(0), _disp(ed)
{
timeval now;
@ -45,57 +45,113 @@ EepleTimeout::EepleTimeout( int interval, bool repeat, EepleMainLoop* ed )
_expiration = millis(now) + interval;
_disp->_mutex_t.lock();
_disp->_timeouts.push_back(this);
_disp->_mutex_t.unlock();
}
EepleTimeout::~EepleTimeout()
DefaultTimeout::~DefaultTimeout()
{
_disp->_mutex_t.lock();
_disp->_timeouts.remove(this);
_disp->_mutex_t.unlock();
}
EepleWatch::EepleWatch( int fd, int flags, EepleMainLoop* ed )
DefaultWatch::DefaultWatch( int fd, int flags, DefaultMainLoop* ed )
: _enabled(true), _fd(fd), _flags(flags), _state(0), _data(0), _disp(ed)
{
_disp->_mutex_w.lock();
_disp->_watches.push_back(this);
_disp->_mutex_w.unlock();
}
EepleWatch::~EepleWatch()
DefaultWatch::~DefaultWatch()
{
_disp->_mutex_w.lock();
_disp->_watches.remove(this);
_disp->_mutex_w.unlock();
}
EepleMainLoop::EepleMainLoop()
DefaultMutex::DefaultMutex()
{
#if defined HAVE_PTHREAD
pthread_mutex_init(&_mutex, NULL);
#elif defined HAVE_WIN32
#endif
}
DefaultMutex::~DefaultMutex()
{
#if defined HAVE_PTHREAD
pthread_mutex_destroy(&_mutex);
#elif defined HAVE_WIN32
#endif
}
void DefaultMutex::lock()
{
#if defined HAVE_PTHREAD
pthread_mutex_lock(&_mutex);
#elif defined HAVE_WIN32
#endif
}
void DefaultMutex::unlock()
{
#if defined HAVE_PTHREAD
pthread_mutex_unlock(&_mutex);
#elif defined HAVE_WIN32
#endif
}
DefaultMainLoop::DefaultMainLoop()
{
}
EepleMainLoop::~EepleMainLoop()
DefaultMainLoop::~DefaultMainLoop()
{
Watches::iterator wi = _watches.begin();
_mutex_w.lock();
DefaultWatches::iterator wi = _watches.begin();
while(wi != _watches.end())
{
Watches::iterator wmp = wi;
DefaultWatches::iterator wmp = wi;
++wmp;
delete (*wi);
wi = wmp;
}
_mutex_w.unlock();
Timeouts::iterator ti = _timeouts.begin();
_mutex_t.lock();
DefaultTimeouts::iterator ti = _timeouts.begin();
while(ti != _timeouts.end())
{
Timeouts::iterator tmp = ti;
DefaultTimeouts::iterator tmp = ti;
++tmp;
delete (*ti);
ti = tmp;
}
_mutex_t.unlock();
}
void EepleMainLoop::dispatch()
void DefaultMainLoop::dispatch()
{
_mutex_w.lock();
int nfd = _watches.size();
pollfd fds[nfd];
Watches::iterator wi = _watches.begin();
DefaultWatches::iterator wi = _watches.begin();
for(nfd = 0; wi != _watches.end(); ++wi)
{
@ -108,10 +164,13 @@ void EepleMainLoop::dispatch()
++nfd;
}
}
_mutex_w.unlock();
int wait_min = 10000;
Timeouts::iterator ti;
DefaultTimeouts::iterator ti;
_mutex_t.lock();
for(ti = _timeouts.begin(); ti != _timeouts.end(); ++ti)
{
@ -119,8 +178,7 @@ void EepleMainLoop::dispatch()
wait_min = (*ti)->interval();
}
//int rfd = poll(fds, nfd, wait_min);
//if(rfd) debug_log("%d descriptors ready");
_mutex_t.unlock();
poll(fds, nfd, wait_min);
@ -129,11 +187,13 @@ void EepleMainLoop::dispatch()
double now_millis = millis(now);
_mutex_t.lock();
ti = _timeouts.begin();
while(ti != _timeouts.end())
{
Timeouts::iterator tmp = ti;
DefaultTimeouts::iterator tmp = ti;
++tmp;
if((*ti)->enabled() && now_millis >= (*ti)->_expiration)
@ -150,13 +210,17 @@ void EepleMainLoop::dispatch()
ti = tmp;
}
_mutex_t.unlock();
_mutex_w.lock();
for(int j = 0; j < nfd; ++j)
{
Watches::iterator wi;
DefaultWatches::iterator wi;
for(wi = _watches.begin(); wi != _watches.end();)
{
Watches::iterator tmp = wi;
DefaultWatches::iterator tmp = wi;
++tmp;
if((*wi)->enabled() && (*wi)->_fd == fds[j].fd)
@ -174,138 +238,6 @@ void EepleMainLoop::dispatch()
wi = tmp;
}
}
}
/*
*/
BusTimeout::BusTimeout( Timeout::Internal* ti, BusDispatcher* bd )
: Timeout(ti), EepleTimeout(Timeout::interval(), true, bd)
{
EepleTimeout::enabled(Timeout::enabled());
}
void BusTimeout::toggle()
{
debug_log("timeout %p toggled (%s)", this, Timeout::enabled() ? "on":"off");
EepleTimeout::enabled(Timeout::enabled());
}
BusWatch::BusWatch( Watch::Internal* wi, BusDispatcher* bd )
: Watch(wi), EepleWatch(Watch::descriptor(), 0, bd)
{
int flags = POLLHUP | POLLERR;
if(Watch::flags() & DBUS_WATCH_READABLE)
flags |= POLLIN;
if(Watch::flags() & DBUS_WATCH_WRITABLE)
flags |= POLLOUT;
EepleWatch::flags(flags);
EepleWatch::enabled(Watch::enabled());
}
void BusWatch::toggle()
{
debug_log("watch %p toggled (%s)", this, Watch::enabled() ? "on":"off");
EepleWatch::enabled(Watch::enabled());
}
void BusDispatcher::enter()
{
debug_log("entering dispatcher %p", this);
_running = true;
while(_running)
{
do_iteration();
}
debug_log("leaving dispatcher %p", this);
}
void BusDispatcher::leave()
{
_running = false;
}
void BusDispatcher::do_iteration()
{
dispatch_pending();
dispatch();
}
Timeout* BusDispatcher::add_timeout( Timeout::Internal* ti )
{
BusTimeout* bt = new BusTimeout(ti, this);
bt->expired = new Callback<BusDispatcher, void, EepleTimeout&>(this, &BusDispatcher::timeout_expired);
bt->data(bt);
debug_log("added timeout %p (%s)", bt, ((Timeout*)bt)->enabled() ? "on":"off");
return bt;
}
void BusDispatcher::rem_timeout( Timeout* t )
{
debug_log("removed timeout %p", t);
delete t;
}
Watch* BusDispatcher::add_watch( Watch::Internal* wi )
{
BusWatch* bw = new BusWatch(wi, this);
bw->ready = new Callback<BusDispatcher, void, EepleWatch&>(this, &BusDispatcher::watch_ready);
bw->data(bw);
debug_log("added watch %p (%s) fd=%d flags=%d",
bw, ((Watch*)bw)->enabled() ? "on":"off", ((Watch*)bw)->descriptor(), ((Watch*)bw)->flags()
);
return bw;
}
void BusDispatcher::rem_watch( Watch* w )
{
debug_log("removed watch %p", w);
delete w;
}
void BusDispatcher::timeout_expired( EepleTimeout& et )
{
debug_log("timeout %p expired", &et);
BusTimeout* timeout = reinterpret_cast<BusTimeout*>(et.data());
timeout->handle();
}
void BusDispatcher::watch_ready( EepleWatch& ew )
{
BusWatch* watch = reinterpret_cast<BusWatch*>(ew.data());
debug_log("watch %p ready, flags=%d state=%d",
watch, ((Watch*)watch)->flags(), watch->state()
);
int flags = 0;
if(watch->state() & POLLIN)
flags |= DBUS_WATCH_READABLE;
if(watch->state() & POLLOUT)
flags |= DBUS_WATCH_WRITABLE;
if(watch->state() & POLLHUP)
flags |= DBUS_WATCH_HANGUP;
if(watch->state() & POLLERR)
flags |= DBUS_WATCH_ERROR;
watch->handle(flags);
_mutex_w.unlock();
}