package: Finally implement parallel zip entries deflating

For that:
1, create ZipPackageStream::successfullyWritten to be called after
the content is written
2, Do not take mutex when reading from WrapStreamForShare - threads should
be using different streams anyway, but there is only one common mutex. :-/

Change-Id: I90303e49206b19454dd4141e24cc8be29c433045
This commit is contained in:
Matúš Kukan
2014-10-21 15:17:13 +02:00
parent db5552631b
commit fbf714b456
7 changed files with 122 additions and 73 deletions

View File

@@ -54,6 +54,9 @@ public:
~ZipOutputEntry(); ~ZipOutputEntry();
css::uno::Sequence< sal_Int8 > getData(); css::uno::Sequence< sal_Int8 > getData();
ZipEntry* getZipEntry() { return m_pCurrentEntry; }
ZipPackageStream* getZipPackageStream() { return m_pCurrentStream; }
bool isEncrypt() { return m_bEncryptCurrentEntry; }
void closeEntry(); void closeEntry();
void write(const css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength); void write(const css::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength);

View File

@@ -23,10 +23,12 @@
#include <com/sun/star/io/XOutputStream.hpp> #include <com/sun/star/io/XOutputStream.hpp>
#include <ByteChucker.hxx> #include <ByteChucker.hxx>
#include <comphelper/threadpool.hxx>
#include <vector> #include <vector>
struct ZipEntry; struct ZipEntry;
class ZipOutputEntry;
class ZipPackageStream; class ZipPackageStream;
class ZipOutputStream class ZipOutputStream
@@ -35,14 +37,17 @@ class ZipOutputStream
::std::vector < ZipEntry * > m_aZipList; ::std::vector < ZipEntry * > m_aZipList;
ByteChucker m_aChucker; ByteChucker m_aChucker;
bool m_bFinished;
ZipEntry *m_pCurrentEntry; ZipEntry *m_pCurrentEntry;
comphelper::ThreadPool &m_rSharedThreadPool;
std::vector< ZipOutputEntry* > m_aEntries;
public: public:
ZipOutputStream( ZipOutputStream(
const ::com::sun::star::uno::Reference< ::com::sun::star::io::XOutputStream > &xOStream ); const ::com::sun::star::uno::Reference< ::com::sun::star::io::XOutputStream > &xOStream );
~ZipOutputStream(); ~ZipOutputStream();
void addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThreadTask );
void writeLOC( ZipEntry *pEntry, bool bEncrypt = false ) void writeLOC( ZipEntry *pEntry, bool bEncrypt = false )
throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException); throw(::com::sun::star::io::IOException, ::com::sun::star::uno::RuntimeException);
void rawWrite( ::com::sun::star::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength ) void rawWrite( ::com::sun::star::uno::Sequence< sal_Int8 >& rBuffer, sal_Int32 nNewOffset, sal_Int32 nNewLength )

View File

@@ -63,14 +63,13 @@ private:
sal_uInt8 m_nStreamMode; sal_uInt8 m_nStreamMode;
sal_uInt32 m_nMagicalHackPos; sal_uInt32 m_nMagicalHackPos;
sal_uInt32 m_nMagicalHackSize; sal_uInt32 m_nMagicalHackSize;
sal_Int64 m_nOwnStreamOrigSize;
bool m_bHasSeekable; bool m_bHasSeekable;
bool m_bCompressedIsSetFromOutside; bool m_bCompressedIsSetFromOutside;
bool m_bFromManifest; bool m_bFromManifest;
bool m_bUseWinEncoding; bool m_bUseWinEncoding;
bool m_bRawStream;
::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > GetOwnSeekStream(); ::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > GetOwnSeekStream();
@@ -138,6 +137,7 @@ public:
void setZipEntryOnLoading( const ZipEntry &rInEntry); void setZipEntryOnLoading( const ZipEntry &rInEntry);
::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > SAL_CALL getRawData() ::com::sun::star::uno::Reference< ::com::sun::star::io::XInputStream > SAL_CALL getRawData()
throw(::com::sun::star::uno::RuntimeException); throw(::com::sun::star::uno::RuntimeException);
void successfullyWritten( ZipEntry *pEntry );
static ::com::sun::star::uno::Sequence < sal_Int8 > static_getImplementationId(); static ::com::sun::star::uno::Sequence < sal_Int8 > static_getImplementationId();

View File

@@ -47,14 +47,13 @@ ZipOutputEntry::ZipOutputEntry( const uno::Reference< uno::XComponentContext >&
, m_pCurrentEntry(&rEntry) , m_pCurrentEntry(&rEntry)
, m_nDigested(0) , m_nDigested(0)
, m_bEncryptCurrentEntry(bEncrypt) , m_bEncryptCurrentEntry(bEncrypt)
, m_pCurrentStream(NULL) , m_pCurrentStream(pStream)
{ {
assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries"); assert(m_pCurrentEntry->nMethod == DEFLATED && "Use ZipPackageStream::rawWrite() for STORED entries");
if (m_bEncryptCurrentEntry) if (m_bEncryptCurrentEntry)
{ {
m_xCipherContext = ZipFile::StaticGetCipher( rxContext, pStream->GetEncryptionData(), true ); m_xCipherContext = ZipFile::StaticGetCipher( rxContext, pStream->GetEncryptionData(), true );
m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( rxContext, pStream->GetEncryptionData() ); m_xDigestContext = ZipFile::StaticGetDigestContextForChecksum( rxContext, pStream->GetEncryptionData() );
m_pCurrentStream = pStream;
} }
} }

View File

@@ -27,6 +27,7 @@
#include <PackageConstants.hxx> #include <PackageConstants.hxx>
#include <ZipEntry.hxx> #include <ZipEntry.hxx>
#include <ZipOutputEntry.hxx>
#include <ZipPackageStream.hxx> #include <ZipPackageStream.hxx>
using namespace com::sun::star; using namespace com::sun::star;
@@ -39,15 +40,13 @@ using namespace com::sun::star::packages::zip::ZipConstants;
ZipOutputStream::ZipOutputStream( const uno::Reference < io::XOutputStream > &xOStream ) ZipOutputStream::ZipOutputStream( const uno::Reference < io::XOutputStream > &xOStream )
: m_xStream(xOStream) : m_xStream(xOStream)
, m_aChucker(xOStream) , m_aChucker(xOStream)
, m_bFinished(false)
, m_pCurrentEntry(NULL) , m_pCurrentEntry(NULL)
, m_rSharedThreadPool(comphelper::ThreadPool::getSharedOptimalPool())
{ {
} }
ZipOutputStream::~ZipOutputStream( void ) ZipOutputStream::~ZipOutputStream( void )
{ {
for (sal_Int32 i = 0, nEnd = m_aZipList.size(); i < nEnd; i++)
delete m_aZipList[i];
} }
void ZipOutputStream::setEntry( ZipEntry *pEntry ) void ZipOutputStream::setEntry( ZipEntry *pEntry )
@@ -66,6 +65,12 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
} }
} }
void ZipOutputStream::addDeflatingThread( ZipOutputEntry *pEntry, comphelper::ThreadTask *pThread )
{
m_rSharedThreadPool.pushTask(pThread);
m_aEntries.push_back(pEntry);
}
void ZipOutputStream::rawWrite( Sequence< sal_Int8 >& rBuffer, sal_Int32 /*nNewOffset*/, sal_Int32 nNewLength ) void ZipOutputStream::rawWrite( Sequence< sal_Int8 >& rBuffer, sal_Int32 /*nNewOffset*/, sal_Int32 nNewLength )
throw(IOException, RuntimeException) throw(IOException, RuntimeException)
{ {
@@ -85,21 +90,33 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = NULL; m_pCurrentEntry = NULL;
} }
void ZipOutputStream::finish( ) void ZipOutputStream::finish()
throw(IOException, RuntimeException) throw(IOException, RuntimeException)
{ {
if (m_bFinished) assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
return;
if (m_aZipList.size() < 1) // Wait for all threads to finish & write
OSL_FAIL("Zip file must have at least one entry!\n"); m_rSharedThreadPool.waitUntilEmpty();
for (size_t i = 0; i < m_aEntries.size(); i++)
{
writeLOC(m_aEntries[i]->getZipEntry(), m_aEntries[i]->isEncrypt());
uno::Sequence< sal_Int8 > aCompressedData = m_aEntries[i]->getData();
rawWrite(aCompressedData, 0, aCompressedData.getLength());
rawCloseEntry(m_aEntries[i]->isEncrypt());
m_aEntries[i]->getZipPackageStream()->successfullyWritten(m_aEntries[i]->getZipEntry());
delete m_aEntries[i];
}
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition()); sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
for (sal_Int32 i =0, nEnd = m_aZipList.size(); i < nEnd; i++) for (size_t i = 0; i < m_aZipList.size(); i++)
{
writeCEN( *m_aZipList[i] ); writeCEN( *m_aZipList[i] );
delete m_aZipList[i];
}
writeEND( nOffset, static_cast < sal_Int32 > (m_aChucker.GetPosition()) - nOffset); writeEND( nOffset, static_cast < sal_Int32 > (m_aChucker.GetPosition()) - nOffset);
m_bFinished = true;
m_xStream->flush(); m_xStream->flush();
m_aZipList.clear();
} }
void ZipOutputStream::writeEND(sal_uInt32 nOffset, sal_uInt32 nLength) void ZipOutputStream::writeEND(sal_uInt32 nOffset, sal_uInt32 nLength)

