Commit 81652601 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Introduce batch Resource Item retrieval (ABI break)

One of the bottlenecks in the current design is the individual Item
retrieval via requestItemDelivery(). The Server does a request for
the first Item, then waits for the Resource to deliver before requesting
the next Item and so on. This patch changes the signature of the
requestItemDelivery() DBus method to support passing multiple Items
(Item IDs to be specific) at once. The patch also deprecates
ResourceBase::retrieveItem() in favor of a newly introduces
ResourceBase::retrieveItems() overload (which takes Item::List and
set of part names as arguments) allowing resource implementations to
process all the requested Items in a single batch and pass them back
to the Server. This should be much faster than querying the Items one
by one.

This change does not remove the retrieveItem() method, but only
deprecates it. When Resource only implements retrieveItem() and not
the new retrieveItems() then retrieveItems() will split the ResourceScheduler
Task into many single-Item Tasks and will call retrieveItem() for
each of them.

The next step is to get rid of DBus in here and use a dedicated
command bus where the Server could request the Items via the Protocol
and would be handed the Items back through the bus.

Once all resources are ported to the new API, the old retrieveItem()
method should be removed.
parent d576724f
......@@ -20,7 +20,7 @@ include(ECMQtDeclareLoggingCategory)
include(AkonadiMacros)
set(PIM_VERSION "5.3.40")
set(PIM_VERSION "5.3.41")
set(QT_REQUIRED_VERSION "5.5.0")
set(AKONADI_VERSION ${PIM_VERSION})
......
......@@ -156,6 +156,7 @@ void KnutResource::retrieveItems(const Akonadi::Collection &collection)
itemsRetrieved(items);
}
#ifdef DO_IT_THE_OLD_WAY
bool KnutResource::retrieveItem(const Item &item, const QSet<QByteArray> &parts)
{
Q_UNUSED(parts);
......@@ -171,6 +172,30 @@ bool KnutResource::retrieveItem(const Item &item, const QSet<QByteArray> &parts)
itemRetrieved(i);
return true;
}
#endif
bool KnutResource::retrieveItems(const Item::List &items, const QSet<QByteArray> &parts)
{
Q_UNUSED(parts);
Item::List results;
results.reserve(items.size());
for (const auto &item : items) {
const QDomElement itemElem = mDocument.itemElementByRemoteId(item.remoteId());
if (itemElem.isNull()) {
cancelTask(i18n("No item found for remoteid %1", item.remoteId()));
return false;
}
Item i = XmlReader::elementToItem(itemElem, true);
i.setParentCollection(item.parentCollection());
i.setId(item.id());
results.push_back(i);
}
itemsRetrieved(results);
return true;
}
void KnutResource::collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent)
{
......
......@@ -50,7 +50,10 @@ public Q_SLOTS:
protected:
void retrieveCollections() Q_DECL_OVERRIDE;
void retrieveItems(const Akonadi::Collection &collection) Q_DECL_OVERRIDE;
#ifdef DO_IT_THE_OLD_WAY
bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts) Q_DECL_OVERRIDE;
#endif
bool retrieveItems(const Akonadi::Item::List &items, const QSet<QByteArray> &parts) Q_DECL_OVERRIDE;
void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) Q_DECL_OVERRIDE;
void collectionChanged(const Akonadi::Collection &collection) Q_DECL_OVERRIDE;
......
......@@ -163,6 +163,9 @@ public:
void slotPrepareItemRetrieval(const Akonadi::Item &item);
void slotPrepareItemRetrievalResult(KJob *job);
void slotPrepareItemsRetrieval(const QVector<Akonadi::Item> &item);
void slotPrepareItemsRetrievalResult(KJob *job);
void changeCommittedResult(KJob *job);
void slotRecursiveMoveReplay(RecursiveMover *mover);
......@@ -505,6 +508,8 @@ ResourceBase::ResourceBase(const QString &id)
SLOT(slotSynchronizeRelations()));
connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),
SLOT(slotPrepareItemRetrieval(Akonadi::Item)));
connect(d->scheduler, SIGNAL(executeItemsFetch(QVector<Akonadi::Item>,QSet<QByteArray>)),
SLOT(slotPrepareItemsRetrieval(QVector<Akonadi::Item>)));
connect(d->scheduler, SIGNAL(executeResourceCollectionDeletion()),
SLOT(slotDeleteResourceCollection()));
connect(d->scheduler, SIGNAL(executeCacheInvalidation(Akonadi::Collection)),
......@@ -620,8 +625,7 @@ void ResourceBase::itemRetrieved(const Item &item)
Q_D(ResourceBase);
Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
if (!item.isValid()) {
d->scheduler->currentTask().sendDBusReplies(i18nc("@info", "Invalid item retrieved"));
d->scheduler->taskDone();
d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
return;
}
......@@ -647,8 +651,7 @@ void ResourceBasePrivate::slotDeliveryDone(KJob *job)
if (job->error()) {
emit q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
}
scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : QString());
scheduler->taskDone();
scheduler->itemFetchDone(QString());
}
void ResourceBase::collectionAttributesRetrieved(const Collection &collection)
......@@ -775,7 +778,7 @@ void ResourceBase::changeCommitted(const Tag &tag)
connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
}
QString ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId, const QString &mimeType, const QByteArrayList &parts)
QString ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
{
Q_D(ResourceBase);
if (!isOnline()) {
......@@ -785,15 +788,15 @@ QString ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId, c
}
setDelayedReply(true);
// FIXME: we need at least the revision number too
Item item(uid);
item.setMimeType(mimeType);
item.setRemoteId(remoteId);
d->scheduler->scheduleItemFetch(item, QSet<QByteArray>::fromList(parts), message());
Item::List items;
items.reserve(uids.size());
for (auto uid : uids) {
items.push_back(Item(uid));
}
d->scheduler->scheduleItemsFetch(items, QSet<QByteArray>::fromList(parts), message());
return QString();
}
void ResourceBase::collectionsRetrieved(const Collection::List &collections)
......@@ -1005,16 +1008,16 @@ void ResourceBasePrivate::slotSynchronizeRelations()
QMetaObject::invokeMethod(q, "retrieveRelations");
}
void ResourceBasePrivate::slotPrepareItemRetrieval(const Akonadi::Item &item)
void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
{
Q_Q(ResourceBase);
ItemFetchJob *fetch = new ItemFetchJob(item, this);
auto fetch = new ItemFetchJob(item, this);
fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
fetch->fetchScope().setCacheOnly(true);
// copy list of attributes to fetch
const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
foreach (const QByteArray &attribute, attributes) {
for (const auto &attribute : attributes) {
fetch->fetchScope().fetchAttribute(attribute);
}
......@@ -1032,13 +1035,44 @@ void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
return;
}
ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
if (fetch->items().count() != 1) {
q->cancelTask(i18n("The requested item no longer exists"));
const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
if (!q->retrieveItem(fetch->items().at(0), parts)) {
q->cancelTask();
}
}
void ResourceBasePrivate::slotPrepareItemsRetrieval(const QVector<Item> &items)
{
Q_Q(ResourceBase);
ItemFetchJob *fetch = new ItemFetchJob(items, this);
fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
fetch->fetchScope().setCacheOnly(true);
// It's possible that one or more items were removed before this task was
// executed, so ignore it and just handle the rest.
fetch->fetchScope().setIgnoreRetrievalErrors(true);
// copy list of attributes to fetch
const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
for (const auto &attribute : attributes) {
fetch->fetchScope().fetchAttribute(attribute);
}
q->connect(fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemsRetrievalResult(KJob*)));
}
void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
{
Q_Q(ResourceBase);
Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
"ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
"Preparing items retrieval although no items retrieval is in progress");
if (job->error()) {
q->cancelTask(job->errorText());
return;
}
const Item item = fetch->items().at(0);
ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
if (!q->retrieveItem(item, parts)) {
if (!q->retrieveItems(fetch->items(), parts)) {
q->cancelTask();
}
}
......@@ -1073,6 +1107,9 @@ void ResourceBase::itemsRetrievalDone()
if (d->mItemSyncer) {
d->mItemSyncer->deliveryDone();
} else {
if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
d->scheduler->currentTask().sendDBusReplies(QString());
}
// user did the sync himself, we are done now
d->scheduler->taskDone();
}
......@@ -1105,7 +1142,16 @@ Item ResourceBase::currentItem() const
Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
"ResourceBase::currentItem()",
"Trying to access current item although no item retrieval is in progress");
return d->scheduler->currentTask().item;
return d->scheduler->currentTask().items[0];
}
Item::List ResourceBase::currentItems() const
{
Q_D(const ResourceBase);
Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
"ResourceBase::currentItems()",
"Trying to access current items although no items retrieval is in progress");
return d->scheduler->currentTask().items;
}
void ResourceBase::synchronizeCollectionTree()
......@@ -1261,12 +1307,53 @@ void ResourceBase::setItemStreamingEnabled(bool enable)
}
}
namespace {
class UpdateItemsJob : public KCompositeJob
{
public:
explicit UpdateItemsJob(const Item::List &items, QObject *parent = Q_NULLPTR)
: KCompositeJob(parent)
, mItems(items)
{
}
void start() Q_DECL_OVERRIDE
{
Q_FOREACH (const Item &item, mItems) {
auto job = new ItemModifyJob(item, this);
addSubjob(job);
}
}
void slotResult(KJob *job) Q_DECL_OVERRIDE
{
KCompositeJob::slotResult(job);
if (!hasSubjobs()) {
emitResult();
}
}
private:
Item::List mItems;
};
}
void ResourceBase::itemsRetrieved(const Item::List &items)
{
Q_D(ResourceBase);
d->createItemSyncInstanceIfMissing();
if (d->mItemSyncer) {
d->mItemSyncer->setFullSyncItems(items);
if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
auto job = new UpdateItemsJob(items, this);
connect(job, SIGNAL(result(KJob*)),
this, SLOT(slotItemSyncDone(KJob*)));
job->start();
} else {
d->createItemSyncInstanceIfMissing();
if (d->mItemSyncer) {
d->mItemSyncer->setFullSyncItems(items);
}
}
}
......@@ -1286,6 +1373,8 @@ void ResourceBasePrivate::slotItemSyncDone(KJob *job)
Q_Q(ResourceBase);
if (job->error() && job->error() != Job::UserCanceled) {
emit q->error(job->errorString());
} else if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
scheduler->currentTask().sendDBusReplies(QString());
}
scheduler->taskDone();
}
......@@ -1361,6 +1450,39 @@ void ResourceBase::retrieveRelations()
d->scheduler->taskDone();
}
bool ResourceBase::retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts)
{
Q_UNUSED(item);
Q_UNUSED(parts);
// retrieveItem() can no longer be pure virtual, because then we could not mark
// it as deprecated (i.e. implementations would still be forced to implement it),
// so instead we assert here.
// NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
// compile time, we want to hit this assert *ALWAYS*.
qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
"The base implementation of retrieveItem() must never be reached. "
"You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
"to handle item retrieval requests.", __FILE__, __LINE__);
return false;
}
bool ResourceBase::retrieveItems(const Akonadi::Item::List& items, const QSet<QByteArray>& parts)
{
Q_D(ResourceBase);
// If we reach this implementation of retrieveItems() then it means that the
// resource is still using the deprecated retrieveItem() method, so we explode
// this to a myriad of tasks in scheduler and let them be processed one by one
const qint64 id = d->scheduler->currentTask().serial;
for (const auto &item : items) {
d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
}
taskDone();
return true;
}
void Akonadi::ResourceBase::abortActivity()
{
}
......
......@@ -347,8 +347,24 @@ protected Q_SLOTS:
* @param parts The item parts that should be retrieved.
* @return false if there is an immediate error when retrieving the item.
* @see itemRetrieved()
* @deprecated Use retrieveItems(const Akonadi::Item::List &, const QSet<QByteArray> &) instead.
*/
virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts) = 0;
AKONADIAGENTBASE_DEPRECATED virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts);
/**
* Retrieve given @p items from the backend.
* Add the requested payload parts and call itemsRetrieved() when done.
* @param items The items whose payload should be retrieved. Use those objects
* when delivering the result instead of creating new items to ensure conflict
* detection will work.
* @param parts The item parts that should be retrieved.
* @return false if there is an immeidate error when retrieving the items.
* @see itemsRetrieved()
* @since 5.4
*
* @todo: Make this method pure virtual once retrieveItem() is gone
*/
virtual bool retrieveItems(const Akonadi::Item::List &items, const QSet<QByteArray> &parts);
/**
* Abort any activity in progress in the backend. By default this method does nothing.
......@@ -643,7 +659,14 @@ protected:
* @note Calling this method is only allowed during fetching a single item, that
* is directly or indirectly from retrieveItem().
*/
Item currentItem() const;
AKONADIAGENTBASE_DEPRECATED Item currentItem() const;
/**
* Returns the items that are currently retrieved.
* @note Calling this method is only allowed during item fetch, that is
* directly or indirectly from retrieveItems(Akonadi::Item::List,QSet<QByteArray>)
*/
Item::List currentItems() const;
/**
* This method is called whenever the resource should start synchronize all data.
......@@ -820,7 +843,7 @@ private:
// dbus resource interface
friend class ::Akonadi__ResourceAdaptor;
QString requestItemDelivery(qint64 uid, const QString &remoteId, const QString &mimeType, const QByteArrayList &parts);
QString requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts);
private:
Q_DECLARE_PRIVATE(ResourceBase)
......@@ -841,8 +864,10 @@ private:
Q_PRIVATE_SLOT(d_func(), void slotItemSyncDone(KJob *))
Q_PRIVATE_SLOT(d_func(), void slotPercent(KJob *, unsigned long))
Q_PRIVATE_SLOT(d_func(), void slotDelayedEmitProgress())
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &item))
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &items))
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrievalResult(KJob *))
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemsRetrieval(const QVector<Akonadi::Item> &items))
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemsRetrievalResult(KJob *))
Q_PRIVATE_SLOT(d_func(), void changeCommittedResult(KJob *))
Q_PRIVATE_SLOT(d_func(), void slotSessionReconnected())
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplay(RecursiveMover *))
......
......@@ -124,11 +124,30 @@ void ResourceScheduler::scheduleAttributesSync(const Collection &collection)
scheduleNext();
}
void ResourceScheduler::scheduleItemFetch(const Item &item, const QSet<QByteArray> &parts, const QDBusMessage &msg)
void ResourceScheduler::scheduleItemFetch(const Akonadi::Item &item, const QSet<QByteArray> &parts,
const QList<QDBusMessage> &msgs, qint64 parentId)
{
Task t;
t.type = FetchItem;
t.item = item;
t.items << item;
t.itemParts = parts;
t.dbusMsgs = msgs;
t.argument = parentId;
TaskList &queue = queueForTaskType(t.type);
queue << t;
signalTaskToTracker(t, "FetchItem", QString::number(item.id()));
scheduleNext();
}
void ResourceScheduler::scheduleItemsFetch(const Item::List &items, const QSet<QByteArray> &parts, const QDBusMessage &msg)
{
Task t;
t.type = FetchItems;
t.items = items;
t.itemParts = parts;
// if the current task does already fetch the requested item, break here but
......@@ -148,7 +167,13 @@ void ResourceScheduler::scheduleItemFetch(const Item &item, const QSet<QByteArra
t.dbusMsgs << msg;
queue << t;
signalTaskToTracker(t, "FetchItem", QString::number(item.id()));
QStringList ids;
ids.reserve(items.size());
for (const auto &item : items) {
ids.push_back(QString::number(item.id()));
}
signalTaskToTracker(t, "FetchItems", ids.join(QStringLiteral(", ")));
scheduleNext();
}
......@@ -282,6 +307,44 @@ void ResourceScheduler::taskDone()
scheduleNext();
}
void ResourceScheduler::itemFetchDone(const QString &msg)
{
Q_ASSERT(mCurrentTask.type == FetchItem);
TaskList &queue = queueForTaskType(mCurrentTask.type);
const qint64 parentId = mCurrentTask.argument.toLongLong();
// msg is empty, there was no error
if (msg.isEmpty() && !queue.isEmpty()) {
Task &nextTask = queue[0];
// If the next task is FetchItem too...
if (nextTask.type != mCurrentTask.type || nextTask.argument.toLongLong() != parentId) {
// If the next task is not FetchItem or the next FetchItem task has
// different parentId then this was the last task in the series, so
// send the DBus replies.
mCurrentTask.sendDBusReplies(msg);
}
} else {
// msg was not empty, there was an error.
// remove all subsequent FetchItem tasks with the same parentId
auto iter = queue.begin();
while (iter != queue.end()) {
if (iter->type != mCurrentTask.type || iter->argument.toLongLong() == parentId) {
iter = queue.erase(iter);
continue;
} else {
break;
}
}
// ... and send DBus reply with the error message
mCurrentTask.sendDBusReplies(msg);
}
taskDone();
}
void ResourceScheduler::deferTask()
{
if (mCurrentTask.type == Invalid) {
......@@ -362,7 +425,10 @@ void ResourceScheduler::executeNext()
emit executeTagSync();
break;
case FetchItem:
emit executeItemFetch(mCurrentTask.item, mCurrentTask.itemParts);
emit executeItemFetch(mCurrentTask.items.at(0), mCurrentTask.itemParts);
break;
case FetchItems:
emit executeItemsFetch(mCurrentTask.items, mCurrentTask.itemParts);
break;
case DeleteResourceCollection:
emit executeResourceCollectionDeletion();
......@@ -415,6 +481,11 @@ ResourceScheduler::Task ResourceScheduler::currentTask() const
return mCurrentTask;
}
ResourceScheduler::Task &ResourceScheduler::currentTask()
{
return mCurrentTask;
}
void ResourceScheduler::setOnline(bool state)
{
if (mOnline == state) {
......@@ -432,9 +503,21 @@ void ResourceScheduler::setOnline(bool state)
}
// abort pending synchronous tasks, might take longer until the resource goes online again
TaskList &itemFetchQueue = queueForTaskType(FetchItem);
qint64 parentId = -1;
Task lastTask;
for (QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end();) {
if ((*it).type == FetchItem) {
(*it).sendDBusReplies(i18nc("@info", "Job canceled."));
qint64 idx = it->argument.toLongLong();
if (parentId == -1) {
parentId = idx;
}
if (idx != parentId) {
// Only emit the DBus reply once we reach the last taskwith the
// same "idx"
lastTask.sendDBusReplies(i18nc("@info", "Job canceled."));
parentId = idx;
}
lastTask = (*it);
it = itemFetchQueue.erase(it);
if (s_resourcetracker) {
QList<QVariant> argumentList;
......@@ -510,6 +593,7 @@ ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType(TaskType ty
case RecursiveMoveReplay:
return ChangeReplayQueue;
case FetchItem:
case FetchItems:
case SyncCollectionAttributes:
return UserActionQueue;
default:
......@@ -582,6 +666,7 @@ static const char s_taskTypes[][27] = {
"SyncCollectionAttributes",
"SyncTags",
"FetchItem",
"FetchItems",
"ChangeReplay",
"RecursiveMoveReplay",
"DeleteResourceCollection",
......@@ -599,8 +684,13 @@ QTextStream &Akonadi::operator<<(QTextStream &d, const ResourceScheduler::Task &
if (task.collection.isValid()) {
d << "collection " << task.collection.id() << " ";
}
if (task.item.id() != -1) {
d << "item " << task.item.id() << " ";
if (!task.items.isEmpty()) {
QStringList ids;
ids.reserve(task.items.size());
for (const auto &item : task.items) {
ids.push_back(QString::number(item.id()));
}
d << "items " << ids.join(QStringLiteral(", ")) << " ";
}
if (!task.methodName.isEmpty()) {
d << task.methodName << " " << task.argument.toString();
......
......@@ -57,6 +57,7 @@ public:
SyncCollectionAttributes,
SyncTags,
FetchItem,
FetchItems,
ChangeReplay,
RecursiveMoveReplay,
DeleteResourceCollection,
......@@ -81,7 +82,7 @@ public:
qint64 serial;
TaskType type;
Collection collection;
Item item;
QVector<Item> items;
QSet<QByteArray> itemParts;
QList<QDBusMessage> dbusMsgs;
QObject *receiver;
......@@ -94,7 +95,7 @@ public:
{
return type == other.type
&& (collection == other.collection || (!collection.isValid() && !other.collection.isValid()))
&& (item == other.item || (!item.isValid() && !other.item.isValid()))
&& items == other.items
&& itemParts == other.itemParts
&& receiver == other.receiver
&& methodName == other.methodName
......@@ -131,11 +132,28 @@ public:
/**
Schedules fetching of a single PIM item.
@param item The item to fetch.
This task is only ever used if the resource still uses the old deprecated
retrieveItem() (instead of retrieveItems(Item::List)) method. This task has
a special meaning to the scheduler and instead of replying to the DBus message
after the single @p item is retrieved, the items are accumulated until all
tasks from the same messages are fetched.
@param items The items to fetch.
@param parts List of names of the parts of the item to fetch.
@param msg The associated D-Bus message.
@param parentId ID of the original ItemsFetch task that this task was created from.
We can use this ID to group the tasks together