Commit 46b5fb42 authored by Harald Sitter's avatar Harald Sitter 🏳🌈
Browse files

smb: fast copy

Summary:
see https://bugs.kde.org/show_bug.cgi?id=291835#c57 for background

- reading now happens inside a future. should be safe since we don't have
  any other threads doing anything while we wait.
- the future feeds into a buffer from which the main thread will
  take file segments and write them to disk
- buffer has 4 segments and synchronizes the threads via wait conditions
- the size of a segment is determined somewhat dynamically between 64kb
  and 4mb. the larger a file is the more it benefits from larger
  read requests

under perfect conditions this yields approximately mount-level copy
performance, unfortunately those are hard to hit so on average it's usually
less (somewhere in the range of 10 to 20% depending on the actual file
size and server type).

for many tiny files performance is about where it was before. the larger
the files get the greater the gains from this diff though.

specifically here's some samples I've taken:

- for downloads from windows10
  - 1G & 4G file
    - compared to 20.04 is ~77% faster
    - within 10% of windows
  - 8G file
    - compared to 20.04 is ~79% faster
    - within 5% of windows
- uploads to windows10
  - all sizes
    - compared to 20.04 is ~50% faster
    - now comparable performance to windows
- for remote-to-remote file copies from windows10 to smbd 4.11.6
  - 1000 x 5K files
    - no change, dreadfully slow, likely problem in KIO internals
  - 1G file
    - compared to 20.04 is ~45% faster
    - within 8% of windows
  - 4G file
    - compared to 20.04 is ~95% faster
    - and somehow 18% faster than windows (could be a fluke)

I've done transfers for 128M, 256M, 512M, 1G, 4G and partially 8G.
Differences not mentioned are either unchanged, negligible or in line with
documented trends.

BUG: 291835
FIXED-IN: 20.08

Test Plan:
- fallocate -l 1G file
- copy around
- copy kio-extras around

Reviewers: ngraham, cfeck, #frameworks, #dolphin

Subscribers: mmustac, meven, hallas, anthonyfieroni, asturmlechner, kde-frameworks-devel, kfm-devel

Tags: #dolphin, #frameworks

Differential Revision: https://phabricator.kde.org/D27504
parent 7e679b13
......@@ -19,6 +19,8 @@ add_feature_info("Internal KDSoapWSDiscoveryClient" INTERNAL_WSDCLIENT
add_feature_info("SMB DNS-SD Discovery" HAVE_KDNSSD_WITH_SIGNAL_RACE_PROTECTION
"Discover SMB hosts via DNS-SD/Avahi/Bonjour. KF5DNSSD >= 5.54 is required to support this.")
find_package(Threads REQUIRED)
add_definitions(-DTRANSLATION_DOMAIN=\"kio5_smb\")
include(CheckIncludeFile)
......@@ -47,6 +49,7 @@ set(kio_smb_PART_SRCS
wsdiscoverer.cpp
dnssddiscoverer.cpp
discovery.cpp
transfer.cpp
)
ecm_qt_declare_logging_category(kio_smb_PART_SRCS
......@@ -69,6 +72,7 @@ target_link_libraries(kio_smb_static
Qt5::Network
KF5::DNSSD
KDSoap::WSDiscoveryClient
Threads::Threads # std::async
)
# Final plugin target.
......
......@@ -11,6 +11,7 @@ include(ECMAddTests)
ecm_add_tests(
smburltest
transfertest
LINK_LIBRARIES
Qt5::Test
kio_smb_static
......
/*
SPDX-FileCopyrightText: 2020 Harald Sitter <sitter@kde.org>
SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#include <QTest>
#include <future>
#include "transfer.h"
class TransferTest : public QObject
{
Q_OBJECT
private Q_SLOTS:
void testSegmentOnSmallFile()
{
// Files smaller than our minimal segment size ought to be transferred in one go
// otherwise we have a chance of degrading performance.
QCOMPARE(TransferSegment(1).buf.size(), 1);
}
void testMaxSegment()
{
// Large files may only use up to a given maximum.
QCOMPARE(TransferSegment(512 * 1024 * 1024).buf.size(), c_maxSegmentSize);
}
void testIdealSegmentSize()
{
QCOMPARE(TransferSegment(64 * 1024 * 1024).buf.size(), 1342177);
}
void testSegment()
{
TransferSegment s(8);
QCOMPARE(s.buf.size(), 8);
memset(s.buf.data(), 1, 8);
QCOMPARE(s.buf.data()[0], 1);
}
void testRing()
{
TransferRingBuffer ring(8);
for (auto i = 0; i <= 32; ++i) {
{
auto s = ring.nextFree();
memset(s->buf.data(), i, 8);
ring.push();
}
{
auto s = ring.pop();
QCOMPARE(s->buf.data()[0], static_cast<char>(i));
ring.unpop();
}
}
}
void testRingThreadedSlowPush()
{
const auto runs = 127;
const auto fileSize = 8;
TransferRingBuffer ring(fileSize);
std::atomic<bool> abort(false);
auto pullFuture = std::async(std::launch::async, [&ring, &abort]() -> bool {
for (auto i = 0; i <= runs && !abort; ++i) {
auto s = ring.pop();
if (!QTest::qCompare(s->buf.data()[0], static_cast<char>(i),
qPrintable(QStringLiteral("On pull iteration %1").arg(i)), "",
__FILE__, __LINE__)) {
abort = true;
return false;
}
ring.unpop();
}
return true;
});
auto pushFuture = std::async(std::launch::async, [&ring, &abort]() -> bool {
for (auto i = 0; i <= runs && !abort; ++i) {
auto s = ring.nextFree();
memset(s->buf.data(), i, fileSize);
ring.push();
if (abort) {
ring.done();
return false;
}
// Slow down this thread to simulate slow network reads.
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
ring.done();
return true;
});
pushFuture.wait();
pullFuture.wait();
QVERIFY(pushFuture.get());
QVERIFY(pullFuture.get());
}
void testRingThreadedSlowPull()
{
const auto runs = 127;
const auto fileSize = 8;
TransferRingBuffer ring(fileSize);
std::atomic<bool> abort(false);
auto pullFuture = std::async(std::launch::async, [&ring, &abort]() -> bool {
for (auto i = 0; i <= runs && !abort; ++i) {
auto s = ring.pop();
if (!QTest::qCompare(s->buf.data()[0], static_cast<char>(i),
qPrintable(QStringLiteral("On pull iteration %1").arg(i)), "",
__FILE__, __LINE__)) {
abort = true;
}
// Slow down this thread to simulate slow local writes.
std::this_thread::sleep_for(std::chrono::milliseconds(5));
ring.unpop();
}
return true;
});
auto pushFuture = std::async(std::launch::async, [&ring, &abort]() -> bool {
for (auto i = 0; i <= runs && !abort; ++i) {
auto s = ring.nextFree();
memset(s->buf.data(), i, fileSize);
if (abort) {
ring.done();
return false;
}
ring.push();
}
ring.done();
return true;
});
pushFuture.wait();
pullFuture.wait();
QVERIFY(pushFuture.get());
QVERIFY(pullFuture.get());
}
};
QTEST_GUILESS_MAIN(TransferTest)
#include "transfertest.moc"
......@@ -79,7 +79,6 @@ extern "C" {
//---------------------------
#include "smburl.h"
#define MAX_XFER_BUF_SIZE 65534
using namespace KIO;
......
......@@ -39,6 +39,10 @@
#include <kconfiggroup.h>
#include <kio/ioslave_defaults.h>
#include <future>
#include "transfer.h"
void SMBSlave::copy(const QUrl &src, const QUrl &dst, int permissions, KIO::JobFlags flags)
{
const bool isSourceLocal = src.isLocalFile();
......@@ -55,25 +59,14 @@ void SMBSlave::copy(const QUrl &src, const QUrl &dst, int permissions, KIO::JobF
void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO::JobFlags flags)
{
SMBUrl src;
SMBUrl dst;
mode_t initialmode;
ssize_t n;
int dstflags;
int srcfd = -1;
int dstfd = -1;
int errNum = 0;
KIO::filesize_t processed_size = 0;
unsigned char buf[MAX_XFER_BUF_SIZE];
qCDebug(KIO_SMB_LOG) << "SMBSlave::copy with src = " << ksrc << "and dest = " << kdst;
// setup urls
src = ksrc;
dst = kdst;
SMBUrl src = ksrc;
SMBUrl dst = kdst;
// Obtain information about source
errNum = cache_stat(src, &st);
int errNum = cache_stat(src, &st);
if (errNum != 0) {
if (errNum == EACCES) {
error(KIO::ERR_ACCESS_DENIED, src.toDisplayString());
......@@ -86,7 +79,8 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO:
error(KIO::ERR_IS_DIRECTORY, src.toDisplayString());
return;
}
totalSize(st.st_size);
const auto srcSize = st.st_size;
totalSize(srcSize);
// Check to se if the destination exists
errNum = cache_stat(dst, &st);
......@@ -102,7 +96,7 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO:
}
// Open the source file
srcfd = smbc_open(src.toSmbcUrl(), O_RDONLY, 0);
int srcfd = smbc_open(src.toSmbcUrl(), O_RDONLY, 0);
if (srcfd < 0) {
errNum = errno;
} else {
......@@ -118,6 +112,8 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO:
return;
}
mode_t initialmode = 0;
// Determine initial creation mode
if (permissions != -1) {
initialmode = permissions | S_IWUSR;
......@@ -126,11 +122,11 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO:
}
// Open the destination file
dstflags = O_CREAT | O_TRUNC | O_WRONLY;
int dstflags = O_CREAT | O_TRUNC | O_WRONLY;
if (!(flags & KIO::Overwrite)) {
dstflags |= O_EXCL;
}
dstfd = smbc_open(dst.toSmbcUrl(), dstflags, initialmode);
int dstfd = smbc_open(dst.toSmbcUrl(), dstflags, initialmode);
if (dstfd < 0) {
errNum = errno;
} else {
......@@ -151,10 +147,15 @@ void SMBSlave::smbCopy(const QUrl &ksrc, const QUrl &kdst, int permissions, KIO:
}
// Perform copy
// TODO: if and when smb_context becomes thread-safe, use two contexts connected with
// a ring buffer to optimize transfer speed (also see smbCopyGet)
// https://bugzilla.samba.org/show_bug.cgi?id=11413
KIO::filesize_t processed_size = 0;
TransferSegment segment(srcSize);
while (true) {
n = smbc_read(srcfd, buf, MAX_XFER_BUF_SIZE);
ssize_t n = smbc_read(srcfd, segment.buf.data(), segment.buf.size());
if (n > 0) {
n = smbc_write(dstfd, buf, n);
n = smbc_write(dstfd, segment.buf.data(), n);
if (n == -1) {
qCDebug(KIO_SMB_LOG) << "SMBSlave::copy copy now KIO::ERR_CANNOT_WRITE";
error(KIO::ERR_CANNOT_WRITE, dst.toDisplayString());
......@@ -316,30 +317,49 @@ void SMBSlave::smbCopyGet(const QUrl &ksrc, const QUrl &kdst, int permissions, K
return;
}
// Perform the copy
char buf[MAX_XFER_BUF_SIZE];
bool isErr = false;
std::atomic<bool> isErr(false);
TransferRingBuffer buffer(st.st_size);
auto future = std::async(std::launch::async, [&buffer, &srcfd, &isErr]() -> int {
while (!isErr) {
TransferSegment *segment = buffer.nextFree();
segment->size = smbc_read(srcfd, segment->buf.data(), segment->buf.capacity());
if (segment->size <= 0) {
buffer.push();
buffer.done();
if (segment->size < 0) {
return KIO::ERR_COULD_NOT_READ;
}
break;
}
buffer.push();
}
return KJob::NoError;
});
while (true) {
const ssize_t bytesRead = smbc_read(srcfd, buf, MAX_XFER_BUF_SIZE);
if (bytesRead <= 0) {
if (bytesRead < 0) {
error(KIO::ERR_CANNOT_READ, src.toDisplayString());
isErr = true;
}
TransferSegment *segment = buffer.pop();
if (!segment) { // done, no more segments pending
break;
}
const qint64 bytesWritten = file.write(buf, bytesRead);
const qint64 bytesWritten = file.write(segment->buf.data(), segment->size);
if (bytesWritten == -1) {
qCDebug(KIO_SMB_LOG) << "copy now KIO::ERR_CANNOT_WRITE";
error(KIO::ERR_CANNOT_WRITE, kdst.toDisplayString());
isErr = true;
buffer.unpop();
break;
}
processed_size += bytesWritten;
processedSize(processed_size);
buffer.unpop();
}
if (isErr) { // writing failed
future.wait();
} else if (future.get() != KJob::NoError) { // check if read had an error
error(future.get(), ksrc.toDisplayString());
isErr = true;
}
// FINISHED
......@@ -499,10 +519,9 @@ void SMBSlave::smbCopyPut(const QUrl &ksrc, const QUrl &kdst, int permissions, K
if (processed_size == 0 || srcFile.seek(processed_size)) {
// Perform the copy
char buf[MAX_XFER_BUF_SIZE];
TransferSegment segment(srcInfo.size());
while (true) {
const ssize_t bytesRead = srcFile.read(buf, MAX_XFER_BUF_SIZE);
const ssize_t bytesRead = srcFile.read(segment.buf.data(), segment.buf.size());
if (bytesRead <= 0) {
if (bytesRead < 0) {
error(KIO::ERR_CANNOT_READ, ksrc.toDisplayString());
......@@ -511,7 +530,7 @@ void SMBSlave::smbCopyPut(const QUrl &ksrc, const QUrl &kdst, int permissions, K
break;
}
const qint64 bytesWritten = smbc_write(dstfd, buf, bytesRead);
const qint64 bytesWritten = smbc_write(dstfd, segment.buf.data(), bytesRead);
if (bytesWritten == -1) {
error(KIO::ERR_CANNOT_WRITE, kdst.toDisplayString());
isErr = true;
......
......@@ -37,19 +37,12 @@
#include <QMimeType>
#include <QVarLengthArray>
#include <future>
#include "transfer.h"
void SMBSlave::get(const QUrl &kurl)
{
char buf[MAX_XFER_BUF_SIZE];
int filefd = 0;
int errNum = 0;
ssize_t bytesread = 0;
// time_t curtime = 0;
// time_t lasttime = 0; // Disabled durint port to Qt5/KF5. Seems to be unused.
// time_t starttime = 0; // Disabled durint port to Qt5/KF5. Seems to be unused.
KIO::filesize_t totalbytesread = 0;
QByteArray filedata;
SMBUrl url;
qCDebug(KIO_SMB_LOG) << kurl;
// check (correct) URL
......@@ -65,8 +58,8 @@ void SMBSlave::get(const QUrl &kurl)
return;
// Stat
url = kurl;
errNum = cache_stat(url, &st);
SMBUrl url = kurl;
int errNum = cache_stat(url, &st);
if (errNum != 0) {
if (errNum == EACCES)
error(KIO::ERR_ACCESS_DENIED, url.toDisplayString());
......@@ -83,45 +76,65 @@ void SMBSlave::get(const QUrl &kurl)
totalSize(st.st_size);
// Open and read the file
filefd = smbc_open(url.toSmbcUrl(), O_RDONLY, 0);
if (filefd >= 0) {
bool isFirstPacket = true;
// lasttime = starttime = time(NULL); // This seems to be unused..
int filefd = smbc_open(url.toSmbcUrl(), O_RDONLY, 0);
if (filefd < 0) {
error(KIO::ERR_CANNOT_OPEN_FOR_READING, url.toDisplayString());
return;
}
KIO::filesize_t totalbytesread = 0;
QByteArray filedata;
bool isFirstPacket = true;
TransferRingBuffer buffer(st.st_size);
auto future = std::async(std::launch::async, [&buffer, &filefd]() -> int {
while (true) {
bytesread = smbc_read(filefd, buf, MAX_XFER_BUF_SIZE);
if (bytesread == 0) {
// All done reading
TransferSegment *s = buffer.nextFree();
s->size = smbc_read(filefd, s->buf.data(), s->buf.capacity());
if (s->size <= 0) {
buffer.push();
buffer.done();
if (s->size < 0) {
return KIO::ERR_COULD_NOT_READ;
}
break;
} else if (bytesread < 0) {
error(KIO::ERR_CANNOT_READ, url.toDisplayString());
return;
}
filedata = QByteArray::fromRawData(buf, bytesread);
if (isFirstPacket) {
QMimeDatabase db;
QMimeType type = db.mimeTypeForFileNameAndData(url.fileName(), filedata);
mimeType(type.name());
isFirstPacket = false;
}
data(filedata);
filedata.clear();
buffer.push();
}
return KJob::NoError;
});
// increment total bytes read
totalbytesread += bytesread;
while (true) {
TransferSegment *s = buffer.pop();
if (!s) { // done, no more segments pending
break;
}
processedSize(totalbytesread);
filedata = QByteArray::fromRawData(s->buf.data(), s->size);
if (isFirstPacket) {
QMimeDatabase db;
QMimeType type = db.mimeTypeForFileNameAndData(url.fileName(), filedata);
mimeType(type.name());
isFirstPacket = false;
}
data(filedata);
filedata.clear();
smbc_close(filefd);
data(QByteArray());
processedSize(static_cast<KIO::filesize_t>(st.st_size));
// increment total bytes read
totalbytesread += s->size;
} else {
error(KIO::ERR_CANNOT_OPEN_FOR_READING, url.toDisplayString());
return;
processedSize(totalbytesread);
buffer.unpop();
}
if (future.get() != KJob::NoError) { // check if read had an error
error(future.get(), url.toDisplayString());
}
smbc_close(filefd);
data(QByteArray());
#warning fixme this is potentially a lie
processedSize(static_cast<KIO::filesize_t>(st.st_size));
finished();
}
......
/*
SPDX-FileCopyrightText: 2020 Harald Sitter <sitter@kde.org>
SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#include "transfer.h"
#include <future>
TransferSegment::TransferSegment(const off_t fileSize)
: buf(segmentSizeForFileSize(fileSize))
{
}
off_t TransferSegment::segmentSizeForFileSize(const off_t fileSize_)
{
const off_t fileSize = qMax(0L, fileSize_);
// read() internally splits our read requests into multiple server
// requests and then assembles the responses into our buffer.
// The larger the chunks we request the better the performance.
// At the same time we'll want a semblence of progress reporting
// and also not eat too much RAM. It's a balancing act :|
off_t segmentSize = c_minSegmentSize;
// The segment size is largely arbitrary and sacrifices better throughput for
// greater memory use.
// This only goes up to a maxiumum because bigger transfer blobs directly
// translate to more RAM use. Mind that the effective RAM use will
// be (segmentSize * (segments + 1)). The +1 is because smbc internally will also
// allocate up to a full segment for one read() call.
//
// Unfortunately we have no way of knowing what size smbc will use for the
// network requests, so we can't use a multiple of that. Which means we'll
// almost never reach best performance.
//
// TODO: perhaps it would actually make sense to read at a multiple of
// the target drive's block size?
const off_t idealSegmentSize = qMin(fileSize / 50, c_maxSegmentSize);
segmentSize = qMax(segmentSize, idealSegmentSize);
// If the segment size is larger than the file size it appears we can
// actually degrade performance, so pick the smaller of the two.
if (fileSize != 0) {
segmentSize = qMin(segmentSize, fileSize);
}
return segmentSize;
}
TransferRingBuffer::TransferRingBuffer(const off_t fileSize)
{
for (size_t i = 0; i < m_capacity; ++i) {
m_buffer[i] = std::unique_ptr<TransferSegment>(new TransferSegment(fileSize));
}
}
TransferSegment *TransferRingBuffer::pop()
{
std::unique_lock<std::mutex> lock(m_mutex);
while (head == tail) {
if (!m_done) {
m_cond.wait(lock);
} else {
return nullptr;
}
}
auto segment = m_buffer[tail].get();
m_cond.notify_all();
return segment;
}
void TransferRingBuffer::unpop()
{
std::unique_lock<std::mutex> lock(m_mutex);
tail = ++tail % m_capacity;
m_cond.notify_all();
}
TransferSegment *TransferRingBuffer::nextFree()
{
// This does not require synchronization. As soon
// as we pushed the last item we gained exclusive lock
// on the new item.
m_cond.notify_all();
return m_buffer[head].get();
}
void TransferRingBuffer::push()
{
const auto newHead = (head + 1) % m_capacity;
std::unique_lock<std::mutex> lock(m_mutex);
while (newHead == tail) {
// do not move to the item the reading thread is on
m_cond.wait(lock);
}
head = newHead;
m_cond.notify_all();
}
void TransferRingBuffer::done()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_done = true;
m_cond.notify_all();
}
/*
SPDX-FileCopyrightText: 2020 Harald Sitter <sitter@kde.org>
SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
#ifndef TRANSFER_H
#define TRANSFER_H
#include <QtGlobal>
#include <QVarLengthArray>