Verified Commit b94b8115 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Clean up ItemRetriever to clear up ownership of ItemRetrievalRequest

The request is now passed by value to the retriever. An
ItemRetrievalResult is then returned to the caller. A unique numerical
identifier is used to identify the requests/response pairs.

BUG: 408897
FIXED-IN: 5.15
parent 5b8d7d4a
......@@ -22,16 +22,18 @@
using namespace Akonadi::Server;
Q_DECLARE_METATYPE(ItemRetrievalResult)
FakeItemRetrievalManager::FakeItemRetrievalManager()
: ItemRetrievalManager()
{
qRegisterMetaType<ItemRetrievalRequest*>("ItemRetrievalRequest*");
qRegisterMetaType<ItemRetrievalResult>();
}
FakeItemRetrievalManager::~FakeItemRetrievalManager() = default;
void FakeItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *request)
void FakeItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest request)
{
QMetaObject::invokeMethod(this, [this, request] { Q_EMIT requestFinished(request); }, Qt::QueuedConnection);
QMetaObject::invokeMethod(this, [this, r = std::move(request)] {
Q_EMIT requestFinished({std::move(r)});
}, Qt::QueuedConnection);
}
......@@ -30,9 +30,8 @@ class FakeItemRetrievalManager : public ItemRetrievalManager
Q_OBJECT
public:
explicit FakeItemRetrievalManager();
~FakeItemRetrievalManager() override;
void requestItemDelivery(ItemRetrievalRequest *request) override;
void requestItemDelivery(ItemRetrievalRequest request) override;
};
} // namespace Server
......
......@@ -47,9 +47,9 @@ class FakeItemRetrievalJob : public AbstractItemRetrievalJob
{
Q_OBJECT
public:
FakeItemRetrievalJob(ItemRetrievalRequest *req, DbInitializer &dbInitializer,
FakeItemRetrievalJob(ItemRetrievalRequest req, DbInitializer &dbInitializer,
const QVector<JobResult> &results, QObject *parent)
: AbstractItemRetrievalJob(req, parent)
: AbstractItemRetrievalJob(std::move(req), parent)
, mDbInitializer(dbInitializer)
, mResults(results)
{
......@@ -78,25 +78,25 @@ public:
part.update();
}
} else {
mError = res.error;
m_result.errorMsg = res.error;
break;
}
}
QTimer::singleShot(0, this, [this]() {
Q_EMIT requestCompleted(m_request, mError);
Q_EMIT requestCompleted(this);
});
}
void kill() override
{
// TODO?
// TODO
Q_ASSERT(false);
}
private:
DbInitializer &mDbInitializer;
QVector<JobResult> mResults;
QString mError;
};
class FakeItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
......@@ -118,13 +118,13 @@ public:
mJobResults.insert(itemId, JobResult{ itemId, QByteArray(), QByteArray(), error });
}
AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) override
AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override
{
QVector<JobResult> results;
Q_FOREACH (auto id, request->ids) {
Q_FOREACH (auto id, request.ids) {
auto it = mJobResults.constFind(id);
while (it != mJobResults.constEnd() && it.key() == id) {
if (request->parts.contains(it->partname)) {
if (request.parts.contains(it->partname)) {
results << *it;
}
++it;
......@@ -132,7 +132,7 @@ public:
}
++mJobsCount;
return new FakeItemRetrievalJob(request, mDbInitializer, results, parent);
return new FakeItemRetrievalJob(std::move(request), mDbInitializer, results, parent);
}
int jobsCount() const
......@@ -170,7 +170,7 @@ public:
m_results.success = success;
m_results.signalsCount = spy.count();
if (m_results.signalsCount > 0) {
m_results.emittedItems = spy.at(0).at(0).value<QList<qint64>>();
m_results.emittedItems = spy.at(0).at(0).value<QVector<qint64>>();
}
}
......@@ -178,7 +178,7 @@ public:
{
bool success;
int signalsCount;
QList<qint64> emittedItems;
QVector<qint64> emittedItems;
};
Results results() const {
QMutexLocker lock(&m_mutex);
......@@ -324,7 +324,7 @@ private Q_SLOTS:
QCOMPARE(results.signalsCount, expectedSignals);
// ... with that one item
if (expectedSignals > 0) {
QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
QCOMPARE(results.emittedItems, QVector<qint64>{ item.id() });
}
// Check that the factory had exactly one retrieval job
......@@ -346,7 +346,7 @@ private Q_SLOTS:
QVERIFY(results.success);
QCOMPARE(results.signalsCount, expectedSignals);
if (expectedSignals > 0) {
QCOMPARE(results.emittedItems, QList<qint64>{ item.id() });
QCOMPARE(results.emittedItems, QVector<qint64>{ item.id() });
}
}
qDeleteAll(threads);
......
......@@ -107,6 +107,7 @@ set(libakonadiserver_SRCS
storage/itemretriever.cpp
storage/itemretrievalmanager.cpp
storage/itemretrievaljob.cpp
storage/itemretrievalrequest.cpp
storage/notificationcollector.cpp
storage/parthelper.cpp
storage/parttypehelper.cpp
......
......@@ -67,7 +67,7 @@ bool ItemCopyHandler::copyItem(const PimItem &item, const Collection &target)
return true;
}
void ItemCopyHandler::processItems(const QList<qint64> &ids)
void ItemCopyHandler::processItems(const QVector<qint64> &ids)
{
SelectQueryBuilder<PimItem> qb;
ItemQueryHelper::itemSetToQuery(ImapSet(ids), qb);
......@@ -120,7 +120,7 @@ bool ItemCopyHandler::parseStream()
retriever.setItemSet(cmd.items().uidSet());
retriever.setRetrieveFullPayload(true);
QObject::connect(&retriever, &ItemRetriever::itemsRetrieved,
[this](const QList<qint64> &ids) {
[this](const QVector<qint64> &ids) {
processItems(ids);
});
if (!retriever.exec()) {
......
......@@ -58,7 +58,7 @@ protected:
The changes mentioned above are applied.
*/
bool copyItem(const PimItem &item, const Collection &target);
void processItems(const QList<qint64> &ids);
void processItems(const QVector<qint64> &ids);
private:
Collection mTargetCollection;
......
......@@ -39,7 +39,7 @@ ItemMoveHandler::ItemMoveHandler(AkonadiServer &akonadi)
: Handler(akonadi)
{}
void ItemMoveHandler::itemsRetrieved(const QList<qint64> &ids)
void ItemMoveHandler::itemsRetrieved(const QVector<qint64> &ids)
{
DataStore *store = connection()->storageBackend();
Transaction transaction(store, QStringLiteral("MOVE"));
......@@ -165,7 +165,7 @@ bool ItemMoveHandler::parseStream()
retriever.setScope(cmd.items());
retriever.setRetrieveFullPayload(true);
QObject::connect(&retriever, &ItemRetriever::itemsRetrieved,
[this](const QList<qint64> &ids) {
[this](const QVector<qint64> &ids) {
itemsRetrieved(ids);
});
if (!retriever.exec()) {
......
......@@ -49,7 +49,7 @@ public:
bool parseStream() override;
private:
void itemsRetrieved(const QList<qint64> &ids);
void itemsRetrieved(const QVector<qint64> &ids);
Collection mDestination;
};
......
......@@ -21,20 +21,16 @@
#include "itemretrievalrequest.h"
#include "resourceinterface.h"
#include "akonadiserver_debug.h"
#include <shared/akranges.h>
#include <QDBusPendingCallWatcher>
using namespace Akonadi::Server;
AbstractItemRetrievalJob::AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
AbstractItemRetrievalJob::AbstractItemRetrievalJob(ItemRetrievalRequest req, QObject *parent)
: QObject(parent)
, m_request(req)
{
}
AbstractItemRetrievalJob::~AbstractItemRetrievalJob()
{
}
, m_result(std::move(req))
{}
ItemRetrievalJob::~ItemRetrievalJob()
{
......@@ -43,18 +39,18 @@ ItemRetrievalJob::~ItemRetrievalJob()
void ItemRetrievalJob::start()
{
Q_ASSERT(m_request);
qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << m_request->ids << " parts:" << m_request->parts << " of resource:" << m_request->resourceId;
qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << request().ids << " parts:" << request().parts << " of resource:" << request().resourceId;
// call the resource
if (m_interface) {
m_active = true;
auto reply = m_interface->requestItemDelivery(m_request->ids, m_request->parts);
auto reply = m_interface->requestItemDelivery(request().ids | AkRanges::Actions::toQList, request().parts);
QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(reply, this);
connect(watcher, &QDBusPendingCallWatcher::finished,
this, &ItemRetrievalJob::callFinished);
} else {
Q_EMIT requestCompleted(m_request, QStringLiteral("Unable to contact resource"));
m_result.errorMsg = QStringLiteral("Unable to contact resource");
Q_EMIT requestCompleted(this);
deleteLater();
}
}
......@@ -62,7 +58,8 @@ void ItemRetrievalJob::start()
void ItemRetrievalJob::kill()
{
m_active = false;
Q_EMIT requestCompleted(m_request, QStringLiteral("Request cancelled"));
m_result.errorMsg = QStringLiteral("Request cancelled");
Q_EMIT requestCompleted(this);
}
void ItemRetrievalJob::callFinished(QDBusPendingCallWatcher *watcher)
......@@ -71,12 +68,10 @@ void ItemRetrievalJob::callFinished(QDBusPendingCallWatcher *watcher)
QDBusPendingReply<QString> reply = *watcher;
if (m_active) {
m_active = false;
const QString errorMsg = reply.isError() ? reply.error().message() : reply;
if (!errorMsg.isEmpty()) {
Q_EMIT requestCompleted(m_request, QStringLiteral("Unable to retrieve item from resource: %1").arg(errorMsg));
} else {
Q_EMIT requestCompleted(m_request, QString());
if (reply.isError()) {
m_result.errorMsg = QStringLiteral("Unable to retrieve item from resource: %1").arg(reply.error().message());
}
Q_EMIT requestCompleted(this);
}
deleteLater();
}
......@@ -22,6 +22,8 @@
#include <QObject>
#include "itemretrievalrequest.h"
class QDBusPendingCallWatcher;
class OrgFreedesktopAkonadiResourceInterface;
......@@ -36,17 +38,20 @@ class AbstractItemRetrievalJob : public QObject
{
Q_OBJECT
public:
AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent);
~AbstractItemRetrievalJob() override;
AbstractItemRetrievalJob(ItemRetrievalRequest req, QObject *parent);
~AbstractItemRetrievalJob() override = default;
virtual void start() = 0;
virtual void kill() = 0;
const ItemRetrievalRequest &request() const { return m_result.request; }
const ItemRetrievalResult &result() const { return m_result; }
Q_SIGNALS:
void requestCompleted(ItemRetrievalRequest *request, const QString &errorMsg);
void requestCompleted(AbstractItemRetrievalJob *job);
protected:
ItemRetrievalRequest *m_request = nullptr;
ItemRetrievalResult m_result;
};
/// Async D-Bus retrieval, no modification of the request (thus no need for locking)
......@@ -54,12 +59,9 @@ class ItemRetrievalJob : public AbstractItemRetrievalJob
{
Q_OBJECT
public:
ItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
: AbstractItemRetrievalJob(req, parent)
, m_active(false)
, m_interface(nullptr)
{
}
ItemRetrievalJob(ItemRetrievalRequest req, QObject *parent)
: AbstractItemRetrievalJob(std::move(req), parent)
{}
void setInterface(OrgFreedesktopAkonadiResourceInterface *interface)
{
......@@ -74,7 +76,7 @@ private Q_SLOTS:
void callFinished(QDBusPendingCallWatcher *watcher);
private:
bool m_active;
bool m_active = false;
OrgFreedesktopAkonadiResourceInterface *m_interface = nullptr;
};
......
......@@ -35,10 +35,12 @@
using namespace Akonadi;
using namespace Akonadi::Server;
Q_DECLARE_METATYPE(Akonadi::Server::ItemRetrievalResult)
class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
{
AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) override {
return new ItemRetrievalJob(request, parent);
AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) override {
return new ItemRetrievalJob(std::move(request), parent);
}
};
......@@ -51,6 +53,7 @@ ItemRetrievalManager::ItemRetrievalManager(std::unique_ptr<AbstractItemRetrieval
: AkThread(QStringLiteral("ItemRetrievalManager"), QThread::HighPriority, parent)
, mJobFactory(std::move(factory))
{
qRegisterMetaType<ItemRetrievalResult>("Akonadi::Server::ItemRetrievalResult");
qDBusRegisterMetaType<QByteArrayList>();
}
......@@ -112,13 +115,13 @@ org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(con
}
// called from any thread
void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest req)
{
QWriteLocker locker(&mLock);
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req->ids
<< "to" <<req->resourceId << ". There are" << mPendingRequests.size() << "request queues and"
<< mPendingRequests[req->resourceId].size() << "items mine";
mPendingRequests[req->resourceId].append(req);
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalManager posting retrieval request for items" << req.ids
<< "to" <<req.resourceId << ". There are" << mPendingRequests.size() << "request queues and"
<< mPendingRequests[req.resourceId].size() << "items mine";
mPendingRequests[req.resourceId].emplace_back(std::move(req));
locker.unlock();
Q_EMIT requestAdded();
......@@ -127,69 +130,74 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
// called within the retrieval thread
void ItemRetrievalManager::processRequest()
{
QVector<QPair<AbstractItemRetrievalJob *, QString> > newJobs;
QVector<AbstractItemRetrievalJob *> newJobs;
QWriteLocker locker(&mLock);
// look for idle resources
for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
if (it.value().isEmpty()) {
if (it->second.empty()) {
it = mPendingRequests.erase(it);
continue;
}
if (!mCurrentJobs.contains(it.key()) || mCurrentJobs.value(it.key()) == nullptr) {
if (!mCurrentJobs.contains(it->first) || mCurrentJobs.value(it->first) == nullptr) {
// TODO: check if there is another one for the same uid with more parts requested
ItemRetrievalRequest *req = it.value().takeFirst();
Q_ASSERT(req->resourceId == it.key());
AbstractItemRetrievalJob *job = mJobFactory->retrievalJob(req, this);
auto req = std::move(it->second.front());
it->second.pop_front();
Q_ASSERT(req.resourceId == it->first);
auto job = mJobFactory->retrievalJob(std::move(req), this);
connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
mCurrentJobs.insert(req->resourceId, job);
mCurrentJobs.insert(job->request().resourceId, job);
// delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
newJobs.append(qMakePair(job, req->resourceId));
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << req;
newJobs.append(job);
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << job->request().id;
}
++it;
}
bool nothingGoingOn = mPendingRequests.isEmpty() && mCurrentJobs.isEmpty() && newJobs.isEmpty();
locker.unlock();
if (nothingGoingOn) { // someone asked as to process requests although everything is done already, he might still be waiting
// someone asked as to process requests although everything is done already, he might still be waiting
if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
return;
}
for (auto it = newJobs.constBegin(), end = newJobs.constEnd(); it != end; ++it) {
if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob *>((*it).first)) {
j->setInterface(resourceInterface((*it).second));
locker.unlock();
for (auto *job : newJobs) {
if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob *>(job)) {
j->setInterface(resourceInterface(j->request().resourceId));
}
(*it).first->start();
job->start();
}
}
void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, const QString &errorMsg)
void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
{
if (errorMsg.isEmpty()) {
qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request << "finished";
const auto &request = job->request();
const auto &result = job->result();
if (result.errorMsg.has_value()) {
qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished with error:" << *result.errorMsg;
} else {
qCWarning(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request << "finished with error:" << errorMsg;
qCInfo(AKONADISERVER_LOG) << "ItemRetrievalJob for request" << request.id << "finished";
}
QWriteLocker locker(&mLock);
request->errorMsg = errorMsg;
request->processed = true;
Q_ASSERT(mCurrentJobs.contains(request->resourceId));
mCurrentJobs.remove(request->resourceId);
// TODO check if (*it)->parts is a subset of currentRequest->parts
for (QList<ItemRetrievalRequest *>::Iterator it = mPendingRequests[request->resourceId].begin(); it != mPendingRequests[request->resourceId].end();) {
if ((*it)->ids == request->ids) {
qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->ids << "as well, marking as processed";
(*it)->errorMsg = errorMsg;
(*it)->processed = true;
Q_EMIT requestFinished(*it);
it = mPendingRequests[request->resourceId].erase(it);
Q_ASSERT(mCurrentJobs.contains(request.resourceId));
mCurrentJobs.remove(request.resourceId);
auto &requests = mPendingRequests[request.resourceId];
for (auto it = requests.begin(); it != requests.end();) {
if (it->ids == request.ids) {
qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
ItemRetrievalResult otherResult{std::move(*it)};
otherResult.errorMsg = result.errorMsg;
Q_EMIT requestFinished(otherResult);
it = requests.erase(it);
} else {
++it;
}
}
locker.unlock();
Q_EMIT requestFinished(request);
Q_EMIT requestFinished(result);
Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
}
......
......@@ -21,6 +21,7 @@
#define AKONADI_ITEMRETRIEVALMANAGER_H
#include "itemretriever.h"
#include "itemretrievalrequest.h"
#include "akthread.h"
#include <shared/akstd.h>
......@@ -40,16 +41,14 @@ namespace Server
class Collection;
class ItemRetrievalJob;
class ItemRetrievalRequest;
class AbstractItemRetrievalJob;
class AbstractItemRetrievalJobFactory
{
public:
explicit AbstractItemRetrievalJobFactory() {}
virtual ~AbstractItemRetrievalJobFactory() {}
virtual ~AbstractItemRetrievalJobFactory() = default;
virtual AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) = 0;
virtual AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest request, QObject *parent) = 0;
};
/** Manages and processes item retrieval requests. */
......@@ -65,13 +64,13 @@ public:
* Added for convenience. ItemRetrievalManager takes ownership over the
* pointer and deletes it when the request is processed.
*/
virtual void requestItemDelivery(ItemRetrievalRequest *request);
virtual void requestItemDelivery(ItemRetrievalRequest request);
void triggerCollectionSync(const QString &resource, qint64 colId);
void triggerCollectionTreeSync(const QString &resource);
Q_SIGNALS:
void requestFinished(ItemRetrievalRequest *request);
void requestFinished(const Akonadi::Server::ItemRetrievalResult &result);
void requestAdded();
private:
......@@ -82,7 +81,7 @@ private Q_SLOTS:
void serviceOwnerChanged(const QString &serviceName, const QString &oldOwner, const QString &newOwner);
void processRequest();
void retrievalJobFinished(ItemRetrievalRequest *request, const QString &errorMsg);
void retrievalJobFinished(AbstractItemRetrievalJob *job);
protected:
std::unique_ptr<AbstractItemRetrievalJobFactory> mJobFactory;
......@@ -91,8 +90,9 @@ protected:
QReadWriteLock mLock;
/// Used to let requesting threads wait until the request has been processed
QWaitCondition mWaitCondition;
/// Pending requests queues, one per resource
QHash<QString, QList<ItemRetrievalRequest *> > mPendingRequests;
std::unordered_map<QString, std::list<ItemRetrievalRequest>> mPendingRequests;
/// Currently running jobs, one per resource
QHash<QString, AbstractItemRetrievalJob *> mCurrentJobs;
......
/*
Copyright (c) 2020 Daniel Vrátil <dvratil@kde.org>
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Library General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
#include "itemretrievalrequest.h"
using namespace Akonadi::Server;
ItemRetrievalRequest::Id ItemRetrievalRequest::lastId{0};
ItemRetrievalRequest::ItemRetrievalRequest()
: id(lastId.next())
{}
......@@ -22,33 +22,64 @@
#include <QByteArray>
#include <QString>
#include <QList>
#include <QVector>
#include <QDebug>
#include <shared/akoptional.h>
namespace Akonadi
{
namespace Server
{
class ItemRetrievalRequest;
/// Details of a single item retrieval request