Commit 1bdde3b9 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Make COPY and MOVE handlers more interactive

When copying or moving large batch of items they all have to
go through ItemRetriever first to make sure all payload parts
are locally cached. This can take a lot of time, so to make
the operation seem more responsive, we don't wait for
ItemRetriever to finish retrieval for all the items, but
processes every batch as soon as the ItemRetriever reports
it as finished.
parent 39bcc7d5
......@@ -63,6 +63,30 @@ public:
intervals = other.intervals;
}
template<typename T>
void add(const T &values)
{
T vals = values;
qSort(vals);
for (int i = 0; i < vals.count(); ++i) {
const int begin = vals[i];
Q_ASSERT(begin >= 0);
if (i == vals.count() - 1) {
intervals << ImapInterval(begin, begin);
break;
}
do {
++i;
Q_ASSERT(vals[i] >= 0);
if (vals[i] != (vals[i - 1] + 1)) {
--i;
break;
}
} while (i < vals.count() - 1);
intervals << ImapInterval(begin, vals[i]);
}
}
ImapInterval::List intervals;
};
......@@ -187,6 +211,12 @@ ImapSet::ImapSet(const QVector<qint64> &ids)
add(ids);
}
ImapSet::ImapSet(const QList<qint64> &ids)
: d(new Private)
{
add(ids);
}
ImapSet::ImapSet(const ImapInterval &interval)
:d (new Private)
{
......@@ -225,25 +255,12 @@ bool ImapSet::operator==(const ImapSet &other) const
void ImapSet::add(const QVector<Id> &values)
{
QVector<Id> vals = values;
qSort(vals);
for (int i = 0; i < vals.count(); ++i) {
const int begin = vals[i];
Q_ASSERT(begin >= 0);
if (i == vals.count() - 1) {
d->intervals << ImapInterval(begin, begin);
break;
}
do {
++i;
Q_ASSERT(vals[i] >= 0);
if (vals[i] != (vals[i - 1] + 1)) {
--i;
break;
}
} while (i < vals.count() - 1);
d->intervals << ImapInterval(begin, vals[i]);
}
d->add(values);
}
void ImapSet::add(const QList<Id> &values)
{
d->add(values);
}
void ImapSet::add(const QSet<Id> &values)
......
......@@ -158,6 +158,8 @@ public:
ImapSet(const QVector<qint64> &ids);
ImapSet(const QList<qint64> &ids);
ImapSet(const ImapInterval &interval);
/**
......@@ -189,6 +191,7 @@ public:
@param values List of positive integer numbers in arbitrary order
*/
void add(const QVector<Id> &values);
void add(const QList<Id> &values);
/**
* @overload
......
......@@ -36,8 +36,6 @@ using namespace Akonadi::Server;
bool Copy::copyItem(const PimItem &item, const Collection &target)
{
// qCDebug(AKONADISERVER_LOG) << "Copy::copyItem";
PimItem newItem = item;
newItem.setId(-1);
newItem.setRev(0);
......@@ -67,6 +65,34 @@ bool Copy::copyItem(const PimItem &item, const Collection &target)
return true;
}
void Copy::itemsRetrieved(const QList<qint64>& ids)
{
SelectQueryBuilder<PimItem> qb;
ItemQueryHelper::itemSetToQuery(ImapSet(ids), qb);
if (!qb.exec()) {
failureResponse("Unable to retrieve items");
return;
}
PimItem::List items = qb.result();
qb.query().finish();
DataStore *store = connection()->storageBackend();
Transaction transaction(store);
Q_FOREACH (const PimItem &item, items) {
if (!copyItem(item, mTargetCollection)) {
failureResponse("Unable to copy item");
return;
}
}
if (!transaction.commit()) {
failureResponse("Cannot commit transaction.");
return;
}
}
bool Copy::parseStream()
{
Protocol::CopyItemsCommand cmd(m_command);
......@@ -79,44 +105,25 @@ bool Copy::parseStream()
return failureResponse("No items specified");
}
mTargetCollection = HandlerHelper::collectionFromScope(cmd.destination(), connection());
if (!mTargetCollection.isValid()) {
return failureResponse("No valid target specified");
}
if (mTargetCollection.isVirtual()) {
return failureResponse("Copying items into virtual collections is not allowed");
}
CacheCleanerInhibitor inhibitor;
ItemRetriever retriever(connection());
retriever.setItemSet(cmd.items().uidSet());
retriever.setRetrieveFullPayload(true);
connect(&retriever, &ItemRetriever::itemsRetrieved,
this, &Copy::itemsRetrieved);
if (!retriever.exec()) {
return failureResponse(retriever.lastError());
}
const Collection targetCollection = HandlerHelper::collectionFromScope(cmd.destination(), connection());
if (!targetCollection.isValid()) {
return failureResponse("No valid target specified");
}
if (targetCollection.isVirtual()) {
return failureResponse("Copying items into virtual collections is not allowed");
}
SelectQueryBuilder<PimItem> qb;
ItemQueryHelper::itemSetToQuery(cmd.items().uidSet(), qb);
if (!qb.exec()) {
return failureResponse("Unable to retrieve items");
}
PimItem::List items = qb.result();
qb.query().finish();
DataStore *store = connection()->storageBackend();
Transaction transaction(store);
Q_FOREACH (const PimItem &item, items) {
if (!copyItem(item, targetCollection)) {
return failureResponse("Unable to copy item");
}
}
if (!transaction.commit()) {
return failureResponse("Cannot commit transaction.");
}
return successResponse<Protocol::CopyItemsResponse>();
return true;
}
......@@ -62,6 +62,12 @@ protected:
The changes mentioned above are applied.
*/
bool copyItem(const PimItem &item, const Collection &target);
private Q_SLOTS:
void itemsRetrieved(const QList<qint64> &ids);
private:
Collection mTargetCollection;
};
} // namespace Server
......
......@@ -32,48 +32,23 @@
using namespace Akonadi;
using namespace Akonadi::Server;
bool Move::parseStream()
void Move::itemsRetrieved(const QList<qint64> &ids)
{
Protocol::MoveItemsCommand cmd(m_command);
const Collection destination = HandlerHelper::collectionFromScope(cmd.destination(), connection());
if (destination.isVirtual()) {
return failureResponse("Moving items into virtual collection is not allowed");
}
if (!destination.isValid()) {
return failureResponse("Invalid destination collection");
}
connection()->context()->setScopeContext(cmd.itemsContext());
if (cmd.items().scope() == Scope::Rid) {
if (!connection()->context()->collection().isValid()) {
return failureResponse("RID move requires valid source collection");
}
}
CacheCleanerInhibitor inhibitor;
// make sure all the items we want to move are in the cache
ItemRetriever retriever(connection());
retriever.setScope(cmd.items());
retriever.setRetrieveFullPayload(true);
if (!retriever.exec()) {
return failureResponse(retriever.lastError());
}
DataStore *store = connection()->storageBackend();
Transaction transaction(store);
SelectQueryBuilder<PimItem> qb;
ItemQueryHelper::scopeToQuery(cmd.items(), connection()->context(), qb);
qb.addValueCondition(PimItem::collectionIdFullColumnName(), Query::NotEquals, destination.id());
ItemQueryHelper::itemSetToQuery(ImapSet(ids), qb);
qb.addValueCondition(PimItem::collectionIdFullColumnName(), Query::NotEquals, mDestination.id());
const QDateTime mtime = QDateTime::currentDateTime();
if (qb.exec()) {
const QVector<PimItem> items = qb.result();
if (items.isEmpty()) {
return successResponse<Protocol::MoveItemsResponse>();
successResponse<Protocol::MoveItemsResponse>();
return;
}
// Split the list by source collection
......@@ -82,22 +57,24 @@ bool Move::parseStream()
Q_FOREACH (/*sic!*/ PimItem item, items) {
const Collection source = items.at(0).collection();
if (!source.isValid()) {
return failureResponse("Item without collection found!?");
failureResponse("Item without collection found!?");
return;
}
if (!sources.contains(source.id())) {
sources.insert(source.id(), source);
}
if (!item.isValid()) {
return failureResponse("Invalid item in result set!?");
failureResponse("Invalid item in result set!?");
return;
}
Q_ASSERT(item.collectionId() != destination.id());
Q_ASSERT(item.collectionId() != mDestination.id());
item.setCollectionId(destination.id());
item.setCollectionId(mDestination.id());
item.setAtime(mtime);
item.setDatetime(mtime);
// if the resource moved itself, we assume it did so because the change happend in the backend
if (connection()->context()->resource().id() != destination.resourceId()) {
if (connection()->context()->resource().id() != mDestination.resourceId()) {
item.setDirty(true);
}
......@@ -113,7 +90,7 @@ bool Move::parseStream()
}
const Collection &source = sources.value(sourceId);
store->notificationCollector()->itemsMoved(itemsToMove, source, destination);
store->notificationCollector()->itemsMoved(itemsToMove, source, mDestination);
for (auto iter = toMove.find(sourceId), end = toMove.end(); iter != end; ++iter) {
// reset RID on inter-resource moves, but only after generating the change notification
......@@ -125,16 +102,52 @@ bool Move::parseStream()
// FIXME Could we aggregate the changes to a single SQL query?
if (!(*iter).update()) {
return failureResponse("Unable to update item");
failureResponse("Unable to update item");
return;
}
}
}
} else {
return failureResponse("Unable to execute query");
failureResponse("Unable to execute query");
return;
}
if (!transaction.commit()) {
return failureResponse("Unable to commit transaction.");
failureResponse("Unable to commit transaction.");
return;
}
}
bool Move::parseStream()
{
Protocol::MoveItemsCommand cmd(m_command);
mDestination = HandlerHelper::collectionFromScope(cmd.destination(), connection());
if (mDestination.isVirtual()) {
return failureResponse("Moving items into virtual collection is not allowed");
}
if (!mDestination.isValid()) {
return failureResponse("Invalid destination collection");
}
connection()->context()->setScopeContext(cmd.itemsContext());
if (cmd.items().scope() == Scope::Rid) {
if (!connection()->context()->collection().isValid()) {
return failureResponse("RID move requires valid source collection");
}
}
CacheCleanerInhibitor inhibitor;
// make sure all the items we want to move are in the cache
ItemRetriever retriever(connection());
retriever.setScope(cmd.items());
retriever.setRetrieveFullPayload(true);
connect(&retriever, &ItemRetriever::itemsRetrieved,
this, &Move::itemsRetrieved);
if (!retriever.exec()) {
return failureResponse(retriever.lastError());
}
return successResponse<Protocol::MoveItemsResponse>();
......
......@@ -41,8 +41,15 @@ namespace Server {
class Move : public Handler
{
Q_OBJECT
public:
bool parseStream() Q_DECL_OVERRIDE;
private Q_SLOTS:
void itemsRetrieved(const QList<qint64> &ids);
private:
Collection mDestination;
};
} // namespace Server
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment