Fixed ThreadPool (and dependent ORequestThread) life cycle

At least with sw_complex test under load, it happened that an ORequestThread
could still process a remote release request while the main thread was already
in exit(3).  This was because (a) ThreadPool never joined with the spawned
worker threads (which has been rectified by calling uno_threadpool_dispose(0)
from the final uno_threadpool_destroy), and (b) binaryurp::Bridge called
uno_threadpool_destroy only from its destructor (which could go as late as
exit(3)) instead of from terminate.

Additional clean up:
* Access to Bridge's threadPool_ is now cleanly controlled by mutex_ (even
  though that might not be necessary in every case).
* ThreadPool's stopDisposing got renamed to destroy, to make meaning clearer.

Change-Id: I45fa76e80e790a11065e7bf8ac9d92af2e62f262
This commit is contained in:
Stephan Bergmann
2012-05-16 22:09:21 +02:00
parent 9aad89df87
commit d015384e1d
4 changed files with 52 additions and 32 deletions

View File

@@ -235,22 +235,28 @@ Bridge::Bridge(
} }
void Bridge::start() { void Bridge::start() {
assert(threadPool_ == 0 && !writer_.is() && !reader_.is()); rtl::Reference< Reader > r(new Reader(this));
threadPool_ = uno_threadpool_create(); rtl::Reference< Writer > w(new Writer(this));
assert(threadPool_ != 0); {
writer_.set(new Writer(this)); osl::MutexGuard g(mutex_);
writer_->launch(); assert(threadPool_ == 0 && !writer_.is() && !reader_.is());
reader_.set(new Reader(this)); threadPool_ = uno_threadpool_create();
reader_->launch(); assert(threadPool_ != 0);
// it is important to call reader_->launch() last here; both reader_ = r;
// Writer::execute and Reader::execute can call Bridge::terminate, but writer_ = w;
// Writer::execute is initially blocked in unblocked_.wait() until }
// Reader::execute has called bridge_->sendRequestChangeRequest(), so // It is important to call reader_->launch() last here; both
// effectively only reader_->launch() can lead to an early call to // Writer::execute and Reader::execute can call Bridge::terminate, but
// Bridge::terminate // Writer::execute is initially blocked in unblocked_.wait() until
// Reader::execute has called bridge_->sendRequestChangeRequest(), so
// effectively only reader_->launch() can lead to an early call to
// Bridge::terminate
w->launch();
r->launch();
} }
void Bridge::terminate() { void Bridge::terminate() {
uno_ThreadPool tp;
rtl::Reference< Reader > r; rtl::Reference< Reader > r;
rtl::Reference< Writer > w; rtl::Reference< Writer > w;
Listeners ls; Listeners ls;
@@ -259,6 +265,7 @@ void Bridge::terminate() {
if (terminated_) { if (terminated_) {
return; return;
} }
tp = threadPool_;
std::swap(reader_, r); std::swap(reader_, r);
std::swap(writer_, w); std::swap(writer_, w);
ls.swap(listeners_); ls.swap(listeners_);
@@ -273,8 +280,8 @@ void Bridge::terminate() {
w->stop(); w->stop();
joinThread(r.get()); joinThread(r.get());
joinThread(w.get()); joinThread(w.get());
assert(threadPool_ != 0); assert(tp != 0);
uno_threadpool_dispose(threadPool_); uno_threadpool_dispose(tp);
Stubs s; Stubs s;
{ {
osl::MutexGuard g(mutex_); osl::MutexGuard g(mutex_);
@@ -301,6 +308,7 @@ void Bridge::terminate() {
"binaryurp", "caught runtime exception '" << e.Message << '\''); "binaryurp", "caught runtime exception '" << e.Message << '\'');
} }
} }
uno_threadpool_destroy(tp);
} }
css::uno::Reference< css::connection::XConnection > Bridge::getConnection() css::uno::Reference< css::connection::XConnection > Bridge::getConnection()
@@ -330,7 +338,8 @@ BinaryAny Bridge::mapCppToBinaryAny(css::uno::Any const & cppAny) {
return out; return out;
} }
uno_ThreadPool Bridge::getThreadPool() const { uno_ThreadPool Bridge::getThreadPool() {
osl::MutexGuard g(mutex_);
assert(threadPool_ != 0); assert(threadPool_ != 0);
return threadPool_; return threadPool_;
} }
@@ -571,7 +580,8 @@ bool Bridge::makeCall(
{ {
std::auto_ptr< IncomingReply > resp; std::auto_ptr< IncomingReply > resp;
{ {
AttachThread att(threadPool_); uno_ThreadPool tp = getThreadPool();
AttachThread att(tp);
PopOutgoingRequest pop( PopOutgoingRequest pop(
outgoingRequests_, att.getTid(), outgoingRequests_, att.getTid(),
OutgoingRequest(OutgoingRequest::KIND_NORMAL, member, setter)); OutgoingRequest(OutgoingRequest::KIND_NORMAL, member, setter));
@@ -582,7 +592,7 @@ bool Bridge::makeCall(
incrementCalls(true); incrementCalls(true);
incrementActiveCalls(); incrementActiveCalls();
void * job; void * job;
uno_threadpool_enter(threadPool_, &job); uno_threadpool_enter(tp, &job);
resp.reset(static_cast< IncomingReply * >(job)); resp.reset(static_cast< IncomingReply * >(job));
decrementActiveCalls(); decrementActiveCalls();
decrementCalls(); decrementCalls();
@@ -812,8 +822,8 @@ bool Bridge::isCurrentContextMode() {
} }
Bridge::~Bridge() { Bridge::~Bridge() {
if (threadPool_ != 0) { if (getThreadPool() != 0) {
uno_threadpool_destroy(threadPool_); terminate();
} }
} }
@@ -940,7 +950,7 @@ void Bridge::sendProtPropRequest(
void Bridge::makeReleaseCall( void Bridge::makeReleaseCall(
rtl::OUString const & oid, css::uno::TypeDescription const & type) rtl::OUString const & oid, css::uno::TypeDescription const & type)
{ {
AttachThread att(threadPool_); AttachThread att(getThreadPool());
sendRequest( sendRequest(
att.getTid(), oid, type, att.getTid(), oid, type,
css::uno::TypeDescription( css::uno::TypeDescription(

View File

@@ -106,7 +106,7 @@ public:
BinaryAny mapCppToBinaryAny(com::sun::star::uno::Any const & cppAny); BinaryAny mapCppToBinaryAny(com::sun::star::uno::Any const & cppAny);
uno_ThreadPool getThreadPool() const; uno_ThreadPool getThreadPool();
rtl::Reference< Writer > getWriter(); rtl::Reference< Writer > getWriter();
@@ -258,11 +258,11 @@ private:
com::sun::star::uno::TypeDescription protPropType_; com::sun::star::uno::TypeDescription protPropType_;
com::sun::star::uno::TypeDescription protPropRequest_; com::sun::star::uno::TypeDescription protPropRequest_;
com::sun::star::uno::TypeDescription protPropCommit_; com::sun::star::uno::TypeDescription protPropCommit_;
uno_ThreadPool threadPool_;
OutgoingRequests outgoingRequests_; OutgoingRequests outgoingRequests_;
osl::Mutex mutex_; osl::Mutex mutex_;
Listeners listeners_; Listeners listeners_;
uno_ThreadPool threadPool_;
rtl::Reference< Writer > writer_; rtl::Reference< Writer > writer_;
rtl::Reference< Reader > reader_; rtl::Reference< Reader > reader_;
bool currentContextMode_; bool currentContextMode_;

View File

@@ -26,7 +26,10 @@
* *
************************************************************************/ ************************************************************************/
#include "sal/config.h"
#include <boost/unordered_map.hpp> #include <boost/unordered_map.hpp>
#include <cassert>
#include <stdio.h> #include <stdio.h>
#include <osl/diagnose.h> #include <osl/diagnose.h>
@@ -73,7 +76,7 @@ namespace cppu_threadpool
m_lst.push_back( nDisposeId ); m_lst.push_back( nDisposeId );
} }
void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId ) void DisposedCallerAdmin::destroy( sal_Int64 nDisposeId )
{ {
MutexGuard guard( m_mutex ); MutexGuard guard( m_mutex );
for( DisposedCallerList::iterator ii = m_lst.begin() ; for( DisposedCallerList::iterator ii = m_lst.begin() ;
@@ -172,9 +175,9 @@ namespace cppu_threadpool
} }
} }
void ThreadPool::stopDisposing( sal_Int64 nDisposeId ) void ThreadPool::destroy( sal_Int64 nDisposeId )
{ {
m_DisposedCallerAdmin->stopDisposing( nDisposeId ); m_DisposedCallerAdmin->destroy( nDisposeId );
} }
/****************** /******************
@@ -480,13 +483,14 @@ uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
extern "C" void SAL_CALL extern "C" void SAL_CALL
uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
{ {
ThreadPool::getInstance()->stopDisposing( assert(hPool != 0);
ThreadPool::getInstance()->destroy(
sal::static_int_cast< sal_Int64 >( sal::static_int_cast< sal_Int64 >(
reinterpret_cast< sal_IntPtr >(hPool)) ); reinterpret_cast< sal_IntPtr >(hPool)) );
if( hPool ) bool empty;
{ {
// special treatment for 0 !
OSL_ASSERT( g_pThreadpoolHashSet ); OSL_ASSERT( g_pThreadpoolHashSet );
MutexGuard guard( Mutex::getGlobalMutex() ); MutexGuard guard( Mutex::getGlobalMutex() );
@@ -496,12 +500,18 @@ uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
g_pThreadpoolHashSet->erase( ii ); g_pThreadpoolHashSet->erase( ii );
delete hPool; delete hPool;
if( g_pThreadpoolHashSet->empty() ) empty = g_pThreadpoolHashSet->empty();
if( empty )
{ {
delete g_pThreadpoolHashSet; delete g_pThreadpoolHashSet;
g_pThreadpoolHashSet = 0; g_pThreadpoolHashSet = 0;
} }
} }
if( empty )
{
uno_threadpool_dispose( 0 );
}
} }
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */ /* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@@ -90,7 +90,7 @@ namespace cppu_threadpool {
static DisposedCallerAdminHolder getInstance(); static DisposedCallerAdminHolder getInstance();
void dispose( sal_Int64 nDisposeId ); void dispose( sal_Int64 nDisposeId );
void stopDisposing( sal_Int64 nDisposeId ); void destroy( sal_Int64 nDisposeId );
sal_Bool isDisposed( sal_Int64 nDisposeId ); sal_Bool isDisposed( sal_Int64 nDisposeId );
private: private:
@@ -109,7 +109,7 @@ namespace cppu_threadpool {
static ThreadPoolHolder getInstance(); static ThreadPoolHolder getInstance();
void dispose( sal_Int64 nDisposeId ); void dispose( sal_Int64 nDisposeId );
void stopDisposing( sal_Int64 nDisposeId ); void destroy( sal_Int64 nDisposeId );
void addJob( const ByteSequence &aThreadId, void addJob( const ByteSequence &aThreadId,
sal_Bool bAsynchron, sal_Bool bAsynchron,