Verified Commit 37f9fcae authored by Daniel Vrátil's avatar Daniel Vrátil 🤖
Browse files

(De)compress payload data during (de)serialization

The compression is completely transparent to clients, serializers and
Akonadi. The idea is that when serializing payload, we can compress
the serialized data using LZMA compression to save space. The data are
usually large enough to benefit from the compression and at the same
time small enough for the compression to not cause any significant
performance overhead.

In my local experiment, compressing each file in file_db_data reduced
the overall size by ~30%.

The only place where the compression aspects "leak" to the user is
regarding the Item and part sizes stored in Akonadi.

The change is backwards compatible, so it can handle uncompressed
payloads created before this change just fine. All newly created
or updated payloads will get compressed automatically. Eventually
a StorageJanitor task to compress th entire storage may be introduced,
but we may need some proper progress reporting for that, since it
may take a lot of time, even on fast SSD disk to compress all the
files in file_db_data (depending on size of the database).
parent 2bf3a5c9
......@@ -50,9 +50,8 @@ void InvalidateCacheJobTest::shouldClearPayload()
auto *fetchJob = new ItemFetchJob(Item(itemId), this);
fetchJob->fetchScope().fetchFullPayload();
AKVERIFYEXEC(fetchJob);
QCOMPARE(fetchJob->items().first().payloadData(), "testmailbody2");
QCOMPARE(fetchJob->items().first().payload<QByteArray>(), "testmailbody2");
// Invalidate cache
auto *invCacheJob = new InvalidateCacheJob(Collection(colId), this);
AKVERIFYEXEC(invCacheJob);
......@@ -61,13 +60,13 @@ void InvalidateCacheJobTest::shouldClearPayload()
fetchFromCacheJob->fetchScope().fetchFullPayload();
fetchFromCacheJob->fetchScope().setCacheOnly(true);
AKVERIFYEXEC(fetchFromCacheJob);
QVERIFY(fetchFromCacheJob->items().first().payloadData().isEmpty());
QVERIFY(fetchFromCacheJob->items().first().payload<QByteArray>().isEmpty());
// Fetch item from resource again
auto *fetchAgainJob = new ItemFetchJob(Item(itemId), this);
fetchAgainJob->fetchScope().fetchFullPayload();
AKVERIFYEXEC(fetchAgainJob);
QCOMPARE(fetchAgainJob->items().first().payloadData(), "testmailbody2");
QCOMPARE(fetchAgainJob->items().first().payload<QByteArray>(), "testmailbody2");
}
QTEST_AKONADIMAIN(InvalidateCacheJobTest)
......
......@@ -228,11 +228,11 @@ void ItemAppendTest::testItemSize_data()
Item i(QStringLiteral("application/octet-stream"));
i.setPayload(QByteArray("ABCD"));
QTest::newRow("auto size") << i << 4LL;
QTest::newRow("auto size") << i << 56LL;
i.setSize(3);
QTest::newRow("too small") << i << 4LL;
i.setSize(10);
QTest::newRow("too large") << i << 10LL;
QTest::newRow("too small") << i << 56LL;
i.setSize(100);
QTest::newRow("too large") << i << 100LL;
}
void ItemAppendTest::testItemSize()
......@@ -264,7 +264,7 @@ void ItemAppendTest::testItemMerge_data()
{
Item i1(QStringLiteral("application/octet-stream"));
i1.setPayload(QByteArray("ABCD"));
i1.setSize(4);
i1.setSize(56); // take compression into account
i1.setRemoteId(QStringLiteral("XYZ"));
i1.setGid(QStringLiteral("XYZ"));
i1.setFlag("TestFlag1");
......@@ -272,7 +272,7 @@ void ItemAppendTest::testItemMerge_data()
Item i2(QStringLiteral("application/octet-stream"));
i2.setPayload(QByteArray("DEFGH"));
i2.setSize(5);
i2.setSize(60); // the compression into account
i2.setRemoteId(QStringLiteral("XYZ"));
i2.setGid(QStringLiteral("XYZ"));
i2.setFlag("TestFlag2");
......@@ -287,7 +287,7 @@ void ItemAppendTest::testItemMerge_data()
{
Item i1(QStringLiteral("application/octet-stream"));
i1.setPayload(QByteArray("ABCD"));
i1.setSize(4);
i1.setSize(56); // take compression into account
i1.setRemoteId(QStringLiteral("RID2"));
i1.setGid(QStringLiteral("GID2"));
i1.setFlag("TestFlag1");
......
......@@ -102,25 +102,27 @@ void ItemStoreTest::testFlagChange()
void ItemStoreTest::testDataChange_data()
{
QTest::addColumn<QByteArray>("data");
QTest::newRow("simple") << QByteArray("testbody");
QTest::newRow("null") << QByteArray();
QTest::newRow("empty") << QByteArray("");
QTest::newRow("nullbyte") << QByteArray("\0", 1);
QTest::newRow("nullbyte2") << QByteArray("\0X", 2);
QTest::newRow("linebreaks") << QByteArray("line1\nline2\n\rline3\rline4\r\n");
QTest::newRow("linebreaks2") << QByteArray("line1\r\nline2\r\n\r\n");
QTest::newRow("linebreaks3") << QByteArray("line1\nline2");
QTest::addColumn<qint64>("expectedSize");
QTest::newRow("simple") << QByteArray("testbody") << 60LL;
QTest::newRow("null") << QByteArray() << 0LL;
QTest::newRow("empty") << QByteArray("") << 0LL;
QTest::newRow("nullbyte") << QByteArray("\0", 1) << 56LL;
QTest::newRow("nullbyte2") << QByteArray("\0X", 2) << 56LL;
QTest::newRow("linebreaks") << QByteArray("line1\nline2\n\rline3\rline4\r\n") << 80LL;
QTest::newRow("linebreaks2") << QByteArray("line1\r\nline2\r\n\r\n") << 68LL;
QTest::newRow("linebreaks3") << QByteArray("line1\nline2") << 64LL;
QByteArray b;
QTest::newRow("big") << b.fill('a', 1 << 20);
QTest::newRow("bignull") << b.fill('\0', 1 << 20);
QTest::newRow("bigcr") << b.fill('\r', 1 << 20);
QTest::newRow("biglf") << b.fill('\n', 1 << 20);
QTest::newRow("big") << b.fill('a', 1 << 20) << 280LL;
QTest::newRow("bignull") << b.fill('\0', 1 << 20) << 280LL;
QTest::newRow("bigcr") << b.fill('\r', 1 << 20) << 280LL;
QTest::newRow("biglf") << b.fill('\n', 1 << 20) << 280LL;
}
void ItemStoreTest::testDataChange()
{
QFETCH(QByteArray, data);
QFETCH(qint64, expectedSize);
Item item;
ItemFetchJob *prefetchjob = new ItemFetchJob(Item(1));
......@@ -144,7 +146,8 @@ void ItemStoreTest::testDataChange()
QCOMPARE(item.payload<QByteArray>(), data);
QEXPECT_FAIL("null", "STORE will not update item size on 0 sizes", Continue);
QEXPECT_FAIL("empty", "STORE will not update item size on 0 sizes", Continue);
QCOMPARE(item.size(), static_cast<qint64>(data.size()));
// Cannot compare with data.size() due to payload compression
QCOMPARE(item.size(), expectedSize);
}
void ItemStoreTest::testRemoteId_data()
......
......@@ -12,6 +12,7 @@
#include "protocolhelper_p.h"
#include "private/externalpartstorage_p.h"
#include "private/compressionstream_p.h"
#include "akonadicore_debug.h"
......@@ -79,14 +80,13 @@ void ItemSerializer::deserialize(Item &item, const QByteArray &label, const QByt
QBuffer buffer;
buffer.setData(data);
buffer.open(QIODevice::ReadOnly);
buffer.seek(0);
deserialize(item, label, buffer, version);
buffer.close();
} else {
QFile file;
if (storage == External) {
file.setFileName(ExternalPartStorage::resolveAbsolutePath(data));
} else {
} else if (storage == Foreign) {
file.setFileName(QString::fromUtf8(data));
}
......@@ -104,10 +104,36 @@ void ItemSerializer::deserialize(Item &item, const QByteArray &label, const QByt
/*static*/
void ItemSerializer::deserialize(Item &item, const QByteArray &label, QIODevice &data, int version)
{
if (!TypePluginLoader::defaultPluginForMimeType(item.mimeType())->deserialize(item, label, data, version)) {
auto *plugin = TypePluginLoader::defaultPluginForMimeType(item.mimeType());
const auto handleError = [&](QIODevice &device, bool compressed) {
device.seek(0);
QByteArray data;
if (compressed) {
CompressionStream decompressor(&device);
decompressor.open(QIODevice::ReadOnly);
data = decompressor.readAll();
} else {
data = device.readAll();
}
qCWarning(AKONADICORE_LOG) << "Unable to deserialize payload part:" << label << "in item" << item.id() << "collection" << item.parentCollection().id();
data.seek(0);
qCWarning(AKONADICORE_LOG) << "Payload data was: " << data.readAll();
qCWarning(AKONADICORE_LOG) << (compressed ? "Decompressed" : "") << "payload data was: " << data;
};
if (CompressionStream::isCompressed(&data)) {
CompressionStream decompressor(&data);
decompressor.open(QIODevice::ReadOnly);
if (!plugin->deserialize(item, label, decompressor, version)) {
handleError(decompressor, true);
}
if (decompressor.error()) {
qCWarning(AKONADICORE_LOG) << "Deserialization failed due to decompression error:" << QString::fromStdString(decompressor.error().message());
}
} else {
if (!plugin->deserialize(item, label, data, version)) {
handleError(data, false);
}
}
}
......@@ -129,7 +155,10 @@ void ItemSerializer::serialize(const Item &item, const QByteArray &label, QIODev
return;
}
ItemSerializerPlugin *plugin = TypePluginLoader::pluginForMimeTypeAndClass(item.mimeType(), item.availablePayloadMetaTypeIds());
plugin->serialize(item, label, data, version);
CompressionStream compressor(&data);
compressor.open(QIODevice::WriteOnly);
plugin->serialize(item, label, compressor, version);
}
void ItemSerializer::apply(Item &item, const Item &other)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment