Commit feb520cb authored by Shashwat Jolly's avatar Shashwat Jolly
Browse files

Implement items retrieval and caching

- Sync token is set in EntriesFetchJob's mCollection's remoteRevision, which is then passed to CollectionModifyJob in slotItemsRetrieved()
- Changed CollectionCacheAttribute to EtebaseCache attribute, as it has to used for both collections and items
- Commented out item sync code in BaseHandler, which was used to handle conflict error (will see later)
parent 7721ea8e
......@@ -25,7 +25,7 @@ set(etesyncresource_SRCS
calendartaskbasehandler.cpp
calendarhandler.cpp
taskhandler.cpp
collectioncacheattribute.cpp
etebasecacheattribute.cpp
setupwizard.cpp
${etesyncconfig_SRCS}
......
......@@ -79,48 +79,48 @@ void BaseHandler::updateCollectionRevision(const EteSyncEntry *entry, const Coll
void BaseHandler::syncCollection(const QVariant &collectionVariant)
{
const Collection collection = collectionVariant.value<Collection>();
// const Collection collection = collectionVariant.value<Collection>();
qCDebug(ETESYNC_LOG) << "Syncing journal" << collection.remoteId();
// qCDebug(ETESYNC_LOG) << "Syncing journal" << collection.remoteId();
auto job = new EntriesFetchJob(mClientState->client(), collection, this);
connect(job, &EntriesFetchJob::finished, this, &BaseHandler::slotItemsRetrieved);
job->start();
// auto job = new EntriesFetchJob(mClientState->client(), collection, this);
// connect(job, &EntriesFetchJob::finished, this, &BaseHandler::slotItemsRetrieved);
// job->start();
}
void BaseHandler::slotItemsRetrieved(KJob *job)
{
if (job->error()) {
qCWarning(ETESYNC_LOG) << job->errorText();
return;
}
// if (job->error()) {
// qCWarning(ETESYNC_LOG) << job->errorText();
// return;
// }
std::vector<EteSyncEntryPtr> entries = qobject_cast<EntriesFetchJob *>(job)->getEntries();
// std::vector<EteSyncEntryPtr> entries = qobject_cast<EntriesFetchJob *>(job)->getEntries();
Collection collection = qobject_cast<EntriesFetchJob *>(job)->collection();
// Collection collection = qobject_cast<EntriesFetchJob *>(job)->collection();
qCDebug(ETESYNC_LOG) << "BaseHandler: Syncing items";
QString prevUid = collection.remoteRevision();
const QString journalUid = collection.remoteId();
// qCDebug(ETESYNC_LOG) << "BaseHandler: Syncing items";
// QString prevUid = collection.remoteRevision();
// const QString journalUid = collection.remoteId();
const bool isIncremental = (prevUid.isEmpty() || prevUid.isNull()) ? false : true;
// const bool isIncremental = (prevUid.isEmpty() || prevUid.isNull()) ? false : true;
Item::List changedItems;
Item::List removedItems;
// Item::List changedItems;
// Item::List removedItems;
getItemListFromEntries(entries, changedItems, removedItems, collection, journalUid, prevUid);
// getItemListFromEntries(entries, changedItems, removedItems, collection, journalUid, prevUid);
collection.setRemoteRevision(prevUid);
new CollectionModifyJob(collection, this);
// collection.setRemoteRevision(prevUid);
// new CollectionModifyJob(collection, this);
ItemSync *syncer = new ItemSync(collection);
// ItemSync *syncer = new ItemSync(collection);
if (isIncremental) {
syncer->setIncrementalSyncItems(changedItems, removedItems);
} else {
syncer->setFullSyncItems(changedItems);
}
connect(syncer, SIGNAL(result(KJob*)), this, SLOT(taskDone()));
// if (isIncremental) {
// syncer->setIncrementalSyncItems(changedItems, removedItems);
// } else {
// syncer->setFullSyncItems(changedItems);
// }
// connect(syncer, SIGNAL(result(KJob*)), this, SLOT(taskDone()));
}
bool BaseHandler::handleConflictError(const Collection &collection)
......
/*
* SPDX-FileCopyrightText: 2020 Shashwat Jolly <shashwat.jolly@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
#include "etebaseadapter.h"
#include "collectioncacheattribute.h"
#include <AkonadiCore/Attribute>
CollectionCacheAttribute::CollectionCacheAttribute(QByteArray collectionCache)
: mCollectionCache(collectionCache)
{
}
void CollectionCacheAttribute::setCollectionCache(const QByteArray &collectionCache)
{
mCollectionCache = collectionCache;
}
QByteArray CollectionCacheAttribute::collectionCache() const
{
return mCollectionCache;
}
QByteArray CollectionCacheAttribute::type() const
{
static const QByteArray sType("etebasecollectioncache");
return sType;
}
Akonadi::Attribute *CollectionCacheAttribute::clone() const
{
return new CollectionCacheAttribute(mCollectionCache);
}
QByteArray CollectionCacheAttribute::serialized() const
{
return mCollectionCache;
}
void CollectionCacheAttribute::deserialize(const QByteArray &data)
{
mCollectionCache = data;
}
......@@ -5,17 +5,23 @@
*/
#include "entriesfetchjob.h"
#include "etebasecacheattribute.h"
#include <kcontacts/addressee.h>
#include <KCalendarCore/Event>
#include <KCalendarCore/Todo>
#include <AkonadiCore/CollectionModifyJob>
#include <QtConcurrent>
#include "etesync_debug.h"
#include "settings.h"
using namespace Akonadi;
using namespace EteSyncAPI;
EntriesFetchJob::EntriesFetchJob(const EteSync *client, const Akonadi::Collection &collection, QObject *parent)
EntriesFetchJob::EntriesFetchJob(const EtebaseAccount *account, const Akonadi::Collection &collection, QObject *parent)
: KJob(parent)
, mClient(client)
, mAccount(account)
, mCollection(collection)
{
}
......@@ -33,34 +39,108 @@ void EntriesFetchJob::start()
void EntriesFetchJob::fetchEntries()
{
const QString journalUid = mCollection.remoteId();
mPrevUid = mLastUid = mCollection.remoteRevision();
mEntryManager = etesync_entry_manager_new(mClient, journalUid);
EntriesFetchJob::Status status = FETCH_OK;
while (status != ERROR && status != ALL_ENTRIES_FETCHED) {
status = fetchNextBatch();
if (!mCollection.hasAttribute<EtebaseCacheAttribute>()) {
setError(UserDefinedError);
setErrorText(QStringLiteral("No cache for collection ") + mCollection.remoteId());
return;
}
EtebaseCollectionManagerPtr collectionManager(etebase_account_get_collection_manager(mAccount));
const QByteArray collectionCache = mCollection.attribute<EtebaseCacheAttribute>()->etebaseCache();
EtebaseCollectionPtr etesyncCollection(etebase_collection_manager_cache_load(collectionManager.get(), collectionCache.constData(), collectionCache.size()));
EtebaseCollectionMetadataPtr metaData(etebase_collection_get_meta(etesyncCollection.get()));
const QString type = QString::fromUtf8(etebase_collection_metadata_get_collection_type(metaData.get()));
qCDebug(ETESYNC_LOG) << "Type:" << type;
QString sToken = mCollection.remoteRevision();
bool done = 0;
EtebaseItemManagerPtr itemManager(etebase_collection_manager_get_item_manager(collectionManager.get(), etesyncCollection.get()));
while (!done) {
EtebaseFetchOptionsPtr fetchOptions(etebase_fetch_options_new());
etebase_fetch_options_set_stoken(fetchOptions.get(), sToken);
etebase_fetch_options_set_limit(fetchOptions.get(), 50);
EtebaseItemListResponsePtr itemList(etebase_item_manager_list(itemManager.get(), fetchOptions.get()));
if (!itemList) {
setError(int(etebase_error_get_code()));
const char *err = etebase_error_get_message();
setErrorText(QString::fromUtf8(err));
return;
}
sToken = QString::fromUtf8(etebase_item_list_response_get_stoken(itemList.get()));
done = etebase_item_list_response_is_done(itemList.get());
if (status == ERROR) {
qCDebug(ETESYNC_LOG) << "Returning error from entries fetch job";
setError(etesync_get_error_code());
CharPtr err(etesync_get_error_message());
setErrorText(QStringFromCharPtr(err));
uintptr_t dataLength = etebase_item_list_response_get_data_length(itemList.get());
qCDebug(ETESYNC_LOG) << "Retrieved item list length" << dataLength;
const EtebaseItem *etesyncItems[dataLength];
if (etebase_item_list_response_get_data(itemList.get(), etesyncItems)) {
setError(int(etesync_get_error_code()));
const char *err = etebase_error_get_message();
setErrorText(QString::fromUtf8(err));
}
Item item;
for (int i = 0; i < dataLength; i++) {
saveItemCache(itemManager.get(), etesyncItems[i], item);
setupItem(item, etesyncItems[i], type);
}
}
qCDebug(ETESYNC_LOG) << "Entries fetched";
mCollection.setRemoteRevision(QString::fromUtf8(etebase_collection_get_stoken(etesyncCollection.get())));
}
EntriesFetchJob::Status EntriesFetchJob::fetchNextBatch()
void EntriesFetchJob::setupItem(Akonadi::Item &item, const EtebaseItem *etesyncItem, const QString &type)
{
std::pair<std::vector<EteSyncEntryPtr>, bool> entries = etesync_entry_manager_list(mEntryManager.get(), mLastUid, 50);
if (entries.second) {
return ERROR;
qCDebug(ETESYNC_LOG) << "Setting up item" << etebase_item_get_uid(etesyncItem);
if (!etesyncItem) {
qCDebug(ETESYNC_LOG) << "Unable to setup item - etesyncItem is null";
return;
}
if (entries.first.empty()) {
return ALL_ENTRIES_FETCHED;
if (type == ETEBASE_COLLECTION_TYPE_ADDRESS_BOOK) {
item.setMimeType(KContacts::Addressee::mimeType());
} else if (type == ETEBASE_COLLECTION_TYPE_CALENDAR) {
item.setMimeType(KCalendarCore::Event::eventMimeType());
} else if (type == ETEBASE_COLLECTION_TYPE_TASKS) {
item.setMimeType(KCalendarCore::Todo::todoMimeType());
} else {
qCWarning(ETESYNC_LOG) << "Unknown item type. Cannot set item mime type.";
return;
}
const QString itemUid = QString::fromUtf8(etebase_item_get_uid(etesyncItem));
item.setParentCollection(mCollection);
item.setRemoteId(itemUid);
QByteArray content(2000, '\0');
auto const len = etebase_item_get_content(etesyncItem, content.data(), 2000);
if (len > 2000) {
QByteArray content(len, '\0');
etebase_item_get_content(etesyncItem, content.data(), len);
item.setPayloadFromData(content);
return;
}
item.setPayloadFromData(content);
if (etebase_item_is_deleted(etesyncItem)) {
mRemovedItems.push_back(item);
return;
}
mEntries.insert(mEntries.end(), std::make_move_iterator(entries.first.begin()), std::make_move_iterator(entries.first.end()));
mLastUid = QStringFromCharPtr(CharPtr(etesync_entry_get_uid(mEntries[mEntries.size() - 1].get())));
return FETCH_OK;
mItems.push_back(item);
}
void EntriesFetchJob::saveItemCache(const EtebaseItemManager *itemManager, const EtebaseItem *etebaseItem, Item &item)
{
qCDebug(ETESYNC_LOG) << "Saving cache for item" << etebase_item_get_uid(etebaseItem);
uintptr_t ret_size;
EtebaseCachePtr cache(etebase_item_manager_cache_save(itemManager, etebaseItem, &ret_size));
QByteArray cacheData((char *)cache.get(), ret_size);
EtebaseCacheAttribute *etebaseCacheAttribute = item.attribute<EtebaseCacheAttribute>(Item::AddIfMissing);
etebaseCacheAttribute->setEtebaseCache(cacheData);
}
......@@ -8,8 +8,10 @@
#define ETESYNCENTRIESFETCHJOB_H
#include <AkonadiCore/Collection>
#include <AkonadiCore/Item>
#include <KJob>
#include "etebaseadapter.h"
#include "etesyncadapter.h"
namespace EteSyncAPI {
......@@ -18,18 +20,18 @@ class EntriesFetchJob : public KJob
Q_OBJECT
public:
explicit EntriesFetchJob(const EteSync *client, const Akonadi::Collection &collection, QObject *parent = nullptr);
explicit EntriesFetchJob(const EtebaseAccount *account, const Akonadi::Collection &collection, QObject *parent = nullptr);
void start() override;
std::vector<EteSyncEntryPtr> getEntries()
Akonadi::Item::List items() const
{
return std::move(mEntries);
return mItems;
}
QString getPrevUid() const
Akonadi::Item::List removedItems() const
{
return mPrevUid;
return mRemovedItems;
}
Akonadi::Collection collection() const
......@@ -38,22 +40,15 @@ public:
}
protected:
enum Status
{
FETCH_OK,
ERROR,
ALL_ENTRIES_FETCHED
};
void fetchEntries();
Status fetchNextBatch();
void setupItem(Akonadi::Item &item, const EtebaseItem *etebaseItem, const QString &type);
void saveItemCache(const EtebaseItemManager *itemManager, const EtebaseItem *etebaseItem, Akonadi::Item &item);
private:
const EteSync *mClient = nullptr;
QString mPrevUid;
QString mLastUid;
EteSyncEntryManagerPtr mEntryManager;
std::vector<EteSyncEntryPtr> mEntries;
const EtebaseAccount *mAccount = nullptr;
Akonadi::Collection mCollection;
Akonadi::Item::List mItems;
Akonadi::Item::List mRemovedItems;
};
} // namespace EteSyncAPI
......
......@@ -53,6 +53,21 @@ struct EtebaseDeleter
etebase_collection_metadata_destroy(ptr);
}
void operator()(EtebaseItemManager *ptr)
{
etebase_item_manager_destroy(ptr);
}
void operator()(EtebaseItemListResponse *ptr)
{
etebase_item_list_response_destroy(ptr);
}
void operator()(EtebaseItemMetadata *ptr)
{
etebase_item_metadata_destroy(ptr);
}
void operator()(char *ptr)
{
std::free(ptr);
......@@ -71,6 +86,9 @@ using EtebaseCollectionListResponsePtr = std::unique_ptr<EtebaseCollectionListRe
using EtebaseCollectionManagerPtr = std::unique_ptr<EtebaseCollectionManager, EtebaseDeleter>;
using EtebaseCollectionPtr = std::unique_ptr<EtebaseCollection, EtebaseDeleter>;
using EtebaseCollectionMetadataPtr = std::unique_ptr<EtebaseCollectionMetadata, EtebaseDeleter>;
using EtebaseItemManagerPtr = std::unique_ptr<EtebaseItemManager, EtebaseDeleter>;
using EtebaseItemListResponsePtr = std::unique_ptr<EtebaseItemListResponse, EtebaseDeleter>;
using EtebaseItemMetadataPtr = std::unique_ptr<EtebaseItemMetadata, EtebaseDeleter>;
using EtebaseCachePtr = std::unique_ptr<void, EtebaseDeleter>;
using CharPtr = std::unique_ptr<char, EtebaseDeleter>;
......
/*
* SPDX-FileCopyrightText: 2020 Shashwat Jolly <shashwat.jolly@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
#include "etebaseadapter.h"
#include "etebasecacheattribute.h"
#include <AkonadiCore/Attribute>
EtebaseCacheAttribute::EtebaseCacheAttribute(QByteArray etebaseCache)
: mEtebaseCache(etebaseCache)
{
}
void EtebaseCacheAttribute::setEtebaseCache(const QByteArray etebaseCache)
{
mEtebaseCache = etebaseCache;
}
QByteArray EtebaseCacheAttribute::etebaseCache() const
{
return mEtebaseCache;
}
QByteArray EtebaseCacheAttribute::type() const
{
static const QByteArray sType("etebasecache");
return sType;
}
Akonadi::Attribute *EtebaseCacheAttribute::clone() const
{
return new EtebaseCacheAttribute(mEtebaseCache);
}
QByteArray EtebaseCacheAttribute::serialized() const
{
return mEtebaseCache;
}
void EtebaseCacheAttribute::deserialize(const QByteArray &data)
{
mEtebaseCache = data;
}
......@@ -4,25 +4,27 @@
* SPDX-License-Identifier: GPL-2.0-or-later
*/
#ifndef COLLECTIONCACHEATTRIBUTE_H
#define COLLECTIONCACHEATTRIBUTE_H
#ifndef ETEBASECACHEATTRIBUTE_H
#define ETEBASECACHEATTRIBUTE_H
#include "etebaseadapter.h"
#include <AkonadiCore/Attribute>
class CollectionCacheAttribute : public Akonadi::Attribute
class EtebaseCacheAttribute : public Akonadi::Attribute
{
public:
explicit CollectionCacheAttribute(QByteArray collectionCache = nullptr);
void setCollectionCache(const QByteArray &collectionCache);
QByteArray collectionCache() const;
explicit EtebaseCacheAttribute(QByteArray etebaseCache = QByteArray());
void setEtebaseCache(const QByteArray etebaseCache);
QByteArray etebaseCache() const;
QByteArray type() const override;
Attribute *clone() const override;
QByteArray serialized() const override;
void deserialize(const QByteArray &data) override;
private:
QByteArray mCollectionCache;
QByteArray mEtebaseCache;
};
#endif
......@@ -10,6 +10,7 @@
#include <kcontacts/contactgroup.h>
#include <kwindowsystem.h>
#include <AkonadiCore/AttributeFactory>
#include <AkonadiCore/CachePolicy>
#include <AkonadiCore/ChangeRecorder>
#include <AkonadiCore/CollectionColorAttribute>
......@@ -22,6 +23,7 @@
#include <KMessageBox>
#include <QDBusConnection>
#include "etebasecacheattribute.h"
#include "entriesfetchjob.h"
#include "etesync_debug.h"
#include "etesyncadapter.h"
......@@ -70,6 +72,8 @@ EteSyncResource::EteSyncResource(const QString &id)
mCalendarHandler = CalendarHandler::Ptr(new CalendarHandler(this));
mTaskHandler = TaskHandler::Ptr(new TaskHandler(this));
AttributeFactory::registerAttribute<EtebaseCacheAttribute>();
connect(this, &Akonadi::AgentBase::reloadConfiguration, this, &EteSyncResource::onReloadConfiguration);
qCDebug(ETESYNC_LOG) << "Resource started";
......@@ -165,6 +169,8 @@ void EteSyncResource::slotCollectionsRetrieved(KJob *job)
collections.append(qobject_cast<JournalsFetchJob *>(job)->collections());
Collection::List removedCollections = qobject_cast<JournalsFetchJob *>(job)->removedCollections();
mJournalsCacheUpdateTime = QDateTime::currentDateTime();
collectionsRetrievedIncremental(collections, removedCollections);
qCDebug(ETESYNC_LOG) << "Collections retrieval done";
......@@ -318,10 +324,17 @@ void EteSyncResource::retrieveItems(const Akonadi::Collection &collection)
qCDebug(ETESYNC_LOG) << "Retrieving entries for journal" << collection.remoteId();
const int timeSinceLastCacheUpdate = mJournalsCacheUpdateTime.secsTo(QDateTime::currentDateTime());
if (timeSinceLastCacheUpdate <= 30) {
const QString journalUid = collection.remoteId();
const EteSyncJournalPtr &journal = getJournal(journalUid);
const QString lastEntryUid = QStringFromCharPtr(CharPtr(etesync_journal_get_last_uid(journal.get())));
if (lastEntryUid == collection.remoteRevision()) {
if (!collection.hasAttribute<EtebaseCacheAttribute>()) {
qCDebug(ETESYNC_LOG) << "No cache for collection" << collection.remoteId();
cancelTask(i18n("No cache for collection '%1'", collection.remoteId()));
return;
}
EtebaseCollectionManagerPtr collectionManager(etebase_account_get_collection_manager(mClientState->account()));
const QByteArray collectionCache = collection.attribute<EtebaseCacheAttribute>()->etebaseCache();
EtebaseCollectionPtr etesyncCollection(etebase_collection_manager_cache_load(collectionManager.get(), collectionCache.constData(), collectionCache.size()));
const QString sToken = QString::fromUtf8(etebase_collection_get_stoken(etesyncCollection.get()));
if (sToken == collection.remoteRevision()) {
itemsRetrievalDone();
return;
}
......@@ -332,7 +345,7 @@ void EteSyncResource::retrieveItems(const Akonadi::Collection &collection)
return;
}
auto job = new EntriesFetchJob(mClientState->client(), collection, this);
auto job = new EntriesFetchJob(mClientState->account(), collection, this);
connect(job, &EntriesFetchJob::finished, this, &EteSyncResource::slotItemsRetrieved);
......@@ -345,24 +358,18 @@ void EteSyncResource::slotItemsRetrieved(KJob *job)
qCDebug(ETESYNC_LOG) << "Error in fetching entries";
qCWarning(ETESYNC_LOG) << "EteSync error" << job->error() << job->errorText();
handleError(job->error());
return;
}
qCDebug(ETESYNC_LOG) << "Retrieving entries";
auto entries = qobject_cast<EntriesFetchJob *>(job)->getEntries();
Item::List items = qobject_cast<EntriesFetchJob *>(job)->items();
Item::List removedItems = qobject_cast<EntriesFetchJob *>(job)->removedItems();
Collection collection = qobject_cast<EntriesFetchJob *>(job)->collection();
itemsRetrievedIncremental(items, removedItems);
QString prevUid = qobject_cast<EntriesFetchJob *>(job)->getPrevUid();
auto handler = fetchHandlerForCollection(collection);
qCDebug(ETESYNC_LOG) << "Updating collection sync token";
Collection collection = qobject_cast<EntriesFetchJob *>(job)->collection();
new CollectionModifyJob(collection, this);
if (handler) {
handler->setupItems(entries, collection, prevUid);
} else {
qCWarning(ETESYNC_LOG) << "Unknown collection" << collection.name();
itemsRetrieved({});
}
qCDebug(ETESYNC_LOG) << "Items retrieval done";
}
void EteSyncResource::aboutToQuit()
......
......@@ -14,7 +14,7 @@
#include <KCalendarCore/Event>
#include <KCalendarCore/Todo>
#include "collectioncacheattribute.h"
#include "etebasecacheattribute.h"
#include "etesync_debug.h"
using namespace EteSyncAPI;
......@@ -75,8 +75,8 @@ void JournalsFetchJob::fetchJournals()
Collection collection;
for (int i = 0; i < dataLength; i++) {
setupCollection(collection, etesyncCollections[i]);
saveCollectionCache(collectionManager.get(), etesyncCollections[i], collection);
setupCollection(collection, etesyncCollections[i]);
}
int removedCollectionsLength = etebase_collection_list_response_get_removed_memberships_length(collectionList.get());
......@@ -147,21 +147,22 @@ void JournalsFetchJob::setupCollection(Akonadi::Collection &collection, const Et
collection.setRemoteId(journalUid);
collection.setName(journalUid);
collection.setContentMimeTypes(mimeTypes);
collection.setParentCollection(mResourceCollection);
if (etebase_collection_is_deleted(etesyncCollection)) {
mRemovedCollections.push_back(collection);
return;
}
collection.setParentCollection(mResourceCollection);
mCollections.push_back(collection);
}
void JournalsFetchJob::saveCollectionCache(const EtebaseCollectionManager *collectionManager, const EtebaseCollection *etebaseCollection, Collection &collection)
{
qCDebug(ETESYNC_LOG) << "Saving cache for collection" << etebase_collection_get_uid(etebaseCollection);
uintptr_t ret_size;
EtebaseCachePtr cache(etebase_collection_manager_cache_save(collectionManager, etebaseCollection, &ret_size));
QByteArray cacheData((char *)cache.get(), ret_size);