tdf#125662: do parallel-zip in batches
In this approach the input stream is read one batch (of constant size) at a time and each batch is compressed by ThreadedDeflater. After we are done with a batch, the deflated buffer is processed straightaway (directed to file backed storage). Change-Id: I2d42f86cf5898e4d746836d94bf6009a8d3b0230 Reviewed-on: https://gerrit.libreoffice.org/c/core/+/86596 Tested-by: Jenkins Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
This commit is contained in:
@@ -21,37 +21,48 @@
|
||||
#define INCLUDED_PACKAGE_THREADEDDEFLATER_HXX
|
||||
|
||||
#include <com/sun/star/uno/Sequence.hxx>
|
||||
#include <com/sun/star/io/XInputStream.hpp>
|
||||
#include <com/sun/star/uno/Reference.hxx>
|
||||
#include <package/packagedllapi.hxx>
|
||||
#include <comphelper/threadpool.hxx>
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <functional>
|
||||
|
||||
namespace ZipUtils
|
||||
{
|
||||
/// Parallel compression a stream using the libz deflate algorithm.
|
||||
///
|
||||
/// Almost a replacement for the Deflater class. Call startDeflate() with the data,
|
||||
/// check with finished() or waitForTasks() and retrieve result with getOutput().
|
||||
/// The class will internally split into multiple threads.
|
||||
/// Call deflateWrite() with the input stream and input/output processing functions.
|
||||
/// This will use multiple threads for compression on each batch of data from the stream.
|
||||
class ThreadedDeflater final
|
||||
{
|
||||
class Task;
|
||||
// Note: All this should be lock-less. Each task writes only to its part
|
||||
// of the data, flags are atomic.
|
||||
// of the data.
|
||||
std::vector<std::vector<sal_Int8>> outBuffers;
|
||||
std::shared_ptr<comphelper::ThreadTaskTag> threadTaskTag;
|
||||
css::uno::Sequence<sal_Int8> inBuffer;
|
||||
css::uno::Sequence<sal_Int8> prevDataBlock;
|
||||
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> maProcessOutputFunc;
|
||||
sal_Int64 totalIn;
|
||||
sal_Int64 totalOut;
|
||||
int zlibLevel;
|
||||
std::atomic<int> pendingTasksCount;
|
||||
|
||||
public:
|
||||
// Unlike with Deflater class, bNoWrap is always true.
|
||||
ThreadedDeflater(sal_Int32 nSetLevel);
|
||||
~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE;
|
||||
void startDeflate(const css::uno::Sequence<sal_Int8>& rBuffer);
|
||||
void waitForTasks();
|
||||
bool finished() const;
|
||||
css::uno::Sequence<sal_Int8> getOutput() const;
|
||||
void deflateWrite(
|
||||
const css::uno::Reference<css::io::XInputStream>& xInStream,
|
||||
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
|
||||
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc);
|
||||
sal_Int64 getTotalIn() const { return totalIn; }
|
||||
sal_Int64 getTotalOut() const { return totalOut; }
|
||||
|
||||
private:
|
||||
void processDeflatedBuffers();
|
||||
void clear();
|
||||
};
|
||||
|
||||
|
@@ -44,14 +44,19 @@ class ThreadedDeflater::Task : public comphelper::ThreadTask
|
||||
ThreadedDeflater* deflater;
|
||||
int sequence;
|
||||
int blockSize;
|
||||
bool firstTask : 1;
|
||||
bool lastTask : 1;
|
||||
|
||||
public:
|
||||
Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_)
|
||||
Task(ThreadedDeflater* deflater_, int sequence_, int blockSize_, bool firstTask_,
|
||||
bool lastTask_)
|
||||
: comphelper::ThreadTask(deflater_->threadTaskTag)
|
||||
, stream()
|
||||
, deflater(deflater_)
|
||||
, sequence(sequence_)
|
||||
, blockSize(blockSize_)
|
||||
, firstTask(firstTask_)
|
||||
, lastTask(lastTask_)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -61,58 +66,83 @@ private:
|
||||
|
||||
ThreadedDeflater::ThreadedDeflater(sal_Int32 nSetLevel)
|
||||
: threadTaskTag(comphelper::ThreadPool::createThreadTaskTag())
|
||||
, totalIn(0)
|
||||
, totalOut(0)
|
||||
, zlibLevel(nSetLevel)
|
||||
, pendingTasksCount(0)
|
||||
{
|
||||
}
|
||||
|
||||
ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE
|
||||
{
|
||||
waitForTasks();
|
||||
clear();
|
||||
}
|
||||
ThreadedDeflater::~ThreadedDeflater() COVERITY_NOEXCEPT_FALSE { clear(); }
|
||||
|
||||
void ThreadedDeflater::startDeflate(const uno::Sequence<sal_Int8>& rBuffer)
|
||||
void ThreadedDeflater::deflateWrite(
|
||||
const css::uno::Reference<css::io::XInputStream>& xInStream,
|
||||
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessInputFunc,
|
||||
std::function<void(const css::uno::Sequence<sal_Int8>&, sal_Int32)> aProcessOutputFunc)
|
||||
{
|
||||
inBuffer = rBuffer;
|
||||
sal_Int64 size = inBuffer.getLength();
|
||||
int tasksCount = (size + MaxBlockSize - 1) / MaxBlockSize;
|
||||
tasksCount = std::max(tasksCount, 1);
|
||||
pendingTasksCount = tasksCount;
|
||||
outBuffers.resize(pendingTasksCount);
|
||||
for (int sequence = 0; sequence < tasksCount; ++sequence)
|
||||
sal_Int64 nThreadCount = comphelper::ThreadPool::getSharedOptimalPool().getWorkerCount();
|
||||
sal_Int64 batchSize = MaxBlockSize * nThreadCount;
|
||||
inBuffer.realloc(batchSize);
|
||||
prevDataBlock.realloc(MaxBlockSize);
|
||||
outBuffers.resize(nThreadCount);
|
||||
maProcessOutputFunc = aProcessOutputFunc;
|
||||
bool firstTask = true;
|
||||
|
||||
while (xInStream->available() > 0)
|
||||
{
|
||||
sal_Int64 thisSize = std::min(MaxBlockSize, size);
|
||||
size -= thisSize;
|
||||
comphelper::ThreadPool::getSharedOptimalPool().pushTask(
|
||||
std::make_unique<Task>(this, sequence, thisSize));
|
||||
sal_Int64 inputBytes = xInStream->readBytes(inBuffer, batchSize);
|
||||
aProcessInputFunc(inBuffer, inputBytes);
|
||||
totalIn += inputBytes;
|
||||
int sequence = 0;
|
||||
bool lastBatch = xInStream->available() <= 0;
|
||||
sal_Int64 bytesPending = inputBytes;
|
||||
while (bytesPending > 0)
|
||||
{
|
||||
sal_Int64 taskSize = std::min(MaxBlockSize, bytesPending);
|
||||
bytesPending -= taskSize;
|
||||
bool lastTask = lastBatch && !bytesPending;
|
||||
comphelper::ThreadPool::getSharedOptimalPool().pushTask(
|
||||
std::make_unique<Task>(this, sequence++, taskSize, firstTask, lastTask));
|
||||
|
||||
if (firstTask)
|
||||
firstTask = false;
|
||||
}
|
||||
|
||||
assert(bytesPending == 0);
|
||||
|
||||
comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
|
||||
|
||||
if (!lastBatch)
|
||||
{
|
||||
assert(inputBytes == batchSize);
|
||||
std::copy_n(inBuffer.begin() + (batchSize - MaxBlockSize), MaxBlockSize,
|
||||
prevDataBlock.begin());
|
||||
}
|
||||
|
||||
processDeflatedBuffers();
|
||||
}
|
||||
assert(size == 0);
|
||||
}
|
||||
|
||||
bool ThreadedDeflater::finished() const { return pendingTasksCount == 0; }
|
||||
|
||||
css::uno::Sequence<sal_Int8> ThreadedDeflater::getOutput() const
|
||||
void ThreadedDeflater::processDeflatedBuffers()
|
||||
{
|
||||
assert(finished());
|
||||
sal_Int64 totalSize = 0;
|
||||
sal_Int64 batchOutputSize = 0;
|
||||
for (const auto& buffer : outBuffers)
|
||||
totalSize += buffer.size();
|
||||
uno::Sequence<sal_Int8> outBuffer(totalSize);
|
||||
batchOutputSize += buffer.size();
|
||||
|
||||
css::uno::Sequence<sal_Int8> outBuffer(batchOutputSize);
|
||||
|
||||
auto pos = outBuffer.begin();
|
||||
for (const auto& buffer : outBuffers)
|
||||
for (auto& buffer : outBuffers)
|
||||
{
|
||||
pos = std::copy(buffer.begin(), buffer.end(), pos);
|
||||
return outBuffer;
|
||||
}
|
||||
buffer.clear();
|
||||
}
|
||||
|
||||
void ThreadedDeflater::waitForTasks()
|
||||
{
|
||||
comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(threadTaskTag);
|
||||
maProcessOutputFunc(outBuffer, batchOutputSize);
|
||||
totalOut += batchOutputSize;
|
||||
}
|
||||
|
||||
void ThreadedDeflater::clear()
|
||||
{
|
||||
assert(finished());
|
||||
inBuffer = uno::Sequence<sal_Int8>();
|
||||
outBuffers.clear();
|
||||
}
|
||||
@@ -147,27 +177,35 @@ void ThreadedDeflater::Task::doWork()
|
||||
// zlib doesn't handle const properly
|
||||
unsigned char* inBufferPtr = reinterpret_cast<unsigned char*>(
|
||||
const_cast<signed char*>(deflater->inBuffer.getConstArray()));
|
||||
if (sequence != 0)
|
||||
if (!firstTask)
|
||||
{
|
||||
// the window size is 32k, so set last 32k of previous data as the dictionary
|
||||
assert(MAX_WBITS == 15);
|
||||
assert(MaxBlockSize >= 32768);
|
||||
deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
|
||||
if (sequence > 0)
|
||||
{
|
||||
deflateSetDictionary(&stream, inBufferPtr + myInBufferStart - 32768, 32768);
|
||||
}
|
||||
else
|
||||
{
|
||||
unsigned char* prevBufferPtr = reinterpret_cast<unsigned char*>(
|
||||
const_cast<signed char*>(deflater->prevDataBlock.getConstArray()));
|
||||
deflateSetDictionary(&stream, prevBufferPtr + MaxBlockSize - 32768, 32768);
|
||||
}
|
||||
}
|
||||
stream.next_in = inBufferPtr + myInBufferStart;
|
||||
stream.avail_in = blockSize;
|
||||
stream.next_out = reinterpret_cast<unsigned char*>(deflater->outBuffers[sequence].data());
|
||||
stream.avail_out = outputMaxSize;
|
||||
bool last = sequence == int(deflater->outBuffers.size() - 1); // Last block?
|
||||
|
||||
// The trick is in using Z_SYNC_FLUSH instead of Z_NO_FLUSH. It will align the data at a byte boundary,
|
||||
// and since we use a raw stream, the data blocks then can be simply concatenated.
|
||||
int res = deflate(&stream, last ? Z_FINISH : Z_SYNC_FLUSH);
|
||||
int res = deflate(&stream, lastTask ? Z_FINISH : Z_SYNC_FLUSH);
|
||||
assert(stream.avail_in == 0); // Check that everything has been deflated.
|
||||
if (last ? res == Z_STREAM_END : res == Z_OK)
|
||||
if (lastTask ? res == Z_STREAM_END : res == Z_OK)
|
||||
{ // ok
|
||||
sal_Int64 outSize = outputMaxSize - stream.avail_out;
|
||||
deflater->outBuffers[sequence].resize(outSize);
|
||||
--deflater->pendingTasksCount;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@@ -363,28 +363,18 @@ ZipOutputEntryParallel::ZipOutputEntryParallel(
|
||||
|
||||
void ZipOutputEntryParallel::writeStream(const uno::Reference< io::XInputStream >& xInStream)
|
||||
{
|
||||
sal_Int64 toRead = xInStream->available();
|
||||
uno::Sequence< sal_Int8 > inBuffer( toRead );
|
||||
sal_Int64 read = xInStream->readBytes(inBuffer, toRead);
|
||||
if (read < toRead)
|
||||
inBuffer.realloc( read );
|
||||
while( xInStream->available() > 0 )
|
||||
{ // We didn't get the full size from available().
|
||||
uno::Sequence< sal_Int8 > buf( xInStream->available());
|
||||
read = xInStream->readBytes( buf, xInStream->available());
|
||||
sal_Int64 oldSize = inBuffer.getLength();
|
||||
inBuffer.realloc( oldSize + read );
|
||||
std::copy( buf.begin(), buf.end(), inBuffer.begin() + oldSize );
|
||||
}
|
||||
ZipUtils::ThreadedDeflater deflater( DEFAULT_COMPRESSION );
|
||||
totalIn = inBuffer.getLength();
|
||||
deflater.startDeflate( inBuffer );
|
||||
processInput( inBuffer );
|
||||
deflater.waitForTasks();
|
||||
uno::Sequence< sal_Int8 > outBuffer = deflater.getOutput();
|
||||
deflater.clear(); // release memory
|
||||
totalOut = outBuffer.getLength();
|
||||
processDeflated(outBuffer, outBuffer.getLength());
|
||||
deflater.deflateWrite(xInStream,
|
||||
[this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
|
||||
if (!m_bEncryptCurrentEntry)
|
||||
m_aCRC.updateSegment(rBuffer, nLen);
|
||||
},
|
||||
[this](const uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nLen) {
|
||||
processDeflated(rBuffer, nLen);
|
||||
}
|
||||
);
|
||||
totalIn = deflater.getTotalIn();
|
||||
totalOut = deflater.getTotalOut();
|
||||
closeEntry();
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user