Commit 8694d2b7 authored by Krzysztof Nowicki's avatar Krzysztof Nowicki Committed by Laurent Montel
Browse files

Use a queue to serialize collection synchronizations

When collections were synchronized through the resource scheduler they
were naturally serialized by the queues implemented there. Enabling
direct collection synchronization in response to server notifications
introduced a problem - the synchronizations were not serialized in any
way and could execute in parallel. It could even happen, tha the same
collection was synchronized by two parallel requests, which would
result in double items appearing in the message list.

Solve this problem by introducing an internal collection
synchronization queue.
parent 243beb57
......@@ -319,13 +319,79 @@ void EwsResource::connectionError()
void EwsResource::retrieveItems(const Collection &collection)
{
QString rid = collection.remoteId();
auto job = new EwsFetchItemsJob(collection, mEwsClient, getCollectionSyncState(collection), mItemsToCheck.value(rid), mTagStore, this);
job->setQueuedUpdates(mQueuedUpdates.value(collection.remoteId()));
mQueuedUpdates.remove(collection.remoteId());
connect(job, &EwsFetchItemsJob::result, this, &EwsResource::itemFetchJobFinished);
connectStatusSignals(job);
job->start();
queueFetchItemsJob(collection, [this](EwsFetchItemsJob *fetchJob) {
auto col = fetchJob->collection();
if (fetchJob->error()) {
qCWarningNC(EWSRES_LOG) << QStringLiteral("Item fetch error:") << fetchJob->errorString() << fetchJob->error();
const auto syncState = getCollectionSyncState(fetchJob->collection());
if (!syncState.isEmpty()) {
qCDebugNC(EWSRES_LOG) << QStringLiteral("Retrying with empty state.");
// Retry with a clear sync state.
saveCollectionSyncState(col, QString());
retrieveItems(col);
} else {
qCDebugNC(EWSRES_LOG) << QStringLiteral("Clean sync failed.");
// No more hope
cancelTask(i18nc("@info:status", "Failed to retrieve items"));
return;
}
} else {
saveCollectionSyncState(col, fetchJob->syncState());
itemsRetrievedIncremental(fetchJob->newItems() + fetchJob->changedItems(), fetchJob->deletedItems());
}
saveState();
mItemsToCheck.remove(fetchJob->collection().remoteId());
emitReadyStatus();
});
}
void EwsResource::queueFetchItemsJob(const Akonadi::Collection &col, std::function<void(EwsFetchItemsJob *)> startFn)
{
qCDebugNC(EWSRES_LOG) << QStringLiteral("Enqueuing sync for collection ") << col;
const auto queueEmpty = mFetchItemsJobQueue.empty();
if (!queueEmpty) {
for (const auto &item : std::as_const(mFetchItemsJobQueue)) {
// Don't enqueue the same collection twice, unless it's for the currently synced collection.
if (item.col != mFetchItemsJobQueue.head().col && item.col == col) {
qCDebugNC(EWSRES_LOG) << QStringLiteral("Sync already queued - skipping");
return;
}
}
}
mFetchItemsJobQueue.enqueue({col, startFn});
if (queueEmpty) {
startFetchItemsJob(col, startFn);
}
}
void EwsResource::dequeueFetchItemsJob()
{
qCDebugNC(EWSRES_LOG) << QStringLiteral("Finished queued sync");
mFetchItemsJobQueue.dequeue();
if (!mFetchItemsJobQueue.empty()) {
const auto &head = mFetchItemsJobQueue.head();
startFetchItemsJob(head.col, head.startFn);
}
}
void EwsResource::startFetchItemsJob(const Akonadi::Collection &col, std::function<void(EwsFetchItemsJob *)> startFn)
{
qCDebugNC(EWSRES_LOG) << QStringLiteral("Starting queued sync for collection ") << col;
auto fetchJob = new EwsFetchItemsJob(col, mEwsClient, getCollectionSyncState(col), mItemsToCheck.value(col.remoteId()), mTagStore, this);
fetchJob->setQueuedUpdates(mQueuedUpdates.value(col.remoteId()));
mQueuedUpdates.remove(col.remoteId());
connect(fetchJob, &EwsFetchItemsJob::result, this, [this, startFn, fetchJob](KJob *) {
startFn(fetchJob);
dequeueFetchItemsJob();
});
connectStatusSignals(fetchJob);
fetchJob->start();
}
bool EwsResource::retrieveItems(const Item::List &items, const QSet<QByteArray> &parts)
......@@ -1028,11 +1094,7 @@ void EwsResource::foldersModifiedCollectionSyncFinished(KJob *job)
auto fetchColJob = qobject_cast<CollectionFetchJob *>(job);
const auto collection = fetchColJob->collections().at(0);
auto fetchJob =
new EwsFetchItemsJob(collection, mEwsClient, getCollectionSyncState(collection), mItemsToCheck.value(collection.remoteId()), mTagStore, this);
fetchJob->setQueuedUpdates(mQueuedUpdates.value(collection.remoteId()));
mQueuedUpdates.remove(collection.remoteId());
connect(fetchJob, &EwsFetchItemsJob::result, this, [this, fetchJob](KJob *) {
queueFetchItemsJob(collection, [this](EwsFetchItemsJob *fetchJob) {
auto collection = fetchJob->collection();
if (fetchJob->error()) {
qCWarningNC(EWSRES_LOG) << QStringLiteral("Item fetch error:") << fetchJob->errorString() << fetchJob->error();
......@@ -1052,8 +1114,6 @@ void EwsResource::foldersModifiedCollectionSyncFinished(KJob *job)
emitReadyStatus();
}
});
connectStatusSignals(fetchJob);
fetchJob->start();
}
void EwsResource::folderTreeModifiedEvent()
......
......@@ -6,6 +6,7 @@
#pragma once
#include <QQueue>
#include <QScopedPointer>
#include <AkonadiAgentBase/ResourceBase>
......@@ -143,6 +144,10 @@ private:
static QString getCollectionSyncState(const Akonadi::Collection &col);
static void saveCollectionSyncState(Akonadi::Collection &col, const QString &state);
void queueFetchItemsJob(const Akonadi::Collection &col, std::function<void(EwsFetchItemsJob *)> startFn);
void startFetchItemsJob(const Akonadi::Collection &col, std::function<void(EwsFetchItemsJob *)> startFn);
void dequeueFetchItemsJob();
template<class Job>
void connectStatusSignals(Job *job);
......@@ -162,5 +167,11 @@ private:
int mInitialReconnectTimeout;
EwsTagStore *mTagStore = nullptr;
QScopedPointer<EwsSettings> mSettings;
struct QueuedFetchItemsJob {
Akonadi::Collection col;
std::function<void(EwsFetchItemsJob *)> startFn;
};
QQueue<QueuedFetchItemsJob> mFetchItemsJobQueue;
};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment