Add hook to optionally enable threaded sheet stream parsing.

Threaded version still not working as the fast parser deadlocks during
threaded parsing.

Change-Id: I3d402a22a394d7d0d7edf96590ae039506928fde
This commit is contained in:
Kohei Yoshida
2013-11-22 18:28:26 -05:00
parent e79af02bb0
commit 0da598c5d7

View File

@@ -51,6 +51,12 @@
#include <comphelper/processfactory.hxx> #include <comphelper/processfactory.hxx>
#include <officecfg/Office/Calc.hxx> #include <officecfg/Office/Calc.hxx>
#include <salhelper/thread.hxx>
#include <osl/conditn.hxx>
#include <queue>
#include <boost/scoped_ptr.hpp>
#include "oox/ole/vbaproject.hxx" #include "oox/ole/vbaproject.hxx"
namespace oox { namespace oox {
@@ -194,6 +200,189 @@ const RecordInfo* WorkbookFragment::getRecordInfos() const
return spRecInfos; return spRecInfos;
} }
namespace {
class WorkerThread;
typedef std::pair<WorksheetGlobalsRef, FragmentHandlerRef> SheetFragmentHandler;
typedef std::vector<SheetFragmentHandler> SheetFragmentVector;
typedef rtl::Reference<WorkerThread> WorkerThreadRef;
struct WorkerThreadData
{
osl::Mutex maMtx;
std::vector<WorkerThreadRef> maThreads;
};
struct IdleWorkerThreadData
{
osl::Mutex maMtx;
osl::Condition maCondAdded;
std::queue<WorkerThread*> maThreads;
};
struct
{
boost::scoped_ptr<WorkerThreadData> mpWorkerThreads;
boost::scoped_ptr<IdleWorkerThreadData> mpIdleThreads;
} aThreadGlobals;
enum WorkerAction
{
None = 0,
TerminateThread,
Work
};
class WorkerThread : public salhelper::Thread
{
WorkbookFragment& mrWorkbookHandler;
size_t mnID;
FragmentHandlerRef mxHandler;
osl::Mutex maMtxAction;
osl::Condition maCondActionChanged;
WorkerAction meAction;
public:
WorkerThread( WorkbookFragment& rWorkbookHandler, size_t nID ) :
salhelper::Thread("sheet-import-worker-thread"),
mrWorkbookHandler(rWorkbookHandler), mnID(nID), meAction(None) {}
virtual void execute()
{
announceIdle();
// Keep looping until the terminate request is set.
for (maCondActionChanged.wait(); true; maCondActionChanged.wait())
{
osl::MutexGuard aGuard(maMtxAction);
if (!maCondActionChanged.check())
// Wait again.
continue;
maCondActionChanged.reset();
if (meAction == TerminateThread)
// End the thread.
return;
if (meAction != Work)
continue;
#if 0
// TODO : This still deadlocks in the fast parser code.
mrWorkbookHandler.importOoxFragment(mxHandler);
#else
double val = rand() / static_cast<double>(RAND_MAX);
val *= 1000000; // normalize to 1 second.
val *= 1.5; // inflate it a bit.
usleep(val); // pretend to be working while asleep.
#endif
announceIdle();
}
}
void announceIdle()
{
// Set itself idle to receive a new task from the main thread.
osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
aThreadGlobals.mpIdleThreads->maThreads.push(this);
aThreadGlobals.mpIdleThreads->maCondAdded.set();
}
void terminate()
{
osl::MutexGuard aGuard(maMtxAction);
meAction = TerminateThread;
maCondActionChanged.set();
}
void assign( const FragmentHandlerRef& rHandler )
{
osl::MutexGuard aGuard(maMtxAction);
mxHandler = rHandler;
meAction = Work;
maCondActionChanged.set();
}
};
void importSheetFragments( WorkbookFragment& rWorkbookHandler, SheetFragmentVector& rSheets )
{
#if 0 // threaded version
size_t nThreadCount = 3;
if (nThreadCount > rSheets.size())
nThreadCount = rSheets.size();
// Create new thread globals.
aThreadGlobals.mpWorkerThreads.reset(new WorkerThreadData);
aThreadGlobals.mpIdleThreads.reset(new IdleWorkerThreadData);
SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end();
{
// Initialize worker threads.
osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
for (size_t i = 0; i < nThreadCount; ++i)
{
WorkerThreadRef pThread(new WorkerThread(rWorkbookHandler, i));
aThreadGlobals.mpWorkerThreads->maThreads.push_back(pThread);
pThread->launch();
}
}
for (aThreadGlobals.mpIdleThreads->maCondAdded.wait(); true; aThreadGlobals.mpIdleThreads->maCondAdded.wait())
{
osl::MutexGuard aGuard(aThreadGlobals.mpIdleThreads->maMtx);
if (!aThreadGlobals.mpIdleThreads->maCondAdded.check())
// Wait again.
continue;
aThreadGlobals.mpIdleThreads->maCondAdded.reset();
// Assign work to all idle threads.
while (!aThreadGlobals.mpIdleThreads->maThreads.empty())
{
if (it == itEnd)
break;
WorkerThread* p = aThreadGlobals.mpIdleThreads->maThreads.front();
aThreadGlobals.mpIdleThreads->maThreads.pop();
p->assign(it->second);
++it;
}
if (it == itEnd)
// Finished! Exit the loop.
break;
}
{
// Terminate all worker threads.
osl::MutexGuard aGuard(aThreadGlobals.mpWorkerThreads->maMtx);
for (size_t i = 0, n = aThreadGlobals.mpWorkerThreads->maThreads.size(); i < n; ++i)
{
WorkerThreadRef pWorker = aThreadGlobals.mpWorkerThreads->maThreads[i];
pWorker->terminate();
if (pWorker.is())
pWorker->join();
}
}
// Delete all thread globals.
aThreadGlobals.mpWorkerThreads.reset();
aThreadGlobals.mpIdleThreads.reset();
#else // non-threaded version
for( SheetFragmentVector::iterator it = rSheets.begin(), itEnd = rSheets.end(); it != itEnd; ++it)
{
// import the sheet fragment
rWorkbookHandler.importOoxFragment(it->second);
}
#endif
}
}
void WorkbookFragment::finalizeImport() void WorkbookFragment::finalizeImport()
{ {
ISegmentProgressBarRef xGlobalSegment = getProgressBar().createSegment( PROGRESS_LENGTH_GLOBALS ); ISegmentProgressBarRef xGlobalSegment = getProgressBar().createSegment( PROGRESS_LENGTH_GLOBALS );
@@ -229,8 +418,6 @@ void WorkbookFragment::finalizeImport()
fragments for all sheets that are needed before the cell formulas are fragments for all sheets that are needed before the cell formulas are
loaded. Additionally, the instances of the WorkbookGlobals structures loaded. Additionally, the instances of the WorkbookGlobals structures
have to be stored for every sheet. */ have to be stored for every sheet. */
typedef ::std::pair< WorksheetGlobalsRef, FragmentHandlerRef > SheetFragmentHandler;
typedef ::std::vector< SheetFragmentHandler > SheetFragmentVector;
SheetFragmentVector aSheetFragments; SheetFragmentVector aSheetFragments;
std::vector<WorksheetHelper*> maHelpers; std::vector<WorksheetHelper*> maHelpers;
WorksheetBuffer& rWorksheets = getWorksheets(); WorksheetBuffer& rWorksheets = getWorksheets();
@@ -315,11 +502,7 @@ void WorkbookFragment::finalizeImport()
} }
// load all worksheets // load all worksheets
for( SheetFragmentVector::iterator aIt = aSheetFragments.begin(), aEnd = aSheetFragments.end(); aIt != aEnd; ++aIt ) importSheetFragments(*this, aSheetFragments);
{
// import the sheet fragment
importOoxFragment( aIt->second );
}
for( std::vector<WorksheetHelper*>::iterator aIt = maHelpers.begin(), aEnd = maHelpers.end(); aIt != aEnd; ++aIt ) for( std::vector<WorksheetHelper*>::iterator aIt = maHelpers.begin(), aEnd = maHelpers.end(); aIt != aEnd; ++aIt )
{ {