Commit 0e085b86 authored by David Faure's avatar David Faure
Browse files

Implement buffering in the DataStream class, to improve performance on Windows.

Summary:
The internal QWindowsPipeWriter in Qt doesn't do any buffering, so the first 8
bytes (the tag) were sent first, and the rest only later.

This solution uses an explicit flush() method, while doing the flushing
in the destructor would have been much more convenient and less
error-prone, but the flushing can throw an exception, so we need
to do it inside the try/catch -- and certainly not when the stream
is deleted because another exception happened. This would all be
so much simpler without the use of exceptions :-)

Test Plan: all tests pass (on Linux)

Reviewers: dvratil, kfunk

Reviewed By: dvratil

Subscribers: kde-pim

Tags: #kde_pim

Differential Revision: https://phabricator.kde.org/D29265
parent dd761e0d
......@@ -66,8 +66,10 @@ private:
{
QBuffer buf;
buf.open(QIODevice::ReadWrite);
Akonadi::Protocol::DataStream stream(&buf);
Akonadi::Protocol::serialize(&buf, in);
Akonadi::Protocol::serialize(stream, in);
stream.flush();
buf.seek(0);
return Akonadi::Protocol::deserialize(&buf).staticCast<T>();
......@@ -82,6 +84,7 @@ private:
Akonadi::Protocol::DataStream stream(&buf);
stream << in;
stream.flush();
buf.seek(0);
T out;
stream >> out;
......
......@@ -66,14 +66,15 @@ TestScenario TestScenario::create(qint64 tag, TestScenario::Action action,
QBuffer buffer(&sc.data);
buffer.open(QIODevice::ReadWrite);
{
QDataStream stream(&buffer);
Protocol::DataStream stream(&buffer);
stream << tag;
Protocol::serialize(&buffer, response);
Protocol::serialize(stream, response);
stream.flush();
}
{
buffer.seek(0);
QDataStream os(&buffer);
Protocol::DataStream os(&buffer);
qint64 cmpTag;
os >> cmpTag;
Q_ASSERT(cmpTag == tag);
......
......@@ -24,6 +24,7 @@
#include <private/protocol_p.h>
#include <private/datastream_p_p.h>
#include <QBuffer>
#include <QTest>
#include <QMutexLocker>
#include <QLocalSocket>
......@@ -97,7 +98,9 @@ void FakeClient::readServerPart()
Protocol::deserialize(mStream.device());
}
} else {
QDataStream expectedStream(scenario.data);
QBuffer buffer(&scenario.data);
buffer.open(QIODevice::ReadOnly);
Protocol::DataStream expectedStream(&buffer);
qint64 expectedTag, actualTag;
expectedStream >> expectedTag;
......
......@@ -22,7 +22,7 @@
#include <QThread>
#include <QMutex>
#include <QDataStream>
#include "datastream_p_p.h"
#include "fakeakonadiserver.h"
......@@ -57,7 +57,7 @@ private:
TestScenario::List mScenarios;
QLocalSocket *mSocket = nullptr;
QDataStream mStream;
Protocol::DataStream mStream;
};
}
}
......
......@@ -316,7 +316,8 @@ void Connection::doSendCommand(qint64 tag, const Protocol::CommandPtr &cmd)
Protocol::DataStream stream(mSocket.data());
try {
stream << tag;
Protocol::serialize(mSocket.data(), cmd);
Protocol::serialize(stream, cmd);
stream.flush();
} catch (const Akonadi::ProtocolException &e) {
qCWarning(AKONADICORE_LOG) << "Protocol Exception:" << QString::fromUtf8(e.what());
mSocket->close();
......
......@@ -42,6 +42,20 @@ DataStream::DataStream(QIODevice *device)
DataStream::~DataStream()
{
// No flush() here. Throwing an exception in a destructor would go badly. The caller MUST call flush after writing.
}
void DataStream::flush()
{
if (!mWriteBuffer.isEmpty()) {
const int len = mWriteBuffer.size();
int ret = mDev->write(mWriteBuffer);
if (ret != len) {
// TODO: Try to write data again unless ret is -1?
throw ProtocolException("Failed to write all data");
}
mWriteBuffer.clear();
}
}
void DataStream::waitForData(QIODevice *device, int timeoutMs)
......@@ -114,11 +128,7 @@ void DataStream::writeRawData(const char *data, int len)
{
checkDevice();
int ret = mDev->write(data, len);
if (ret != len) {
// TODO: Try to write data again unless ret is -1?
throw ProtocolException("Failed to write all data");
}
mWriteBuffer += QByteArray::fromRawData(data, len);
}
void DataStream::writeBytes(const char *bytes, int len)
......
......@@ -50,6 +50,8 @@ public:
int waitTimeout() const;
void setWaitTimeout(int timeout);
void flush();
template<typename T>
inline typename std::enable_if<std::is_integral<T>::value, DataStream>::type
&operator<<(T val);
......@@ -88,6 +90,7 @@ private:
}
QIODevice *mDev;
QByteArray mWriteBuffer;
int mWaitTimeout;
};
......@@ -96,9 +99,7 @@ inline typename std::enable_if<std::is_integral<T>::value, DataStream>::type
&DataStream::operator<<(T val)
{
checkDevice();
if (mDev->write((char *)&val, sizeof(T)) != sizeof(T)) {
throw Akonadi::ProtocolException("Failed to write data to stream");
}
writeRawData((char *)&val, sizeof(T));
return *this;
}
......
......@@ -153,7 +153,7 @@ QDebug operator<<(QDebug _dbg, Command::Type type)
template<typename T>
DataStream &operator<<(DataStream &stream, const QSharedPointer<T> &ptr)
{
Protocol::serialize(stream.device(), ptr);
Protocol::serialize(stream, ptr);
return stream;
}
......
......@@ -269,7 +269,7 @@ private:
friend AKONADIPRIVATE_EXPORT CommandPtr deserialize(QIODevice *device);
};
AKONADIPRIVATE_EXPORT void serialize(QIODevice *device, const CommandPtr &command);
AKONADIPRIVATE_EXPORT void serialize(DataStream &stream, const CommandPtr &command);
AKONADIPRIVATE_EXPORT CommandPtr deserialize(QIODevice *device);
AKONADIPRIVATE_EXPORT QString debugString(const Command &command);
AKONADIPRIVATE_EXPORT inline QString debugString(const CommandPtr &command)
......
......@@ -126,9 +126,8 @@ bool CppGenerator::generateDocument(DocumentNode const *node)
void CppGenerator::writeImplSerializer(DocumentNode const *node)
{
mImpl << "void serialize(QIODevice *device, const CommandPtr &cmd)\n"
mImpl << "void serialize(DataStream &stream, const CommandPtr &cmd)\n"
"{\n"
" DataStream stream(device);\n"
" switch (static_cast<int>(cmd->type() | (cmd->isResponse() ? Command::_ResponseBit : 0))) {\n"
" case Command::Invalid:\n"
" stream << cmdCast<Command>(cmd);\n"
......
......@@ -481,7 +481,8 @@ void Connection::sendResponse(qint64 tag, const Protocol::CommandPtr &response)
}
Protocol::DataStream stream(m_socket.get());
stream << tag;
Protocol::serialize(m_socket.get(), response);
Protocol::serialize(stream, response);
stream.flush();
if (!m_socket->waitForBytesWritten()) {
if (m_socket->state() == QLocalSocket::ConnectedState) {
throw ProtocolException("Server write timeout");
......
......@@ -161,6 +161,7 @@ Connection::sendResponse(qint64 tag, T &&response)
Protocol::DataStream stream(m_socket.get());
stream << tag;
stream << std::move(response);
stream.flush();
if (!m_socket->waitForBytesWritten()) {
if (m_socket->state() == QLocalSocket::ConnectedState) {
throw ProtocolException("Server write timeout");
......
......@@ -675,7 +675,8 @@ void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::CommandPtr
Protocol::DataStream stream(mSocket);
stream << tag;
try {
Protocol::serialize(mSocket, cmd);
Protocol::serialize(stream, cmd);
stream.flush();
if (!mSocket->waitForBytesWritten()) {
if (mSocket->state() == QLocalSocket::ConnectedState) {
qCWarning(AKONADISERVER_LOG) << "NotificationSubscriber for" << mSubscriber << ": timeout writing into stream";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment