connection.cpp 16.6 KB
Newer Older
Till Adam's avatar
Till Adam committed
1
/***************************************************************************
2 3
 *   SPDX-FileCopyrightText: 2006 Till Adam <adam@kde.org>                 *
 *   SPDX-FileCopyrightText: 2013 Volker Krause <vkrause@kde.org>          *
Till Adam's avatar
Till Adam committed
4
 *                                                                         *
5
 *   SPDX-License-Identifier: LGPL-2.0-or-later                            *
Till Adam's avatar
Till Adam committed
6
 ***************************************************************************/
7
#include "connection.h"
Laurent Montel's avatar
Laurent Montel committed
8
#include "akonadiserver_debug.h"
9

Laurent Montel's avatar
Laurent Montel committed
10

11
#include <QSettings>
12
#include <QEventLoop>
13
#include <QThreadStorage>
Till Adam's avatar
Till Adam committed
14 15

#include "storage/datastore.h"
16
#include "storage/dbdeadlockcatcher.h"
Till Adam's avatar
Till Adam committed
17
#include "handler.h"
18
#include "notificationmanager.h"
Daniel Vrátil's avatar
Daniel Vrátil committed
19

Till Adam's avatar
Till Adam committed
20
#include "tracer.h"
21

22
#include <cassert>
Hannah von Reth's avatar
Hannah von Reth committed
23 24

#ifndef Q_OS_WIN
25
#include <cxxabi.h>
Hannah von Reth's avatar
Hannah von Reth committed
26
#endif
Till Adam's avatar
Till Adam committed
27

28

29
#include <private/protocol_p.h>
30
#include <private/datastream_p_p.h>
31
#include <private/standarddirs_p.h>
32 33

using namespace Akonadi;
34
using namespace Akonadi::Server;
Till Adam's avatar
Till Adam committed
35

36 37
#define IDLE_TIMER_TIMEOUT 180000 // 3 min

38
static QString connectionIdentifier(Connection *c) {
Laurent Montel's avatar
Laurent Montel committed
39
    const QString id = QString::asprintf("%p", static_cast<void *>(c));
40 41 42
    return id;
}

43 44 45
Connection::Connection(AkonadiServer &akonadi)
    : AkThread(connectionIdentifier(this), QThread::InheritPriority)
    , m_akonadi(akonadi)
Till Adam's avatar
Till Adam committed
46
{
47 48
}

49 50 51
Connection::Connection(quintptr socketDescriptor, AkonadiServer &akonadi)
    : AkThread(connectionIdentifier(this), QThread::InheritPriority)
    , m_akonadi(akonadi)
52 53
{
    m_socketDescriptor = socketDescriptor;
54
    m_identifier = connectionIdentifier(this); // same as objectName()
55

56
    const QSettings settings(Akonadi::StandardDirs::serverConfigFile(), QSettings::IniFormat);
57
    m_verifyCacheOnRetrieval = settings.value(QStringLiteral("Cache/VerifyOnRetrieval"), m_verifyCacheOnRetrieval).toBool();
58 59 60 61 62
}

void Connection::init()
{
    AkThread::init();
Till Adam's avatar
Till Adam committed
63

Daniel Vrátil's avatar
Daniel Vrátil committed
64
    auto socket = std::make_unique<QLocalSocket>();
65
    if (!socket->setSocketDescriptor(m_socketDescriptor)) {
Laurent Montel's avatar
Laurent Montel committed
66
        qCWarning(AKONADISERVER_LOG) << "Connection(" << m_identifier
Laurent Montel's avatar
Laurent Montel committed
67
                                     << ")::run: failed to set socket descriptor: "
Laurent Montel's avatar
Laurent Montel committed
68 69
                                     << socket->error()
                                     << "(" << socket->errorString() << ")";
Till Adam's avatar
Till Adam committed
70 71 72
        return;
    }

Daniel Vrátil's avatar
Daniel Vrátil committed
73 74
    m_socket = std::move(socket);
    connect(m_socket.get(), &QLocalSocket::disconnected, this, &Connection::slotSocketDisconnected);
75

Daniel Vrátil's avatar
Daniel Vrátil committed
76 77
    m_idleTimer = std::make_unique<QTimer>();
    connect(m_idleTimer.get(), &QTimer::timeout, this, &Connection::slotConnectionIdle);
Till Adam's avatar
Till Adam committed
78

79
    storageBackend()->notificationCollector()->setConnection(this);
80

Daniel Vrátil's avatar
Daniel Vrátil committed
81
    if (m_socket->state() == QLocalSocket::ConnectedState) {
82 83
        QTimer::singleShot(0, this, &Connection::handleIncomingData);
    } else {
Daniel Vrátil's avatar
Daniel Vrátil committed
84
        connect(m_socket.get(), &QLocalSocket::connected, this, &Connection::handleIncomingData,
85 86 87 88 89 90
                Qt::QueuedConnection);
    }

    try {
        slotSendHello();
    } catch (const ProtocolException &e) {
91
        qCWarning(AKONADISERVER_LOG) << "Protocol Exception sending \"hello\" on connection" << m_identifier << ":" << e.what();
92 93
        m_socket->disconnectFromServer();
    }
94 95 96 97
}

