Commit 8d6e3879 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖

Initial port of Session and ProtocolHelper to binary protocol

parent 019f2c37
......@@ -54,7 +54,7 @@ set(AKONADI_TESTS_EXPORT AKONADICORE_EXPORT)
configure_file(akonadiprivate_export.h.in "${CMAKE_CURRENT_BINARY_DIR}/akonadiprivate_export.h")
set(KF5AkonadiServer_MIN_VERSION "5.0.41")
set(KF5AkonadiServer_MIN_VERSION "5.0.50")
find_package(KF5AkonadiServer ${KF5AkonadiServer_MIN_VERSION} CONFIG REQUIRED)
find_package(Qt5Designer NO_MODULE)
......
This diff is collapsed.
......@@ -31,6 +31,7 @@
#include <akonadi/private/imapparser_p.h>
#include <akonadi/private/protocol_p.h>
#include <akonadi/private/scope_p.h>
#include <boost/bind.hpp>
#include <algorithm>
......@@ -67,22 +68,20 @@ public:
/**
Parse a cache policy definition.
@param data The input data.
@param policy The parsed cache policy.
@param start Start of the data, ie. postion after the label.
@returns Position in data after the cache policy description.
@returns Akonadi::CachePolicy
*/
static int parseCachePolicy(const QByteArray &data, CachePolicy &policy, int start = 0);
static CachePolicy parseCachePolicy(const Protocol::CachePolicy &policy);
/**
Convert a cache policy object into its protocol representation.
*/
static QByteArray cachePolicyToByteArray(const CachePolicy &policy);
static Protocol::CachePolicy cachePolicyToProtocol(const CachePolicy &policy);
/**
Convert a ancestor chain from its protocol representation into an Entity object.
*/
static void parseAncestors(const QByteArray &data, Entity *entity, int start = 0);
static void parseAncestors(const QVector<Protocol::Ancestor> &ancestors, Entity *entity);
/**
Convert a ancestor chain from its protocol representation into an Entity object.
......@@ -90,28 +89,31 @@ public:
This method allows to pass a @p valuePool which acts as cache, so ancestor paths for the
same @p parentCollection don't have to be parsed twice.
*/
static void parseAncestorsCached(const QByteArray &data, Entity *entity, Collection::Id parentCollection, ProtocolHelperValuePool *valuePool = 0, int start = 0);
static void parseAncestorsCached(const QVector<Protocol::Ancestor> &ancestors, Entity *entity, Collection::Id parentCollection, ProtocolHelperValuePool *valuePool = 0, int start = 0);
/**
Parse a collection description.
@param data The input data.
@param collection The parsed collection.
@param start Start of the data.
@param requireParent Whether or not we require a parent as part of the data.
@returns Position in data after the collection description.
@returns The parsed collection
*/
static int parseCollection(const QByteArray &data, Collection &collection, int start = 0, bool requireParent = true);
static Collection parseCollection(const Protocol::FetchCollectionsResponse &data, bool requireParent = true);
static void parseAttributes(const Protocol::Attributes &attributes, Entity *entity);
static void parseAttributes(const Protocol::Attributes &attributes, AttributeEntity *entity);
static CollectionStatistics parseCollectionStatistics(const Protocol::FetchCollectionStatsResponse &stats);
/**
Convert attributes to their protocol representation.
*/
static QByteArray attributesToByteArray(const Entity &entity, bool ns = false);
static QByteArray attributesToByteArray(const AttributeEntity &entity, bool ns = false);
static Protocol::Attributes attributesToProtocol(const Entity &entity, bool ns = false);
static Protocol::Attributes attributesToProtocol(const AttributeEntity &entity, bool ns = false);
/**
Encodes part label and namespace.
*/
static QByteArray encodePartIdentifier(PartNamespace ns, const QByteArray &label, int version = 0);
static QByteArray encodePartIdentifier(PartNamespace ns, const QByteArray &label);
/**
Decode part label and namespace.
......@@ -123,7 +125,7 @@ public:
@throws A Akonadi::Exception if the item set contains items with missing/invalid identifiers.
*/
template <typename T>
static QByteArray entitySetToByteArray(const QList<T> &_objects, const QByteArray &command)
static Scope entitySetToScope(const QVector<T> &_objects)
{
if (_objects.isEmpty()) {
throw Exception("No objects specified");
......@@ -134,20 +136,14 @@ public:
QByteArray rv;
std::sort(objects.begin(), objects.end(), boost::bind(&T::id, _1) < boost::bind(&T::id, _2));
if (objects.first().isValid()) {
// all items have a uid set
rv += " " AKONADI_CMD_UID " ";
if (!command.isEmpty()) {
rv += command;
rv += ' ';
}
QVector<typename T::Id> uids;
foreach (const T &object, objects) {
uids.reserve(objects.size());
for (const T &object : objects) {
uids << object.id();
}
ImapSet set;
set.add(uids);
rv += set.toImapSequenceSet();
return rv;
return Scope(set);
}
// check if all items have a remote id
......@@ -161,88 +157,65 @@ public:
if (std::find_if(objects.constBegin(), objects.constEnd(),
!boost::bind(static_cast<bool (*)(const T &)>(&CollectionUtils::hasValidHierarchicalRID), _1))
== objects.constEnd() && objects.size() == 1) { // ### HRID sets are not yet specified
// HRIDs
rv += " " AKONADI_CMD_HRID " ";
if (!command.isEmpty()) {
rv += command;
rv += ' ';
}
rv += '(' + hierarchicalRidToByteArray(objects.first()) + ')';
return rv;
return hierarchicalRidToScope(objects.first());
}
// RIDs
QList<QByteArray> rids;
foreach (const T &object, objects) {
rids << ImapParser::quote(object.remoteId().toUtf8());
QStringList rids;
rids.reserve(objects.size());
for (const T &object : objects) {
rids << object.remoteId();
}
rv += " " AKONADI_CMD_RID " ";
if (!command.isEmpty()) {
rv += command;
rv += ' ';
}
rv += '(';
rv += ImapParser::join(rids, " ");
rv += ')';
return rv;
return Scope(Scope::Rid, rids);
}
static QByteArray entitySetToByteArray(const QList<Akonadi::Item> &_objects, const QByteArray &command);
static QByteArray tagSetToImapSequenceSet(const Akonadi::Tag::List &_objects);
static QByteArray tagSetToByteArray(const Akonadi::Tag::List &_objects, const QByteArray &command);
static QByteArray commandContextToByteArray(const Akonadi::Collection &collection, const Akonadi::Tag &tag,
const Item::List &requestedItems, const QByteArray &command);
static Protocol::ScopeContext commandContextToProtocol(const Akonadi::Collection &collection, const Akonadi::Tag &tag,
const Item::List &requestedItems, const QByteArray &command);
/**
Converts the given object identifier into a protocol representation.
@throws A Akonadi::Exception if the item set contains items with missing/invalid identifiers.
*/
template <typename T>
static QByteArray entityIdToByteArray(const T &object, const QByteArray &command)
static Scope entityIdToScope(const T &object, const QByteArray &command)
{
return entitySetToByteArray(typename T::List() << object, command);
return entitySetToScope(typename T::List() << object, command);
}
/**
Converts the given collection's hierarchical RID into a protocol representation.
Assumes @p col has a valid hierarchical RID, so check that before!
*/
static QByteArray hierarchicalRidToByteArray(const Collection &col);
static Scope hierarchicalRidToScope(const Collection &col);
/**
Converts the HRID of the given item into an ASAP protocol representation.
Assumes @p item has a valid HRID.
*/
static QByteArray hierarchicalRidToByteArray(const Item &item);
static Scope hierarchicalRidToScope(const Item &item);
/**
Converts a given ItemFetchScope object into a protocol representation.
*/
static QByteArray itemFetchScopeToByteArray(const ItemFetchScope &fetchScope);
static Protocol::FetchScope itemFetchScopeToProtocol(const ItemFetchScope &fetchScope);
/**
Converts a given TagFetchScope object into a protocol representation.
*/
static QByteArray tagFetchScopeToByteArray(const TagFetchScope &fetchScope);
static QVector<QByteArray> tagFetchScopeToProtocol(const TagFetchScope &fetchScope);
/**
Parses a single line from an item fetch job result into an Item object.
*/
static void parseItemFetchResult(const QList<QByteArray> &lineTokens, Item &item, ProtocolHelperValuePool *valuePool = 0);
static void parseTagFetchResult(const QList<QByteArray> &lineTokens, Tag &tag);
static void parseRelationFetchResult(const QList<QByteArray> &lineTokens, Relation &tag);
static Item parseItemFetchResult(const Protocol::FetchItemsResponse &data, ProtocolHelperValuePool *valuePool = 0);
static Tag parseTagFetchResult(const Protocol::FetchTagsResponse &data);
static Relation parseRelationFetchResult(const Protocol::FetchRelationsResponse &data);
static QString akonadiStoragePath();
static QString absolutePayloadFilePath(const QString &fileName);
static bool streamPayloadToFile(const QByteArray &command, const QByteArray &data, QByteArray &error);
static QByteArray listPreference(Collection::ListPurpose purpose, Collection::ListPreference preference);
static QByteArray enabled(bool);
static QByteArray referenced(bool);
};
}
......
......@@ -26,6 +26,7 @@
#include "servermanager_p.h"
#include "protocolhelper_p.h"
#include <akonadi/private/xdgbasedirs_p.h>
#include <akonadi/private/protocol_p.h>
#include <QDebug>
#include <klocalizedstring.h>
......@@ -53,12 +54,6 @@ using namespace Akonadi;
//@cond PRIVATE
static const QList<QByteArray> sCapabilities = QList<QByteArray>()
<< "NOTIFY 3"
<< "NOPAYLOADPATH"
<< "AKAPPENDSTREAMING"
<< "SERVERSEARCH"
<< "DIRECTSTREAMING";
void SessionPrivate::startNext()
{
......@@ -187,64 +182,59 @@ void SessionPrivate::socketDisconnected()
void SessionPrivate::dataReceived()
{
while (socket->bytesAvailable() > 0) {
if (parser->continuationSize() > 1) {
const QByteArray data = socket->read(qMin(socket->bytesAvailable(), parser->continuationSize() - 1));
parser->parseBlock(data);
} else if (socket->canReadLine()) {
if (!parser->parseNextLine(socket->readLine())) {
continue; // response not yet completed
}
QDataStream stream(socket);
qint64 tag;
// TODO: Verify the tag matches the last tag we send
stream >> tag;
Protocol::Command cmd = Protocol::deserialize(socket);
if (cmd.type() == Protocol::Command::Invalid) {
qWarning() << "Invalid command, the world is going to end!";
continue;
}
if (logFile) {
logFile->write("S: " + parser->data());
logFile->flush();
}
// handle login response
if (parser->tag() == QByteArray("0")) {
if (parser->data().startsWith("OK")) { //krazy:exclude=strings
writeData("1 CAPABILITY (" + ImapParser::join(sCapabilities, " ") + ")");
} else {
qWarning() << "Unable to login to Akonadi server:" << parser->data();
socket->close();
QTimer::singleShot(1000, mParent, SLOT(reconnect()));
}
}
if (logFile) {
logFile->write("S: " + cmd.debugString());
logFile->flush();
}
// handle capability response
if (parser->tag() == QByteArray("1")) {
if (parser->data().startsWith("OK")) {
connected = true;
startNext();
} else {
qDebug() << "Unhandled server capability response:" << parser->data();
}
// Handle Hello response -> send Login
if (cmd.type() == Protocol::Command::Hello) {
Protocol::HelloResponse hello(cmd);
if (hello.isError()) {
qWarning() << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
socket->close();
QTimer::singleShot(1000, mParent, &Session::reconnect);
break;
}
// send login command
if (parser->tag() == "*" && parser->data().startsWith("OK Akonadi")) {
const int pos = parser->data().indexOf("[PROTOCOL");
if (pos > 0) {
qint64 tmp = 0;
ImapParser::parseNumber(parser->data(), tmp, 0, pos + 9);
protocolVersion = tmp;
Internal::setServerProtocolVersion(tmp);
}
qDebug() << "Server protocol version is:" << protocolVersion;
writeData("0 LOGIN " + ImapParser::quote(sessionId) + '\n');
// work for the current job
} else {
if (currentJob) {
currentJob->d_ptr->handleResponse(parser->tag(), parser->data());
}
qDebug() << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
qDebug() << "Server says:" << hello.message();
// Version mismatch is handled in SessionPrivate::startJob() so that
// we can report the error out via KJob API
protocolVersion = hello.protocolVersion();
Protocol::LoginCommand login(sessionId);
sendCommand(login);
continue;
}
// Login response
if (cmd.type() == Protocol::Command::Login) {
Protocol::LoginResponse login(cmd);
if (login.isError()) {
qWarning() << "Unable to login to Akonadi server:" << login.errorMessage();
socket->close();
QTimer::singleShot(1000, mParent, &Session::reconnect);
break;
}
// reset parser stuff
parser->reset();
// work for the current job
} else {
break; // nothing we can do for now
if (currentJob) {
currentJob->d_ptr->handleResponse(tag, cmd);
}
}
}
}
......@@ -287,9 +277,21 @@ void SessionPrivate::doStartNext()
void SessionPrivate::startJob(Job *job)
{
if (protocolVersion < minimumProtocolVersion()) {
if (protocolVersion != clientProtocolVersion()) {
job->setError(Job::ProtocolVersionMismatch);
job->setErrorText(i18n("Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion()));
if (protocolVersion < SessionPrivate::clientProtocolVersion()) {
job->setErrorText(i18n("Protocol version mismatch. Server version is newer (%1) than ours (%2). "
"If you updated your system recently please restart the Akonadi server.",
protocolVersion, clientProtocolVersion()));
qWarning() << "Protocol version mismatch. Server version is newer (" << protocolVersion << ") than ours (" << clientProtocolVersion() << "). "
"If you updated your system recently please restart the Akonadi server.";
} else {
job->setErrorText(i18n("Protocol version mismatch. Server version is older (%1) than ours (%2). "
"If you updated your system recently please restart all KDE PIM applications.",
protocolVersion, clientProtocolVersion()));
qWarning() << "Protocol version mismatch. Server version is older (" << protocolVersion << ") than ours (" << clientProtocolVersion() << "). "
"If you updated your system recently please restart all KDE PIM applications.";
}
job->emitResult();
} else {
job->d_ptr->startQueued();
......@@ -349,20 +351,20 @@ int SessionPrivate::nextTag()
return theNextTag++;
}
void SessionPrivate::writeData(const QByteArray &data)
void SessionPrivate::sendCommand(qint64 tag, const Protocol::Command &command)
{
if (logFile) {
logFile->write("C: " + data);
if (!data.endsWith('\n')) {
logFile->write("\n");
}
logFile->write("C: " + command.debugString());
logFile->write("\n");
logFile->flush();
}
if (socket) {
socket->write(data);
if (socket && socket->isOpen()) {
QDataStream stream(socket);
stream << tag;
Protocol::serialize(socket, command);
} else {
//PORT QT5 qWarning() << "Trying to write while session is disconnected!" << kBacktrace();
// TODO: Queue the commands and resend on reconnect?
}
}
......@@ -396,15 +398,17 @@ SessionPrivate::SessionPrivate(Session *parent)
, socket(0)
, protocolVersion(0)
, currentJob(0)
, parser(0)
, logFile(0)
{
}
SessionPrivate::~SessionPrivate()
{
}
void SessionPrivate::init(const QByteArray &id)
{
qDebug() << id;
parser = new ImapParser();
if (!id.isEmpty()) {
sessionId = id;
......
......@@ -24,7 +24,6 @@
#include "session.h"
#include "item.h"
#include "servermanager.h"
#include <akonadi/private/imapparser_p.h>
#include <QtNetwork/QLocalSocket>
......@@ -36,6 +35,10 @@ class QIODevice;
namespace Akonadi {
namespace Protocol {
class Command;
}
/**
* @internal
*/
......@@ -44,10 +47,7 @@ class AKONADICORE_EXPORT SessionPrivate
public:
explicit SessionPrivate(Session *parent);
virtual ~SessionPrivate()
{
delete parser;
}
virtual ~SessionPrivate();
virtual void init(const QByteArray &sessionId);
......@@ -105,18 +105,18 @@ public:
int nextTag();
/**
Sends the given raw data.
Sends the given command to server
*/
void writeData(const QByteArray &data);
void sendCommand(qint64 tag, const Protocol::Command &command);
/**
* Propagate item revision changes to following jobs.
*/
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision);
static int minimumProtocolVersion()
static int clientProtocolVersion()
{
return 44;
return 50;
}
/**
......@@ -137,9 +137,6 @@ public:
Job *currentJob;
bool jobRunning;
// parser stuff
ImapParser *parser;
QFile *logFile;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment