Kaydet (Commit) aeb2014b authored tarafından Luboš Luňák's avatar Luboš Luňák

remove code confusion about threads vs thread tasks

A threadpool controls a number of threads that execute a number
of thread *tasks* from a queue. The API even says they are tasks.
So it's damn confusing when ZipPackageStream::saveChild()
claims to limit the number of threads to 4x the maximum number
of threads. It limits the number of queued thread tasks.

Change-Id: I317497f27a82d92a7c8566b14aaeae73a4ffef1f
Reviewed-on: https://gerrit.libreoffice.org/72677
Tested-by: Jenkins
Reviewed-by: 's avatarLuboš Luňák <l.lunak@collabora.com>
üst 86618248
......@@ -36,8 +36,8 @@ class ZipPackageStream;
class ZipOutputEntry
{
// allow only DeflateThread to change m_bFinished using setFinished()
friend class DeflateThread;
// allow only DeflateThreadTask to change m_bFinished using setFinished()
friend class DeflateThreadTask;
css::uno::Sequence< sal_Int8 > m_aDeflateBuffer;
ZipUtils::Deflater m_aDeflater;
......
......@@ -47,7 +47,7 @@ public:
const css::uno::Reference< css::io::XOutputStream > &xOStream );
~ZipOutputStream();
void addDeflatingThread( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
void addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThreadTask );
/// @throws css::io::IOException
/// @throws css::uno::RuntimeException
......@@ -79,12 +79,12 @@ private:
void writeEXT( const ZipEntry &rEntry );
// ScheduledThread handling helpers
void consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry> pCandidate);
void consumeFinishedScheduledThreadEntries();
void consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate);
void consumeFinishedScheduledThreadTaskEntries();
public:
void reduceScheduledThreadsToGivenNumberOrLess(
sal_Int32 nThreads);
void reduceScheduledThreadTasksToGivenNumberOrLess(
sal_Int32 nThreadTasks);
const std::shared_ptr<comphelper::ThreadTaskTag>& getThreadTaskTag() { return mpThreadTaskTag; }
};
......
......@@ -68,9 +68,9 @@ void ZipOutputStream::setEntry( ZipEntry *pEntry )
}
}
void ZipOutputStream::addDeflatingThread( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pThread )
void ZipOutputStream::addDeflatingThreadTask( ZipOutputEntry *pEntry, std::unique_ptr<comphelper::ThreadTask> pTask )
{
comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pThread));
comphelper::ThreadPool::getSharedOptimalPool().pushTask(std::move(pTask));
m_aEntries.push_back(pEntry);
}
......@@ -91,14 +91,14 @@ void ZipOutputStream::rawCloseEntry( bool bEncrypt )
m_pCurrentEntry = nullptr;
}
void ZipOutputStream::consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry> pCandidate)
void ZipOutputStream::consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry> pCandidate)
{
//Any exceptions thrown in the threads were caught and stored for now
const std::exception_ptr& rCaughtException(pCandidate->getParallelDeflateException());
if (rCaughtException)
{
m_aDeflateException = rCaughtException; // store it for later throwing
// the exception handler in DeflateThread should have cleaned temp file
// the exception handler in DeflateThreadTask should have cleaned temp file
return;
}
......@@ -124,7 +124,7 @@ void ZipOutputStream::consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry
pCandidate->deleteBufferFile();
}
void ZipOutputStream::consumeFinishedScheduledThreadEntries()
void ZipOutputStream::consumeFinishedScheduledThreadTaskEntries()
{
std::vector< ZipOutputEntry* > aNonFinishedEntries;
......@@ -132,7 +132,7 @@ void ZipOutputStream::consumeFinishedScheduledThreadEntries()
{
if(pEntry->isFinished())
{
consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry>(pEntry));
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pEntry));
}
else
{
......@@ -144,13 +144,13 @@ void ZipOutputStream::consumeFinishedScheduledThreadEntries()
m_aEntries = aNonFinishedEntries;
}
void ZipOutputStream::reduceScheduledThreadsToGivenNumberOrLess(sal_Int32 nThreads)
void ZipOutputStream::reduceScheduledThreadTasksToGivenNumberOrLess(sal_Int32 nThreadTasks)
{
while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
while(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks)
{
consumeFinishedScheduledThreadEntries();
consumeFinishedScheduledThreadTaskEntries();
if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreads)
if(static_cast< sal_Int32 >(m_aEntries.size()) > nThreadTasks)
{
osl::Thread::wait(std::chrono::microseconds(100));
}
......@@ -161,7 +161,7 @@ void ZipOutputStream::finish()
{
assert(!m_aZipList.empty() && "Zip file must have at least one entry!");
// Wait for all threads to finish & write
// Wait for all thread tasks to finish & write
comphelper::ThreadPool::getSharedOptimalPool().waitUntilDone(mpThreadTaskTag);
// consume all processed entries
......@@ -169,7 +169,7 @@ void ZipOutputStream::finish()
{
ZipOutputEntry* pCandidate = m_aEntries.back();
m_aEntries.pop_back();
consumeScheduledThreadEntry(std::unique_ptr<ZipOutputEntry>(pCandidate));
consumeScheduledThreadTaskEntry(std::unique_ptr<ZipOutputEntry>(pCandidate));
}
sal_Int32 nOffset= static_cast < sal_Int32 > (m_aChucker.GetPosition());
......@@ -183,7 +183,7 @@ void ZipOutputStream::finish()
m_aZipList.clear();
if (m_aDeflateException)
{ // throw once all threads are finished and m_aEntries can be released
{ // throw once all thread tasks are finished and m_aEntries can be released
std::rethrow_exception(m_aDeflateException);
}
}
......
......@@ -448,14 +448,14 @@ static void deflateZipEntry(ZipOutputEntry *pZipEntry,
pZipEntry->closeEntry();
}
class DeflateThread: public comphelper::ThreadTask
class DeflateThreadTask: public comphelper::ThreadTask
{
ZipOutputEntry *mpEntry;
uno::Reference< io::XInputStream > mxInStream;
public:
DeflateThread( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry,
const uno::Reference< io::XInputStream >& xInStream )
DeflateThreadTask( const std::shared_ptr<comphelper::ThreadTaskTag>& pTag, ZipOutputEntry *pEntry,
const uno::Reference< io::XInputStream >& xInStream )
: comphelper::ThreadTask(pTag)
, mpEntry(pEntry)
, mxInStream(xInStream)
......@@ -826,17 +826,19 @@ bool ZipPackageStream::saveChild(
if (bParallelDeflate)
{
// tdf#93553 limit to a useful amount of threads. Taking number of available
// tdf#93553 limit to a useful amount of pending tasks. Having way too many
// tasks pending may use a lot of memory. Take number of available
// cores and allow 4-times the amount for having the queue well filled. The
// 2nd parameter is the time to wait between cleanups in 10th of a second.
// Both values may be added to the configuration settings if needed.
static sal_Int32 nAllowedThreads(comphelper::ThreadPool::getPreferredConcurrency() * 4);
rZipOut.reduceScheduledThreadsToGivenNumberOrLess(nAllowedThreads);
static sal_Int32 nAllowedTasks(comphelper::ThreadPool::getPreferredConcurrency() * 4);
rZipOut.reduceScheduledThreadTasksToGivenNumberOrLess(nAllowedTasks);
// Start a new thread deflating this zip entry
// Start a new thread task deflating this zip entry
ZipOutputEntry *pZipEntry = new ZipOutputEntry(
m_xContext, *pTempEntry, this, bToBeEncrypted);
rZipOut.addDeflatingThread( pZipEntry, std::make_unique<DeflateThread>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) );
rZipOut.addDeflatingThreadTask( pZipEntry,
std::make_unique<DeflateThreadTask>(rZipOut.getThreadTaskTag(), pZipEntry, xStream) );
}
else
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment