Commit d42f9470 authored by David Faure's avatar David Faure

Fix ItemRetriever in case of concurrent requests for the same item(s)

Summary:
- ItemRetrievalManager must emit requestFinished() for those other requests,
otherwise the list of pending requests in ItemRetriever is never emptied

- ItemRetriever must not assume that a signal being emitted by
ItemRetrievalManager is necessarily about the request it's waiting for, it
could be for another one. So it must first check in its list of pending
requests to determine whether it should react or not.

With multithreaded unittest, checked for races with clang+tsan.
(There is one race, the connect to ItemRetrievalRequest vs the emit
in other threads, we should lock mLock before connect...)

Test Plan: new unittest

Reviewers: dvratil

Reviewed By: dvratil

Subscribers: #kde_pim

Tags: #kde_pim

Differential Revision: https://phabricator.kde.org/D4618
parent 859d99b2
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <QObject> #include <QObject>
#include <QTest> #include <QTest>
#include <QTimer> #include <QTimer>
#include <QMutex>
#include "storage/itemretriever.h" #include "storage/itemretriever.h"
#include "storage/itemretrievaljob.h" #include "storage/itemretrievaljob.h"
...@@ -144,6 +145,52 @@ private: ...@@ -144,6 +145,52 @@ private:
QMultiHash<qint64, JobResult> mJobResults; QMultiHash<qint64, JobResult> mJobResults;
}; };
using RequestedParts = QVector<QByteArray /* FQ name */>;
class ClientThread : public QThread
{
public:
ClientThread(Entity::Id itemId, const RequestedParts &requestedParts)
: m_itemId(itemId), m_requestedParts(requestedParts)
{}
void run() Q_DECL_OVERRIDE
{
// ItemRetriever should...
ItemRetriever retriever;
retriever.setItem(m_itemId);
retriever.setRetrieveParts(m_requestedParts);
QSignalSpy spy(&retriever, &ItemRetriever::itemsRetrieved);
const bool success = retriever.exec();
QMutexLocker lock(&m_mutex);
m_results.success = success;
m_results.signalsCount = spy.count();
if (m_results.signalsCount > 0) {
m_results.emittedItems = spy.at(0).at(0).value<QList<qint64>>();
}
}
struct Results
{
bool success;
int signalsCount;
QList<qint64> emittedItems;
};
Results results() const {
QMutexLocker lock(&m_mutex);
return m_results;
}
private:
const Entity::Id m_itemId;
const RequestedParts m_requestedParts;
mutable QMutex m_mutex; // protects results below
Results m_results;
};
class ItemRetrieverTest : public QObject class ItemRetrieverTest : public QObject
{ {
Q_OBJECT Q_OBJECT
...@@ -151,7 +198,6 @@ class ItemRetrieverTest : public QObject ...@@ -151,7 +198,6 @@ class ItemRetrieverTest : public QObject
using ExistingParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>; using ExistingParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
using AvailableParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>; using AvailableParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
using RequestedParts = QVector<QByteArray /* FQ name */>;
public: public:
ItemRetrieverTest() ItemRetrieverTest()
...@@ -252,63 +298,89 @@ private Q_SLOTS: ...@@ -252,63 +298,89 @@ private Q_SLOTS:
// Setup // Setup
DbInitializer dbInitializer; for (int step = 0; step < 2; ++step) {
FakeItemRetrievalJobFactory factory(dbInitializer); DbInitializer dbInitializer;
ItemRetrievalManager mgr(&factory); FakeItemRetrievalJobFactory factory(dbInitializer);
QTest::qWait(100); ItemRetrievalManager mgr(&factory);
QTest::qWait(100);
// Given a PimItem with existing parts
Resource res = dbInitializer.createResource("testresource"); // Given a PimItem with existing parts
Collection col = dbInitializer.createCollection("col1"); Resource res = dbInitializer.createResource("testresource");
PimItem item = dbInitializer.createItem("1", col); Collection col = dbInitializer.createCollection("col1");
Q_FOREACH (const auto &existingPart, existingParts) {
dbInitializer.createPart(item.id(), existingPart.first, existingPart.second); // step 0: do it in the main thread, for easier debugging
} PimItem item = dbInitializer.createItem("1", col);
Q_FOREACH (const auto &existingPart, existingParts) {
dbInitializer.createPart(item.id(), existingPart.first, existingPart.second);
}
Q_FOREACH (const auto &availablePart, availableParts) { Q_FOREACH (const auto &availablePart, availableParts) {
factory.addJobResult(item.id(), availablePart.first, availablePart.second); factory.addJobResult(item.id(), availablePart.first, availablePart.second);
} }
// ItemRetriever should... if (step == 0) {
ItemRetriever retriever; ClientThread thread(item.id(), requestedParts);
retriever.setItem(item.id()); thread.run();
retriever.setRetrieveParts(requestedParts);
QSignalSpy spy(&retriever, &ItemRetriever::itemsRetrieved); const ClientThread::Results results = thread.results();
// ItemRetriever should ... succeed
QVERIFY(results.success);
// Emit exactly one signal ...
QCOMPARE(results.signalsCount, expectedSignals);
// ... with that one item
if (expectedSignals > 0) {
QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
}
// Succeed // Check that the factory had exactly one retrieval job
QVERIFY(retriever.exec()); QCOMPARE(factory.jobsCount(), expectedRetrievalJobs);
// Run exactly one retrieval job
QCOMPARE(factory.jobsCount(), expectedRetrievalJobs);
// Emit exactly one signal ...
QCOMPARE(spy.count(), expectedSignals);
// ... with that one item
if (expectedSignals > 0) {
QCOMPARE(spy.at(0).at(0).value<QList<qint64>>(), QList<qint64>{ item.id() });
}
// and the part exists in the DB } else {
const auto parts = item.parts(); QVector<ClientThread *> threads;
QCOMPARE(parts.count(), expectedParts); for (int i = 0; i < 20; ++i) {
Q_FOREACH (const Part &dbPart, item.parts()) { threads.append(new ClientThread(item.id(), requestedParts));
const QString fqname = dbPart.partType().ns() + QLatin1Char(':') + dbPart.partType().name(); }
if (!requestedParts.contains(fqname.toLatin1())) { for (int i = 0; i < threads.size(); ++i) {
continue; threads.at(i)->start();
}
for (int i = 0; i < threads.size(); ++i) {
threads.at(i)->wait();
}
for (int i = 0; i < threads.size(); ++i) {
const ClientThread::Results results = threads.at(i)->results();
QVERIFY(results.success);
QCOMPARE(results.signalsCount, expectedSignals);
if (expectedSignals > 0) {
QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
}
}
qDeleteAll(threads);
} }
auto it = std::find_if(availableParts.constBegin(), availableParts.constEnd(), // Check that the parts now exist in the DB
[dbPart](const QPair<QByteArray, QByteArray> &p) { const auto parts = item.parts();
return dbPart.partType().name().toLatin1() == p.first; QCOMPARE(parts.count(), expectedParts);
}); Q_FOREACH (const Part &dbPart, item.parts()) {
if (it == availableParts.constEnd()) { const QString fqname = dbPart.partType().ns() + QLatin1Char(':') + dbPart.partType().name();
it = std::find_if(existingParts.constBegin(), existingParts.constEnd(), if (!requestedParts.contains(fqname.toLatin1())) {
[fqname](const QPair<QByteArray, QByteArray> &p) { continue;
return fqname.toLatin1() == p.first; }
});
QVERIFY(it != existingParts.constEnd()); auto it = std::find_if(availableParts.constBegin(), availableParts.constEnd(),
} [dbPart](const QPair<QByteArray, QByteArray> &p) {
return dbPart.partType().name().toLatin1() == p.first;
});
if (it == availableParts.constEnd()) {
it = std::find_if(existingParts.constBegin(), existingParts.constEnd(),
[fqname](const QPair<QByteArray, QByteArray> &p) {
return fqname.toLatin1() == p.first;
});
QVERIFY(it != existingParts.constEnd());
}
QCOMPARE(dbPart.data(), it->second); QCOMPARE(dbPart.data(), it->second);
QCOMPARE(dbPart.datasize(), it->second.size()); QCOMPARE(dbPart.datasize(), it->second.size());
}
} }
} }
}; };
......
...@@ -232,6 +232,7 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c ...@@ -232,6 +232,7 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c
qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->ids << "as well, marking as processed"; qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->ids << "as well, marking as processed";
(*it)->errorMsg = errorMsg; (*it)->errorMsg = errorMsg;
(*it)->processed = true; (*it)->processed = true;
Q_EMIT requestFinished(*it);
it = mPendingRequests[request->resourceId].erase(it); it = mPendingRequests[request->resourceId].erase(it);
} else { } else {
++it; ++it;
......
...@@ -317,17 +317,18 @@ bool ItemRetriever::exec() ...@@ -317,17 +317,18 @@ bool ItemRetriever::exec()
QEventLoop eventLoop; QEventLoop eventLoop;
connect(ItemRetrievalManager::instance(), &ItemRetrievalManager::requestFinished, connect(ItemRetrievalManager::instance(), &ItemRetrievalManager::requestFinished,
this, [&](ItemRetrievalRequest *finishedRequest) { this, [&](ItemRetrievalRequest *finishedRequest) {
if (!finishedRequest->errorMsg.isEmpty()) { if (requests.removeOne(finishedRequest)) {
mLastError = finishedRequest->errorMsg.toUtf8(); if (!finishedRequest->errorMsg.isEmpty()) {
eventLoop.exit(1); mLastError = finishedRequest->errorMsg.toUtf8();
} else { eventLoop.exit(1);
requests.removeOne(finishedRequest); } else {
Q_EMIT itemsRetrieved(finishedRequest->ids); Q_EMIT itemsRetrieved(finishedRequest->ids);
if (requests.isEmpty()) { if (requests.isEmpty()) {
eventLoop.quit(); eventLoop.quit();
}
} }
}, Qt::UniqueConnection); }
}
}, Qt::UniqueConnection);
auto it = requests.begin(); auto it = requests.begin();
while (it != requests.end()) { while (it != requests.end()) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment