Commit f43ab1cf authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

NtfManager: process outgoing notifications in parallel

Use a QThreadPool to run NotificationSubcriber::notify() for multiple
instances in parallel. This might provide a small speed up with large
amount of subscribers, but nothing big. It will be more visible once
we start sending notification payloads.
parent ce107127
......@@ -30,6 +30,8 @@
#include <QLocalSocket>
#include <QSettings>
#include <QCoreApplication>
#include <QThreadPool>
#include <QPointer>
using namespace Akonadi;
using namespace Akonadi::Server;
......@@ -55,6 +57,9 @@ void NotificationManager::init()
mTimer->setSingleShot(true);
connect(mTimer, &QTimer::timeout,
this, &NotificationManager::emitPendingNotifications);
mNotifyThreadPool = new QThreadPool(this);
mNotifyThreadPool->setMaxThreadCount(5);
}
void NotificationManager::quit()
......@@ -99,6 +104,32 @@ void NotificationManager::slotNotify(const Protocol::ChangeNotification::List &m
}
}
class NotifyRunnable : public QRunnable
{
public:
explicit NotifyRunnable(NotificationSubscriber *subscriber,
const Protocol::ChangeNotification::List &notifications)
: mSubscriber(subscriber)
, mNotifications(notifications)
{
}
~NotifyRunnable()
{
}
void run() Q_DECL_OVERRIDE
{
if (mSubscriber) {
mSubscriber->notify(mNotifications);
}
}
private:
QPointer<NotificationSubscriber> mSubscriber;
Protocol::ChangeNotification::List mNotifications;
};
void NotificationManager::emitPendingNotifications()
{
if (mNotifications.isEmpty()) {
......@@ -106,7 +137,7 @@ void NotificationManager::emitPendingNotifications()
}
Q_FOREACH (NotificationSubscriber *subscriber, mSubscribers) {
subscriber->notify(mNotifications);
mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
}
mNotifications.clear();
......
......@@ -28,6 +28,7 @@
class NotificationManagerTest;
class QLocalSocket;
class QThreadPool;
namespace Akonadi {
namespace Server {
......@@ -61,6 +62,7 @@ private:
Protocol::ChangeNotification::List mNotifications;
QTimer *mTimer;
QThreadPool *mNotifyThreadPool;
QVector<NotificationSubscriber *> mSubscribers;
friend class NotificationSubscriber;
......
......@@ -122,6 +122,8 @@ void NotificationSubscriber::socketDisconnected()
void NotificationSubscriber::disconnectSubscriber()
{
QMutexLocker locker(&mLock);
if (mManager) {
Protocol::SubscriptionChangeNotification changeNtf;
changeNtf.setSubscriber(mSubscriber);
......@@ -140,6 +142,8 @@ void NotificationSubscriber::disconnectSubscriber()
void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
{
QMutexLocker locker(&mLock);
qDebug() << "Subscriber" << this << "identified as" << command.subscriberName();
mSubscriber = command.subscriberName();
......@@ -154,6 +158,8 @@ void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscripti
void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
{
QMutexLocker locker(&mLock);
const auto modifiedParts = command.modifiedParts();
#define START_MONITORING(type) \
......@@ -245,6 +251,8 @@ void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscripti
Protocol::ChangeNotification NotificationSubscriber::toChangeNotification() const
{
// Assumes mLock being locked by caller
Protocol::SubscriptionChangeNotification ntf;
ntf.setSessionId(mSession);
ntf.setSubscriber(mSubscriber);
......@@ -265,6 +273,8 @@ Protocol::ChangeNotification NotificationSubscriber::toChangeNotification() cons
bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
{
// Assumes mLock being locked by caller
if (id < 0) {
return false;
} else if (mMonitoredCollections.contains(id)) {
......@@ -277,6 +287,8 @@ bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
{
// Assumes mLock being locked by caller
const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType);
if (mMonitoredMimeTypes.contains(mimeType)) {
return true;
......@@ -293,6 +305,8 @@ bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const
{
// Assumes mLock being locked by caller
if (msg.operation() != Protocol::ItemChangeNotification::Move) {
return false;
}
......@@ -301,6 +315,8 @@ bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::
bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const
{
// Assumes mLock being locked by caller
if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
return false;
}
......@@ -310,13 +326,15 @@ bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::
bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &notification) const
{
// Assumes mLock being locked by caller
if (notification.items().count() == 0) {
return false;
}
//We always want notifications that affect the parent resource (like an item added to a referenced collection)
const bool notificationForParentResource = (mSession == notification.resource());
if (CollectionReferenceManager::instance()->isReferenced(notification.parentCollection())) {
//We always want notifications that affect the parent resource (like an item added to a referenced collection)
const bool notificationForParentResource = (mSession == notification.resource());
return (mExclusive
|| isCollectionMonitored(notification.parentCollection())
|| isMoveDestinationResourceMonitored(notification)
......@@ -364,6 +382,8 @@ bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeN
bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &notification) const
{
// Assumes mLock being locked by caller
if (notification.id() < 0) {
return false;
}
......@@ -435,6 +455,8 @@ bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::Colle
bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &notification) const
{
// Assumes mLock being locked by caller
if (notification.id() < 0) {
return false;
}
......@@ -491,6 +513,8 @@ bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNot
bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &notification) const
{
// Assumes mLock being locked by caller
Q_UNUSED(notification);
if (mAllMonitored) {
......@@ -506,6 +530,8 @@ bool NotificationSubscriber::acceptsRelationNotification(const Protocol::Relatio
bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &notification) const
{
// Assumes mLock being locked by caller
Q_UNUSED(notification);
// Unlike other types, subscription notifications must be explicitly enabled
......@@ -515,6 +541,8 @@ bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::Sub
bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &notification) const
{
// Assumes mLock being locked
// Uninitialized subscriber gets nothing
if (mSubscriber.isEmpty()) {
return false;
......@@ -544,9 +572,12 @@ bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotificat
void NotificationSubscriber::notify(const Protocol::ChangeNotification::List &notifications)
{
QMutexLocker locker(&mLock);
Q_FOREACH (const auto &notification, notifications) {
if (acceptsNotification(notification)) {
writeNotification(notification);
QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection,
Q_ARG(Akonadi::Protocol::ChangeNotification, notification));
}
}
}
......
......@@ -23,6 +23,7 @@
#include <QObject>
#include <QByteArray>
#include <QMimeDatabase>
#include <QMutex>
#include <private/protocol_p.h>
#include "entities.h"
......@@ -67,12 +68,16 @@ private:
bool isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const;
Protocol::ChangeNotification toChangeNotification() const;
protected Q_SLOTS:
virtual void writeNotification(const Akonadi::Protocol::ChangeNotification &notification);
protected:
explicit NotificationSubscriber(NotificationManager *manager = Q_NULLPTR);
virtual void writeNotification(const Protocol::ChangeNotification &notification);
void writeCommand(qint64 tag, const Protocol::Command &cmd);
mutable QMutex mLock;
NotificationManager *mManager;
QLocalSocket *mSocket;
QByteArray mSubscriber;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment