Commit 34fdee06 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Make the dependencies between service threads explicit

parent 48b2190d
......@@ -46,7 +46,7 @@ private Q_SLOTS:
void shouldInitializeSyncIntervals()
{
// WHEN
FakeIntervalCheck sched(mAkonadi);
FakeIntervalCheck sched(mAkonadi.itemRetrievalManager());
sched.waitForInit();
const TimePoint now(std::chrono::steady_clock::now());
// THEN
......@@ -66,7 +66,7 @@ private Q_SLOTS:
void shouldObeyMinimumInterval()
{
// GIVEN
FakeIntervalCheck sched(mAkonadi);
FakeIntervalCheck sched(mAkonadi.itemRetrievalManager());
// WHEN
sched.setMinimumInterval(10);
sched.waitForInit();
......@@ -82,7 +82,7 @@ private Q_SLOTS:
void shouldRemoveAndAddCollectionFromSchedule()
{
// GIVEN
FakeIntervalCheck sched(mAkonadi);
FakeIntervalCheck sched(mAkonadi.itemRetrievalManager());
sched.waitForInit();
const auto timeForRoot = sched.nextScheduledTime(1);
const auto timeForColB = sched.nextScheduledTime(3);
......@@ -112,7 +112,7 @@ private Q_SLOTS:
void shouldHonourIntervalChange()
{
// GIVEN
FakeIntervalCheck sched(mAkonadi);
FakeIntervalCheck sched(mAkonadi.itemRetrievalManager());
sched.waitForInit();
const auto timeForColB = sched.nextScheduledTime(3);
Collection colA = Collection::retrieveByName(QStringLiteral("Collection A"));
......
......@@ -26,6 +26,11 @@
#include "inspectablenotificationcollector.h"
#include "fakeintervalcheck.h"
#include "storage/collectionstatistics.h"
#include "cachecleaner.h"
#include "storagejanitor.h"
#include "resourcemanager.h"
#include "debuginterface.h"
#include "search/searchtaskmanager.h"
#include <QSettings>
#include <QCoreApplication>
......@@ -222,16 +227,20 @@ void FakeAkonadiServer::initFake()
}
mTracer = std::make_unique<Tracer>();
mPreprocessorManager = std::make_unique<PreprocessorManager>(*this);
mPreprocessorManager->setEnabled(false);
mSearchManager = std::make_unique<FakeSearchManager>(*this);
mCollectionStats = std::make_unique<CollectionStatistics>();
mCacheCleaner = std::make_unique<CacheCleaner>();
if (!mDisableItemRetrievalManager) {
mItemRetrieval = std::make_unique<FakeItemRetrievalManager>();
}
mAgentSearchManager = std::make_unique<SearchTaskManager>();
mIntervalCheck = std::make_unique<FakeIntervalCheck>(*this);
mDebugInterface = std::make_unique<DebugInterface>(*mTracer.get());
mResourceManager = std::make_unique<ResourceManager>(*mTracer.get());
mPreprocessorManager = std::make_unique<PreprocessorManager>(*mTracer.get());
mPreprocessorManager->setEnabled(false);
mIntervalCheck = std::make_unique<FakeIntervalCheck>(*mItemRetrieval.get());
mSearchManager = std::make_unique<FakeSearchManager>(*mAgentSearchManager.get());
mStorageJanitor = std::make_unique<StorageJanitor>(*this);
qDebug() << "==== Fake Akonadi Server started ====";
}
......@@ -255,21 +264,23 @@ bool FakeAkonadiServer::quit()
mConnection.reset();
mClient.reset();
mPreprocessorManager.reset();
mCollectionStats.reset();
mStorageJanitor.reset();
mSearchManager.reset();
mItemRetrieval.reset();
mIntervalCheck.reset();
mPreprocessorManager.reset();
mResourceManager.reset();
mDebugInterface.reset();
mAgentSearchManager.reset();
mItemRetrieval.reset();
mCacheCleaner.reset();
mCollectionStats.reset();
mTracer.reset();
if (mDataStore) {
mDataStore->close();
}
mIntervalCheck.reset();
// Tracer should go last
mTracer.reset();
qDebug() << "==== Fake Akonadi Server shut down ====";
return true;
}
......
......@@ -18,12 +18,11 @@
*/
#include "fakeintervalcheck.h"
#include "fakeakonadiserver.h"
using namespace Akonadi::Server;
FakeIntervalCheck::FakeIntervalCheck(FakeAkonadiServer &akonadi)
: IntervalCheck(akonadi)
FakeIntervalCheck::FakeIntervalCheck(ItemRetrievalManager &retrievalManager)
: IntervalCheck(retrievalManager)
{
}
......
......@@ -27,13 +27,13 @@
namespace Akonadi {
namespace Server {
class FakeAkonadiServer;
class ItemRetrievalManager;
class FakeIntervalCheck : public IntervalCheck
{
Q_OBJECT
public:
FakeIntervalCheck(FakeAkonadiServer &akonadi);
FakeIntervalCheck(ItemRetrievalManager &retrievalManager);
void waitForInit();
protected:
......
......@@ -18,14 +18,13 @@
*/
#include "fakesearchmanager.h"
#include "fakeakonadiserver.h"
#include "entities.h"
using namespace Akonadi::Server;
FakeSearchManager::FakeSearchManager(FakeAkonadiServer &akonadi)
: SearchManager(QStringList(), akonadi)
FakeSearchManager::FakeSearchManager(SearchTaskManager &agentSearchManager)
: SearchManager(QStringList(), agentSearchManager)
{
}
......
......@@ -25,7 +25,7 @@
namespace Akonadi {
namespace Server {
class FakeAkonadiServer;
class SearchTaskManager;
/**
* Subclass of SearchManager that does nothing.
......@@ -35,7 +35,7 @@ class FakeSearchManager : public SearchManager
Q_OBJECT
public:
explicit FakeSearchManager(FakeAkonadiServer &akonadi);
explicit FakeSearchManager(SearchTaskManager &searchTaskManager);
~FakeSearchManager() override;
void registerInstance(const QString &id) override;
......
......@@ -108,135 +108,50 @@ bool AkonadiServer::init()
// Restrict permission to 600, as the file might contain database password in plaintext
QFile::setPermissions(serverConfigFile, QFile::ReadOwner | QFile::WriteOwner);
if (!DbConfig::configuredDatabase()) {
quit();
return false;
}
if (DbConfig::configuredDatabase()->useInternalServer()) {
if (!startDatabaseProcess()) {
quit();
return false;
}
} else {
if (!createDatabase()) {
quit();
return false;
}
}
DbConfig::configuredDatabase()->setup();
const QString connectionSettingsFile = StandardDirs::connectionConfigFile(StandardDirs::WriteOnly);
QSettings connectionSettings(connectionSettingsFile, QSettings::IniFormat);
mCmdServer = std::make_unique<AkLocalServer>(this);
connect(mCmdServer.get(), QOverload<quintptr>::of(&AkLocalServer::newConnection), this, &AkonadiServer::newCmdConnection);
mNotificationManager = std::make_unique<NotificationManager>();
mNtfServer = std::make_unique<AkLocalServer>(this);
// Note: this is a queued connection, as NotificationManager lives in its
// own thread
connect(mNtfServer.get(), QOverload<quintptr>::of(&AkLocalServer::newConnection),
mNotificationManager.get(), &NotificationManager::registerConnection);
// TODO: share socket setup with client
#ifdef Q_OS_WIN
// use the installation prefix as uid
QString suffix;
if (Instance::hasIdentifier()) {
suffix = QStringLiteral("%1-").arg(Instance::identifier());
}
suffix += QString::fromUtf8(QUrl::toPercentEncoding(qApp->applicationDirPath()));
const QString defaultCmdPipe = QStringLiteral("Akonadi-Cmd-") % suffix;
const QString cmdPipe = settings.value(QStringLiteral("Connection/NamedPipe"), defaultCmdPipe).toString();
if (!mCmdServer->listen(cmdPipe)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << cmdPipe << ":" << mCmdServer->errorString();
quit();
return false;
const QByteArray dbusAddress = qgetenv("DBUS_SESSION_BUS_ADDRESS");
if (!dbusAddress.isEmpty()) {
connectionSettings.setValue(QStringLiteral("DBUS/Address"), QLatin1String(dbusAddress));
}
const QString defaultNtfPipe = QStringLiteral("Akonadi-Ntf-") % suffix;
const QString ntfPipe = settings.value(QStringLiteral("Connection/NtfNamedPipe"), defaultNtfPipe).toString();
if (!mNtfServer->listen(ntfPipe)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << ntfPipe << ":" << mNtfServer->errorString();
quit();
return false;
}
connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("NamedPipe"));
connectionSettings.setValue(QStringLiteral("Data/NamedPipe"), cmdPipe);
connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("NamedPipe"));
connectionSettings.setValue(QStringLiteral("Notifications/NamedPipe"), ntfPipe);
#else
const QString cmdSocketName = QStringLiteral("akonadiserver-cmd.socket");
const QString ntfSocketName = QStringLiteral("akonadiserver-ntf.socket");
const QString socketDir = Utils::preferredSocketDirectory(StandardDirs::saveDir("data"),
qMax(cmdSocketName.length(), ntfSocketName.length()));
const QString cmdSocketFile = socketDir % QLatin1Char('/') % cmdSocketName;
QFile::remove(cmdSocketFile);
if (!mCmdServer->listen(cmdSocketFile)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Unix socket" << cmdSocketFile << ":" << mCmdServer->errorString();
// Setup database
if (!setupDatabase()) {
quit();
return false;
}
const QString ntfSocketFile = socketDir % QLatin1Char('/') % ntfSocketName;
QFile::remove(ntfSocketFile);
if (!mNtfServer->listen(ntfSocketFile)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Unix socket" << ntfSocketFile << ":" << mNtfServer->errorString();
// Create local servers and start listening
if (!createServers(connectionSettings)) {
quit();
return false;
}
connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("UnixPath"));
connectionSettings.setValue(QStringLiteral("Data/UnixPath"), cmdSocketFile);
connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("UnixPath"));
connectionSettings.setValue(QStringLiteral("Notifications/UnixPath"), ntfSocketFile);
#endif
// initialize the database
DataStore *db = DataStore::self();
if (!db->database().isOpen()) {
qCCritical(AKONADISERVER_LOG) << "Unable to open database" << db->database().lastError().text();
quit();
return false;
}
if (!db->init()) {
qCCritical(AKONADISERVER_LOG) << "Unable to initialize database.";
quit();
return false;
}
const auto searchManagers = settings.value(QStringLiteral("Search/Manager"),
QStringList{QStringLiteral("Agent")}).toStringList();
mTracer = std::make_unique<Tracer>();
mDebugInterface = std::make_unique<DebugInterface>(*this);
mResourceManager = std::make_unique<ResourceManager>(*this);
mCollectionStats = std::make_unique<CollectionStatistics>();
mPreprocessorManager = std::make_unique<PreprocessorManager>(*this);
// Forcibly disable it if configuration says so
if (settings.value(QStringLiteral("General/DisablePreprocessing"), false).toBool()) {
mPreprocessorManager->setEnabled(false);
}
mCacheCleaner = std::make_unique<CacheCleaner>();
mIntervalCheck = std::make_unique<IntervalCheck>(*this);
mStorageJanitor = std::make_unique<StorageJanitor>(*this);
mItemRetrieval = std::make_unique<ItemRetrievalManager>();
mAgentSearchManager = std::make_unique<SearchTaskManager>();
const QStringList searchManagers = settings.value(QStringLiteral("Search/Manager"),
QStringList() << QStringLiteral("Agent")).toStringList();
mSearchManager = std::make_unique<SearchManager>(searchManagers, *this);
mDebugInterface = std::make_unique<DebugInterface>(*mTracer.get());
mResourceManager = std::make_unique<ResourceManager>(*mTracer.get());
mPreprocessorManager = std::make_unique<PreprocessorManager>(*mTracer.get());
mIntervalCheck = std::make_unique<IntervalCheck>(*mItemRetrieval.get());
mSearchManager = std::make_unique<SearchManager>(searchManagers, *mAgentSearchManager.get());
mStorageJanitor = std::make_unique<StorageJanitor>(*this);
if (settings.value(QStringLiteral("General/DisablePreprocessing"), false).toBool()) {
mPreprocessorManager->setEnabled(false);
}
new ServerAdaptor(this);
QDBusConnection::sessionBus().registerObject(QStringLiteral("/Server"), this);
const QByteArray dbusAddress = qgetenv("DBUS_SESSION_BUS_ADDRESS");
if (!dbusAddress.isEmpty()) {
connectionSettings.setValue(QStringLiteral("DBUS/Address"), QLatin1String(dbusAddress));
}
mControlWatcher = std::make_unique<QDBusServiceWatcher>(
DBus::serviceName(DBus::Control), QDBusConnection::sessionBus(),
QDBusServiceWatcher::WatchForUnregistration);
......@@ -251,7 +166,7 @@ bool AkonadiServer::init()
// server quit. We don't attempt to resume preprocessing
// for the items as we don't actually know at which stage the
// operation was interrupted...
db->unhideAllPimItems();
DataStore::self()->unhideAllPimItems();
// We are ready, now register org.freedesktop.Akonadi service to DBus and
// the fun can begin
......@@ -277,17 +192,18 @@ bool AkonadiServer::quit()
mConnections.clear();
qCDebug(AKONADISERVER_LOG) << "terminating service threads";
mDebugInterface.reset();
mResourceManager.reset();
mCacheCleaner.reset();
mIntervalCheck.reset();
// Keep this order in sync (reversed) with the order of initialization
mStorageJanitor.reset();
mItemRetrieval.reset();
mAgentSearchManager.reset();
mSearchManager.reset();
mNotificationManager.reset();
mCollectionStats.reset();
mIntervalCheck.reset();
mPreprocessorManager.reset();
mResourceManager.reset();
mDebugInterface.reset();
mAgentSearchManager.reset();
mItemRetrieval.reset();
mCacheCleaner.reset();
mCollectionStats.reset();
mTracer.reset();
if (DbConfig::isConfigured()) {
......@@ -335,6 +251,38 @@ void AkonadiServer::connectionDisconnected()
mConnections.erase(it);
}
bool AkonadiServer::setupDatabase()
{
if (!DbConfig::configuredDatabase()) {
return false;
}
if (DbConfig::configuredDatabase()->useInternalServer()) {
if (!startDatabaseProcess()) {
return false;
}
} else {
if (!createDatabase()) {
return false;
}
}
DbConfig::configuredDatabase()->setup();
// initialize the database
DataStore *db = DataStore::self();
if (!db->database().isOpen()) {
qCCritical(AKONADISERVER_LOG) << "Unable to open database" << db->database().lastError().text();
return false;
}
if (!db->init()) {
qCCritical(AKONADISERVER_LOG) << "Unable to initialize database.";
return false;
}
return true;
}
bool AkonadiServer::startDatabaseProcess()
{
if (!DbConfig::configuredDatabase()->useInternalServer()) {
......@@ -400,6 +348,72 @@ void AkonadiServer::stopDatabaseProcess()
DbConfig::configuredDatabase()->stopInternalServer();
}
bool AkonadiServer::createServers(QSettings &connectionSettings)
{
mCmdServer = std::make_unique<AkLocalServer>(this);
connect(mCmdServer.get(), QOverload<quintptr>::of(&AkLocalServer::newConnection), this, &AkonadiServer::newCmdConnection);
mNotificationManager = std::make_unique<NotificationManager>();
mNtfServer = std::make_unique<AkLocalServer>(this);
// Note: this is a queued connection, as NotificationManager lives in its
// own thread
connect(mNtfServer.get(), QOverload<quintptr>::of(&AkLocalServer::newConnection),
mNotificationManager.get(), &NotificationManager::registerConnection);
// TODO: share socket setup with client
#ifdef Q_OS_WIN
// use the installation prefix as uid
QString suffix;
if (Instance::hasIdentifier()) {
suffix = QStringLiteral("%1-").arg(Instance::identifier());
}
suffix += QString::fromUtf8(QUrl::toPercentEncoding(qApp->applicationDirPath()));
const QString defaultCmdPipe = QStringLiteral("Akonadi-Cmd-") % suffix;
const QString cmdPipe = settings.value(QStringLiteral("Connection/NamedPipe"), defaultCmdPipe).toString();
if (!mCmdServer->listen(cmdPipe)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << cmdPipe << ":" << mCmdServer->errorString();
return false;
}
const QString defaultNtfPipe = QStringLiteral("Akonadi-Ntf-") % suffix;
const QString ntfPipe = settings.value(QStringLiteral("Connection/NtfNamedPipe"), defaultNtfPipe).toString();
if (!mNtfServer->listen(ntfPipe)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << ntfPipe << ":" << mNtfServer->errorString();
return false;
}
connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("NamedPipe"));
connectionSettings.setValue(QStringLiteral("Data/NamedPipe"), cmdPipe);
connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("NamedPipe"));
connectionSettings.setValue(QStringLiteral("Notifications/NamedPipe"), ntfPipe);
#else
const QString cmdSocketName = QStringLiteral("akonadiserver-cmd.socket");
const QString ntfSocketName = QStringLiteral("akonadiserver-ntf.socket");
const QString socketDir = Utils::preferredSocketDirectory(StandardDirs::saveDir("data"),
qMax(cmdSocketName.length(), ntfSocketName.length()));
const QString cmdSocketFile = socketDir % QLatin1Char('/') % cmdSocketName;
QFile::remove(cmdSocketFile);
if (!mCmdServer->listen(cmdSocketFile)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Unix socket" << cmdSocketFile << ":" << mCmdServer->errorString();
return false;
}
const QString ntfSocketFile = socketDir % QLatin1Char('/') % ntfSocketName;
QFile::remove(ntfSocketFile);
if (!mNtfServer->listen(ntfSocketFile)) {
qCCritical(AKONADISERVER_LOG) << "Unable to listen on Unix socket" << ntfSocketFile << ":" << mNtfServer->errorString();
return false;
}
connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("UnixPath"));
connectionSettings.setValue(QStringLiteral("Data/UnixPath"), cmdSocketFile);
connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("UnixPath"));
connectionSettings.setValue(QStringLiteral("Notifications/UnixPath"), ntfSocketFile);
#endif
return true;
}
CacheCleaner *AkonadiServer::cacheCleaner()
{
return mCacheCleaner.get();
......
......@@ -29,6 +29,7 @@
class QProcess;
class QDBusServiceWatcher;
class QSettings;
namespace Akonadi
{
......@@ -111,6 +112,8 @@ private:
bool startDatabaseProcess();
bool createDatabase();
void stopDatabaseProcess();
bool createServers(QSettings &connectionSettings);
bool setupDatabase();
uint userId() const;
protected:
......
......@@ -20,15 +20,14 @@
#include "debuginterface.h"
#include "debuginterfaceadaptor.h"
#include "tracer.h"
#include "akonadi.h"
#include <QDBusConnection>
using namespace Akonadi::Server;
DebugInterface::DebugInterface(AkonadiServer &akonadi)
DebugInterface::DebugInterface(Tracer &tracer)
: QObject()
, m_akonadi(akonadi)
, m_tracer(tracer)
{
new DebugInterfaceAdaptor(this);
QDBusConnection::sessionBus().registerObject(QStringLiteral("/debug"),
......@@ -37,10 +36,10 @@ DebugInterface::DebugInterface(AkonadiServer &akonadi)
QString DebugInterface::tracer() const
{
return m_akonadi.tracer().currentTracer();
return m_tracer.currentTracer();
}
void DebugInterface::setTracer(const QString &tracer)
{
m_akonadi.tracer().activateTracer(tracer);
m_tracer.activateTracer(tracer);
}
......@@ -27,7 +27,7 @@ namespace Akonadi
namespace Server
{
class AkonadiServer;
class Tracer;
/**
* Interface to configure and query debugging options.
......@@ -38,14 +38,14 @@ class DebugInterface : public QObject
Q_CLASSINFO("D-Bus Interface", "org.freedesktop.Akonadi.DebugInterface")
public:
explicit DebugInterface(AkonadiServer &akonadi);
explicit DebugInterface(Tracer &tracer);
public Q_SLOTS:
Q_SCRIPTABLE QString tracer() const;
Q_SCRIPTABLE void setTracer(const QString &tracer);
private:
AkonadiServer &m_akonadi;
Tracer &m_tracer;
};
} // namespace Server
......
......@@ -71,7 +71,7 @@ bool SearchHandler::parseStream()
mItemFetchScope = cmd.itemFetchScope();
mTagFetchScope = cmd.tagFetchScope();
SearchRequest request(connection()->sessionId(), akonadi());
SearchRequest request(connection()->sessionId(), akonadi().searchManager(), akonadi().agentSearchManager());
request.setCollections(collections);
request.setMimeTypes(cmd.mimeTypes());
request.setQuery(cmd.query());
......
......@@ -19,7 +19,6 @@
*/
#include "intervalcheck.h"
#include "akonadi.h"
#include "storage/datastore.h"
#include "storage/itemretrievalmanager.h"
#include "storage/entity.h"
......@@ -29,9 +28,9 @@ using namespace Akonadi::Server;
static const int MINIMUM_AUTOSYNC_INTERVAL = 5; // minutes
static const int MINIMUM_COLTREESYNC_INTERVAL = 5; // minutes
IntervalCheck::IntervalCheck(AkonadiServer &akonadi)
IntervalCheck::IntervalCheck(ItemRetrievalManager &itemRetrievalManager)
: CollectionScheduler(QStringLiteral("IntervalCheck"), QThread::IdlePriority)
, mAkonadi(akonadi)
, mItemRetrievalManager(itemRetrievalManager)
{
}
......@@ -75,7 +74,7 @@ void IntervalCheck::collectionExpired(const Collection &collection)
const QDateTime lastExpectedCheck = now.addSecs(interval * -60);
if (!mLastCollectionTreeSyncs.contains(resourceName) || mLastCollectionTreeSyncs.value(resourceName) < lastExpectedCheck) {
mLastCollectionTreeSyncs.insert(resourceName, now);
mAkonadi.itemRetrievalManager().triggerCollectionTreeSync(resourceName);
mItemRetrievalManager.triggerCollectionTreeSync(resourceName);
}
}
......@@ -87,5 +86,5 @@ void IntervalCheck::collectionExpired(const Collection &collection)
return;
}
mLastChecks.insert(collection.id(), now);
mAkonadi.itemRetrievalManager().triggerCollectionSync(collection.resource().name(), collection.id());
mItemRetrievalManager.triggerCollectionSync(collection.resource().name(), collection.id());
}