tdf#104126 - comphelper thread-pool, use reliable std::condition_variable.

The existing osl::Condition is an API and reliability disaster area.

Change-Id: I3be84e1c6a83e58c43c40c9c8720790d923a6694
Reviewed-on: https://gerrit.libreoffice.org/31163
Tested-by: Jenkins <ci@libreoffice.org>
Reviewed-by: Michael Meeks <michael.meeks@collabora.com>
Tested-by: Michael Meeks <michael.meeks@collabora.com>
This commit is contained in:
Michael Meeks 2016-12-01 11:14:24 +00:00
parent 0afbe8d5ca
commit aa68c99d88
4 changed files with 154 additions and 168 deletions

View File

@ -10,11 +10,14 @@
#include <comphelper/threadpool.hxx>
#include <com/sun/star/uno/Exception.hpp>
#include <sal/config.h>
#include <rtl/instance.hxx>
#include <rtl/string.hxx>
#include <salhelper/thread.hxx>
#include <algorithm>
#include <memory>
#include <thread>
#include <chrono>
namespace comphelper {
@ -26,9 +29,9 @@ static thread_local bool gbIsWorkerThread;
// used to group thread-tasks for waiting in waitTillDone()
class COMPHELPER_DLLPUBLIC ThreadTaskTag
{
osl::Mutex mMutex;
std::size_t mnTasksWorking;
osl::Condition maTasksComplete;
std::mutex maMutex;
sal_Int32 mnTasksWorking;
std::condition_variable maTasksComplete;
public:
ThreadTaskTag();
@ -42,14 +45,11 @@ public:
class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
osl::Condition maNewWork;
bool mbWorking;
public:
explicit ThreadWorker( ThreadPool *pPool ) :
salhelper::Thread("thread-pool"),
mpPool( pPool ),
mbWorking( false )
mpPool( pPool )
{
}
@ -58,74 +58,20 @@ public:
#if defined DBG_UTIL && defined LINUX
gbIsWorkerThread = true;
#endif
while ( ThreadTask * pTask = waitForWork() )
std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
while( !mpPool->mbTerminate )
{
std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
try {
pTask->doWork();
}
catch (const std::exception &e)
ThreadTask *pTask = mpPool->popWorkLocked( aGuard, true );
if( pTask )
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
}
catch (const css::uno::Exception &e)
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
}
delete pTask;
pTag->onTaskWorkerDone();
aGuard.unlock();
pTask->execAndDelete();
aGuard.lock();
}
}
ThreadTask *waitForWork()
{
ThreadTask *pRet = nullptr;
osl::ResettableMutexGuard aGuard( mpPool->maGuard );
pRet = mpPool->popWork();
while( !pRet )
{
if (mbWorking)
mpPool->stopWork();
mbWorking = false;
maNewWork.reset();
if( mpPool->mbTerminate )
break;
aGuard.clear(); // unlock
maNewWork.wait();
aGuard.reset(); // lock
pRet = mpPool->popWork();
}
if (pRet)
{
if (!mbWorking)
mpPool->startWork();
mbWorking = true;
}
return pRet;
}
// Why a condition per worker thread - you may ask.
//
// Unfortunately the Windows synchronisation API that we wrap
// is horribly inadequate cf.
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
// The existing osl::Condition API should only ever be used
// between one producer and one consumer thread to avoid the
// lost wakeup problem.
void signalNewWork()
{
maNewWork.set();
}
};
@ -133,19 +79,18 @@ ThreadPool::ThreadPool( sal_Int32 nWorkers ) :
mnThreadsWorking( 0 ),
mbTerminate( false )
{
std::unique_lock< std::mutex > aGuard( maMutex );
for( sal_Int32 i = 0; i < nWorkers; i++ )
maWorkers.push_back( new ThreadWorker( this ) );
maTasksComplete.set();
osl::MutexGuard aGuard( maGuard );
for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
rpWorker->launch();
}
ThreadPool::~ThreadPool()
{
waitAndCleanupWorkers();
shutdown();
}
struct ThreadPoolStatic : public rtl::StaticWithInit< std::shared_ptr< ThreadPool >,
@ -183,59 +128,61 @@ sal_Int32 ThreadPool::getPreferredConcurrency()
return ThreadCount;
}
void ThreadPool::waitAndCleanupWorkers()
// FIXME: there should be no need for this as/when our baseline
// is >VS2015 and drop WinXP; the sorry details are here:
// https://connect.microsoft.com/VisualStudio/feedback/details/1282596
void ThreadPool::shutdown()
{
osl::ResettableMutexGuard aGuard( maGuard );
if (mbTerminate)
return;
std::unique_lock< std::mutex > aGuard( maMutex );
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
while ( ThreadTask * pTask = popWork() )
{
std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
pTask->doWork();
delete pTask;
pTag->onTaskWorkerDone();
}
ThreadTask *pTask;
while ( ( pTask = popWorkLocked(aGuard, false) ) )
pTask->execAndDelete();
}
else
{
aGuard.clear();
maTasksComplete.wait();
aGuard.reset();
while( !maTasks.empty() )
maTasksChanged.wait( aGuard );
}
assert( maTasks.empty() );
mbTerminate = true;
maTasksChanged.notify_all();
while( !maWorkers.empty() )
{
rtl::Reference< ThreadWorker > xWorker = maWorkers.back();
maWorkers.pop_back();
assert(std::find(maWorkers.begin(), maWorkers.end(), xWorker)
== maWorkers.end());
xWorker->signalNewWork();
aGuard.clear();
{ // unlocked
aGuard.unlock();
{
xWorker->join();
xWorker.clear();
}
aGuard.reset();
aGuard.lock();
}
}
void ThreadPool::pushTask( ThreadTask *pTask )
{
osl::MutexGuard aGuard( maGuard );
std::unique_lock< std::mutex > aGuard( maMutex );
pTask->mpTag->onTaskPushed();
maTasks.insert( maTasks.begin(), pTask );
// horrible beyond belief:
for(rtl::Reference<ThreadWorker> & rpWorker : maWorkers)
rpWorker->signalNewWork();
maTasksComplete.reset();
maTasksChanged.notify_one();
}
ThreadTask *ThreadPool::popWork()
ThreadTask *ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
{
do
{
if( !maTasks.empty() )
{
@ -243,40 +190,46 @@ ThreadTask *ThreadPool::popWork()
maTasks.pop_back();
return pTask;
}
else
else if (!bWait || mbTerminate)
return nullptr;
maTasksChanged.wait( rGuard );
} while (!mbTerminate);
return nullptr;
}
void ThreadPool::startWork()
void ThreadPool::startWorkLocked()
{
mnThreadsWorking++;
}
void ThreadPool::stopWork()
void ThreadPool::stopWorkLocked()
{
assert( mnThreadsWorking > 0 );
if ( --mnThreadsWorking == 0 )
maTasksComplete.set();
maTasksChanged.notify_all();
}
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag)
{
#if defined DBG_UTIL && defined LINUX
assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
#endif
osl::ResettableMutexGuard aGuard( maGuard );
{
std::unique_lock< std::mutex > aGuard( maMutex );
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
while ( ThreadTask * pTask = popWork() )
{
std::shared_ptr<ThreadTaskTag> pTag(pTask->getTag());
pTask->doWork();
delete pTask;
pTag->onTaskWorkerDone();
ThreadTask *pTask;
while (!rTag->isDone() &&
( pTask = popWorkLocked(aGuard, false) ) )
pTask->execAndDelete();
}
}
aGuard.clear();
rTag->waitUntilDone();
}
@ -290,56 +243,75 @@ bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
return pTag->isDone();
}
ThreadTask::ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag)
: mpTag(pTag)
{
}
void ThreadTask::execAndDelete()
{
std::shared_ptr<ThreadTaskTag> pTag(mpTag);
try {
doWork();
}
catch (const std::exception &e)
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
}
catch (const css::uno::Exception &e)
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.Message);
}
delete this;
pTag->onTaskWorkerDone();
}
ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
{
maTasksComplete.set();
}
void ThreadTaskTag::onTaskPushed()
{
osl::MutexGuard g(mMutex);
assert( mnTasksWorking < 65535 ); // sanity checking
++mnTasksWorking;
maTasksComplete.reset();
std::unique_lock< std::mutex > aGuard( maMutex );
mnTasksWorking++;
assert( mnTasksWorking < 65536 ); // sanity checking
}
void ThreadTaskTag::onTaskWorkerDone()
{
osl::MutexGuard g(mMutex);
assert(mnTasksWorking > 0);
--mnTasksWorking;
std::unique_lock< std::mutex > aGuard( maMutex );
mnTasksWorking--;
assert(mnTasksWorking >= 0);
if (mnTasksWorking == 0)
maTasksComplete.set();
}
void ThreadTaskTag::waitUntilDone()
{
#if defined DBG_UTIL && defined LINUX
assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
#endif
#ifdef DBG_UTIL
// 3 minute timeout in debug mode so our tests fail sooner rather than later
osl::Condition::Result rv = maTasksComplete.wait(TimeValue { 3*60, 0 });
assert(rv != osl::Condition::result_timeout);
#else
// 10 minute timeout in production so the app eventually throws some kind of error
if (maTasksComplete.wait(TimeValue { 10*60, 0 }) == osl::Condition::Result::result_timeout)
throw std::runtime_error("timeout waiting for threadpool tasks");
#endif
maTasksComplete.notify_all();
}
bool ThreadTaskTag::isDone()
{
std::unique_lock< std::mutex > aGuard( maMutex );
return mnTasksWorking == 0;
}
void ThreadTaskTag::waitUntilDone()
{
std::unique_lock< std::mutex > aGuard( maMutex );
while( mnTasksWorking > 0 )
{
#ifdef DBG_UTIL
// 3 minute timeout in debug mode so our tests fail sooner rather than later
std::cv_status result = maTasksComplete.wait_for(
aGuard, std::chrono::seconds( 3 * 60 ));
assert(result != std::cv_status::timeout);
#else
// 10 minute timeout in production so the app eventually throws some kind of error
if (maTasksComplete.wait_for(
aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
throw std::runtime_error("timeout waiting for threadpool tasks");
#endif
}
}
} // namespace comphelper
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

View File

@ -81,6 +81,7 @@
#include <toolkit/helper/vclunohelper.hxx>
#include <comphelper/configuration.hxx>
#include <comphelper/fileurl.hxx>
#include <comphelper/threadpool.hxx>
#include <comphelper/processfactory.hxx>
#include <comphelper/backupfilehelper.hxx>
#include <unotools/bootstrap.hxx>
@ -1791,11 +1792,14 @@ int Desktop::doShutdown()
StarBASIC::DetachAllDocBasicItems();
#endif
}
// be sure that path/language options gets destroyed before
// UCB is deinitialized
pExecGlobals->pLanguageOptions.reset( nullptr );
pExecGlobals->pPathOptions.reset( nullptr );
comphelper::ThreadPool::getSharedOptimalPool().shutdown();
bool bRR = pExecGlobals->bRestartRequested;
delete pExecGlobals;
pExecGlobals = nullptr;

View File

@ -11,11 +11,11 @@
#define INCLUDED_COMPHELPER_THREADPOOL_HXX
#include <sal/config.h>
#include <salhelper/thread.hxx>
#include <osl/mutex.hxx>
#include <osl/conditn.hxx>
#include <rtl/ref.hxx>
#include <comphelper/comphelperdllapi.h>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <vector>
#include <memory>
@ -28,14 +28,19 @@ class COMPHELPER_DLLPUBLIC ThreadTask
{
friend class ThreadPool;
std::shared_ptr<ThreadTaskTag> mpTag;
/// execute and delete this task
void execAndDelete();
protected:
/// override to get your task performed by the pool
virtual void doWork() = 0;
/// once pushed ThreadTasks are destroyed by the pool
virtual ~ThreadTask() {}
public:
ThreadTask(const std::shared_ptr<ThreadTaskTag>& pTag);
virtual ~ThreadTask() {}
virtual void doWork() = 0;
const std::shared_ptr<ThreadTaskTag>& getTag() { return mpTag; }
};
/// A very basic thread pool implementation
/// A very basic thread-safe thread pool implementation
class COMPHELPER_DLLPUBLIC ThreadPool final
{
public:
@ -50,7 +55,7 @@ public:
/// returns a configurable max-concurrency
/// limit to avoid spawning an unnecessarily
/// large number of threads on high-core boxes.
/// MAX_CONCURRENCY envar controls the cap.
/// MAX_CONCURRENCY env. var. controls the cap.
static sal_Int32 getPreferredConcurrency();
ThreadPool( sal_Int32 nWorkers );
@ -65,6 +70,9 @@ public:
/// return the number of live worker threads
sal_Int32 getWorkerCount() const { return maWorkers.size(); }
/// wait until all work is completed, then join all threads
void shutdown();
private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
@ -72,20 +80,21 @@ private:
class ThreadWorker;
friend class ThreadWorker;
/// wait until all work is completed, then join all threads
void waitAndCleanupWorkers();
/** Pop a work task
@param bWait - if set wait until task present or termination
@return a new task to perform, or NULL if list empty or terminated
*/
ThreadTask *popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait );
void startWorkLocked();
void stopWorkLocked();
ThreadTask *popWork();
void startWork();
void stopWork();
osl::Mutex maGuard;
sal_Int32 mnThreadsWorking;
/// signalled when all in-progress tasks are complete
osl::Condition maTasksComplete;
std::mutex maMutex;
std::condition_variable maTasksChanged;
sal_Int32 mnThreadsWorking;
bool mbTerminate;
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
std::vector< ThreadTask * > maTasks;
std::vector< rtl::Reference< ThreadWorker > > maWorkers;
};
} // namespace comphelper

View File

@ -27,6 +27,7 @@
#include <osl/diagnose.h>
#include <osl/time.h>
#include <osl/thread.hxx>
#include <PackageConstants.hxx>
#include <ZipEntry.hxx>