Verified Commit ade84d0b authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Minor ItemRetriever refactoring/cleanup

parent b94b8115
......@@ -127,12 +127,9 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest req)
Q_EMIT requestAdded();
}
// called within the retrieval thread
void ItemRetrievalManager::processRequest()
QVector<AbstractItemRetrievalJob *> ItemRetrievalManager::scheduleJobsForIdleResourcesLocked()
{
QVector<AbstractItemRetrievalJob *> newJobs;
QWriteLocker locker(&mLock);
// look for idle resources
for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
if (it->second.empty()) {
it = mPendingRequests.erase(it);
......@@ -154,13 +151,22 @@ void ItemRetrievalManager::processRequest()
++it;
}
return newJobs;
}
// called within the retrieval thread
void ItemRetrievalManager::processRequest()
{
QWriteLocker locker(&mLock);
// look for idle resources
auto newJobs = scheduleJobsForIdleResourcesLocked();
// someone asked as to process requests although everything is done already, he might still be waiting
if (mPendingRequests.empty() && mCurrentJobs.isEmpty() && newJobs.isEmpty()) {
return;
}
locker.unlock();
// Start the jobs
for (auto *job : newJobs) {
if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob *>(job)) {
j->setInterface(resourceInterface(j->request().resourceId));
......@@ -169,6 +175,17 @@ void ItemRetrievalManager::processRequest()
}
}
namespace {
bool isSubsetOf(const QByteArrayList &superset, const QByteArrayList &subset)
{
// For very small lists like these, this is faster than copy, sort and std::include
return std::all_of(subset.cbegin(), subset.cend(),
[&superset](const auto &val) { return superset.contains(val); });
}
}
void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
{
const auto &request = job->request();
......@@ -183,9 +200,11 @@ void ItemRetrievalManager::retrievalJobFinished(AbstractItemRetrievalJob *job)
QWriteLocker locker(&mLock);
Q_ASSERT(mCurrentJobs.contains(request.resourceId));
mCurrentJobs.remove(request.resourceId);
// Check if there are any pending requests that are satisfied by this retrieval job
auto &requests = mPendingRequests[request.resourceId];
for (auto it = requests.begin(); it != requests.end();) {
if (it->ids == request.ids) {
// TODO: also complete requests that are subset of the completed one
if (it->ids == request.ids && isSubsetOf(request.parts, it->parts)) {
qCDebug(AKONADISERVER_LOG) << "Someone else requested items " << request.ids << "as well, marking as processed.";
ItemRetrievalResult otherResult{std::move(*it)};
otherResult.errorMsg = result.errorMsg;
......
......@@ -75,6 +75,7 @@ Q_SIGNALS:
private:
OrgFreedesktopAkonadiResourceInterface *resourceInterface(const QString &id);
QVector<AbstractItemRetrievalJob *> scheduleJobsForIdleResourcesLocked();
private Q_SLOTS:
void init() override;
......
......@@ -41,6 +41,7 @@
using namespace Akonadi;
using namespace Akonadi::Server;
using namespace AkRanges;
Q_DECLARE_METATYPE(ItemRetrievalResult)
......@@ -211,22 +212,64 @@ static bool hasAllParts(const ItemRetrievalRequest &req, const QSet<QByteArray>
}
}
bool ItemRetriever::exec()
bool ItemRetriever::runItemRetrievalRequests(std::list<ItemRetrievalRequest> requests)
{
if (mParts.isEmpty() && !mFullPayload) {
return true;
QEventLoop eventLoop;
QVector<ItemRetrievalRequest::Id> pendingRequests;
connect(&mItemRetrievalManager, &ItemRetrievalManager::requestFinished,
this, [this, &eventLoop, &pendingRequests](const ItemRetrievalResult &result) {
if (pendingRequests.contains(result.request.id)) {
if (mCanceled) {
eventLoop.exit(1);
} else if (result.errorMsg.has_value()) {
mLastError = result.errorMsg->toUtf8();
eventLoop.exit(1);
} else {
Q_EMIT itemsRetrieved(result.request.ids);
pendingRequests.removeOne(result.request.id);
if (pendingRequests.empty()) {
eventLoop.quit();
}
}
}
}, Qt::UniqueConnection);
if (mConnection) {
connect(mConnection, &Connection::connectionClosing,
&eventLoop, [&eventLoop]() { eventLoop.exit(1); });
}
verifyCache();
for (auto &&request : requests) {
if ((!mFullPayload && request.parts.isEmpty()) || request.ids.isEmpty()) {
continue;
}
QSqlQuery query = buildQuery();
QByteArrayList parts;
for (const QByteArray &part : qAsConst(mParts)) {
if (part.startsWith(AKONADI_PARAM_PLD)) {
parts << part.mid(4);
// TODO: how should we handle retrieval errors here? so far they have been ignored,
// which makes sense in some cases, do we need a command parameter for this?
try {
// Request is deleted inside ItemRetrievalManager, so we need to take
// a copy here
//const auto ids = request->ids;
pendingRequests.push_back(request.id);
mItemRetrievalManager.requestItemDelivery(std::move(request));
} catch (const ItemRetrieverException &e) {
qCCritical(AKONADISERVER_LOG) << e.type() << ": " << e.what();
mLastError = e.what();
return false;
}
}
if (!pendingRequests.empty()) {
if (eventLoop.exec()) {
return false;
}
}
return true;
}
akOptional<ItemRetriever::PreparedRequests> ItemRetriever::prepareRequests(QSqlQuery &query, const QByteArrayList &parts)
{
QHash<qint64, QString> resourceIdNameCache;
std::list<ItemRetrievalRequest> requests;
QHash<qint64 /* collection */, decltype(requests)::iterator> colRequests;
......@@ -242,7 +285,7 @@ bool ItemRetriever::exec()
const auto itemIter = itemRequests.constFind(pimItemId);
if (Q_UNLIKELY(mCanceled)) {
return false;
return nullopt;
}
if (pimItemId == prevPimItemId) {
......@@ -324,61 +367,33 @@ bool ItemRetriever::exec()
// No need to update the hashtable at this point
}
//qCDebug(AKONADISERVER_LOG) << "Closing queries and sending out requests.";
return PreparedRequests{std::move(requests), std::move(readyItems)};
}
if (!readyItems.isEmpty()) {
Q_EMIT itemsRetrieved(readyItems);
bool ItemRetriever::exec()
{
if (mParts.isEmpty() && !mFullPayload) {
return true;
}
QEventLoop eventLoop;
QVector<ItemRetrievalRequest::Id> pendingRequests;
connect(&mItemRetrievalManager, &ItemRetrievalManager::requestFinished,
this, [this, &eventLoop, &pendingRequests](const ItemRetrievalResult &result) {
if (pendingRequests.contains(result.request.id)) {
if (mCanceled) {
eventLoop.exit(1);
} else if (result.errorMsg.has_value()) {
mLastError = result.errorMsg->toUtf8();
eventLoop.exit(1);
} else {
Q_EMIT itemsRetrieved(result.request.ids);
pendingRequests.removeOne(result.request.id);
if (pendingRequests.empty()) {
eventLoop.quit();
}
}
}
}, Qt::UniqueConnection);
verifyCache();
if (mConnection) {
connect(mConnection, &Connection::connectionClosing,
&eventLoop, [&eventLoop]() { eventLoop.exit(1); });
}
QSqlQuery query = buildQuery();
const auto parts = mParts | Views::filter([](const auto &part) { return part.startsWith(AKONADI_PARAM_PLD); })
| Views::transform([](const auto &part) { return part.mid(4); })
| Actions::toQList;
for (auto &&request : requests) {
if ((!mFullPayload && request.parts.isEmpty()) || request.ids.isEmpty()) {
continue;
}
const auto requests = prepareRequests(query, parts);
if (!requests.has_value()) {
return false;
}
// TODO: how should we handle retrieval errors here? so far they have been ignored,
// which makes sense in some cases, do we need a command parameter for this?
try {
// Request is deleted inside ItemRetrievalManager, so we need to take
// a copy here
//const auto ids = request->ids;
pendingRequests.push_back(request.id);
mItemRetrievalManager.requestItemDelivery(std::move(request));
} catch (const ItemRetrieverException &e) {
qCCritical(AKONADISERVER_LOG) << e.type() << ": " << e.what();
mLastError = e.what();
return false;
}
if (!requests->readyItems.isEmpty()) {
Q_EMIT itemsRetrieved(requests->readyItems);
}
if (!pendingRequests.empty()) {
if (eventLoop.exec()) {
return false;
}
if (!runItemRetrievalRequests(std::move(requests->requests))) {
return false;
}
// retrieve items in child collections if requested
......
......@@ -29,6 +29,8 @@
#include <private/scope_p.h>
#include <private/imapset_p.h>
#include <shared/akoptional.h>
AKONADI_EXCEPTION_MAKE_INSTANCE(ItemRetrieverException);
......@@ -40,6 +42,7 @@ namespace Server
class Connection;
class CommandContext;
class ItemRetrievalManager;
class ItemRetrievalRequest;
/**
Helper class for retrieving missing items parts from remote resources.
......@@ -87,6 +90,14 @@ private:
*/
void verifyCache();
/// Execute the retrieval
bool runItemRetrievalRequests(std::list<ItemRetrievalRequest> requests);
struct PreparedRequests {
std::list<ItemRetrievalRequest> requests;
QVector<qint64> readyItems;
};
akOptional<PreparedRequests> prepareRequests(QSqlQuery &query, const QByteArrayList &parts);
Akonadi::ImapSet mItemSet;
Collection mCollection;
Scope mScope;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment