Commit dd8d63ff authored by Krzysztof Nowicki's avatar Krzysztof Nowicki Committed by Laurent Montel
Browse files

Split multiple-item modification jobs into chunks



The EWS server is only able to handle around 100 items inside a
request at a time. This applies to all requests that accept multiple
items. To allow mass item updates it is necessary to split large
series into chunks.

This change applies this modification to the mail item update job.

Updates are split into 10-item chunks. While it is well under the
limit, such a small chunk size does not increase overhead too much and
it allows more responsive progress reporting.

The modification also adds progress reporting, now that the
infrastructure is there. This should additionally fix the Akonadi
socket timeout errors that could have been caused by the resource
being busy for long periods of time without any message sent to/from
the resource.

A dedicated helper template class has been added to help reduce
boilerplate when adding chunk splitting into more request types.
Signed-off-by: Krzysztof Nowicki's avatarKrzysztof Nowicki <krissn@op.pl>
parent 1ca2dfff
/*
SPDX-FileCopyrightText: 2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
#pragma once
#include <QVector>
#include "ewsjob.h"
template<typename Req, typename ReqItem, typename RespItem>
class EwsAbstractChunkedJob
{
public:
EwsAbstractChunkedJob(unsigned int chunkSize);
~EwsAbstractChunkedJob() = default;
typedef QVector<ReqItem> ReqItemList;
typedef QVector<RespItem> RespItemList;
void setItems(const ReqItemList &items);
const RespItemList &responses() const;
template<typename ReqSetupFn, typename RespGetFn, typename ProgressFn, typename ResultFn>
void start(ReqSetupFn reqSetupFn, RespGetFn respGetFn, ProgressFn progressFn, ResultFn resultFn);
private:
unsigned int mChunkSize;
unsigned int mItemsDone;
Req *mRequest = nullptr;
ReqItemList mItems;
RespItemList mResponses;
};
template<typename Req, typename ReqItem, typename RespItem>
EwsAbstractChunkedJob<Req, ReqItem, RespItem>::EwsAbstractChunkedJob(unsigned int chunkSize)
: mChunkSize(chunkSize)
, mItemsDone(0)
{
}
template<typename Req, typename ReqItem, typename RespItem>
void EwsAbstractChunkedJob<Req, ReqItem, RespItem>::setItems(const ReqItemList &items)
{
mItems = items;
}
template<typename Req, typename ReqItem, typename RespItem>
const QVector<RespItem> &EwsAbstractChunkedJob<Req, ReqItem, RespItem>::responses() const
{
return mResponses;
}
template<typename Req, typename ReqItem, typename RespItem>
template<typename ReqSetupFn, typename RespGetFn, typename ProgressFn, typename ResultFn>
void EwsAbstractChunkedJob<Req, ReqItem, RespItem>::start(ReqSetupFn reqSetupFn, RespGetFn respGetFn, ProgressFn progressFn, ResultFn resultFn)
{
int itemsToDo = qMin(mItems.size() - mItemsDone, mChunkSize);
if (itemsToDo == 0) {
resultFn(true, QString());
return;
}
mRequest = reqSetupFn(mItems.cbegin() + mItemsDone, mItems.cbegin() + mItemsDone + itemsToDo);
if (!mRequest) {
resultFn(false, QStringLiteral("Failed to set-up request"));
return;
}
mItemsDone += itemsToDo;
QObject::connect(mRequest, &KJob::result, [this, reqSetupFn, respGetFn, progressFn, resultFn, itemsToDo](KJob *job) {
if (job->error()) {
resultFn(false, job->errorString());
return;
}
Req *req = qobject_cast<Req *>(job);
if (!req) {
resultFn(false, QStringLiteral("Incorrect request object type"));
return;
}
auto responses = respGetFn(req);
Q_ASSERT(responses.size() == itemsToDo);
mResponses += responses;
progressFn(mItemsDone * 100 / mItems.size());
start(reqSetupFn, respGetFn, progressFn, resultFn);
});
mRequest->start();
}
/*
SPDX-FileCopyrightText: 2015-2016 Krzysztof Nowicki <krissn@op.pl>
SPDX-FileCopyrightText: 2015-2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
......@@ -73,6 +73,13 @@ void EwsModifyItemFlagsJob::start()
EwsItemHandler *handler = EwsItemHandler::itemHandler(static_cast<EwsItemType>(type));
EwsModifyItemJob *job = handler->modifyItemJob(mClient, items[type], QSet<QByteArray>() << "FLAGS", this);
connect(job, &EwsModifyItemJob::result, this, &EwsModifyItemFlagsJob::itemModifyFinished);
connect(job, &EwsModifyItemJob::status, this, [this](int s, const QString &message) {
Q_EMIT status(s, message);
});
connect(job, &EwsModifyItemJob::percent, this, [this](int p) {
Q_EMIT percent(p);
});
addSubjob(job);
job->start();
started = true;
......
/*
SPDX-FileCopyrightText: 2015-2016 Krzysztof Nowicki <krissn@op.pl>
SPDX-FileCopyrightText: 2015-2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
......@@ -31,6 +31,10 @@ public:
void start() override;
Q_SIGNALS:
void status(int status, const QString &message = QString());
void percent(int progress);
protected:
Akonadi::Item::List mItems;
Akonadi::Item::List mResultItems;
......
/*
SPDX-FileCopyrightText: 2015-2016 Krzysztof Nowicki <krissn@op.pl>
SPDX-FileCopyrightText: 2015-2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
......@@ -21,6 +21,9 @@ public:
void setModifiedFlags(const QSet<QByteArray> &addedFlags, const QSet<QByteArray> &removedFlags);
const Akonadi::Item::List &items() const;
Q_SIGNALS:
void status(int status, const QString &message = QString());
void percent(int progress);
protected:
Akonadi::Item::List mItems;
......
......@@ -521,6 +521,7 @@ void EwsResource::itemChanged(const Akonadi::Item &item, const QSet<QByteArray>
} else {
EwsModifyItemJob *job = EwsItemHandler::itemHandler(type)->modifyItemJob(mEwsClient, Item::List() << item, partIdentifiers, this);
connect(job, &KJob::result, this, &EwsResource::itemChangeRequestFinished);
connectStatusSignals(job);
job->start();
}
}
......@@ -533,6 +534,7 @@ void EwsResource::itemsFlagsChanged(const Akonadi::Item::List &items, const QSet
auto job = new EwsModifyItemFlagsJob(mEwsClient, this, items, addedFlags, removedFlags);
connect(job, &EwsModifyItemFlagsJob::result, this, &EwsResource::itemModifyFlagsRequestFinished);
connectStatusSignals(job);
job->start();
}
......
/*
SPDX-FileCopyrightText: 2015-2016 Krzysztof Nowicki <krissn@op.pl>
SPDX-FileCopyrightText: 2015-2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
......@@ -8,6 +8,8 @@
#include <Akonadi/KMime/MessageFlags>
#include <KLocalizedString>
#include "ewsmailhandler.h"
#include "ewsupdateitemrequest.h"
......@@ -15,8 +17,11 @@
using namespace Akonadi;
constexpr unsigned ChunkSize = 10;
EwsModifyMailJob::EwsModifyMailJob(EwsClient &client, const Akonadi::Item::List &items, const QSet<QByteArray> &parts, QObject *parent)
: EwsModifyItemJob(client, items, parts, parent)
, mChunkedJob(ChunkSize)
{
}
......@@ -27,9 +32,10 @@ EwsModifyMailJob::~EwsModifyMailJob()
void EwsModifyMailJob::start()
{
bool doSubmit = false;
auto req = new EwsUpdateItemRequest(mClient, this);
EwsId itemId;
EwsUpdateItemRequest::ItemChange::List itemChanges;
itemChanges.reserve(mItems.size());
for (const Item &item : std::as_const(mItems)) {
itemId = EwsId(item.remoteId(), item.remoteRevision());
......@@ -47,48 +53,61 @@ void EwsModifyMailJob::start()
ic.addUpdate(upd);
}
req->addItemChange(ic);
itemChanges.append(ic);
doSubmit = true;
}
}
if (doSubmit) {
connect(req, &KJob::result, this, &EwsModifyMailJob::updateItemFinished);
req->start();
mChunkedJob.setItems(itemChanges);
mChunkedJob.start(
[this](EwsUpdateItemRequest::ItemChange::List::const_iterator firstChange, EwsUpdateItemRequest::ItemChange::List::const_iterator lastChange) {
auto req = new EwsUpdateItemRequest(mClient, this);
for (auto it = firstChange; it != lastChange; ++it) {
req->addItemChange(*it);
}
return req;
},
[](EwsUpdateItemRequest *req) {
return req->responses();
},
[this](unsigned int progress) {
Q_EMIT percent(progress);
},
[this](bool success, const QString &error) {
updateItemsFinished(success, error);
});
} else {
delete req;
emitResult();
}
}
void EwsModifyMailJob::updateItemFinished(KJob *job)
void EwsModifyMailJob::updateItemsFinished(bool success, const QString &error)
{
if (job->error()) {
setErrorText(job->errorString());
emitResult();
return;
}
auto req = qobject_cast<EwsUpdateItemRequest *>(job);
if (!req) {
setErrorText(QStringLiteral("Invalid EwsUpdateItemRequest job object"));
if (!success) {
setErrorText(error);
emitResult();
return;
}
Q_ASSERT(req->responses().size() == mItems.size());
Item::List::iterator it = mItems.begin();
const auto responses{req->responses()};
for (const EwsUpdateItemRequest::Response &resp : responses) {
for (const auto &resp : mChunkedJob.responses()) {
if (!resp.isSuccess()) {
setErrorText(QStringLiteral("Item update failed: ") + resp.responseMessage());
emitResult();
return;
}
it->setRemoteRevision(resp.itemId().changeKey());
/* In general EWS guarantees that the order of response items will match the order of request items.
* It is therefore safe to iterate these in parallel. */
if (it->remoteId() == resp.itemId().id()) {
it->setRemoteRevision(resp.itemId().changeKey());
++it;
} else {
setErrorText(i18nc("@info:status", "Item out of order while processing update item response."));
emitResult();
return;
}
}
emitResult();
}
/*
SPDX-FileCopyrightText: 2015-2016 Krzysztof Nowicki <krissn@op.pl>
SPDX-FileCopyrightText: 2015-2019 Krzysztof Nowicki <krissn@op.pl>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
#pragma once
#include "ewsabstractchunkedjob.h"
#include "ewsmodifyitemjob.h"
#include "ewsupdateitemrequest.h"
class EwsModifyMailJob : public EwsModifyItemJob
{
......@@ -15,7 +17,10 @@ public:
EwsModifyMailJob(EwsClient &client, const Akonadi::Item::List &items, const QSet<QByteArray> &parts, QObject *parent);
~EwsModifyMailJob() override;
void start() override;
private Q_SLOTS:
void updateItemFinished(KJob *job);
private:
void updateItemsFinished(bool success, const QString &error);
EwsAbstractChunkedJob<EwsUpdateItemRequest, EwsUpdateItemRequest::ItemChange, EwsUpdateItemRequest::Response> mChunkedJob;
};
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment