Commit 9c34fef3 authored by Andrew Savonichev's avatar Andrew Savonichev Committed by Dmitry Kazakov
Browse files

BUG:360677 KisUpdaterContext should not kill threads in waitForDone()

Summary:
KisUpdaterContext::waitForDone() method must lock context and wait
untill all threads finish their current tasks.
We cannot call QThreadPool.waitForDone() to do this, because after
waiting it removes all threads from the pool.

Test Plan:
Verified that no extra threads created: 8 for global thread pool and 8 for
KisUpdaterContext

(gdb) info threads
  Id   Target Id         Frame
  22   Thread 0x7fff8cff9700 (LWP 18481) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  21   Thread 0x7fff8d7fa700 (LWP 18480) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  20   Thread 0x7fff8dffb700 (LWP 18479) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  19   Thread 0x7fff8e7fc700 (LWP 18478) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  18   Thread 0x7fff8effd700 (LWP 18477) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  17   Thread 0x7fff8f7fe700 (LWP 18476) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  16   Thread 0x7fff8ffff700 (LWP 18475) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  15   Thread 0x7fff9bfff700 (LWP 18474) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  13   Thread 0x7fffb8ff9700 (LWP 18471) "QFileInfoGather" 0x00007fffef6e291f in pthread_cond_wait () from /lib64/libpthread.so.0
  12   Thread 0x7fffb97fa700 (LWP 18470) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  11   Thread 0x7fffb9ffb700 (LWP 18469) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  10   Thread 0x7fffba7fc700 (LWP 18468) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  9    Thread 0x7fffbaffd700 (LWP 18467) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  8    Thread 0x7fffbb7fe700 (LWP 18466) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  7    Thread 0x7fffbbfff700 (LWP 18465) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  6    Thread 0x7fffc8f8a700 (LWP 18464) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  5    Thread 0x7fffc978b700 (LWP 18463) "Thread (pooled)" 0x00007fffef6e2cc8 in pthread_cond_timedwait () from /lib64/libpthread.so.0
  4    Thread 0x7fffca38d700 (LWP 18462) "KisTileDataSwap" 0x00007fffef6e291f in pthread_cond_wait () from /lib64/libpthread.so.0
  3    Thread 0x7fffcab8e700 (LWP 18461) "KisTileDataPool" 0x00007fffef6e291f in pthread_cond_wait () from /lib64/libpthread.so.0
  2    Thread 0x7fffe0843700 (LWP 18460) "QXcbEventReader" 0x00007ffff16a33ed in poll () from /lib64/libc.so.6
* 1    Thread 0x7fffe7f3f7c0 (LWP 18456) "krita" 0x00007ffff16a33ed in poll () from /lib64/libc.so.6

Reviewers: dkazakov

Reviewed By: dkazakov