View File

@@ -90,10 +90,12 @@ ZipPackageStream::ZipPackageStream ( ZipPackage & rNewPackage,
, m_nStreamMode( PACKAGE_STREAM_NOTSET ) , m_nStreamMode( PACKAGE_STREAM_NOTSET )
, m_nMagicalHackPos( 0 ) , m_nMagicalHackPos( 0 )
, m_nMagicalHackSize( 0 ) , m_nMagicalHackSize( 0 )
, m_nOwnStreamOrigSize( 0 )
, m_bHasSeekable( false ) , m_bHasSeekable( false )
, m_bCompressedIsSetFromOutside( false ) , m_bCompressedIsSetFromOutside( false )
, m_bFromManifest( false ) , m_bFromManifest( false )
, m_bUseWinEncoding( false ) , m_bUseWinEncoding( false )
, m_bRawStream( false )
{ {
m_xContext = xContext; m_xContext = xContext;
m_nFormat = nFormat; m_nFormat = nFormat;
@@ -437,6 +439,35 @@ bool ZipPackageStream::ParsePackageRawStream()
return true; return true;
} }
class DeflateThread: public comphelper::ThreadTask
{
ZipOutputEntry *mpEntry;
uno::Reference< io::XInputStream > mxInStream;
public:
DeflateThread( ZipOutputEntry *pEntry,
const uno::Reference< io::XInputStream >& xInStream )
: mpEntry(pEntry)
, mxInStream(xInStream)
{}
private:
virtual void doWork() SAL_OVERRIDE
{
sal_Int32 nLength = 0;
uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
do
{
nLength = mxInStream->readBytes(aSeq, n_ConstBufferSize);
mpEntry->write(aSeq, 0, nLength);
}
while (nLength == n_ConstBufferSize);
mpEntry->closeEntry();
mxInStream.clear();
}
};
static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> & rStream ) static void ImplSetStoredData( ZipEntry & rEntry, uno::Reference< io::XInputStream> & rStream )
{ {
// It's very annoying that we have to do this, but lots of zip packages // It's very annoying that we have to do this, but lots of zip packages
@@ -497,20 +528,21 @@ bool ZipPackageStream::saveChild(
OSL_ENSURE( m_nStreamMode != PACKAGE_STREAM_NOTSET, "Unacceptable ZipPackageStream mode!" ); OSL_ENSURE( m_nStreamMode != PACKAGE_STREAM_NOTSET, "Unacceptable ZipPackageStream mode!" );
bool bRawStream = false; m_bRawStream = false;
if ( m_nStreamMode == PACKAGE_STREAM_DETECT ) if ( m_nStreamMode == PACKAGE_STREAM_DETECT )
bRawStream = ParsePackageRawStream(); m_bRawStream = ParsePackageRawStream();
else if ( m_nStreamMode == PACKAGE_STREAM_RAW ) else if ( m_nStreamMode == PACKAGE_STREAM_RAW )
bRawStream = true; m_bRawStream = true;
bool bParallelDeflate = false;
bool bTransportOwnEncrStreamAsRaw = false; bool bTransportOwnEncrStreamAsRaw = false;
// During the storing the original size of the stream can be changed // During the storing the original size of the stream can be changed
// TODO/LATER: get rid of this hack // TODO/LATER: get rid of this hack
sal_Int64 nOwnStreamOrigSize = bRawStream ? m_nMagicalHackSize : aEntry.nSize; m_nOwnStreamOrigSize = m_bRawStream ? m_nMagicalHackSize : aEntry.nSize;
bool bUseNonSeekableAccess = false; bool bUseNonSeekableAccess = false;
uno::Reference < io::XInputStream > xStream; uno::Reference < io::XInputStream > xStream;
if ( !IsPackageMember() && !bRawStream && !bToBeEncrypted && bToBeCompressed ) if ( !IsPackageMember() && !m_bRawStream && !bToBeEncrypted && bToBeCompressed )
{ {
// the stream is not a package member, not a raw stream, // the stream is not a package member, not a raw stream,
// it should not be encrypted and it should be compressed, // it should not be encrypted and it should be compressed,
@@ -540,11 +572,11 @@ bool ZipPackageStream::saveChild(
{ {
// If the stream is a raw one, then we should be positioned // If the stream is a raw one, then we should be positioned
// at the beginning of the actual data // at the beginning of the actual data
if ( !bToBeCompressed || bRawStream ) if ( !bToBeCompressed || m_bRawStream )
{ {
// The raw stream can neither be encrypted nor connected // The raw stream can neither be encrypted nor connected
OSL_ENSURE( !bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" ); OSL_ENSURE( !m_bRawStream || !(bToBeCompressed || bToBeEncrypted), "The stream is already encrypted!\n" );
xSeek->seek ( bRawStream ? m_nMagicalHackPos : 0 ); xSeek->seek ( m_bRawStream ? m_nMagicalHackPos : 0 );
ImplSetStoredData ( *pTempEntry, xStream ); ImplSetStoredData ( *pTempEntry, xStream );
// TODO/LATER: Get rid of hacks related to switching of Flag Method and Size properties! // TODO/LATER: Get rid of hacks related to switching of Flag Method and Size properties!
@@ -553,7 +585,7 @@ bool ZipPackageStream::saveChild(
{ {
// this is the correct original size // this is the correct original size
pTempEntry->nSize = xSeek->getLength(); pTempEntry->nSize = xSeek->getLength();
nOwnStreamOrigSize = pTempEntry->nSize; m_nOwnStreamOrigSize = pTempEntry->nSize;
} }
xSeek->seek ( 0 ); xSeek->seek ( 0 );
@@ -592,7 +624,7 @@ bool ZipPackageStream::saveChild(
return bSuccess; return bSuccess;
} }
if ( bToBeEncrypted || bRawStream || bTransportOwnEncrStreamAsRaw ) if ( bToBeEncrypted || m_bRawStream || bTransportOwnEncrStreamAsRaw )
{ {
if ( bToBeEncrypted && !bTransportOwnEncrStreamAsRaw ) if ( bToBeEncrypted && !bTransportOwnEncrStreamAsRaw )
{ {
@@ -624,11 +656,11 @@ bool ZipPackageStream::saveChild(
aPropSet[PKG_MNFST_ITERATION].Value <<= m_xBaseEncryptionData->m_nIterationCount; aPropSet[PKG_MNFST_ITERATION].Value <<= m_xBaseEncryptionData->m_nIterationCount;
// Need to store the uncompressed size in the manifest // Need to store the uncompressed size in the manifest
OSL_ENSURE( nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" ); OSL_ENSURE( m_nOwnStreamOrigSize >= 0, "The stream size was not correctly initialized!\n" );
aPropSet[PKG_MNFST_UCOMPSIZE].Name = sSizeProperty; aPropSet[PKG_MNFST_UCOMPSIZE].Name = sSizeProperty;
aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= nOwnStreamOrigSize; aPropSet[PKG_MNFST_UCOMPSIZE].Value <<= m_nOwnStreamOrigSize;
if ( bRawStream || bTransportOwnEncrStreamAsRaw ) if ( m_bRawStream || bTransportOwnEncrStreamAsRaw )
{ {
::rtl::Reference< EncryptionData > xEncData = GetEncryptionData(); ::rtl::Reference< EncryptionData > xEncData = GetEncryptionData();
if ( !xEncData.is() ) if ( !xEncData.is() )
@@ -651,7 +683,7 @@ bool ZipPackageStream::saveChild(
// If the entry is already stored in the zip file in the format we // If the entry is already stored in the zip file in the format we
// want for this write...copy it raw // want for this write...copy it raw
if ( !bUseNonSeekableAccess if ( !bUseNonSeekableAccess
&& ( bRawStream || bTransportOwnEncrStreamAsRaw && ( m_bRawStream || bTransportOwnEncrStreamAsRaw
|| ( IsPackageMember() && !bToBeEncrypted || ( IsPackageMember() && !bToBeEncrypted
&& ( ( aEntry.nMethod == DEFLATED && bToBeCompressed ) && ( ( aEntry.nMethod == DEFLATED && bToBeCompressed )
|| ( aEntry.nMethod == STORED && !bToBeCompressed ) ) ) ) ) || ( aEntry.nMethod == STORED && !bToBeCompressed ) ) ) ) )
@@ -671,7 +703,7 @@ bool ZipPackageStream::saveChild(
try try
{ {
if ( bRawStream ) if ( m_bRawStream )
xStream->skipBytes( m_nMagicalHackPos ); xStream->skipBytes( m_nMagicalHackPos );
ZipOutputStream::setEntry(pTempEntry); ZipOutputStream::setEntry(pTempEntry);
@@ -733,35 +765,29 @@ bool ZipPackageStream::saveChild(
try try
{ {
ZipOutputStream::setEntry(pTempEntry); ZipOutputStream::setEntry(pTempEntry);
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
// the entry is provided to the ZipOutputStream that will delete it // the entry is provided to the ZipOutputStream that will delete it
pAutoTempEntry.release(); pAutoTempEntry.release();
sal_Int32 nLength;
uno::Sequence < sal_Int8 > aSeq (n_ConstBufferSize);
if (pTempEntry->nMethod == STORED) if (pTempEntry->nMethod == STORED)
{ {
sal_Int32 nLength;
uno::Sequence< sal_Int8 > aSeq(n_ConstBufferSize);
rZipOut.writeLOC(pTempEntry, bToBeEncrypted);
do do
{ {
nLength = xStream->readBytes(aSeq, n_ConstBufferSize); nLength = xStream->readBytes(aSeq, n_ConstBufferSize);
rZipOut.rawWrite(aSeq, 0, nLength); rZipOut.rawWrite(aSeq, 0, nLength);
} }
while ( nLength == n_ConstBufferSize ); while ( nLength == n_ConstBufferSize );
rZipOut.rawCloseEntry(bToBeEncrypted);
} }
else else
{ {
ZipOutputEntry aZipEntry(m_xContext, *pTempEntry, this, bToBeEncrypted); bParallelDeflate = true;
do // Start a new thread deflating this zip entry
{ ZipOutputEntry *pZipEntry = new ZipOutputEntry(m_xContext, *pTempEntry, this, bToBeEncrypted);
nLength = xStream->readBytes(aSeq, n_ConstBufferSize); rZipOut.addDeflatingThread( pZipEntry, new DeflateThread(pZipEntry, xStream) );
aZipEntry.write(aSeq, 0, nLength);
}
while ( nLength == n_ConstBufferSize );
aZipEntry.closeEntry();
uno::Sequence< sal_Int8 > aCompressedData = aZipEntry.getData();
rZipOut.rawWrite(aCompressedData, 0, aCompressedData.getLength());
} }
rZipOut.rawCloseEntry(bToBeEncrypted);
} }
catch ( ZipException& ) catch ( ZipException& )
{ {
@@ -793,30 +819,8 @@ bool ZipPackageStream::saveChild(
} }
} }
if( bSuccess ) if (bSuccess && !bParallelDeflate)
{ successfullyWritten(pTempEntry);
if ( !IsPackageMember() )
{
CloseOwnStreamIfAny();
SetPackageMember ( true );
}
if ( bRawStream )
{
// the raw stream was integrated and now behaves
// as usual encrypted stream
SetToBeEncrypted( true );
}
// Then copy it back afterwards...
ZipPackageFolder::copyZipEntry ( aEntry, *pTempEntry );
// TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving )
if ( IsEncrypted() )
setSize( nOwnStreamOrigSize );
aEntry.nOffset *= -1;
}
if ( aPropSet.getLength() if ( aPropSet.getLength()
&& ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) ) && ( m_nFormat == embed::StorageFormats::PACKAGE || m_nFormat == embed::StorageFormats::OFOPXML ) )
@@ -825,6 +829,31 @@ bool ZipPackageStream::saveChild(
return bSuccess; return bSuccess;
} }
void ZipPackageStream::successfullyWritten( ZipEntry *pEntry )
{
if ( !IsPackageMember() )
{
CloseOwnStreamIfAny();
SetPackageMember ( true );
}
if ( m_bRawStream )
{
// the raw stream was integrated and now behaves
// as usual encrypted stream
SetToBeEncrypted( true );
}
// Then copy it back afterwards...
ZipPackageFolder::copyZipEntry( aEntry, *pEntry );
// TODO/LATER: get rid of this hack ( the encrypted stream size property is changed during saving )
if ( IsEncrypted() )
setSize( m_nOwnStreamOrigSize );
aEntry.nOffset *= -1;
}
void ZipPackageStream::SetPackageMember( bool bNewValue ) void ZipPackageStream::SetPackageMember( bool bNewValue )
{ {
if ( bNewValue ) if ( bNewValue )

View File

@@ -54,8 +54,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readBytes( uno::Sequence< sal_Int8 >& aDa
io::IOException, io::IOException,
uno::RuntimeException, std::exception ) uno::RuntimeException, std::exception )
{ {
::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() );
if ( !m_xInStream.is() ) if ( !m_xInStream.is() )
throw io::IOException(THROW_WHERE ); throw io::IOException(THROW_WHERE );
@@ -73,8 +71,6 @@ sal_Int32 SAL_CALL WrapStreamForShare::readSomeBytes( uno::Sequence< sal_Int8 >&
io::IOException, io::IOException,
uno::RuntimeException, std::exception ) uno::RuntimeException, std::exception )
{ {
::osl::MutexGuard aGuard( m_rMutexRef->GetMutex() );
if ( !m_xInStream.is() ) if ( !m_xInStream.is() )
throw io::IOException(THROW_WHERE ); throw io::IOException(THROW_WHERE );