Members of the KDE Community are recommended to subscribe to the kde-community mailing list at https://mail.kde.org/mailman/listinfo/kde-community to allow them to participate in important discussions and receive other important announcements

Commit 86c83fa5 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖

Pass incoming Commands from SessionThread to main thread via CommandBuffer

Instead of using signals/slots to pass the incoming Commands from
SessionThread to main thread where Session or Monitor handle them, we
use a CommandBuffer object which is a simple thead-safe queue which
will automatically notify the client when there are new commands
available and at the same time compresses the signals so that the
client is not overwhelmed by "new data" notifications.
parent ccdfc58c
......@@ -126,8 +126,8 @@ class AKONADITESTFAKE_EXPORT FakeNotificationConnection : public Akonadi::Connec
Q_OBJECT
public:
explicit FakeNotificationConnection(QObject *parent = nullptr)
: Connection(Connection::NotificationConnection, "testConn", parent)
explicit FakeNotificationConnection(Akonadi::Session *session, Akonadi::CommandBuffer *buffer)
: Connection(Connection::NotificationConnection, "", buffer, session)
{}
virtual ~FakeNotificationConnection()
......@@ -155,8 +155,9 @@ public:
{
}
Akonadi::Connection *createNotificationConnection(Akonadi::Session *parent) override {
return new FakeNotificationConnection(parent);
Akonadi::Connection *createNotificationConnection(Akonadi::Session *parent,
Akonadi::CommandBuffer *buffer) override {
return new FakeNotificationConnection(parent, buffer);
}
Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Akonadi::Session *session) override {
......
......@@ -29,13 +29,15 @@
using namespace Akonadi;
Connection *ChangeNotificationDependenciesFactory::createNotificationConnection(Session *session)
Connection *ChangeNotificationDependenciesFactory::createNotificationConnection(Session *session,
CommandBuffer *commandBuffer)
{
if (!Akonadi::ServerManager::self()->isRunning()) {
return nullptr;
}
return session->d->sessionThread()->createConnection(Connection::NotificationConnection, session->sessionId());
return session->d->sessionThread()->createConnection(Connection::NotificationConnection,
session->sessionId(), commandBuffer);
}
QObject *ChangeNotificationDependenciesFactory::createChangeMediator(QObject *parent)
......
......@@ -27,6 +27,7 @@ namespace Akonadi
{
class Connection;
class CommandBuffer;
/**
* This class exists so that we can create a fake notification source in
......@@ -38,7 +39,9 @@ public:
virtual ~ChangeNotificationDependenciesFactory()
{
}
virtual Connection *createNotificationConnection(Session *parent);
virtual Connection *createNotificationConnection(Session *parent, CommandBuffer *commandBuffer);
virtual QObject *createChangeMediator(QObject *parent);
virtual Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Session *session);
......
/*
Copyright (c) 2018 Daniel Vrátil <dvratil@kde.org>
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Library General Public License as published by
the Free Software Foundation; either version 2 of the License, or (at your
option) any later version.
This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
*/
#ifndef AKONADICORE_COMMANDBUFFER_H_
#define AKONADICORE_COMMANDBUFFER_H_
class QObject;
#include <QMutex>
#include <QQueue>
#include <private/protocol_p.h>
#include "akonadicore_debug.h"
namespace Akonadi {
class CommandBufferLocker;
class CommandBufferNotifyBlocker;
class CommandBuffer
{
friend class CommandBufferLocker;
friend class CommandBufferNotifyBlocker;
public:
struct Command {
qint64 tag;
Protocol::CommandPtr command;
};
CommandBuffer(QObject *parent, const char *notifySlot)
: mParent(parent)
, mNotifySlot(notifySlot)
{
}
void enqueue(qint64 tag, const Protocol::CommandPtr &command)
{
mCommands.enqueue({ tag, command });
if (mNotify) {
const bool ok = QMetaObject::invokeMethod(mParent, mNotifySlot.constData(), Qt::QueuedConnection);
Q_ASSERT(ok); Q_UNUSED(ok);
}
}
inline Command dequeue()
{
return mCommands.dequeue();
}
inline bool isEmpty() const
{
return mCommands.isEmpty();
}
private:
QObject *mParent = nullptr;
QByteArray mNotifySlot;
QQueue<Command> mCommands;
QMutex mLock;
bool mNotify = true;
};
class CommandBufferLocker
{
public:
explicit CommandBufferLocker(CommandBuffer *buffer)
: mBuffer(buffer)
{
relock();
}
~CommandBufferLocker()
{
unlock();
}
inline void unlock()
{
mBuffer->mLock.unlock();
}
inline void relock()
{
mBuffer->mLock.lock();
}
private:
CommandBuffer *mBuffer;
};
class CommandBufferNotifyBlocker
{
public:
explicit CommandBufferNotifyBlocker(CommandBuffer *buffer)
: mBuffer(buffer)
{
mBuffer->mNotify = false;
}
~CommandBufferNotifyBlocker()
{
mBuffer->mNotify = true;
}
private:
CommandBuffer *mBuffer;
};
} // namespace
#endif
......@@ -21,6 +21,7 @@
#include "session_p.h"
#include "servermanager_p.h"
#include "akonadicore_debug.h"
#include "commandbuffer_p.h"
#include <QDataStream>
#include <QFile>
......@@ -37,12 +38,12 @@
using namespace Akonadi;
Connection::Connection(ConnectionType connType, const QByteArray &sessionId, QObject *parent)
Connection::Connection(ConnectionType connType, const QByteArray &sessionId,
CommandBuffer *commandBuffer, QObject *parent)
: QObject(parent)
, mConnectionType(connType)
, mSocket(nullptr)
, mLogFile(nullptr)
, mSessionId(sessionId)
, mCommandBuffer(commandBuffer)
{
qRegisterMetaType<Protocol::CommandPtr>();
qRegisterMetaType<QAbstractSocket::SocketState>();
......@@ -265,27 +266,9 @@ void Connection::dataReceived()
Q_ASSERT(cmd->isResponse());
}
Q_EMIT commandReceived(tag, cmd);
/*
if (!handleCommand(tag, cmd)) {
break;
}
*/
// FIXME: It happens often that data are arriving from the server faster
// than we Jobs can process them which means, that we often process all
// responses in single dataReceived() call and thus not returning to back
// to QEventLoop, which breaks batch-delivery of ItemFetchJob (among other
// things). To workaround that we force processing of events every
// now and then.
//
// Longterm we want something better, like processing and parsing in
// separate thread which would only post the parsed Protocol::Commands
// to the jobs through event loop. That will be overall slower but should
// result in much more responsive applications.
if (timer.elapsed() > 50) {
thread()->eventDispatcher()->processEvents(QEventLoop::ExcludeSocketNotifiers);
timer.restart();
{
CommandBufferLocker locker(mCommandBuffer);
mCommandBuffer->enqueue(tag, cmd);
}
}
}
......
......@@ -35,6 +35,8 @@ namespace Akonadi
{
class SessionThread;
class SessionPrivate;
class CommandBuffer;
class AKONADICORE_EXPORT Connection : public QObject
{
......@@ -47,7 +49,8 @@ public:
};
Q_ENUM(ConnectionType)
explicit Connection(ConnectionType connType, const QByteArray &sessionId, QObject *parent = nullptr);
explicit Connection(ConnectionType connType, const QByteArray &sessionId,
CommandBuffer *commandBuffer, QObject *parent = nullptr);
~Connection();
Q_INVOKABLE void reconnect();
......@@ -78,12 +81,7 @@ private:
QLocalSocket *mSocket = nullptr;
QFile *mLogFile = nullptr;
QByteArray mSessionId;
QMutex mLock;
struct Command {
qint64 tag;
Protocol::CommandPtr cmd;
};
QQueue<Command> mOutQueue;
CommandBuffer *mCommandBuffer;
friend class Akonadi::SessionThread;
};
......
......@@ -821,7 +821,7 @@ private:
Q_PRIVATE_SLOT(d_ptr, void slotStatisticsChangedFinished(KJob *))
Q_PRIVATE_SLOT(d_ptr, void slotFlushRecentlyChangedCollections())
Q_PRIVATE_SLOT(d_ptr, void slotUpdateSubscription())
Q_PRIVATE_SLOT(d_ptr, void commandReceived(qint64 tag, const Akonadi::Protocol::CommandPtr &))
Q_PRIVATE_SLOT(d_ptr, void handleCommands())
Q_PRIVATE_SLOT(d_ptr, void dataAvailable())
Q_PRIVATE_SLOT(d_ptr, void serverStateChanged(Akonadi::ServerManager::State))
Q_PRIVATE_SLOT(d_ptr, void invalidateCollectionCache(qint64))
......
......@@ -49,6 +49,7 @@ MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenci
, collectionCache(nullptr)
, itemCache(nullptr)
, tagCache(nullptr)
, mCommandBuffer(parent, "handleCommands")
, pendingModificationTimer(nullptr)
, monitorReady(false)
, fetchCollection(false)
......@@ -99,12 +100,10 @@ bool MonitorPrivate::connectToNotificationManager()
return false;
}
ntfConnection = dependenciesFactory->createNotificationConnection(session);
ntfConnection = dependenciesFactory->createNotificationConnection(session, &mCommandBuffer);
if (!ntfConnection) {
return false;
}
q_ptr->connect(ntfConnection, SIGNAL(commandReceived(qint64,Akonadi::Protocol::CommandPtr)),
q_ptr, SLOT(commandReceived(qint64,Akonadi::Protocol::CommandPtr)));
pendingModification = Protocol::ModifySubscriptionCommand();
for (const auto &col : qAsConst(collections)) {
......@@ -721,60 +720,70 @@ int MonitorPrivate::translateAndCompress(QQueue<Protocol::ChangeNotificationPtr>
return 0;
}
void MonitorPrivate::commandReceived(qint64 tag, const Protocol::CommandPtr &command)
void MonitorPrivate::handleCommands()
{
Q_Q(Monitor);
Q_UNUSED(tag);
if (command->isResponse()) {
switch (command->type()) {
case Protocol::Command::Hello: {
qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus";
QByteArray subname;
if (!q->objectName().isEmpty()) {
subname = q->objectName().toLatin1();
} else {
subname = session->sessionId();
}
subname += " - " + QByteArray::number(quintptr(q));
qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\"";
auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId());
ntfConnection->sendCommand(2, subCmd);
break;
}
case Protocol::Command::CreateSubscription: {
auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(pendingModification);
pendingModification = Protocol::ModifySubscriptionCommand();
ntfConnection->sendCommand(3, msubCmd);
break;
}
CommandBufferLocker lock(&mCommandBuffer);
CommandBufferNotifyBlocker notify(&mCommandBuffer);
while (!mCommandBuffer.isEmpty()) {
const auto cmd = mCommandBuffer.dequeue();
lock.unlock();
const auto command = cmd.command;
if (command->isResponse()) {
switch (command->type()) {
case Protocol::Command::Hello: {
qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus";
QByteArray subname;
if (!q->objectName().isEmpty()) {
subname = q->objectName().toLatin1();
} else {
subname = session->sessionId();
}
subname += " - " + QByteArray::number(quintptr(q));
qCDebug(AKONADICORE_LOG) << q_ptr << "Subscribing as \"" << subname << "\"";
auto subCmd = Protocol::CreateSubscriptionCommandPtr::create(subname, session->sessionId());
ntfConnection->sendCommand(2, subCmd);
break;
}
case Protocol::Command::ModifySubscription:
// TODO: Handle errors
if (!monitorReady) {
monitorReady = true;
Q_EMIT q_ptr->monitorReady();
case Protocol::Command::CreateSubscription: {
auto msubCmd = Protocol::ModifySubscriptionCommandPtr::create(pendingModification);
pendingModification = Protocol::ModifySubscriptionCommand();
ntfConnection->sendCommand(3, msubCmd);
break;
}
break;
default:
qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command);
break;
}
} else {
switch (command->type()) {
case Protocol::Command::ItemChangeNotification:
case Protocol::Command::CollectionChangeNotification:
case Protocol::Command::TagChangeNotification:
case Protocol::Command::RelationChangeNotification:
case Protocol::Command::SubscriptionChangeNotification:
case Protocol::Command::DebugChangeNotification:
slotNotify(command.staticCast<Protocol::ChangeNotification>());
break;
default:
qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command);
break;
case Protocol::Command::ModifySubscription:
// TODO: Handle errors
if (!monitorReady) {
monitorReady = true;
Q_EMIT q_ptr->monitorReady();
}
break;
default:
qCWarning(AKONADICORE_LOG) << "Received an unexpected response on Notification stream: " << Protocol::debugString(command);
break;
}
} else {
switch (command->type()) {
case Protocol::Command::ItemChangeNotification:
case Protocol::Command::CollectionChangeNotification:
case Protocol::Command::TagChangeNotification:
case Protocol::Command::RelationChangeNotification:
case Protocol::Command::SubscriptionChangeNotification:
case Protocol::Command::DebugChangeNotification:
slotNotify(command.staticCast<Protocol::ChangeNotification>());
break;
default:
qCWarning(AKONADICORE_LOG) << "Received an unexpected message on Notification stream:" << Protocol::debugString(command);
break;
}
}
lock.relock();
}
}
......
......@@ -33,6 +33,7 @@
#include "servermanager.h"
#include "changenotificationdependenciesfactory_p.h"
#include "connection_p.h"
#include "commandbuffer_p.h"
#include "private/protocol_p.h"
......@@ -81,6 +82,8 @@ public:
TagListCache *tagCache = nullptr;
QMimeDatabase mimeDatabase;
CommandBuffer mCommandBuffer;
Protocol::ModifySubscriptionCommand pendingModification;
QTimer *pendingModificationTimer;
bool monitorReady;
......@@ -141,7 +144,7 @@ public:
*/
int translateAndCompress(QQueue<Protocol::ChangeNotificationPtr> &notificationQueue, const Protocol::ChangeNotificationPtr &msg);
void commandReceived(qint64 tag, const Protocol::CommandPtr &command);
void handleCommands();
virtual void slotNotify(const Protocol::ChangeNotificationPtr &msg);
......
......@@ -60,12 +60,9 @@ void SessionPrivate::startNext()
void SessionPrivate::reconnect()
{
if (!connection) {
connection = sessionThread()->createConnection(Connection::CommandConnection, sessionId);
connection = sessionThread()->createConnection(Connection::CommandConnection, sessionId, &mCommandBuffer);
mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected,
Qt::QueuedConnection);
mParent->connect(connection, SIGNAL(commandReceived(qint64,Akonadi::Protocol::CommandPtr)),
mParent, SLOT(handleCommand(qint64,Akonadi::Protocol::CommandPtr)),
Qt::QueuedConnection);
mParent->connect(connection, SIGNAL(socketDisconnected()), mParent, SLOT(socketDisconnected()),
Qt::QueuedConnection);
mParent->connect(connection, SIGNAL(socketError(QString)), mParent, SLOT(socketError(QString)),
......@@ -94,49 +91,52 @@ void SessionPrivate::socketDisconnected()
connected = false;
}
bool SessionPrivate::handleCommand(qint64 tag, const Protocol::CommandPtr &cmd)
bool SessionPrivate::handleCommands()
{
// Handle Hello response -> send Login
if (cmd->type() == Protocol::Command::Hello) {
const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
if (hello.isError()) {
qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
connection->closeConnection();
QTimer::singleShot(1000, connection, &Connection::reconnect);
return false;
}
qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
// Version mismatch is handled in SessionPrivate::startJob() so that
// we can report the error out via KJob API
protocolVersion = hello.protocolVersion();
Internal::setServerProtocolVersion(protocolVersion);
Internal::setGeneration(hello.generation());
sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
return true;
}
// Login response
if (cmd->type() == Protocol::Command::Login) {
const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
if (login.isError()) {
qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
connection->closeConnection();
QTimer::singleShot(1000, mParent, SLOT(reconnect()));
return false;
CommandBufferLocker lock(&mCommandBuffer);
CommandBufferNotifyBlocker notify(&mCommandBuffer);
while (!mCommandBuffer.isEmpty()) {
const auto command = mCommandBuffer.dequeue();
lock.unlock();
const auto cmd = command.command;
const auto tag = command.tag;
// Handle Hello response -> send Login
if (cmd->type() == Protocol::Command::Hello) {
const auto &hello = Protocol::cmdCast<Protocol::HelloResponse>(cmd);
if (hello.isError()) {
qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
connection->closeConnection();
QTimer::singleShot(1000, connection, &Connection::reconnect);
return false;
}
qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
qCDebug(AKONADICORE_LOG) << "Server generation:" << hello.generation();
qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
// Version mismatch is handled in SessionPrivate::startJob() so that
// we can report the error out via KJob API
protocolVersion = hello.protocolVersion();
Internal::setServerProtocolVersion(protocolVersion);
Internal::setGeneration(hello.generation());
sendCommand(nextTag(), Protocol::LoginCommandPtr::create(sessionId));
} else if (cmd->type() == Protocol::Command::Login) {
const auto &login = Protocol::cmdCast<Protocol::LoginResponse>(cmd);
if (login.isError()) {
qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
connection->closeConnection();
QTimer::singleShot(1000, mParent, SLOT(reconnect()));
return false;
}
connected = true;
startNext();
} else if (currentJob) {
currentJob->d_ptr->handleResponse(tag, cmd);
}
connected = true;
startNext();
return true;
}
// work for the current job
if (currentJob) {
currentJob->d_ptr->handleResponse(tag, cmd);
lock.relock();
}
return true;
......@@ -292,6 +292,7 @@ SessionPrivate::SessionPrivate(Session *parent)
, mSessionThread(new SessionThread)
, connection(nullptr)
, protocolVersion(0)
, mCommandBuffer(parent, "handleCommands")
, currentJob(nullptr)
{
// Shutdown the thread before QApplication event loop quits - the
......
......@@ -134,7 +134,7 @@ private:
Q_PRIVATE_SLOT(d, void reconnect())
Q_PRIVATE_SLOT(d, void socketError(const QString &error))
Q_PRIVATE_SLOT(d, void socketDisconnected())
Q_PRIVATE_SLOT(d, bool handleCommand(qint64 tag, const Akonadi::Protocol::CommandPtr &cmd))
Q_PRIVATE_SLOT(d, bool handleCommands())
Q_PRIVATE_SLOT(d, void doStartNext())
Q_PRIVATE_SLOT(d, void jobDone(KJob *))
Q_PRIVATE_SLOT(d, void jobWriteFinished(Akonadi::Job *))
......
......@@ -24,6 +24,7 @@
#include "session.h"
#include "item.h"
#include "servermanager.h"
#include "commandbuffer_p.h"
#include <QQueue>
......@@ -67,7 +68,7 @@ public:
void socketDisconnected();
void socketError(const QString &error);
void dataReceived();
virtual bool handleCommand(qint64 tag, const Protocol::CommandPtr &cmd);
virtual bool handleCommands();
void doStartNext();
void startJob(Job *job);
......@@ -135,6 +136,8 @@ public:
qint64 theNextTag;
int protocolVersion;
CommandBuffer mCommandBuffer;
// job management
QQueue<Job *> queue;
QQueue<Job *> pipeline;
......
......@@ -18,12 +18,14 @@
*/
#include "sessionthread_p.h"
#include "session_p.h"
#include "akonadicore_debug.h"
#include <QThread>
Q_DECLARE_METATYPE(Akonadi::Connection::ConnectionType)
Q_DECLARE_METATYPE(Akonadi::Connection *)
Q_DECLARE_METATYPE(Akonadi::CommandBuffer *)
using namespace Akonadi;
......@@ -32,6 +34,7 @@ SessionThread::SessionThread(QObject *parent)
{
qRegisterMetaType<Connection::ConnectionType>();
qRegisterMetaType<Connection *>();
qRegisterMetaType<CommandBuffer *>();
QThread *thread = new QThread();
moveToThread(thread);
......@@ -54,24 +57,27 @@ SessionThread::~SessionThread()
}
Connection *SessionThread::createConnection(Connection::ConnectionType connectionType,
const QByteArray &sessionId)
const QByteArray &sessionId,
CommandBuffer *commandBuffer)
{
Connection *conn = nullptr;
const bool invoke = QMetaObject::invokeMethod(this, "doCreateConnection",
Qt::BlockingQueuedConnection,
Q_RETURN_ARG(Akonadi::Connection*, conn),
Q_ARG(Akonadi::Connection::ConnectionType, connectionType),
Q_ARG(QByteArray, sessionId));
Q_ARG(QByteArray, sessionId),
Q_ARG(Akonadi::CommandBuffer*, commandBuffer));
Q_ASSERT(invoke); Q_UNUSED(invoke);
return conn;
}
Connection *SessionThread::doCreateConnection(Connection::ConnectionType connType,
const QByteArray &sessionId)
const QByteArray &sessionId,
CommandBuffer *commandBuffer)
{
Q_ASSERT(thread() == QThread::currentThread());
Connection *conn = new Connection(connType, sessionId);
Connection *conn = new Connection(connType, sessionId, commandBuffer);
conn->moveToThread(thread());
connect(conn, &QObject::destroyed,
this, [this](QObject * obj) {
......
......@@ -27,7 +27,7 @@
namespace Akonadi
{
class CommandBuffer;
class SessionThread : public QObject
{
Q_OBJECT
......@@ -36,13 +36,14 @@ public:
explicit SessionThread(QObject *parent = nullptr);
~SessionThread();
Connection *createConnection(Connection::ConnectionType connType, const QByteArray &sessionId);
private:
Q_INVOKABLE Akonadi::Connection *doCreateConnection(Akonadi::Connection::ConnectionType connType,
const QByteArray &sessionId);
Connection *createConnection(Connection::ConnectionType connType, const QByteArray &sessionId,