Commit f257797f authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Implement notification subscription management via Protocol

NotificationManager now lives in separete thread. It creates
NotificationSubscriber object for each new connection on NotificationServer.
Each connection does not have its own thread as in case of Connection,
because the communication there is minimal and we don't mind if the
NotificationManager thread gets blocked for a short moment.

Each Subscriber keeps state of the subscription. When a notification is
delivered to NotificationManager, it will pass them to all subscribers
which will then decide whether they accept the notification and eventually
send it to subscribed client.

Currently this breaks notification and subscriber debugging because we
completely removed the DBus introspection. The plan is to introduce a
subscription change notification (a type of Protocol::ChangeNotification)
that clients like AkonadiConsole could subscribe to to get information about
new or removed subscribers and their subscription state.
parent 0e478e2b
......@@ -22,13 +22,12 @@
#include "monitor_p.h"
#include "notificationsource_p.h"
#include "notificationbus_p.h"
#include "collectionfetchscope.h"
#include "itemfetchscope.h"
#include "akonaditestfake_export.h"
#include "private/protocol_p.h"
template<typename T, typename Cache>
class FakeEntityCache : public Cache
{
......@@ -122,25 +121,27 @@ public Q_SLOTS:
}
};
class AKONADITESTFAKE_EXPORT FakeNotificationBus : public QObject
class AKONADITESTFAKE_EXPORT FakeNotificationConnection : public Akonadi::Connection
{
Q_OBJECT
public:
explicit FakeNotificationBus(QObject *parent = Q_NULLPTR)
: QObject(parent)
explicit FakeNotificationConnection(QObject *parent = Q_NULLPTR)
: Connection(Connection::NotificationConnection, "testConn", parent)
{}
virtual ~FakeNotificationBus()
virtual ~FakeNotificationConnection()
{}
void emitNotify(const Akonadi::Protocol::ChangeNotification &ntf)
{
Q_EMIT notify(ntf);
Q_EMIT commandReceived(3, ntf);
}
/*
Q_SIGNALS:
void notify(const Akonadi::Protocol::ChangeNotification &ntf);
*/
};
class FakeMonitorDependeciesFactory : public Akonadi::ChangeNotificationDependenciesFactory
......@@ -154,13 +155,8 @@ public:
{
}
Akonadi::NotificationSource *createNotificationSource(QObject *parent) Q_DECL_OVERRIDE {
return new Akonadi::NotificationSource(new FakeNotificationSource(parent));
}
QObject *createNotificationBus(QObject *parent, Akonadi::NotificationSource *source) Q_DECL_OVERRIDE {
Q_UNUSED(source);
return new FakeNotificationBus(parent);
Akonadi::Connection *createNotificationConnection(Akonadi::Session *parent) Q_DECL_OVERRIDE {
return new FakeNotificationConnection(parent);
}
Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Akonadi::Session *session) Q_DECL_OVERRIDE {
......
......@@ -50,14 +50,9 @@ class AKONADITESTFAKE_EXPORT InspectableChangeRecorder : public Akonadi::ChangeR
public:
InspectableChangeRecorder(FakeMonitorDependeciesFactory *dependenciesFactory, QObject *parent = Q_NULLPTR);
FakeNotificationSource *notifier() const
FakeNotificationConnection *notificationConnection() const
{
return qobject_cast<FakeNotificationSource *>(d_ptr->notificationSource->source());
}
FakeNotificationBus *notificationBus() const
{
return qobject_cast<FakeNotificationBus *>(d_ptr->notificationBus);
return qobject_cast<FakeNotificationConnection *>(d_ptr->ntfConnection);
}
QQueue<Akonadi::Protocol::ChangeNotification> pendingNotifications() const
......
......@@ -50,14 +50,9 @@ class AKONADITESTFAKE_EXPORT InspectableMonitor : public Akonadi::Monitor
public:
InspectableMonitor(FakeMonitorDependeciesFactory *dependenciesFactory, QObject *parent = Q_NULLPTR);
FakeNotificationSource *notifier() const
FakeNotificationConnection *notificationConnection() const
{
return qobject_cast<FakeNotificationSource *>(d_ptr->notificationSource->source());
}
FakeNotificationBus *notificationBus() const
{
return qobject_cast<FakeNotificationBus *>(d_ptr->notificationBus);
return qobject_cast<FakeNotificationConnection *>(d_ptr->ntfConnection);
}
QQueue<Akonadi::Protocol::ChangeNotification> pendingNotifications() const
......
......@@ -93,6 +93,9 @@ void MonitorNotificationTest::testSingleMessage_impl(MonitorImpl *monitor, FakeC
{
Q_UNUSED(itemCache)
// Workaround for the QTimer::singleShot() in fake monitors to happen
QTest::qWait(10);
monitor->setSession(m_fakeSession);
monitor->fetchCollection(true);
......@@ -116,7 +119,7 @@ void MonitorNotificationTest::testSingleMessage_impl(MonitorImpl *monitor, FakeC
QVERIFY(monitor->pipeline().isEmpty());
QVERIFY(monitor->pendingNotifications().isEmpty());
monitor->notificationBus()->emitNotify(msg);
monitor->notificationConnection()->emitNotify(msg);
QCOMPARE(monitor->pipeline().size(), 1);
QVERIFY(monitor->pendingNotifications().isEmpty());
......@@ -185,7 +188,7 @@ void MonitorNotificationTest::testFillPipeline_impl(MonitorImpl *monitor, FakeCo
QVERIFY(monitor->pendingNotifications().isEmpty());
Q_FOREACH (const Protocol::ChangeNotification &ntf, list) {
monitor->notificationBus()->emitNotify(ntf);
monitor->notificationConnection()->emitNotify(ntf);
}
QCOMPARE(monitor->pipeline().size(), 5);
......@@ -272,7 +275,7 @@ void MonitorNotificationTest::testMonitor_impl(MonitorImpl *monitor, FakeCollect
QVERIFY(monitor->pendingNotifications().isEmpty());
Q_FOREACH (const Protocol::ChangeNotification &ntf, list) {
monitor->notificationBus()->emitNotify(ntf);
monitor->notificationConnection()->emitNotify(ntf);
}
// Collection 6 is not notified, because Collection 5 has held up the pipeline
......
......@@ -488,13 +488,11 @@ void ProtocolTest::testLoginCommand()
QVERIFY(!in.isResponse());
QVERIFY(in.isValid());
in.setSessionId("MySession-123-notifications");
in.setSessionMode(LoginCommand::NotificationBus);
const LoginCommand out = serializeAndDeserialize(in);
QVERIFY(out.isValid());
QVERIFY(!out.isResponse());
QCOMPARE(out.sessionId(), QByteArray("MySession-123-notifications"));
QCOMPARE(out.sessionMode(), LoginCommand::NotificationBus);
QCOMPARE(out, in);
const bool notEquals = (out != in);
QVERIFY(!notEquals);
......
......@@ -21,7 +21,7 @@
#include "entities.h"
#include "notificationmanager.h"
#include "notificationsource.h"
#include "notificationsubscriber.h"
#include <QtCore/QObject>
#include <QtTest/QTest>
......@@ -33,11 +33,77 @@ using namespace Akonadi::Server;
Q_DECLARE_METATYPE(QVector<QString>)
class TestableNotificationSubscriber : public NotificationSubscriber
{
public:
TestableNotificationSubscriber()
: NotificationSubscriber()
{
}
void setAllMonitored(bool allMonitored)
{
mAllMonitored = allMonitored;
}
void setMonitoredCollection(qint64 collection, bool monitored)
{
if (monitored) {
mMonitoredCollections.insert(collection);
} else {
mMonitoredCollections.remove(collection);
}
}
void setMonitoredItem(qint64 item, bool monitored)
{
if (monitored) {
mMonitoredItems.insert(item);
} else {
mMonitoredItems.remove(item);
}
}
void setMonitoredResource(const QByteArray &resource, bool monitored)
{
if (monitored) {
mMonitoredResources.insert(resource);
} else {
mMonitoredResources.remove(resource);
}
}
void setMonitoredMimeType(const QString &mimeType, bool monitored)
{
if (monitored) {
mMonitoredMimeTypes.insert(mimeType);
} else {
mMonitoredMimeTypes.remove(mimeType);
}
}
void setIgnoredSession(const QByteArray &session, bool ignored)
{
if (ignored) {
mIgnoredSessions.insert(session);
} else {
mIgnoredSessions.remove(session);
}
}
void writeNotification(const Protocol::ChangeNotification &notification) Q_DECL_OVERRIDE
{
emittedNotifications << notification;
}
Protocol::ChangeNotification::List emittedNotifications;
};
class NotificationManagerTest : public QObject
{
Q_OBJECT
typedef QList<NotificationSource *> NSList;
typedef QList<NotificationSubscriber *> NSList;
private Q_SLOTS:
void testSourceFilter_data()
......@@ -267,37 +333,33 @@ private Q_SLOTS:
QFETCH(Protocol::ChangeNotification, notification);
QFETCH(bool, accepted);
NotificationManager mgr;
NotificationSource source(QStringLiteral("testSource"), QString(), &mgr);
mgr.registerSource(&source);
TestableNotificationSubscriber subscriber;
source.setAllMonitored(allMonitored);
subscriber.setAllMonitored(allMonitored);
Q_FOREACH (Entity::Id id, monitoredCollections) {
source.setMonitoredCollection(id, true);
subscriber.setMonitoredCollection(id, true);
}
Q_FOREACH (Entity::Id id, monitoredItems) {
source.setMonitoredItem(id, true);
subscriber.setMonitoredItem(id, true);
}
Q_FOREACH (const QByteArray &res, monitoredResources) {
source.setMonitoredResource(res, true);
subscriber.setMonitoredResource(res, true);
}
Q_FOREACH (const QString &mimeType, monitoredMimeTypes) {
source.setMonitoredMimeType(mimeType, true);
subscriber.setMonitoredMimeType(mimeType, true);
}
Q_FOREACH (const QByteArray &session, ignoredSessions) {
source.setIgnoredSession(session, true);
subscriber.setIgnoredSession(session, true);
}
QSignalSpy spy(&source, SIGNAL(notify(Akonadi::Protocol::Command)));
Protocol::ChangeNotification::List list;
list << notification;
mgr.slotNotify(list);
mgr.emitPendingNotifications();
subscriber.notify(list);
QCOMPARE(spy.count(), accepted ? 1 : 0);
QCOMPARE(subscriber.emittedNotifications.count(), accepted ? 1 : 0);
if (accepted) {
Protocol::ChangeNotification ntf = spy.at(0).at(0).value<Protocol::Command>();
const Protocol::ChangeNotification ntf = subscriber.emittedNotifications.at(0);
QVERIFY(ntf.isValid());
}
}
......
......@@ -11,7 +11,7 @@ set(akonadicore_base_SRCS
changenotificationdependenciesfactory.cpp
changerecorder.cpp
changerecorder_p.cpp
connectionthread.cpp
connection.cpp
collection.cpp
collectioncolorattribute.cpp
collectionfetchscope.cpp
......@@ -46,7 +46,6 @@ set(akonadicore_base_SRCS
monitor.cpp
monitor_p.cpp
newmailnotifierattribute.cpp
notificationbus_p.cpp
notificationsource_p.cpp
partfetcher.cpp
pastehelper.cpp
......@@ -59,6 +58,7 @@ set(akonadicore_base_SRCS
searchquery.cpp
servermanager.cpp
session.cpp
sessionthread.cpp
specialcollectionattribute.cpp
specialcollections.cpp
tag.cpp
......
......@@ -18,70 +18,25 @@
*/
#include "changenotificationdependenciesfactory_p.h"
#include "KDBusConnectionPool"
#include "notificationsource_p.h"
#include "notificationbus_p.h"
#include "notificationsourceinterface.h"
#include "notificationmanagerinterface.h"
#include "sessionthread_p.h"
#include "connection_p.h"
#include "changemediator_p.h"
#include "servermanager.h"
#include "akonadicore_debug.h"
#include "session_p.h"
#include <KRandom>
#include <qdbusextratypes.h>
using namespace Akonadi;
NotificationSource *ChangeNotificationDependenciesFactory::createNotificationSource(QObject *parent)
Connection *ChangeNotificationDependenciesFactory::createNotificationConnection(Session *session)
{
if (!Akonadi::ServerManager::self()->isRunning()) {
return 0;
}
org::freedesktop::Akonadi::NotificationManager *manager =
new org::freedesktop::Akonadi::NotificationManager(
ServerManager::serviceName(Akonadi::ServerManager::Server),
QStringLiteral("/notifications"),
KDBusConnectionPool::threadConnection());
if (!manager) {
// :TODO: error handling
return 0;
}
const QString name =
QStringLiteral("%1_%2_%3").arg(
QCoreApplication::applicationName(),
QString::number(QCoreApplication::applicationPid()),
KRandom::randomString(6));
QDBusObjectPath p = manager->subscribe(name, false);
const bool validError = manager->lastError().isValid();
if (validError) {
qCWarning(AKONADICORE_LOG) << manager->lastError().name() << manager->lastError().message();
// :TODO: What to do?
delete manager;
return 0;
}
delete manager;
org::freedesktop::Akonadi::NotificationSource *notificationSource =
new org::freedesktop::Akonadi::NotificationSource(
ServerManager::serviceName(Akonadi::ServerManager::Server),
p.path(),
KDBusConnectionPool::threadConnection(), parent);
if (!notificationSource) {
// :TODO: error handling
return 0;
}
return new NotificationSource(notificationSource);
}
QObject *ChangeNotificationDependenciesFactory::createNotificationBus(QObject *parent, NotificationSource *source)
{
NotificationBusPrivate *priv = new NotificationBusPrivate;
Session *session = new Session(priv, source->identifier().toLatin1(), parent);
priv->setParent(session);
return priv;
return session->d->sessionThread()->createConnection(Connection::NotificationConnection, session->sessionId());
}
QObject *ChangeNotificationDependenciesFactory::createChangeMediator(QObject *parent)
......
......@@ -26,7 +26,7 @@
namespace Akonadi
{
class NotificationSource;
class Connection;
/**
* This class exists so that we can create a fake notification source in
......@@ -38,8 +38,7 @@ public:
virtual ~ChangeNotificationDependenciesFactory()
{
}
virtual NotificationSource *createNotificationSource(QObject *parent);
virtual QObject *createNotificationBus(QObject *parent, NotificationSource *source);
virtual Connection *createNotificationConnection(Session *parent);
virtual QObject *createChangeMediator(QObject *parent);
virtual Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Session *session);
......
......@@ -17,7 +17,7 @@
* 02110-1301, USA.
*/
#include "connectionthread_p.h"
#include "connection_p.h"
#include "session_p.h"
#include "servermanager_p.h"
#include "akonadicore_debug.h"
......@@ -37,8 +37,9 @@
using namespace Akonadi;
ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
Connection::Connection(ConnectionType connType, const QByteArray &sessionId, QObject *parent)
: QObject(parent)
, mConnectionType(connType)
, mSocket(Q_NULLPTR)
, mLogFile(Q_NULLPTR)
, mSessionId(sessionId)
......@@ -46,10 +47,6 @@ ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
qRegisterMetaType<Protocol::Command>();
qRegisterMetaType<QAbstractSocket::SocketState>();
QThread *thread = new QThread();
moveToThread(thread);
thread->start();
const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE");
if (!sessionLogFile.isEmpty()) {
mLogFile = new QFile(QStringLiteral("%1.%2.%3").arg(QString::fromLatin1(sessionLogFile),
......@@ -63,44 +60,24 @@ ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
}
}
ConnectionThread::~ConnectionThread()
Connection::~Connection()
{
if (QCoreApplication::instance()) {
QMetaObject::invokeMethod(this, "doThreadQuit");
} else {
// QCoreApplication already destroyed -> invokeMethod would just not get the message delivered
// We leak the socket, but at least we don't block for 10s
qWarning() << "Akonadi ConnectionThread deleted after QCoreApplication is destroyed. Clean up your sessions earlier!";
thread()->quit();
}
if (!thread()->wait(10 * 1000)) {
thread()->terminate();
// Make sure to wait until it's done, otherwise it can crash when the pthread callback is called
thread()->wait();
}
delete mLogFile;
delete thread();
}
void ConnectionThread::doThreadQuit()
{
Q_ASSERT(QThread::currentThread() == thread());
if (mSocket) {
mSocket->disconnect(this);
mSocket->close();
delete mSocket;
}
thread()->quit();
}
void ConnectionThread::reconnect()
void Connection::reconnect()
{
const bool ok = QMetaObject::invokeMethod(this, "doReconnect", Qt::QueuedConnection);
Q_ASSERT(ok);
Q_UNUSED(ok)
}
void ConnectionThread::doReconnect()
void Connection::doReconnect()
{
Q_ASSERT(QThread::currentThread() == thread());
......@@ -146,9 +123,15 @@ void ConnectionThread::doReconnect()
<< XdgBaseDirs::systemPathList("config");
}
const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
const QString defaultSocketDir = StandardDirs::saveDir("data");
serverAddress = connectionSettings.value(QStringLiteral("Data/UnixPath"), QString(defaultSocketDir + QStringLiteral("/akonadiserver.socket"))).toString();
if (mConnectionType == CommandConnection) {
const QString defaultSocketPath = defaultSocketDir % QStringLiteral("akonadiserver-cmd.socket");
serverAddress = connectionSettings.value(QStringLiteral("Data/UnixPath"), defaultSocketPath).toString();
} else if (mConnectionType == NotificationConnection) {
const QString defaultSocketPath = defaultSocketDir % QStringLiteral("akonadiserver-ntf.socket");
serverAddress = connectionSettings.value(QStringLiteral("Notifications/UnixPath"), defaultSocketPath).toString();
}
}
// create sockets if not yet done, note that this does not yet allow changing socket types on the fly
......@@ -157,11 +140,12 @@ void ConnectionThread::doReconnect()
mSocket = new QLocalSocket(this);
connect(mSocket, static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), this,
[this](QLocalSocket::LocalSocketError) {
qCWarning(AKONADICORE_LOG) << mSocket->errorString();
Q_EMIT socketError(mSocket->errorString());
Q_EMIT socketDisconnected();
});
connect(mSocket, &QLocalSocket::disconnected, this, &ConnectionThread::socketDisconnected);
connect(mSocket, &QLocalSocket::readyRead, this, &ConnectionThread::dataReceived);
connect(mSocket, &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
connect(mSocket, &QLocalSocket::readyRead, this, &Connection::dataReceived);
}
// actually do connect
......@@ -171,7 +155,7 @@ void ConnectionThread::doReconnect()
Q_EMIT reconnected();
}
void ConnectionThread::forceReconnect()
void Connection::forceReconnect()
{
const bool ok = QMetaObject::invokeMethod(this, "doForceReconnect",
Qt::QueuedConnection);
......@@ -179,7 +163,7 @@ void ConnectionThread::forceReconnect()
Q_UNUSED(ok)
}
void ConnectionThread::doForceReconnect()
void Connection::doForceReconnect()
{
Q_ASSERT(QThread::currentThread() == thread());
......@@ -191,14 +175,14 @@ void ConnectionThread::doForceReconnect()
mSocket = Q_NULLPTR;
}
void ConnectionThread::disconnect()
void Connection::closeConnection()
{
const bool ok = QMetaObject::invokeMethod(this, "doDisconnect", Qt::QueuedConnection);
const bool ok = QMetaObject::invokeMethod(this, "doCloseConnection", Qt::QueuedConnection);
Q_ASSERT(ok);
Q_UNUSED(ok)
}
void ConnectionThread::doDisconnect()
void Connection::doCloseConnection()
{
Q_ASSERT(QThread::currentThread() == thread());
......@@ -208,13 +192,12 @@ void ConnectionThread::doDisconnect()
}
}
void ConnectionThread::dataReceived()
void Connection::dataReceived()
{
Q_ASSERT(QThread::currentThread() == thread());
QElapsedTimer timer;
timer.start();
while (mSocket->bytesAvailable() > 0) {
QDataStream stream(mSocket);
qint64 tag;
......@@ -263,13 +246,13 @@ void ConnectionThread::dataReceived()
// to the jobs through event loop. That will be overall slower but should
// result in much more responsive applications.
if (timer.elapsed() > 50) {
QThread::currentThread()->eventDispatcher()->processEvents(QEventLoop::ExcludeSocketNotifiers);
thread()->eventDispatcher()->processEvents(QEventLoop::ExcludeSocketNotifiers);
timer.restart();
}
}
}
void ConnectionThread::sendCommand(qint64 tag, const Protocol::Command &cmd)
void Connection::sendCommand(qint64 tag, const Protocol::Command &cmd)
{
const bool ok = QMetaObject::invokeMethod(this, "doSendCommand",
Qt::QueuedConnection,
......@@ -279,7 +262,7 @@ void ConnectionThread::sendCommand(qint64 tag, const Protocol::Command &cmd)
Q_UNUSED(ok)
}
void ConnectionThread::doSendCommand(qint64 tag, const Protocol::Command &cmd)
void Connection::doSendCommand(qint64 tag, const Protocol::Command &cmd)
{
Q_ASSERT(QThread::currentThread() == thread());
......
......@@ -27,23 +27,33 @@
#include <private/protocol_p.h>
#include "akonadicore_export.h"
class QAbstractSocket;
class QFile;
namespace Akonadi
{
class ConnectionThread : public QObject
class SessionThread;
class AKONADICORE_EXPORT Connection : public QObject
<