void Connection::quit()
{
98 99 100 101 102 103
    if (QThread::currentThread()->loopLevel() > 1) {
        m_connectionClosing = true;
        Q_EMIT connectionClosing();
        return;
    }

104
    m_akonadi.tracer().endConnection(m_identifier, QString());
105

Daniel Vrátil's avatar
Daniel Vrátil committed
106 107
    m_socket.reset();
    m_idleTimer.reset();
108

109
    AkThread::quit();
110
}
111

112 113
void Connection::slotSendHello()
{
David Faure's avatar
David Faure committed
114
    SchemaVersion version = SchemaVersion::retrieveAll().at(0);
115

116 117 118 119 120 121
    Protocol::HelloResponse hello;
    hello.setServerName(QStringLiteral("Akonadi"));
    hello.setMessage(QStringLiteral("Not Really IMAP server"));
    hello.setProtocolVersion(Protocol::version());
    hello.setGeneration(version.generation());
    sendResponse(0, std::move(hello));
122 123
}

124 125
DataStore *Connection::storageBackend()
{
126 127
    if (!m_backend) {
        m_backend = DataStore::self();
128 129 130
    }
    return m_backend;
}
131

132 133
Connection::~Connection()
{
134
    quitThread();
135 136 137 138

    if (m_reportTime) {
        reportTime();
    }
139 140 141 142
}

void Connection::slotConnectionIdle()
{
Laurent Montel's avatar
Laurent Montel committed
143
    Q_ASSERT(m_currentHandler == nullptr);
Laurent Montel's avatar
Laurent Montel committed
144
    if (m_backend && m_backend->isOpened()) {
145 146 147 148 149
        if (m_backend->inTransaction()) {
            // This is a programming error, the timer should not have fired.
            // But it is safer to abort and leave the connection open, until
            // a later operation causes the idle timer to fire (than crash
            // the akonadi server).
150
            qCInfo(AKONADISERVER_LOG) << m_sessionId << "NOT Closing idle db connection; we are in transaction";
151 152 153 154
            return;
        }
        m_backend->close();
    }
Till Adam's avatar
Till Adam committed
155 156
}

157 158 159 160 161 162 163 164 165 166 167
void Connection::slotSocketDisconnected()
{
    // If we have active handler, wait for it to finish, then we emit the signal
    // from slotNewDate()
    if (m_currentHandler) {
        return;
    }

    Q_EMIT disconnected();
}

168 169 170 171 172 173 174 175 176 177 178 179 180
void Connection::parseStream(const Protocol::CommandPtr &cmd)
{
    if (!m_currentHandler->parseStream()) {
        try {
            m_currentHandler->failureResponse("Error while handling a command");
        } catch (...) {
            m_connectionClosing = true;
        }
        qCWarning(AKONADISERVER_LOG) << "Error while handling command" << cmd->type()
                                     << "on connection" << m_identifier;
    }
}

181
void Connection::handleIncomingData()
Till Adam's avatar
Till Adam committed
182
{
183
    Q_FOREVER {
184

185
        if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
186 187
            break;
        }
188

189 190 191 192
        // Blocks with event loop until some data arrive, allows us to still use QTimers
        // and similar while waiting for some data to arrive
        if (m_socket->bytesAvailable() < int(sizeof(qint64))) {
            QEventLoop loop;
Daniel Vrátil's avatar
Daniel Vrátil committed
193 194
            connect(m_socket.get(), &QLocalSocket::readyRead, &loop, &QEventLoop::quit);
            connect(m_socket.get(), &QLocalSocket::stateChanged, &loop, &QEventLoop::quit);
195 196 197
            connect(this, &Connection::connectionClosing, &loop, &QEventLoop::quit);
            loop.exec();
        }
198

199
        if (m_connectionClosing || !m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
200 201
            break;
        }
202

203
        m_idleTimer->stop();
204

205 206 207 208
        // will only open() a previously idle backend.
        // Otherwise, a new backend could lazily be constructed by later calls.
        if (!storageBackend()->isOpened()) {
            m_backend->open();
209 210
        }

211 212
        QString currentCommand;
        while (m_socket->bytesAvailable() >= int(sizeof(qint64))) {
Daniel Vrátil's avatar
Daniel Vrátil committed
213
            Protocol::DataStream stream(m_socket.get());
214 215 216 217 218 219
            qint64 tag = -1;
            stream >> tag;
            // TODO: Check tag is incremental sequence

            Protocol::CommandPtr cmd;
            try {
Daniel Vrátil's avatar
Daniel Vrátil committed
220
                cmd = Protocol::deserialize(m_socket.get());
221
            } catch (const Akonadi::ProtocolException &e) {
222 223
                qCWarning(AKONADISERVER_LOG) << "ProtocolException while deserializing incoming data on connection"
                                             << m_identifier << ":" <<  e.what();
Daniel Vrátil's avatar
Daniel Vrátil committed
224
                setState(Server::LoggingOut);
225 226
                return;
            } catch (const std::exception &e) {
227 228
                qCWarning(AKONADISERVER_LOG) << "Unknown exception while deserializing incoming data on connection"
                                             << m_identifier << ":" << e.what();
Daniel Vrátil's avatar
Daniel Vrátil committed
229
                setState(Server::LoggingOut);
230 231 232
                return;
            }
            if (cmd->type() == Protocol::Command::Invalid) {
233 234
                qCWarning(AKONADISERVER_LOG) << "Received an invalid command on connection" << m_identifier
                                             << ": resetting connection";
Daniel Vrátil's avatar
Daniel Vrátil committed
235
                setState(Server::LoggingOut);
236 237
                return;
            }
238

239
            // Tag context and collection context is not persistent.
240
            m_context.setTag(std::nullopt);
241
            m_context.setCollection({});
242 243
            if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
                m_akonadi.tracer().connectionInput(m_identifier, tag, cmd);
244
            }
245

Daniel Vrátil's avatar
Daniel Vrátil committed
246
            m_currentHandler = findHandlerForCommand(cmd->type());
247
            if (!m_currentHandler) {
248 249
                qCWarning(AKONADISERVER_LOG) << "Invalid command: no such handler for" << cmd->type()
                                             << "on connection" << m_identifier;
Daniel Vrátil's avatar
Daniel Vrátil committed
250
                setState(Server::LoggingOut);
251
                return;
252
            }
253 254
            if (m_reportTime) {
                startTime();
255
            }
256 257 258 259 260

            m_currentHandler->setConnection(this);
            m_currentHandler->setTag(tag);
            m_currentHandler->setCommand(cmd);
            try {
261
                DbDeadlockCatcher catcher([this, &cmd]() { parseStream(cmd); });
262 263 264 265 266 267 268
            } catch (const Akonadi::Server::HandlerException &e) {
                if (m_currentHandler) {
                    try {
                        m_currentHandler->failureResponse(e.what());
                    } catch (...) {
                        m_connectionClosing = true;
                    }
269 270
                    qCWarning(AKONADISERVER_LOG) << "Handler exception when handling command" << cmd->type()
                                                 << "on connection" << m_identifier << ":" << e.what();
271 272 273 274 275 276 277 278
                }
            } catch (const Akonadi::Server::Exception &e) {
                if (m_currentHandler) {
                    try {
                        m_currentHandler->failureResponse(QString::fromUtf8(e.type()) + QLatin1String(": ") + QString::fromUtf8(e.what()));
                    } catch (...) {
                        m_connectionClosing = true;
                    }
279 280
                    qCWarning(AKONADISERVER_LOG) << "General exception when handling command" << cmd->type()
                                                 << "on connection" << m_identifier << ":" << e.what();
281 282 283 284
                }
            } catch (const Akonadi::ProtocolException &e) {
                // No point trying to send anything back to client, the connection is
                // already messed up
285 286
                qCWarning(AKONADISERVER_LOG) << "Protocol exception when handling command" << cmd->type()
                                             << "on connection" << m_identifier << ":" << e.what();
287
                m_connectionClosing = true;
288
#if defined(Q_OS_LINUX) && !defined(_LIBCPP_VERSION)
289 290 291 292 293 294 295 296 297
            } catch (abi::__forced_unwind&) {
                // HACK: NPTL throws __forced_unwind during thread cancellation and
                // we *must* rethrow it otherwise the program aborts. Due to the issue
                // described in #376385 we might end up destroying (cancelling) the
                // thread from a nested loop executed inside parseStream() above,
                // so the exception raised in there gets caught by this try..catch
                // statement and it must be rethrown at all cost. Remove this hack
                // once the root problem is fixed.
                throw;
298
#endif
299
            } catch (...) {
300 301
                qCCritical(AKONADISERVER_LOG) << "Unknown exception while handling command" << cmd->type()
                                              << "on connection" << m_identifier;
302 303 304 305 306 307
                if (m_currentHandler) {
                    try {
                        m_currentHandler->failureResponse("Unknown exception caught");
                    } catch (...) {
                        m_connectionClosing = true;
                    }
308
                }
309
            }
310 311 312
            if (m_reportTime) {
                stopTime(currentCommand);
            }
Daniel Vrátil's avatar
Daniel Vrátil committed
313
            m_currentHandler.reset();
314

315
            if (!m_socket || m_socket->state() != QLocalSocket::ConnectedState) {
316 317 318 319 320 321 322
                Q_EMIT disconnected();
                return;
            }

            if (m_connectionClosing) {
                break;
            }
323
        }
