Verified Commit 2bf3a5c9 authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

Implement CompressionStream

It's a QIODevice that can be placed on top of another
QIODevice. When opened in read-only mode, it will read
compressed data from the underlying device and output
decompressed data. When opened in write-only mode it
will take the data written into it and write them
compressed into the underlying device.
parent b57db19f
......@@ -104,6 +104,14 @@ if (${AccountsQt5_FOUND} AND ${KAccounts_FOUND})
set(WITH_ACCOUNTS TRUE)
endif()
set(LibLZMA_MINIMUM_VERSION "5.0.0")
find_package(LibLZMA ${LibLZMA_MINIMUM_VERSION})
set_package_properties(LibLZMA PROPERTIES
DESCRIPTION "LZMA compression library"
URL "https://tukaani.org/xz/"
TYPE REQUIRED
)
if(BUILD_TESTING)
set(AKONADI_TESTS_EXPORT AKONADICORE_EXPORT)
......
......@@ -33,3 +33,4 @@ if (NOT MSVC)
endif()
add_unit_test(imapparsertest.cpp)
add_unit_test(imapsettest.cpp)
add_unit_test(compressionstreamtest.cpp)
/*
SPDX-FileCopyrightText: 2020 Daniel Vrátil <dvratil@kde.org>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
#include "compressionstream_p.h"
#include <QObject>
#include <QTest>
#include <QBuffer>
#include <QRandomGenerator>
using namespace Akonadi;
class CompressionStreamTest : public QObject
{
Q_OBJECT
private Q_SLOTS:
void testCompression_data()
{
QTest::addColumn<QByteArray>("testData");
QTest::newRow("Null") << QByteArray{};
QTest::newRow("Empty") << QByteArray("");
QTest::newRow("Hello world") << QByteArray("Hello world");
}
void testCompression()
{
QFETCH(QByteArray, testData);
QByteArray compressedData;
QBuffer compressedBuffer(&compressedData);
compressedBuffer.open(QIODevice::WriteOnly);
{
CompressionStream stream(&compressedBuffer);
QVERIFY(stream.open(QIODevice::WriteOnly));
QCOMPARE(stream.write(testData), testData.size());
stream.close();
QVERIFY(!stream.error());
}
compressedBuffer.close();
compressedBuffer.open(QIODevice::ReadOnly);
QByteArray decompressedData;
{
CompressionStream stream(&compressedBuffer);
QVERIFY(stream.open(QIODevice::ReadOnly));
decompressedData = stream.readAll();
stream.close();
QVERIFY(!stream.error());
}
QCOMPARE(decompressedData.size(), testData.size());
QCOMPARE(decompressedData, testData);
}
void testUnbufferedCompressionOfLargeText()
{
std::array<std::string, 101> loremIpsum = {
"Lorem", "ipsum", "dolor", "sit", "amet,", "consectetur", "adipiscing", "elit.", "Integer",
"dictum", "massa", "orci,", "eget", "tempor", "neque", "euismod", "a.", "Suspendisse", "mi",
"arcu,", "facilisis", "eu", "risus", "at,", "varius", "vehicula", "mi.", "Proin", "tristique",
"eros", "nisl,", "vel", "porttitor", "erat", "elementum", "at.", "Quisque", "et", "ex", "id",
"risus", "hendrerit", "rhoncus", "eu", "vel", "enim.", "Vivamus", "at", "lorem", "laoreet", "ex",
"mattis", "feugiat", "vitae", "sit", "amet", "sem.", "Vestibulum", "in", "ante", "sagittis,",
"venenatis", "nibh", "et,", "consectetur", "est.", "Donec", "cursus", "enim", "ac", "pellentesque",
"euismod.", "Nullam", "interdum", "metus", "sed", "blandit", "dapibus.", "Ut", "nec", "euismod",
"magna.", "Aenean", "gravida", "elit", "metus,", "eget", "vehicula", "nibh", "euismod", "ut.",
"Vestibulum", "risus", "lectus,", "molestie", "elementum", "lobortis", "at,", "finibus", "a", "quam."
};
QByteArray testData;
QRandomGenerator *generator = QRandomGenerator::system();
while (testData.size() < 10 * 1024) {
testData += QByteArray(" ") + loremIpsum[generator->bounded(100)].c_str();
}
QByteArray compressedData;
QBuffer compressedBuffer(&compressedData);
compressedBuffer.open(QIODevice::WriteOnly);
{
CompressionStream stream(&compressedBuffer);
QVERIFY(stream.open(QIODevice::WriteOnly | QIODevice::Unbuffered));
qint64 written = 0;
for (int i = 0; i < testData.size(); ++i) {
written += stream.write(testData.constData() + i, 1);
}
QCOMPARE(written, testData.size());
stream.close();
QVERIFY(!stream.error());
}
compressedBuffer.close();
compressedBuffer.open(QIODevice::ReadOnly);
QByteArray decompressedData;
{
CompressionStream stream(&compressedBuffer);
QVERIFY(stream.open(QIODevice::ReadOnly | QIODevice::Unbuffered));
while (!stream.atEnd()) {
char buf[3] = {};
const auto read = stream.read(buf, sizeof(buf));
decompressedData.append(buf, read);
}
stream.close();
QVERIFY(!stream.error());
}
QCOMPARE(decompressedData.size(), testData.size());
QCOMPARE(decompressedData, testData);
}
void testDetection_data()
{
QTest::addColumn<QVector<uint8_t>>("data");
QTest::addColumn<bool>("isValid");
QTest::newRow("Too short - random") << QVector<uint8_t>{0x65, 0x66} << false;
QTest::newRow("Too short - valid start") << QVector<uint8_t>{0xfd, 0x37, 0x7a, 0x58, 0x5a} << false;
QTest::newRow("Valid magic only") << QVector<uint8_t>{0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00} << true;
QTest::newRow("Valid input") << QVector<uint8_t>{
0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00, 0x00, 0x04, 0xe6, 0xd6,
0xb4, 0x46, 0x02, 0x00, 0x21, 0x01, 0x16, 0x00, 0x00, 0x00,
0x74, 0x2f, 0xe5, 0xa3, 0x01, 0x00, 0x01, 0x41, 0x0a, 0x00,
0x00, 0x00, 0x8f, 0xe8, 0x69, 0xe6, 0x2b, 0x6a, 0xcd, 0x94,
0x00, 0x01, 0x1a, 0x02, 0xdc, 0x2e, 0xa5, 0x7e, 0x1f, 0xb6,
0xf3, 0x7d, 0x01, 0x00, 0x00, 0x00, 0x00, 0x04, 0x59, 0x5a } << true;
}
void testDetection()
{
QFETCH(QVector<uint8_t>, data);
QFETCH(bool, isValid);
QByteArray ba(reinterpret_cast<const char *>(data.constData()), data.size());
QBuffer buffer(&ba);
QVERIFY(buffer.open(QIODevice::ReadOnly));
QCOMPARE(CompressionStream::isCompressed(&buffer), isValid);
}
};
QTEST_GUILESS_MAIN(CompressionStreamTest)
#include "compressionstreamtest.moc"
......@@ -19,6 +19,7 @@ set(akonadiprivate_SRCS
imapparser.cpp
imapset.cpp
instance.cpp
compressionstream.cpp
datastream_p.cpp
externalpartstorage.cpp
protocol.cpp
......@@ -32,6 +33,8 @@ set(akonadiprivate_LIBS
PUBLIC
Qt5::Core
Qt5::DBus
PRIVATE
LibLZMA::LibLZMA
)
if (WIN32)
set(akonadiprivate_LIBS
......
/*
SPDX-FileCopyrightText: 2020 Daniel Vrátil <dvratil@kde.org>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
#include "compressionstream_p.h"
#include "akonadiprivate_debug.h"
#include <QByteArray>
#include <lzma.h>
using namespace Akonadi;
namespace {
class LZMAErrorCategory : public std::error_category
{
public:
const char *name() const noexcept override { return "lzma"; }
std::string message(int ev) const noexcept override
{
switch (static_cast<lzma_ret>(ev)) {
case LZMA_OK: return "Operation completed succesfully";
case LZMA_STREAM_END: return "End of stream was reached";
case LZMA_NO_CHECK: return "Input stream has no integrity check";
case LZMA_UNSUPPORTED_CHECK: return "Cannot calculate the integrity check";
case LZMA_GET_CHECK: return "Integrity check type is now available";
case LZMA_MEM_ERROR: return "Cannot allocate memory";
case LZMA_MEMLIMIT_ERROR: return "Memory usage limit was reached";
case LZMA_FORMAT_ERROR: return "File format not recognized";
case LZMA_OPTIONS_ERROR: return "Invalid or unsupported options";
case LZMA_DATA_ERROR: return "Data is corrupt";
case LZMA_BUF_ERROR: return "No progress is possible";
case LZMA_PROG_ERROR: return "Programming error";
}
Q_UNREACHABLE();
}
};
const LZMAErrorCategory &lzmaErrorCategory()
{
static const LZMAErrorCategory lzmaErrorCategory {};
return lzmaErrorCategory;
}
} // namespace
namespace std
{
template<> struct is_error_code_enum<lzma_ret> : std::true_type {};
std::error_condition make_error_condition(lzma_ret ret)
{
return std::error_condition(static_cast<int>(ret), lzmaErrorCategory());
}
QDebug operator<<(QDebug dbg, const std::string &str)
{
dbg << QString::fromStdString(str);
return dbg;
}
} // namespace std
std::error_code make_error_code(lzma_ret e)
{
return {static_cast<int>(e), lzmaErrorCategory()};
}
class Akonadi::Compressor {
public:
std::error_code initialize(QIODevice::OpenMode openMode)
{
if (openMode == QIODevice::ReadOnly) {
return lzma_auto_decoder(&mStream, 100 * 1024 * 1024 /* 100 MiB */, 0);
} else {
return lzma_easy_encoder(&mStream, LZMA_PRESET_DEFAULT, LZMA_CHECK_CRC32);
}
}
void setInputBuffer(const char *data, qint64 size)
{
mStream.next_in = reinterpret_cast<const uint8_t *>(data);
mStream.avail_in = size;
}
void setOutputBuffer(char *data, qint64 maxSize)
{
mStream.next_out = reinterpret_cast<uint8_t *>(data);
mStream.avail_out = maxSize;
}
int inputBufferAvailable() const
{
return mStream.avail_in;
}
int outputBufferAvailable() const
{
return mStream.avail_out;
}
std::error_code finalize()
{
lzma_end(&mStream);
return LZMA_OK;
}
std::error_code inflate()
{
return lzma_code(&mStream, LZMA_RUN);
}
std::error_code deflate(bool finish)
{
return lzma_code(&mStream, finish ? LZMA_FINISH : LZMA_RUN);
}
protected:
lzma_stream mStream = LZMA_STREAM_INIT;
};
CompressionStream::CompressionStream(QIODevice *stream, QObject *parent)
: QIODevice(parent)
, mStream(stream)
, mResult(LZMA_OK)
{}
CompressionStream::~CompressionStream()
{
close();
}
bool CompressionStream::open(OpenMode mode)
{
if ((mode & QIODevice::ReadOnly) && (mode & QIODevice::WriteOnly)) {
qCWarning(AKONADIPRIVATE_LOG) << "Invalid open mode for CompressionStream.";
return false;
}
mCompressor = std::make_unique<Compressor>();
if (const auto err = mCompressor->initialize(mode & QIODevice::ReadOnly ? QIODevice::ReadOnly : QIODevice::WriteOnly); err != LZMA_OK) {
qCWarning(AKONADIPRIVATE_LOG) << "Failed to initialize LZMA stream coder:" << err.message();
return false;
}
if (mode & QIODevice::WriteOnly) {
mBuffer.resize(BUFSIZ);
mCompressor->setOutputBuffer(mBuffer.data(), mBuffer.size());
}
return QIODevice::open(mode);
}
void CompressionStream::close()
{
if (!isOpen()) {
return;
}
if (openMode() & QIODevice::WriteOnly && mResult == LZMA_OK) {
write(nullptr, 0);
}
mResult = mCompressor->finalize();
setOpenMode(QIODevice::NotOpen);
}
std::error_code CompressionStream::error() const
{
return mResult == LZMA_STREAM_END ? LZMA_OK : mResult;
}
bool CompressionStream::atEnd() const
{
return mResult == LZMA_STREAM_END && QIODevice::atEnd() && mStream->atEnd();
}
qint64 CompressionStream::readData(char *data, qint64 dataSize)
{
qint64 dataRead = 0;
if (mResult == LZMA_STREAM_END) {
return 0;
} else if (mResult != LZMA_OK) {
return -1;
}
mCompressor->setOutputBuffer(data, dataSize);
while (dataSize > 0) {
if (mCompressor->inputBufferAvailable() == 0) {
mBuffer.resize(BUFSIZ);
const auto compressedDataRead = mStream->read(mBuffer.data(), mBuffer.size());
if (compressedDataRead > 0) {
mCompressor->setInputBuffer(mBuffer.data(), compressedDataRead);
} else {
break;
}
}
mResult = mCompressor->inflate();
if (mResult != LZMA_OK && mResult != LZMA_STREAM_END) {
qCWarning(AKONADIPRIVATE_LOG) << "Error while decompressing LZMA stream:" << mResult.message();
break;
}
const auto decompressedDataRead = dataSize - mCompressor->outputBufferAvailable();
dataRead += decompressedDataRead;
dataSize -= decompressedDataRead;
if (mResult == LZMA_STREAM_END) {
if (mStream->atEnd()) {
break;
}
}
mCompressor->setOutputBuffer(data + dataRead, dataSize);
}
return dataRead;
}
qint64 CompressionStream::writeData(const char *data, qint64 dataSize)
{
if (mResult != LZMA_OK) {
return 0;
}
bool finish = (data == nullptr);
if (!finish) {
mCompressor->setInputBuffer(data, dataSize);
}
qint64 dataWritten = 0;
while (dataSize > 0 || finish) {
mResult = mCompressor->deflate(finish);
if (mResult != LZMA_OK && mResult != LZMA_STREAM_END) {
qCWarning(AKONADIPRIVATE_LOG) << "Error while compressing LZMA stream:" << mResult.message();
break;
}
if (mCompressor->inputBufferAvailable() == 0 || (mResult == LZMA_STREAM_END)) {
const auto wrote = dataSize - mCompressor->inputBufferAvailable();
dataWritten += wrote;
dataSize -= wrote;
if (dataSize > 0) {
mCompressor->setInputBuffer(data + dataWritten, dataSize);
}
}
if (mCompressor->outputBufferAvailable() == 0 || (mResult == LZMA_STREAM_END) || finish) {
const auto toWrite = mBuffer.size() - mCompressor->outputBufferAvailable();
if (toWrite > 0) {
const auto writtenSize = mStream->write(mBuffer.constData(), toWrite);
if (writtenSize != toWrite) {
qCWarning(AKONADIPRIVATE_LOG) << "Failed to write compressed data to output device:" << mStream->errorString();
setErrorString(QStringLiteral("Failed to write compressed data to output device."));
return 0;
}
}
if (mResult == LZMA_STREAM_END) {
Q_ASSERT(finish);
break;
}
mBuffer.resize(BUFSIZ);
mCompressor->setOutputBuffer(mBuffer.data(), mBuffer.size());
}
}
return dataWritten;
}
bool CompressionStream::isCompressed(QIODevice *data)
{
constexpr std::array<uchar, 6> magic = {0xfd, 0x37, 0x7a, 0x58, 0x5a, 0x00};
if (!data->isOpen() && !data->isReadable()) {
return false;
}
char buf[6] = {};
if (data->peek(buf, sizeof(buf)) != sizeof(buf)) {
return false;
}
return memcmp(magic.data(), buf, sizeof(buf)) == 0;
}
/*
SPDX-FileCopyrightText: 2020 Daniel Vrátil <dvratil@kde.org>
SPDX-License-Identifier: LGPL-2.0-or-later
*/
#ifndef AKONADI_COMPRESSIONSTREAM_H_
#define AKONADI_COMPRESSIONSTREAM_H_
#include "akonadiprivate_export.h"
#include <QIODevice>
#include <memory>
namespace Akonadi
{
class Compressor;
class AKONADIPRIVATE_EXPORT CompressionStream : public QIODevice
{
Q_OBJECT
public:
explicit CompressionStream(QIODevice *stream, QObject *parent = nullptr);
~CompressionStream() override;
bool open(QIODevice::OpenMode mode) override;
void close() override;
bool atEnd() const override;
std::error_code error() const;
static bool isCompressed(QIODevice *data);
protected:
qint64 readData(char *data, qint64 maxSize) override;
qint64 writeData(const char *data, qint64 maxSize) override;
private:
QIODevice *mStream = nullptr;
QByteArray mBuffer;
std::error_code mResult;
std::unique_ptr<Compressor> mCompressor;
};
} // namespace Akonadi
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment