diff --git a/CMakeLists.txt b/CMakeLists.txt index 7270f8eebe55641c857c69ad0fbbed78489464f1..8ab2e528ee2df715e0982d0d626ddd6e51678818 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,8 +6,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(Qt5_MIN_VERSION 5.12) -# Use ecm_generate_dbus_service_file + ecm_install_configured_files once bumped to 5.73+ -set(KF5_MIN_VERSION 5.66) +set(KF5_MIN_VERSION 5.76) find_package(ECM ${KF5_MIN_VERSION} CONFIG REQUIRED) set(CMAKE_MODULE_PATH ${ECM_MODULE_PATH} ${CMAKE_SOURCE_DIR}/cmake) @@ -42,6 +41,8 @@ set(KIOFUSE_SOURCES kiofusevfs.h kiofuseservice.cpp kiofuseservice.h + filejobpool.cpp + filejobpool.h kiofusenode.h) ecm_setup_version(PROJECT diff --git a/filejobpool.cpp b/filejobpool.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0f7b09718a38a8ac85d724175dca0dda4c04d54e --- /dev/null +++ b/filejobpool.cpp @@ -0,0 +1,307 @@ +/* + SPDX-FileCopyrightText: 2020 Alexander Saoutkin + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include + +#include "filejobpool.h" +#include "debug.h" + +void FileJobPool::acquireFileJob(QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, std::function callback) +{ + qDebug(KIOFUSE_LOG) << "Acquiring job for " << url << "with ino:" << ino; + auto lookup = m_pool.find(url.scheme()); + if(lookup == m_pool.end()) + { + qDebug(KIOFUSE_LOG) << "Creating map for " << url.scheme(); + const auto& [it, slots] = m_pool.insert(std::make_pair(url.scheme(), SlotsForScheme())); + if (!slots) + return callback(nullptr); + + lookup = it; + } + + if(getSpaceForJob(obj, url, ino, flags, offset, lookup->second, callback)) + return; + + // Using a unique_ptr here to let the lambda disconnect the connection itself + auto connection = std::make_unique(); + auto &conn = *connection; + conn = connect(this, &FileJobPool::jobReleased, [=, connection = std::move(connection)](auto &scheme, auto *fileJob) { + Q_UNUSED(fileJob); + if(scheme != url.scheme()) + return; // Definitely can't take a slot here. + + if(getSpaceForJob(obj, url, ino, flags, offset, lookup->second, callback)) + { + disconnect(*connection); + return; + } + }); +} + +void FileJobPool::releaseFileJob(const QString &scheme, KIO::FileJob * fileJob) +{ + // Disconnection is necessary as client is not expected to disconnect + // connections it has made to the FileJob. Seeming as they've "released" it, + // we don't want the slots they've previously connected to be called when + // the FileJob is reused. + auto lookup = m_pool.find(scheme); + if(lookup == m_pool.end()) + { + Q_UNREACHABLE(); + return; + } + + for(JobSlot &slot : lookup->second) + { + if(slot.job == fileJob) + { + fileJob->disconnect(slot.receiver); + fileJob->disconnect(this); + initConnections(slot); + slot.status = JobStatus::Idle; + qDebug(KIOFUSE_LOG) << "Releasing job" << slot.url << "with ino" << slot.ino; + emit jobReleased(scheme, fileJob); + return; + } + } + + Q_UNREACHABLE(); +} + +void FileJobPool::slotJobFinished(const QString &scheme, KIO::FileJob *fileJob) +{ + auto lookup = m_pool.find(scheme); + if(lookup == m_pool.end()) + { + Q_UNREACHABLE(); + return; + } + + for(JobSlot &slot : lookup->second) + { + if(slot.job == fileJob) + { + resetJobSlot(slot); + emit jobReleased(scheme, fileJob); + return; + } + } + + Q_UNREACHABLE(); +} + +void FileJobPool::initConnections(JobSlot &slot) +{ + connect(slot.job, &KIO::FileJob::result, this, [=] { + slotJobFinished(slot.url.scheme(), slot.job); + }); + connect(slot.job, &KIO::FileJob::data, this, [=, &slot] (KIO::Job* job, const QByteArray& data) { + Q_UNUSED(job); + slot.offset += data.size(); + qDebug(KIOFUSE_LOG) << "Adjusted offset to" << slot.offset << "for" << slot.url << "with ino: " << slot.ino; + }); + connect(slot.job, &KIO::FileJob::written, this, [=, &slot] (KIO::Job* job, KIO::filesize_t written) { + Q_UNUSED(job); + slot.offset += written; + qDebug(KIOFUSE_LOG) << "Adjusted offset to" << slot.offset << "for" << slot.url << "with ino: " << slot.ino; + }); +} + + +void FileJobPool::initJob(QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, JobSlot &element, std::function callback) +{ + element.status = JobStatus::Busy; + KIO::FileJob *fileJob = KIO::open(url, flags); + // In case opening fails + auto resConnection = std::make_unique(); + auto &resConn = *resConnection; + resConn = connect(fileJob, &KIO::FileJob::result, this, [=, &element, connection=std::move(resConnection)] (KJob *job) { + Q_UNUSED(job); + disconnect(*connection); + resetJobSlot(element); + callback(fileJob); // Send back job so that client can call job->error() + }); + // If opening succeeds + auto openConnection = std::make_unique(); + auto &openConn = *openConnection; + openConn = connect(fileJob, &KIO::FileJob::open, this, [=, &element, connection=std::move(openConnection)] (KIO::Job *job) { + Q_UNUSED(job); + disconnect(*connection); + element.job = fileJob; + element.receiver = obj; + element.url = url; + element.ino = ino; + element.flags = flags; + if (element.offset == offset) + { + initConnections(element); + return callback(element.job); + } + fileJob->seek(offset); + auto posConnection = std::make_unique(); + auto &posConn = *posConnection; + posConn = connect(fileJob, &KIO::FileJob::position, this, [=, &element, connection=std::move(posConnection)] (KIO::Job *job, KIO::filesize_t offset) { + Q_UNUSED(job); + qDebug(KIOFUSE_LOG) << "Adjusted offset to" << offset << "for " << element.url << "with" << element.ino; + fileJob->disconnect(*connection); // Remove open/position connection + initConnections(element); + element.offset = offset; + callback(element.job); + }); + }); +} + +bool FileJobPool::getSpaceForJob(QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, SlotsForScheme &jobList, std::function callback) +{ + // We wish to select a job with the following priority + // 1. A job that is idle, with the same ino, open mode flags and offset as desired. + // 2. A job that is idle, with the same ino, open mode flags as desired. + // 3. A job that is free. + // 4. A job that is idle. + // Otherwise we return nothing + std::optional> slot = std::nullopt; + qDebug(KIOFUSE_LOG) << url << flags << ino << offset; + for(JobSlot &element : jobList) + { + qDebug(KIOFUSE_LOG) << element.url << element.flags << element.ino << element.offset; + if(element.status == JobStatus::Idle) + { + if(element.ino == ino && element.flags & flags) + { + slot.emplace(element); + if(element.offset == offset) + break; + } + else if(!slot.has_value()) + slot.emplace(element); + } + else if(element.status == JobStatus::Free) + { + if (slot.has_value()) + { + auto &jobSlot = slot.value().get(); + if (jobSlot.status == JobStatus::Idle && jobSlot.ino != ino) + slot.emplace(element); + } + else + slot.emplace(element); + } + } + + if(!slot.has_value()) + return false; + + JobSlot &jobSlot = slot.value().get(); + if(jobSlot.status == JobStatus::Idle && (jobSlot.ino != ino || !(jobSlot.flags & flags))) + { + // It's idle but it's not open for the correct inode. Time to replace! + qDebug(KIOFUSE_LOG) << "Closing idle job to be replaced for " << url << "with ino:" << ino; + jobSlot.job->disconnect(jobSlot.receiver); // Don't want to call slotJobFinished() + jobSlot.job->disconnect(this); // Don't want to call slotJobFinished() + jobSlot.job->close(); + resetJobSlot(jobSlot); + } + + if(jobSlot.status == JobStatus::Free) + { + qDebug(KIOFUSE_LOG) << "Creating new job in free slot for " << url << "with ino:" << ino; + initJob(obj, url, ino, flags, offset, jobSlot, callback); + } + else if(jobSlot.status == JobStatus::Idle) + { + qDebug(KIOFUSE_LOG) << "Using existing job for" << url << "with ino:" << ino; + jobSlot.status = JobStatus::Busy; + if (jobSlot.offset == offset) + { + callback(jobSlot.job); + return true; + } + + jobSlot.job->seek(offset); + auto posConnection = std::make_unique(); + auto &posConn = *posConnection; + posConn = connect(jobSlot.job, &KIO::FileJob::position, [=, &jobSlot, posConnection = std::move(posConnection)] (KIO::Job *job, KIO::filesize_t offset) { + Q_UNUSED(job); + disconnect(*posConnection); + qDebug(KIOFUSE_LOG) << "Adjusted offset to" << offset << "for " << url << "with" << ino; + jobSlot.offset = offset; + callback(jobSlot.job); + }); + } + + return true; +} + +void FileJobPool::resetJobSlot(FileJobPool::JobSlot &slot) +{ + slot.job = nullptr; + slot.receiver = nullptr; + slot.url = QUrl(); + slot.ino = 0; + slot.status = JobStatus::Free; + slot.flags = QIODevice::NotOpen; + slot.offset = 0; +} + +void FileJobPool::closeJob(const QString &scheme, const fuse_ino_t ino) +{ + std::vector> slotList = jobsByIno(scheme, ino); + for(JobSlot &slot : slotList) + { + if(slot.status == JobStatus::Busy) + continue; + + slot.status = JobStatus::Busy; + slot.job->close(); + return; + } +} + +std::vector> FileJobPool::jobsByIno(const QString &scheme, const fuse_ino_t ino) +{ + std::vector> slotList; + auto lookup = m_pool.find(scheme); + if(lookup == m_pool.end()) + return slotList; + + // Prevent UB to make sure no reallocs happen: + // https://stackoverflow.com/questions/48471683/push-backs-to-a-stdvectorstdreference-wrappertype + slotList.reserve(lookup->second.size()); + for(JobSlot &slot : lookup->second) + { + if(slot.ino == ino) + slotList.push_back(std::ref(slot)); + } + + return slotList; +} + +void FileJobPool::closeAllJobs(std::function callback) +{ + const auto numSlots = std::make_shared(0); + for(auto &pair : m_pool) + { + for(JobSlot &slot : pair.second) + { + if(slot.job) + { + (*numSlots)++; + // TODO: Do we need to explicitly disconnect this? + connect(slot.job, qOverload(&KIO::FileJob::close), this, [=] () { + (*numSlots)--; + if(*numSlots == 0) + callback(0); + }); + slot.status = JobStatus::Busy; + slot.job->close(); + } + } + } + + if(*numSlots == 0) + callback(0); +} diff --git a/filejobpool.h b/filejobpool.h new file mode 100644 index 0000000000000000000000000000000000000000..8aad09a9726083e396b943c3bc75ec29dc1e2b7b --- /dev/null +++ b/filejobpool.h @@ -0,0 +1,105 @@ +/* + SPDX-FileCopyrightText: 2020 Alexander Saoutkin + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#pragma once + +#include +#include +#include +#include + + +#include +#include + +#include +#include + +/** + * This class stores a pool of FileJob objects, allowing clients to both acquire + * and release FileJob objects back into the pool. The client is only permitted + * to use the read()/write()/truncate() operations of the FileJob. + * + * DESIGN + * ------ + * When a client performs a read, write or truncation, it will attempt to + * acquire an available FileJob for the inode requested from the FileJobPool. + * The reason why the unique identifier is an inode, not a URL, is to not have + * special handling of renames. Acquisition only requires a URL as a parameter + * for when a FileJob needs to be opened to service the acquisition request. + * Otherwise, all other methods only require the URL scheme to avoid having to + * iterate all the keys (and subsequently, the values) of std::map, the data + * structure that represents the FileJob pool. + * + * If the client has completed its I/O operation successfully, it will release + * the FileJob back to the pool. Releasing of the FileJob is unnecessary if + * the FileJob reports an error whilst the client is using it or if acquisition + * failed. + * + * The class also makes connections to various signals of the FileJob to manage + * the state of the pool, such as maintaining the correct offset and + * automatically releasing the FileJob in case of failure whilst the client is + * using it. + * + * There are also some additional helper methods: + * + * flushAllJobs() is used to flush all data before closing the application. + * + * closeJob() is to make sure not too many FileJobs are hogging slaves that + * might be used by other apps or operations. + */ + +class FileJobPool : public QObject { + Q_OBJECT + +public: + /** Acquires a FileJob ready to use for a given inode, received via a callback. */ + void acquireFileJob (QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, std::function callback); + /** Releases a still running (i.e. not finished) FileJob back to the pool. */ + void releaseFileJob (const QString &scheme, KIO::FileJob *fileJob); + /** Closes a job for a given inode, if at least one exists. */ + void closeJob(const QString &scheme, const fuse_ino_t ino); + /** Flushes (by closing) all open FileJobs. */ + void closeAllJobs(std::function callback); + +private: + /** Possible states of a JobSlot */ + enum class JobStatus { + Busy, + Free, + Idle + }; + /** A slot that stores a FileJob and relevant metadata */ + struct JobSlot { + KIO::FileJob *job = nullptr; + QObject *receiver = nullptr; + std::vector connections = {}; + JobStatus status = JobStatus::Free; + QUrl url = {}; + fuse_ino_t ino = 0; // == KIOFuseIno::Invalid in kiofusevfs.h + QIODevice::OpenModeFlag flags = QIODevice::NotOpen; + KIO::filesize_t offset = 0; + }; + /** The pool of jobs, with a max of 5 jobs per URL scheme */ + using SlotsForScheme = std::array; + std::map m_pool; + + /** Invokes callback on error or when a FileJob has been successfully created in the given slot. */ + void initJob(QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, JobSlot &element, std::function callback); + /** Establishes necessary connections for a slot's FileJob */ + void initConnections(JobSlot& slot); + /** Resets a job slot.*/ + void resetJobSlot(JobSlot& slot); + /** Returns true if space has been acquired for a given ino, false otherwise. */ + bool getSpaceForJob(QObject *obj, const QUrl &url, const fuse_ino_t ino, QIODevice::OpenModeFlag flags, KIO::filesize_t offset, SlotsForScheme &jobList, std::function callback); + /** Returns a list of references to slots containing FileJobs for a given inode */ + std::vector> jobsByIno(const QString &scheme, const fuse_ino_t ino); +Q_SIGNALS: + /** Emitted when there is a slot with a newly free or idle job */ + void jobReleased(const QString &scheme, KIO::FileJob *fileJob); +private Q_SLOTS: + /** Use this slot to cleanup when a job is finished */ + void slotJobFinished(const QString &scheme, KIO::FileJob *fileJob); +}; diff --git a/kiofusevfs.cpp b/kiofusevfs.cpp index c9908c7d14c3ad1c6793c11ac01da10469a46e9b..53331169a2d6978fa1001265df407452e090ac49 100644 --- a/kiofusevfs.cpp +++ b/kiofusevfs.cpp @@ -255,6 +255,13 @@ void KIOFuseVFS::stop() needEventLoop = true; } + + if(m_useFileJob) + { + auto lockerPointer = std::make_shared(&loop); + m_fileJobPool.closeAllJobs([lp = std::move(lockerPointer)](int) {}); + needEventLoop = true; + } if(needEventLoop) loop.exec(); // Wait until all QEventLoopLockers got destroyed @@ -589,23 +596,27 @@ void KIOFuseVFS::setattr(fuse_req_t req, fuse_ino_t ino, struct stat *attr, int } else if ((to_set & FUSE_SET_ATTR_SIZE) && fileJobBasedFileNode) { - auto *fileJob = KIO::open(that->remoteUrl(fileJobBasedFileNode), QIODevice::ReadWrite); - connect(fileJob, &KIO::FileJob::result, [=] (auto *job) { - // All errors come through this signal, so error-handling is done here - if(job->error()) + const QUrl& url = that->remoteUrl(fileJobBasedFileNode); + that->m_fileJobPool.acquireFileJob(that, url, ino, QIODevice::ReadWrite, 0, [=] (KIO::FileJob * fileJob) { + if (!fileJob || fileJob->error()) { - sharedState->error = kioErrorToFuseError(job->error()); + sharedState->error = fileJob ? kioErrorToFuseError(fileJob->error()) : EIO; markOperationCompleted(FUSE_SET_ATTR_SIZE); + return; } - }); - connect(fileJob, &KIO::FileJob::open, [=] { - fileJob->truncate(sharedState->value.st_size); - connect(fileJob, &KIO::FileJob::truncated, [=] { - fileJob->close(); - connect(fileJob, qOverload(&KIO::FileJob::close), [=] { - fileJobBasedFileNode->m_stat.st_size = sharedState->value.st_size; + connect(fileJob, &KIO::FileJob::result, that, [=] (auto *job) { + // All errors come through this signal, so error-handling is done here + if(job->error()) + { + sharedState->error = kioErrorToFuseError(job->error()); markOperationCompleted(FUSE_SET_ATTR_SIZE); - }); + } + }); + fileJob->truncate(sharedState->value.st_size); + connect(fileJob, &KIO::FileJob::truncated, that, [=] { + that->m_fileJobPool.releaseFileJob(url.scheme(), fileJob); + fileJobBasedFileNode->m_stat.st_size = sharedState->value.st_size; + markOperationCompleted(FUSE_SET_ATTR_SIZE); }); }); } @@ -1218,52 +1229,36 @@ void KIOFuseVFS::read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, fu return; } - auto *fileJob = KIO::open(that->remoteUrl(remoteNode), QIODevice::ReadOnly); - connect(fileJob, &KIO::FileJob::result, [=] (auto *job) { - // All errors come through this signal, so error-handling is done here - if(job->error()) - fuse_reply_err(req, kioErrorToFuseError(job->error())); - }); - connect(fileJob, &KIO::FileJob::open, [=] { - fileJob->seek(off); - connect(fileJob, &KIO::FileJob::position, [=] (auto *job, KIO::filesize_t offset) { - Q_UNUSED(job); - if(off_t(offset) != off) + QUrl remoteUrl = that->remoteUrl(remoteNode); + that->m_fileJobPool.acquireFileJob(that, remoteUrl, ino, that->unixFlagtoQtFlag(fi->flags), off, [=] (KIO::FileJob * fileJob) { + if(!fileJob || fileJob->error()) + { + fuse_reply_err(req, fileJob ? kioErrorToFuseError(fileJob->error()) : EIO); + return; + } + connect(fileJob, &KIO::FileJob::result, that, [=] { + fuse_reply_err(req, kioErrorToFuseError(fileJob->error())); + }); + // Make sure to not read off the end + KIO::filesize_t actualSize = std::min(remoteNode->m_stat.st_size - off, off_t(size)); + fileJob->read(actualSize); + QByteArray buffer; + fileJob->connect(fileJob, &KIO::FileJob::data, that, [=] (auto *readJob, const QByteArray &data) mutable { + Q_UNUSED(readJob); + QByteArray truncatedData = data.left(actualSize); + buffer.append(truncatedData); + actualSize -= truncatedData.size(); + + if(actualSize > 0) { - fileJob->close(); - fileJob->connect(fileJob, qOverload(&KIO::FileJob::close), [=] { - fuse_reply_err(req, EIO); - }); + // Keep reading until we get all the data we need. + fileJob->read(actualSize); return; } - auto actualSize = remoteNode->m_stat.st_size = fileJob->size(); - // Reading over the end - if(off >= off_t(actualSize)) - actualSize = 0; - else - actualSize = std::min(off_t(actualSize) - off, off_t(size)); - fileJob->read(actualSize); - QByteArray buffer; - fileJob->connect(fileJob, &KIO::FileJob::data, [=] (auto *readJob, const QByteArray &data) mutable { - Q_UNUSED(readJob); - QByteArray truncatedData = data.left(actualSize); - buffer.append(truncatedData); - actualSize -= truncatedData.size(); - - if(actualSize > 0) - { - // Keep reading until we get all the data we need. - fileJob->read(actualSize); - return; - } - fileJob->close(); - fileJob->connect(fileJob, qOverload(&KIO::FileJob::close), [=] { - fuse_reply_buf(req, buffer.constData(), buffer.size()); - }); - }); + that->m_fileJobPool.releaseFileJob(remoteUrl.scheme(), fileJob); + fuse_reply_buf(req, buffer.constData(), buffer.size()); }); }); - return; } @@ -1340,47 +1335,36 @@ void KIOFuseVFS::write(fuse_req_t req, fuse_ino_t ino, const char *buf, size_t s } QByteArray data(buf, size); // Copy data - auto *fileJob = KIO::open(that->remoteUrl(remoteNode), QIODevice::ReadWrite); - connect(fileJob, &KIO::FileJob::result, [=] (auto *job) { - // All errors come through this signal, so error-handling is done here - if(job->error()) - fuse_reply_err(req, kioErrorToFuseError(job->error())); - }); - connect(fileJob, &KIO::FileJob::open, [=, fi_flags=fi->flags] { - off_t offset = (fi_flags & O_APPEND) ? fileJob->size() : off; - fileJob->seek(offset); - connect(fileJob, &KIO::FileJob::position, [=] (auto *job, KIO::filesize_t offset) { - Q_UNUSED(job); - if (off_t(offset) != off) { - fileJob->close(); - fileJob->connect(fileJob, qOverload(&KIO::FileJob::close), [=] { - fuse_reply_err(req, EIO); - }); + QUrl remoteUrl = that->remoteUrl(remoteNode); + off_t offset = (fi->flags & O_APPEND) ? remoteNode->m_stat.st_size : off; + that->m_fileJobPool.acquireFileJob(that, remoteUrl, ino, that->unixFlagtoQtFlag(fi->flags), offset, [=] (KIO::FileJob * fileJob) { + if (!fileJob || fileJob->error()) + { + fuse_reply_err(req, fileJob ? kioErrorToFuseError(fileJob->error()) : EIO); + return; + } + + connect(fileJob, &KIO::FileJob::result, that, [=] { + fuse_reply_err(req, kioErrorToFuseError(fileJob->error())); + }); + // Limit write to avoid killing the slave. + // @see https://phabricator.kde.org/D15448 + fileJob->write(data.left(0xFFFFFF)); + off_t bytesLeft = size; + fileJob->connect(fileJob, &KIO::FileJob::written, that, [=] (auto *writeJob, KIO::filesize_t written) mutable { + Q_UNUSED(writeJob); + bytesLeft -= written; + if (bytesLeft > 0) + { + // Keep writing until we write all the data we need. + fileJob->write(data.mid(size - bytesLeft, 0xFFFFFF)); return; } - // Limit write to avoid killing the slave. - // @see https://phabricator.kde.org/D15448 - fileJob->write(data.left(0xFFFFFF)); - off_t bytesLeft = size; - fileJob->connect(fileJob, &KIO::FileJob::written, [=] (auto *writeJob, KIO::filesize_t written) mutable { - Q_UNUSED(writeJob); - bytesLeft -= written; - if (bytesLeft > 0) - { - // Keep writing until we write all the data we need. - fileJob->write(data.mid(size - bytesLeft, 0xFFFFFF)); - return; - } - fileJob->close(); - fileJob->connect(fileJob, qOverload(&KIO::FileJob::close), [=] { - // Wait till we've flushed first... - remoteNode->m_stat.st_size = std::max(off_t(offset + data.size()), remoteNode->m_stat.st_size); - fuse_reply_write(req, data.size()); - }); - }); + that->m_fileJobPool.releaseFileJob(remoteUrl.scheme(), fileJob); + remoteNode->m_stat.st_size = std::max(off_t(offset + data.size()), remoteNode->m_stat.st_size); + fuse_reply_write(req, data.size()); }); }); - return; } @@ -1412,8 +1396,18 @@ void KIOFuseVFS::release(fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi) fuse_reply_err(req, 0); // Ignored anyway auto remoteFileNode = std::dynamic_pointer_cast(node); - if(node->m_openCount || !remoteFileNode || !remoteFileNode->m_localCache) + auto fileJobNode = std::dynamic_pointer_cast(node); + if(node->m_openCount) + return; + else if(fileJobNode) + { + // Make sure we don't leave too many open FileJobs + that->m_fileJobPool.closeJob(that->remoteUrl(node).scheme(), ino); + return; + } + else if(!remoteFileNode || !remoteFileNode->m_localCache) return; // Nothing to do + // When the cache is not dirty, remove the cache file. that->awaitNodeFlushed(remoteFileNode, [=](int error) { @@ -2496,3 +2490,18 @@ int KIOFuseVFS::kioErrorToFuseError(const int kioError) { default : return EIO; } } + +QIODevice::OpenModeFlag KIOFuseVFS::unixFlagtoQtFlag(const int flag) +{ + switch(flag & O_ACCMODE) + { + case O_RDONLY: + return QIODevice::ReadOnly; + case O_WRONLY: + return QIODevice::WriteOnly; + case O_RDWR: + return QIODevice::ReadWrite; + default: + return QIODevice::NotOpen; + } +} diff --git a/kiofusevfs.h b/kiofusevfs.h index 013144a1fb0fca100916fcb8a73a37bd49d2a209..5a7db79b5ca01ae6645d2468e0d5cb3735659814 100644 --- a/kiofusevfs.h +++ b/kiofusevfs.h @@ -18,6 +18,7 @@ #include #include "kiofusenode.h" +#include "filejobpool.h" // Forward declarations namespace KIO { class UDSEntry; } @@ -161,6 +162,8 @@ private: /** Returns the corresponding FUSE error to the given KIO Job error */ static int kioErrorToFuseError(const int kioError); + /** Returns the corresponding FUSE error to the given KIO Job error */ + QIODevice::OpenModeFlag unixFlagtoQtFlag(const int flag); /** Prevent the Application from quitting. */ std::unique_ptr m_eventLoopLocker; @@ -191,6 +194,8 @@ private: std::unordered_map> m_nodes; /** Set of all nodes with a dirty cache. */ std::set m_dirtyNodes; + /** Pool of KIO::FileJob */ + FileJobPool m_fileJobPool; /** @see setUseFileJob() */ bool m_useFileJob;