324

325 326 327
        // reset, arm the timer
        m_idleTimer->start(IDLE_TIMER_TIMEOUT);

328
        if (m_connectionClosing) {
329
            break;
330
        }
Guy Maurel's avatar
Guy Maurel committed
331
    }
332

333 334 335 336 337
    if (m_connectionClosing) {
        m_socket->disconnect(this);
        m_socket->close();
        QTimer::singleShot(0, this, &Connection::quit);
    }
Till Adam's avatar
Till Adam committed
338 339
}

340
const CommandContext &Connection::context() const
341
{
342 343 344 345 346 347
    return m_context;
}

void Connection::setContext(const CommandContext &context)
{
    m_context = context;
348 349
}

Daniel Vrátil's avatar
Daniel Vrátil committed
350
std::unique_ptr<Handler> Connection::findHandlerForCommand(Protocol::Command::Type command)
Till Adam's avatar
Till Adam committed
351
{
352
    auto handler = Handler::findHandlerForCommandAlwaysAllowed(command, m_akonadi);
353 354
    if (handler) {
        return handler;
355
    }
Till Adam's avatar
Till Adam committed
356

357
    switch (m_connectionState) {
358
    case NonAuthenticated:
359
        handler =  Handler::findHandlerForCommandNonAuthenticated(command, m_akonadi);
360 361
        break;
    case Authenticated:
362
        handler =  Handler::findHandlerForCommandAuthenticated(command, m_akonadi);
363 364 365
        break;
    case LoggingOut:
        break;
Till Adam's avatar
Till Adam committed
366 367
    }

368
    return handler;
Till Adam's avatar
Till Adam committed
369 370
}

371 372 373 374 375
qint64 Connection::currentTag() const
{
    return m_currentHandler->tag();
}

Daniel Vrátil's avatar
Daniel Vrátil committed
376
void Connection::setState(ConnectionState state)
Till Adam's avatar
Till Adam committed
377
{
378
    if (state == m_connectionState) {
379 380
        return;
    }
Till Adam's avatar
Till Adam committed
381
    m_connectionState = state;
382
    switch (m_connectionState) {
383
    case NonAuthenticated:
384
        assert(0);   // can't happen, it's only the initial state, we can't go back to it
385 386 387 388
        break;
    case Authenticated:
        break;
    case LoggingOut:
389
        m_socket->disconnectFromServer();
390
        break;
Till Adam's avatar
Till Adam committed
391 392 393
    }
}

394
void Connection::setSessionId(const QByteArray &id)
Volker Krause's avatar
Volker Krause committed
395
{
Laurent Montel's avatar
Laurent Montel committed
396
    m_identifier = QString::asprintf("%s (%p)", id.data(), static_cast<void *>(this));
397
    m_akonadi.tracer().beginConnection(m_identifier, QString());
398
    //m_streamParser->setTracerIdentifier(m_identifier);
399 400 401

    m_sessionId = id;
    setObjectName(QString::fromLatin1(id));
402 403
    // this races with the use of objectName() in QThreadPrivate::start
    //thread()->setObjectName(objectName() + QStringLiteral("-Thread"));
404
    storageBackend()->setSessionId(id);
Volker Krause's avatar
Volker Krause committed
405 406
}

407
QByteArray Connection::sessionId() const
Volker Krause's avatar
Volker Krause committed
408
{
409
    return m_sessionId;
Volker Krause's avatar
Volker Krause committed
410 411
}

412
bool Connection::isOwnerResource(const PimItem &item) const
413
{
414
    if (context().resource().isValid() && item.collection().resourceId() == context().resource().id()) {
415 416 417 418 419 420 421
        return true;
    }
    // fallback for older resources
    if (sessionId() == item.collection().resource().name().toUtf8()) {
        return true;
    }
    return false;
422
}
423

424
bool Connection::isOwnerResource(const Collection &collection) const
425
{
426
    if (context().resource().isValid() && collection.resourceId() == context().resource().id()) {
427 428 429 430 431 432
        return true;
    }
    if (sessionId() == collection.resource().name().toUtf8()) {
        return true;
    }
    return false;
433 434
}

435
bool Connection::verifyCacheOnRetrieval() const
436
{
437
    return m_verifyCacheOnRetrieval;
438
}
439 440 441 442 443 444 445 446 447 448 449 450

void Connection::startTime()
{
    m_time.start();
}

void Connection::stopTime(const QString &identifier)
{
    int elapsed = m_time.elapsed();
    m_totalTime += elapsed;
    m_totalTimeByHandler[identifier] += elapsed;
    m_executionsByHandler[identifier]++;
Laurent Montel's avatar
Laurent Montel committed
451
    qCDebug(AKONADISERVER_LOG) << identifier << " time : " << elapsed << " total: " << m_totalTime;
452 453 454 455
}

void Connection::reportTime() const
{
Laurent Montel's avatar
Laurent Montel committed
456 457
    qCDebug(AKONADISERVER_LOG) << "===== Time report for " << m_identifier << " =====";
    qCDebug(AKONADISERVER_LOG) << " total: " << m_totalTime;
458 459
    for (auto it = m_totalTimeByHandler.cbegin(), end = m_totalTimeByHandler.cend(); it != end; ++it) {
        const QString &handler = it.key();
Laurent Montel's avatar
Laurent Montel committed
460
        qCDebug(AKONADISERVER_LOG) << "handler : " << handler << " time: " << m_totalTimeByHandler.value(handler) << " executions " << m_executionsByHandler.value(handler) << " avg: " << m_totalTimeByHandler.value(handler) / m_executionsByHandler.value(handler);
461
    }
462 463
}

464
void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
465
{
466 467
    if (m_akonadi.tracer().currentTracer() != QLatin1String("null")) {
        m_akonadi.tracer().connectionOutput(m_identifier, tag, response);
Daniel Vrátil's avatar
Daniel Vrátil committed
468
    }
Daniel Vrátil's avatar
Daniel Vrátil committed
469
    Protocol::DataStream stream(m_socket.get());
470
    stream << tag;
471 472
    Protocol::serialize(stream, response);
    stream.flush();
473 474 475 476 477 478 479 480
    if (!m_socket->waitForBytesWritten()) {
        if (m_socket->state() == QLocalSocket::ConnectedState) {
            throw ProtocolException("Server write timeout");
        } else {
            // The client has disconnected before we managed to send our response,
            // which is not an error
        }
    }
481 482
}

483

484
Protocol::CommandPtr Connection::readCommand()
485
{
Daniel Vrátil's avatar
Daniel Vrátil committed
486
    while (m_socket->bytesAvailable() < static_cast<int>(sizeof(qint64))) {
487
        Protocol::DataStream::waitForData(m_socket.get(), 30000); // 30 seconds, just in case client is busy
488
    }
489

Daniel Vrátil's avatar
Daniel Vrátil committed
490
    Protocol::DataStream stream(m_socket.get());
491 492
    qint64 tag;
    stream >> tag;
493

494
    // TODO: compare tag with m_currentHandler->tag() ?
Daniel Vrátil's avatar
Daniel Vrátil committed
495
    return Protocol::deserialize(m_socket.get());
496
}