Commit 39bcc7d5 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖

Fix a loop in ItemRetriever and make it interactive, add a unit test

Fix a loop in ItemRetriever when we have more parts available
than requested as well as when only some requested parts need
to be retrieved.

Make ItemRetriever interactive by emitting a signal with list
of already retrieved items and using QEventLoop to block inside
ItemRetriever::exec() instead of blocking on a QWaitCondition
inside ItemRetrievalManager.

Add a unit-test with several different scenarios.
parent 77da41c2
......@@ -21,6 +21,7 @@
#include <storage/querybuilder.h>
#include <storage/datastore.h>
#include <storage/parttypehelper.h>
using namespace Akonadi;
using namespace Akonadi::Server;
......@@ -76,6 +77,22 @@ PimItem DbInitializer::createItem(const char *name, const Collection &parent)
return item;
}
Part DbInitializer::createPart(qint64 pimItem, const QByteArray &partName, const QByteArray &partData)
{
auto partType = PartTypeHelper::parseFqName(QString::fromLatin1(partName));
PartType type = PartType::retrieveByFQNameOrCreate(partType.first, partType.second);
Part part;
part.setPimItemId(pimItem);
part.setPartTypeId(type.id());
part.setData(partData);
part.setDatasize(partData.size());
const bool ret = part.insert();
Q_ASSERT(ret);
Q_UNUSED(ret);
return part;
}
QByteArray DbInitializer::toByteArray(bool enabled)
{
if (enabled) {
......@@ -178,7 +195,7 @@ void DbInitializer::cleanup()
if (DataStore::self()->database().isOpen()) {
{
QueryBuilder qb( Relation::tableName(), QueryBuilder::Delete );
QueryBuilder qb(Relation::tableName(), QueryBuilder::Delete);
qb.exec();
}
{
......@@ -191,6 +208,9 @@ void DbInitializer::cleanup()
}
}
Q_FOREACH(Part part, Part::retrieveAll()) {
part.remove();
}
Q_FOREACH(PimItem item, PimItem::retrieveAll()) {
item.remove();
}
......
......@@ -30,6 +30,7 @@ public:
Akonadi::Server::Collection createCollection(const char *name,
const Akonadi::Server::Collection &parent = Akonadi::Server::Collection());
Akonadi::Server::PimItem createItem(const char *name, const Akonadi::Server::Collection &parent);
Akonadi::Server::Part createPart(qint64 pimitemId, const QByteArray &partname, const QByteArray &data);
QByteArray toByteArray(bool enabled);
QByteArray toByteArray(Akonadi::Tristate tristate);
Akonadi::Protocol::FetchCollectionsResponse listResponse(const Akonadi::Server::Collection &col,
......
This diff is collapsed.
......@@ -27,21 +27,31 @@
using namespace Akonadi::Server;
AbstractItemRetrievalJob::AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
: QObject(parent)
, m_request(req)
{
}
AbstractItemRetrievalJob::~AbstractItemRetrievalJob()
{
}
ItemRetrievalJob::~ItemRetrievalJob()
{
Q_ASSERT(!m_active);
}
void ItemRetrievalJob::start(org::freedesktop::Akonadi::Resource *interface)
void ItemRetrievalJob::start()
{
Q_ASSERT(m_request);
qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << m_request->ids << " parts:" << m_request->parts << " of resource:" << m_request->resourceId;
m_interface = interface;
// call the resource
if (interface) {
if (m_interface) {
m_active = true;
auto reply = interface->requestItemDelivery(m_request->ids, m_request->parts);
auto reply = m_interface->requestItemDelivery(m_request->ids, m_request->parts);
QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(reply, this);
connect(watcher, &QDBusPendingCallWatcher::finished,
this, &ItemRetrievalJob::callFinished);
......
......@@ -31,30 +31,49 @@ namespace Server {
class ItemRetrievalRequest;
class AbstractItemRetrievalJob : public QObject
{
Q_OBJECT
public:
AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent);
virtual ~AbstractItemRetrievalJob();
virtual void start() = 0;
virtual void kill() = 0;
Q_SIGNALS:
void requestCompleted(ItemRetrievalRequest *request, const QString &errorMsg);
protected:
ItemRetrievalRequest *m_request;
};
/// Async D-Bus retrieval, no modification of the request (thus no need for locking)
class ItemRetrievalJob : public QObject
class ItemRetrievalJob : public AbstractItemRetrievalJob
{
Q_OBJECT
public:
ItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
: QObject(parent)
, m_request(req)
: AbstractItemRetrievalJob(req, parent)
, m_active(false)
, m_interface(0)
{
}
~ItemRetrievalJob();
void start(OrgFreedesktopAkonadiResourceInterface *interface);
void kill();
Q_SIGNALS:
void requestCompleted(ItemRetrievalRequest *req, const QString &errorMsg);
void setInterface(OrgFreedesktopAkonadiResourceInterface *interface)
{
m_interface = interface;
}
~ItemRetrievalJob() Q_DECL_OVERRIDE;
void start() Q_DECL_OVERRIDE;
void kill() Q_DECL_OVERRIDE;
private Q_SLOTS:
void callFinished(QDBusPendingCallWatcher *watcher);
private:
ItemRetrievalRequest *m_request;
bool m_active;
OrgFreedesktopAkonadiResourceInterface *m_interface;
......
......@@ -39,8 +39,23 @@ using namespace Akonadi::Server;
ItemRetrievalManager *ItemRetrievalManager::sInstance = 0;
class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
{
AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) Q_DECL_OVERRIDE
{
return new ItemRetrievalJob(request, parent);
}
};
ItemRetrievalManager::ItemRetrievalManager(QObject *parent)
: ItemRetrievalManager(new ItemRetrievalJobFactory, parent)
{
}
ItemRetrievalManager::ItemRetrievalManager(AbstractItemRetrievalJobFactory *factory, QObject *parent)
: AkThread(QThread::HighPriority, parent)
, mJobFactory(factory)
{
qDBusRegisterMetaType<QByteArrayList>();
......@@ -126,16 +141,6 @@ org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(con
}
// called from any thread
void ItemRetrievalManager::requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts)
{
ItemRetrievalRequest *req = new ItemRetrievalRequest();
req->ids << uid;
req->resourceId = resource;
req->parts = parts.toList();
requestItemDelivery(req);
}
void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
{
mLock->lockForWrite();
......@@ -146,7 +151,7 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
mLock->unlock();
Q_EMIT requestAdded();
#if 0
mLock->lockForRead();
Q_FOREVER {
//qCDebug(AKONADISERVER_LOG) << "checking if request for item" << req->id << "has been processed...";
......@@ -170,16 +175,16 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
}
throw ItemRetrieverException("WTF?");
#endif
}
// called within the retrieval thread
void ItemRetrievalManager::processRequest()
{
QVector<QPair<ItemRetrievalJob *, QString> > newJobs;
QVector<QPair<AbstractItemRetrievalJob *, QString> > newJobs;
mLock->lockForWrite();
// look for idle resources
for (QHash< QString, QList< ItemRetrievalRequest *> >::iterator it = mPendingRequests.begin(); it != mPendingRequests.end();) {
for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
if (it.value().isEmpty()) {
it = mPendingRequests.erase(it);
continue;
......@@ -188,11 +193,12 @@ void ItemRetrievalManager::processRequest()
// TODO: check if there is another one for the same uid with more parts requested
ItemRetrievalRequest *req = it.value().takeFirst();
Q_ASSERT(req->resourceId == it.key());
ItemRetrievalJob *job = new ItemRetrievalJob(req, this);
connect(job, &ItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
AbstractItemRetrievalJob *job = mJobFactory->retrievalJob(req, this);
connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
mCurrentJobs.insert(req->resourceId, job);
// delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
newJobs.append(qMakePair(job, req->resourceId));
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << req;
}
++it;
}
......@@ -201,17 +207,20 @@ void ItemRetrievalManager::processRequest()
mLock->unlock();
if (nothingGoingOn) { // someone asked as to process requests although everything is done already, he might still be waiting
mWaitCondition->wakeAll();
return;
}
for (QVector<QPair<ItemRetrievalJob *, QString> >::const_iterator it = newJobs.constBegin(); it != newJobs.constEnd(); ++it) {
(*it).first->start(resourceInterface((*it).second));
for (auto it = newJobs.constBegin(), end = newJobs.constEnd(); it != end; ++it) {
if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob*>((*it).first)) {
j->setInterface(resourceInterface((*it).second));
}
(*it).first->start();
}
}
void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, const QString &errorMsg)
{
qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob finished for request" << request << ", error:" << errorMsg;
mLock->lockForWrite();
request->errorMsg = errorMsg;
request->processed = true;
......@@ -228,8 +237,8 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c
++it;
}
}
mWaitCondition->wakeAll();
mLock->unlock();
Q_EMIT requestFinished(request);
Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
}
......
......@@ -38,6 +38,16 @@ namespace Server {
class Collection;
class ItemRetrievalJob;
class ItemRetrievalRequest;
class AbstractItemRetrievalJob;
class AbstractItemRetrievalJobFactory
{
public:
explicit AbstractItemRetrievalJobFactory() {}
virtual ~AbstractItemRetrievalJobFactory() {}
virtual AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) = 0;
};
/** Manages and processes item retrieval requests. */
class ItemRetrievalManager : public AkThread
......@@ -45,10 +55,9 @@ class ItemRetrievalManager : public AkThread
Q_OBJECT
public:
ItemRetrievalManager(QObject *parent = Q_NULLPTR);
ItemRetrievalManager(AbstractItemRetrievalJobFactory *factory, QObject *parent = Q_NULLPTR);
~ItemRetrievalManager();
void requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts);
/**
* Added for convenience. ItemRetrievalManager takes ownership over the
* pointer and deletes it when the request is processed.
......@@ -58,6 +67,7 @@ public:
static ItemRetrievalManager *instance();
Q_SIGNALS:
void requestFinished(ItemRetrievalRequest *request);
void requestAdded();
private:
......@@ -74,6 +84,9 @@ private Q_SLOTS:
private:
static ItemRetrievalManager *sInstance;
AbstractItemRetrievalJobFactory *mJobFactory;
/// Protects mPendingRequests and every Request object posted to it
QReadWriteLock *mLock;
/// Used to let requesting threads wait until the request has been processed
......@@ -81,7 +94,7 @@ private:
/// Pending requests queues, one per resource
QHash<QString, QList<ItemRetrievalRequest *> > mPendingRequests;
/// Currently running jobs, one per resource
QHash<QString, ItemRetrievalJob *> mCurrentJobs;
QHash<QString, AbstractItemRetrievalJob *> mCurrentJobs;
// resource dbus interface cache
QHash<QString, OrgFreedesktopAkonadiResourceInterface *> mResourceInterfaces;
......
......@@ -33,6 +33,9 @@
#include <private/protocol_p.h>
#include <QMetaObject>
#include <QEventLoop>
#include "akonadiserver_debug.h"
using namespace Akonadi;
......@@ -187,6 +190,18 @@ QSqlQuery ItemRetriever::buildQuery() const
return qb.query();
}
namespace {
static bool hasAllParts(ItemRetrievalRequest *req, const QSet<QByteArray> &availableParts)
{
Q_FOREACH (const auto &part, req->parts) {
if (!availableParts.contains(part)) {
return false;
}
}
return true;
}
}
bool ItemRetriever::exec()
{
if (mParts.isEmpty() && !mFullPayload) {
......@@ -207,22 +222,38 @@ bool ItemRetriever::exec()
QVector<ItemRetrievalRequest *> requests;
QHash<qint64 /* collection */, ItemRetrievalRequest*> colRequests;
QHash<qint64 /* item */, ItemRetrievalRequest*> itemRequests;
QVector<qint64> readyItems;
qint64 prevPimItemId = -1;
QSet<QByteArray> availableParts;
ItemRetrievalRequest *lastRequest = Q_NULLPTR;
while (query.isValid()) {
const qint64 pimItemId = query.value(PimItemIdColumn).toLongLong();
const qint64 collectionId = query.value(CollectionIdColumn).toLongLong();
const qint64 resourceId = query.value(ResourceIdColumn).toLongLong();
ItemRetrievalRequest *lastRequest = Q_NULLPTR;
const auto itemIter = itemRequests.constFind(pimItemId);
if (pimItemId == prevPimItemId && query.value(PartTypeNameColumn).isNull()) {
// This is not the first part of the Item we saw, but LEFT JOIN PartTable
// returned a null row - that means the row is an ATR part
// which we don't care about
query.next();
continue;
if (pimItemId == prevPimItemId) {
if (query.value(PartTypeNameColumn).isNull()) {
// This is not the first part of the Item we saw, but LEFT JOIN PartTable
// returned a null row - that means the row is an ATR part
// which we don't care about
query.next();
continue;
}
} else {
if (lastRequest) {
if (hasAllParts(lastRequest, availableParts)) {
// We went through all parts of a single item, if we have all
// parts available in the DB and they are not expired, then
// exclude this item from the retrieval
lastRequest->ids.removeOne(prevPimItemId);
itemRequests.remove(prevPimItemId);
readyItems.push_back(prevPimItemId);
}
}
availableParts.clear();
prevPimItemId = pimItemId;
}
prevPimItemId = pimItemId;
if (itemIter != itemRequests.constEnd()) {
lastRequest = *itemIter;
......@@ -261,36 +292,69 @@ bool ItemRetriever::exec()
lastRequest->parts << partName;
}
} else {
// This particular item already has this particular part. If that's
// the only part we are requesting so far, then just remove the item
// from the queue.
if (lastRequest->parts.size() == 1 && lastRequest->parts.at(0) == partName) {
// TODO: Is this too expensive? Should we use a QSet?
lastRequest->ids.removeOne(pimItemId);
itemRequests.remove(pimItemId);
}
// add the part to list of available parts, we will compare it with
// the list of request parts once we handle all parts of this item
availableParts.insert(partName);
}
query.next();
}
//qCDebug(AKONADISERVER_LOG) << "Closing queries and sending out requests.";
// Post-check in case we only queried one item thus did not reach the check
// at the beginning of the while() loop above
if (lastRequest && hasAllParts(lastRequest, availableParts)) {
lastRequest->ids.removeOne(prevPimItemId);
readyItems.push_back(prevPimItemId);
// No need to update the hashtable at this point
}
//qCDebug(AKONADISERVER_LOG) << "Closing queries and sending out requests.";
query.finish();
Q_FOREACH (ItemRetrievalRequest *request, requests) {
if (!readyItems.isEmpty()) {
Q_EMIT itemsRetrieved(readyItems.toList());
}
QEventLoop eventLoop;
connect(ItemRetrievalManager::instance(), &ItemRetrievalManager::requestFinished,
this, [&](ItemRetrievalRequest *finishedRequest) {
if (!finishedRequest->errorMsg.isEmpty()) {
mLastError = finishedRequest->errorMsg.toUtf8();
eventLoop.exit(1);
} else {
requests.removeOne(finishedRequest);
Q_EMIT itemsRetrieved(finishedRequest->ids);
if (requests.isEmpty()) {
eventLoop.quit();
}
}
}, Qt::UniqueConnection);
auto it = requests.begin();
while (it != requests.end()) {
auto request = (*it);
if ((!mFullPayload && request->parts.isEmpty()) || request->ids.isEmpty()) {
it = requests.erase(it);
delete request;
continue;
}
// TODO: how should we handle retrieval errors here? so far they have been ignored,
// which makes sense in some cases, do we need a command parameter for this?
try {
// Request is deleted inside ItemRetrievalManager, so we need to take
// a copy here
const auto ids = request->ids;
ItemRetrievalManager::instance()->requestItemDelivery(request);
} catch (const ItemRetrieverException &e) {
qCCritical(AKONADISERVER_LOG) << e.type() << ": " << e.what();
mLastError = e.what();
return false;
}
++it;
}
if (!requests.isEmpty()) {
if (eventLoop.exec(QEventLoop::ExcludeSocketNotifiers)) {
return false;
}
}
// retrieve items in child collections if requested
......@@ -301,6 +365,8 @@ bool ItemRetriever::exec()
retriever.setCollection(col, mRecursive);
retriever.setRetrieveParts(mParts);
retriever.setRetrieveFullPayload(mFullPayload);
connect(&retriever, &ItemRetriever::itemsRetrieved,
this, &ItemRetriever::itemsRetrieved);
result = retriever.exec();
if (!result) {
break;
......@@ -313,7 +379,7 @@ bool ItemRetriever::exec()
void ItemRetriever::verifyCache()
{
if (!connection()->verifyCacheOnRetrieval()) {
if (!connection() || !connection()->verifyCacheOnRetrieval()) {
return;
}
......
......@@ -20,6 +20,8 @@
#ifndef ITEMRETRIEVER_H
#define ITEMRETRIEVER_H
#include <QObject>
#include "../exception.h"
#include "entities.h"
......@@ -43,10 +45,12 @@ class QueryBuilder;
@todo make usable for Fetch by allowing to share queries
*/
class ItemRetriever
class ItemRetriever : public QObject
{
Q_OBJECT
public:
ItemRetriever(Connection *connection);
ItemRetriever(Connection *connection = Q_NULLPTR);
Connection *connection() const;
......@@ -68,6 +72,9 @@ public:
QByteArray lastError() const;
Q_SIGNALS:
void itemsRetrieved(const QList<qint64> &ids);
private:
QSqlQuery buildQuery() const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment