tdf#125662: do parallel-zip in batches

In this approach the input stream is read one batch (of constant size)
at a time and each batch is compressed by ThreadedDeflater. After
we are done with a batch, the deflated buffer is processed straightaway
(directed to file backed storage).

Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230
Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
This commit is contained in:
Dennis Francis
2020-01-11 11:51:34 +05:30
parent 9dce33e694
commit 353d4528b8
3 changed files with 109 additions and 70 deletions

View File

@@ -21,37 +21,48 @@
#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
#include <com/sun/star/uno/Sequence.hxx>
#include <com/sun/star/io/XInputStream.hpp>
#include <com/sun/star/uno/Reference.hxx>
#include <package/packagedllapi.hxx>
#include <comphelper/threadpool.hxx>
#include <atomic>
#include <memory>
#include <vector>
#include <functional>
namespace ZipUtils
{
/// Parallel compression a stream using the libz deflate algorithm.
///
/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
/// check with finished() or waitForTasks() and retrieve result with getOutput().
/// The class will internally split into multiple threads.
/// Call deflateWrite() with the input stream and input/output processing functions.
/// This will use multiple threads for compression on each batch of data from the stream.
class ThreadedDeflater final
{
class Task;
// Note: All this should be lock-less. Each task writes only to its part
// of the data, flags are atomic.
// of the data.
std::vector<std::vector<sal_Int8>> outBuffers;
std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
css::uno::Sequence<sal_Int8> inBuffer;
css::uno::Sequence<sal_Int8> prevDataBlock;
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> maProcessOutputFunc;
sal_Int64 totalIn;
sal_Int64 totalOut;
int zlibLevel;
std::atomic<int> pendingTasksCount;
public:
// Unlike with Deflater class, bNoWrap is always true.
ThreadedDeflater(sal_Int32 nSetLevel);
~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE;
void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
void waitForTasks();
bool finished() const;
css::uno::Sequence<sal_Int8> getOutput() const;
void deflateWrite(
const css::uno::Reference<css::io::XInputStream>& xInStream,
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc);
sal_Int64 getTotalIn() const { return totalIn; }
sal_Int64 getTotalOut() const { return totalOut; }
private:
void processDeflatedBuffers();
void clear();
};

View File

@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
ThreadedDeflater* deflater;
int sequence;
int blockSize;
bool firstTask : 1;
bool lastTask : 1;
public:
Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
bool lastTask_)
: comphelper::ThreadTask(deflater_->threadTaskTag)
, stream()
, deflater(deflater_)
, sequence(sequence_)
, blockSize(blockSize_)
, firstTask(firstTask_)
, lastTask(lastTask_)
{
}
@@ -61,58 +66,83 @@ private:
ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
: threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
, totalIn(0)
, totalOut(0)
, zlibLevel(nSetLevel)
, pendingTasksCount(0)
{
}
ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
{
waitForTasks();
clear();
}
ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }
void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
void ThreadedDeflater::deflateWrite(
const css::uno::Reference<css::io::XInputStream>& xInStream,
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
{
inBuffer = rBuffer;
sal_Int64 size = inBuffer.getLength();
int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
tasksCount = std::max(tasksCount, 1);
pendingTasksCount = tasksCount;
outBuffers.resize(pendingTasksCount);
for (int sequence = 0; sequence < tasksCount; ++sequence)
sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
sal_Int64 batchSize = MaxBlockSize * nThreadCount;
inBuffer.realloc(batchSize);
prevDataBlock.realloc(MaxBlockSize);
outBuffers.resize(nThreadCount);
maProcessOutputFunc = aProcessOutputFunc;
bool firstTask = true;
while (xInStream->available() > 0)
{
sal_Int64 thisSize = std::min(MaxBlockSize, size);
size -= thisSize;
comphelper::ThreadPool::getSharedOptimalPool().pushTask(
std::make_unique<Task>(this, sequence, thisSize));
sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
aProcessInputFunc(inBuffer, inputBytes);
totalIn += inputBytes;
int sequence = 0;
bool lastBatch = xInStream->available() <= 0;
sal_Int64 bytesPending = inputBytes;
while (bytesPending > 0)
{
sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
bytesPending -= taskSize;
bool lastTask = lastBatch && !bytesPending;
comphelper::ThreadPool::getSharedOptimalPool().pushTask(
std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));
if (firstTask)
firstTask = false;
}
assert(bytesPending == 0);
comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
if (!lastBatch)
{
assert(inputBytes == batchSize);
std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
prevDataBlock.begin());
}
processDeflatedBuffers();
}
assert(size == 0);
}
bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
void ThreadedDeflater::processDeflatedBuffers()
{
assert(finished());
sal_Int64 totalSize = 0;
sal_Int64 batchOutputSize = 0;
for (const auto& buffer : outBuffers)
totalSize += buffer.size();
uno::Sequence<sal_Int8> outBuffer(totalSize);
batchOutputSize += buffer.size();
css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);
auto pos = outBuffer.begin();
for (const auto& buffer : outBuffers)
for (auto& buffer : outBuffers)
{
pos = std::copy(buffer.begin(), buffer.end(), pos);
return outBuffer;
}
buffer.clear();
}
void ThreadedDeflater::waitForTasks()
{
comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
maProcessOutputFunc(outBuffer, batchOutputSize);
totalOut += batchOutputSize;
}
void ThreadedDeflater::clear()
{
assert(finished());
inBuffer = uno::Sequence<sal_Int8>();
outBuffers.clear();
}
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
// zlib doesn't handle const properly
unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
const_cast<signed char*>(deflater->inBuffer.getConstArray()));
if (sequence != 0)
if (!firstTask)
{
// the window size is 32k, so set last 32k of previous data as the dictionary
assert(MAX_WBITS == 15);
assert(MaxBlockSize >= 32768);
deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
if (sequence > 0)
{
deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
}
else
{
unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
}
}
stream.next_in = inBufferPtr + myInBufferStart;
stream.avail_in = blockSize;
stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
stream.avail_out = outputMaxSize;
bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
// The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
// and since we use a raw stream, the data blocks then can be simply concatenated.
int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
assert(stream.avail_in == 0); // Check that everything has been deflated.
if (last ? res == Z_STREAM_END : res == Z_OK)
if (lastTask ? res == Z_STREAM_END : res == Z_OK)
{ // ok
sal_Int64 outSize = outputMaxSize - stream.avail_out;
deflater->outBuffers[sequence].resize(outSize);
--deflater->pendingTasksCount;
}
else
{

View File

@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(
void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
{
sal_Int64 toRead = xInStream->available();
uno::Sequence< sal_Int8 > inBuffer( toRead );
sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
if (read < toRead)
inBuffer.realloc( read );
while( xInStream->available() > 0 )
{ // We didn't get the full size from available().
uno::Sequence< sal_Int8 > buf( xInStream->available());
read = xInStream->readBytes( buf, xInStream->available());
sal_Int64 oldSize = inBuffer.getLength();
inBuffer.realloc( oldSize + read );
std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
}
ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
totalIn = inBuffer.getLength();
deflater.startDeflate( inBuffer );
processInput( inBuffer );
deflater.waitForTasks();
uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
deflater.clear(); // release memory
totalOut = outBuffer.getLength();
processDeflated(outBuffer, outBuffer.getLength());
deflater.deflateWrite(xInStream,
[this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
if (!m_bEncryptCurrentEntry)
m_aCRC.updateSegment(rBuffer, nLen);
},
[this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
processDeflated(rBuffer, nLen);
}
);
totalIn = deflater.getTotalIn();
totalOut = deflater.getTotalOut();
closeEntry();
}