Subscribers: fazek
BUG:360677
Differential Revision: https://phabricator.kde.org/D1166
parent ffc4d9cf
......@@ -55,7 +55,6 @@ struct Q_DECL_HIDDEN KisUpdateScheduler::Private {
KisSimpleUpdateQueue updatesQueue;
KisStrokesQueue strokesQueue;
KisUpdaterContext updaterContext;
bool processingBlocked = false;
qreal balancingRatio = 1.0; // updates-queue-size/strokes-queue-size
KisProjectionUpdateListener *projectionUpdateListener;
......@@ -64,6 +63,11 @@ struct Q_DECL_HIDDEN KisUpdateScheduler::Private {
QAtomicInt updatesLockCounter;
QReadWriteLock updatesStartLock;
KisLazyWaitCondition updatesFinishedCondition;
// KisUpdaterContext can emit signals to KisUpdateScheduler in the dtor, so it
// must to be deleted before anything else.
// That means updaterContext must be declared last.
KisUpdaterContext updaterContext;
};
KisUpdateScheduler::KisUpdateScheduler(KisProjectionUpdateListener *projectionUpdateListener)
......@@ -163,9 +167,11 @@ void KisUpdateScheduler::fullRefresh(KisNodeSP root, const QRect& rc, const QRec
Q_ASSERT(m_d->updaterContext.isJobAllowed(walker));
m_d->updaterContext.addMergeJob(walker);
m_d->updaterContext.waitForDone();
m_d->updaterContext.unlock();
m_d->updaterContext.waitForDone();
if(needLock) unlock(true);
}
......@@ -408,7 +414,9 @@ bool KisUpdateScheduler::haveUpdatesRunning()
QWriteLocker locker(&m_d->updatesStartLock);
qint32 numMergeJobs, numStrokeJobs;
m_d->updaterContext.lock();
m_d->updaterContext.getJobsSnapshot(numMergeJobs, numStrokeJobs);
m_d->updaterContext.unlock();
return numMergeJobs;
}
......
......@@ -21,18 +21,14 @@
#include <QThread>
#include <QThreadPool>
#include "kis_safe_read_list.h"
#include "kis_update_job_item.h"
#include "kis_stroke_job.h"
KisUpdaterContext::KisUpdaterContext(qint32 threadCount)
KisUpdaterContext::KisUpdaterContext(qint32 threadCount):
m_jobs(threadCount > 0 ? threadCount : defaultThreadCount())
{
if(threadCount <= 0) {
threadCount = QThread::idealThreadCount();
threadCount = threadCount > 0 ? threadCount : 1;
}
m_jobs.resize(threadCount);
for(qint32 i = 0; i < m_jobs.size(); i++) {
m_jobs[i] = new KisUpdateJobItem(&m_exclusiveJobLock);
connect(m_jobs[i], SIGNAL(sigContinueUpdate(const QRect&)),
......@@ -45,6 +41,10 @@ KisUpdaterContext::KisUpdaterContext(qint32 threadCount)
connect(m_jobs[i], SIGNAL(sigJobFinished()),
SLOT(slotJobFinished()), Qt::DirectConnection);
}
#ifdef SANITY_CHECK_CONTEXT_LOCKING
m_lockedBy = (Qt::HANDLE) -1;
#endif
}
KisUpdaterContext::~KisUpdaterContext()
......@@ -54,9 +54,19 @@ KisUpdaterContext::~KisUpdaterContext()
delete m_jobs[i];
}
qint32 KisUpdaterContext::defaultThreadCount() const
{
int threadCount = QThread::idealThreadCount();
return threadCount > 0 ? threadCount : 1;
}
void KisUpdaterContext::getJobsSnapshot(qint32 &numMergeJobs,
qint32 &numStrokeJobs)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
numMergeJobs = 0;
numStrokeJobs = 0;
......@@ -91,6 +101,10 @@ bool KisUpdaterContext::hasSpareThread()
bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
int lod = this->currentLevelOfDetail();
if (lod >= 0 && walker->levelOfDetail() != lod) return false;
......@@ -116,9 +130,13 @@ bool KisUpdaterContext::isJobAllowed(KisBaseRectsWalkerSP walker)
*/
void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
m_lodCounter.addLod(walker->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setWalker(walker);
m_threadPool.start(m_jobs[jobIndex]);
......@@ -129,9 +147,13 @@ void KisUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
*/
void KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
m_lodCounter.addLod(walker->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setWalker(walker);
// HINT: Not calling start() here
......@@ -139,9 +161,13 @@ void KisTestableUpdaterContext::addMergeJob(KisBaseRectsWalkerSP walker)
void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
m_lodCounter.addLod(strokeJob->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setStrokeJob(strokeJob);
m_threadPool.start(m_jobs[jobIndex]);
......@@ -152,9 +178,13 @@ void KisUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
*/
void KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
m_lodCounter.addLod(strokeJob->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setStrokeJob(strokeJob);
// HINT: Not calling start() here
......@@ -162,9 +192,13 @@ void KisTestableUpdaterContext::addStrokeJob(KisStrokeJob *strokeJob)
void KisUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneousJob)
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
#endif
m_lodCounter.addLod(spontaneousJob->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob);
m_threadPool.start(m_jobs[jobIndex]);
......@@ -177,7 +211,7 @@ void KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneous
{
m_lodCounter.addLod(spontaneousJob->levelOfDetail());
qint32 jobIndex = findSpareThread();
Q_ASSERT(jobIndex >= 0);
KIS_ASSERT(jobIndex >= 0);
m_jobs[jobIndex]->setSpontaneousJob(spontaneousJob);
// HINT: Not calling start() here
......@@ -185,7 +219,35 @@ void KisTestableUpdaterContext::addSpontaneousJob(KisSpontaneousJob *spontaneous
void KisUpdaterContext::waitForDone()
{
m_threadPool.waitForDone();
lock();
while(true) {
bool allDone = true;
QVector<KisUpdateJobItem*>::const_iterator iter;
FOREACH_SAFE(iter, m_jobs) {
if ((*iter)->isRunning()) {
allDone = false;
break;
}
}
if (!allDone) {
#ifdef SANITY_CHECK_CONTEXT_LOCKING
m_lockedBy = (Qt::HANDLE) -1;
#endif
m_waitAllCond.wait(&m_lock);
#ifdef SANITY_CHECK_CONTEXT_LOCKING
m_lockedBy = QThread::currentThreadId();
#endif
} else {
break;
}
}
unlock();
}
bool KisUpdaterContext::walkerIntersectsJob(KisBaseRectsWalkerSP walker,
......@@ -208,6 +270,7 @@ void KisUpdaterContext::slotJobFinished()
{
m_lodCounter.removeLod();
m_waitAllCond.wakeOne();
// Be careful. This slot can be called asynchronously without locks.
emit sigSpareThreadAppeared();
}
......@@ -215,10 +278,22 @@ void KisUpdaterContext::slotJobFinished()
void KisUpdaterContext::lock()
{
m_lock.lock();
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT_X(m_lockedBy == (Qt::HANDLE) -1, "KisUpdaterContext",
"context is already locked");
m_lockedBy = QThread::currentThreadId();
#endif
}
void KisUpdaterContext::unlock()
{
#ifdef SANITY_CHECK_CONTEXT_LOCKING
KIS_ASSERT(m_lockedBy == QThread::currentThreadId());
m_lockedBy = (Qt::HANDLE) -1;
#endif
m_lock.unlock();
}
......
......@@ -23,11 +23,16 @@
#include <QMutex>
#include <QReadWriteLock>
#include <QThreadPool>
#include <QWaitCondition>
#include "kis_base_rects_walker.h"
#include "kis_async_merger.h"
#include "kis_lock_free_lod_counter.h"
// TODO: uncomment ifndef for release on 3.0.1
// #ifndef QT_NO_DEBUG
#define SANITY_CHECK_CONTEXT_LOCKING
// #endif // QT_NO_DEBUG
class KisUpdateJobItem;
class KisSpontaneousJob;
......@@ -128,6 +133,9 @@ protected Q_SLOTS:
protected:
static bool walkerIntersectsJob(KisBaseRectsWalkerSP walker,
const KisUpdateJobItem* job);
qint32 defaultThreadCount() const;
qint32 findSpareThread();
protected:
......@@ -141,8 +149,14 @@ protected:
QMutex m_lock;
QVector<KisUpdateJobItem*> m_jobs;
QWaitCondition m_waitAllCond;
QThreadPool m_threadPool;
KisLockFreeLodCounter m_lodCounter;
#ifdef SANITY_CHECK_CONTEXT_LOCKING
// Thread ID of the owner or -1 if not locked
volatile Qt::HANDLE m_lockedBy;
#endif
};
class KRITAIMAGE_EXPORT KisTestableUpdaterContext : public KisUpdaterContext
......
......@@ -114,7 +114,9 @@ void KisStrokesQueueTest::testExclusiveStrokes()
KisTestableUpdaterContext context(2);
QVector<KisUpdateJobItem*> jobs;
context.lock();
context.addMergeJob(walker);
context.unlock();
queue.processQueue(context, false);
jobs = context.getJobs();
......@@ -139,7 +141,9 @@ void KisStrokesQueueTest::testExclusiveStrokes()
QCOMPARE(queue.needsExclusiveAccess(), true);
context.clear();
context.lock();
context.addMergeJob(walker);
context.unlock();
queue.processQueue(context, false);
COMPARE_WALKER(jobs[0], walker);
......@@ -201,7 +205,9 @@ void KisStrokesQueueTest::testBarrierStrokeJobs()
VERIFY_EMPTY(jobs[2]);
// Now some updates has come...
context.lock();
context.addMergeJob(walker);
context.unlock();
jobs = context.getJobs();
COMPARE_NAME(jobs[0], "nor_dab");
......@@ -239,7 +245,9 @@ void KisStrokesQueueTest::testBarrierStrokeJobs()
VERIFY_EMPTY(jobs[2]);
// Process the last update...
context.lock();
context.addMergeJob(walker);
context.unlock();
externalJobsPending = false;
// Yep, the queue is still waiting
......@@ -416,7 +424,9 @@ void KisStrokesQueueTest::testStrokesLevelOfDetail()
KisTestableUpdaterContext context(2);
QVector<KisUpdateJobItem*> jobs;
context.lock();
context.addMergeJob(walker);
context.unlock();
queue.processQueue(context, false);
jobs = context.getJobs();
......@@ -439,10 +449,14 @@ void KisStrokesQueueTest::testStrokesLevelOfDetail()
QCOMPARE(queue.needsExclusiveAccess(), false);
// walker of a different LOD must not be allowed
context.lock();
QCOMPARE(context.isJobAllowed(walker), false);
context.unlock();
context.clear();
context.lock();
context.addMergeJob(walker);
context.unlock();
queue.processQueue(context, false);
jobs = context.getJobs();
......
......@@ -223,7 +223,9 @@ void KisUpdaterContextTest::stressTestExclusiveJobs()
KisStrokeJobStrategy *strategy =
new ExclusivenessCheckerStrategy(counter, hadConcurrency);
context.lock();
context.addStrokeJob(new KisStrokeJob(strategy, data, 0, true));
context.unlock();
}
else {
QTest::qSleep(CHECK_DELAY);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment