Members of the KDE Community are recommended to subscribe to the kde-community mailing list at https://mail.kde.org/mailman/listinfo/kde-community to allow them to participate in important discussions and receive other important announcements

Simplify job management (only 1 parent job allowed), fixes some crashes and locks

parent 91d64a57
......@@ -1039,7 +1039,7 @@ void Bin::slotReloadClip()
// We need to set a temporary id before all outdated producers are replaced;
// TODO refac
// m_doc->getFileProperties(xml, currentItem->AbstractProjectItem::clipId(), 150, true);
pCore->jobManager()->startJob<LoadJob>({currentItem->AbstractProjectItem::clipId()}, {}, QString(), xml);
pCore->jobManager()->startJob<LoadJob>({currentItem->AbstractProjectItem::clipId()}, -1, QString(), xml);
}
}
}
......@@ -2105,7 +2105,7 @@ void Bin::gotProxy(const QString &id, const QString &path)
clip->setProducerProperty(QStringLiteral("kdenlive:proxy"), path);
QDomElement xml = clip->toXml(doc, true);
if (!xml.isNull()) {
pCore->jobManager()->startJob<LoadJob>({id}, {}, QString(), xml);
pCore->jobManager()->startJob<LoadJob>({id}, -1, QString(), xml);
}
}
}
......@@ -3015,7 +3015,7 @@ void Bin::reloadAllProducers()
clip->setClipStatus(AbstractProjectItem::StatusWaiting);
clip->discardAudioThumb();
// We need to set a temporary id before all outdated producers are replaced;
pCore->jobManager()->startJob<LoadJob>({clip->AbstractProjectItem::clipId()}, {}, QString(), xml);
pCore->jobManager()->startJob<LoadJob>({clip->AbstractProjectItem::clipId()}, -1, QString(), xml);
}
}
}
......@@ -3060,7 +3060,7 @@ bool Bin::addClip(QDomElement elem, const QString &clipId)
elem.setAttribute(QStringLiteral("checkProfile"), 1);
}
createClip(elem);
pCore->jobManager()->startJob<LoadJob>({producerId}, {}, QString(), elem);
pCore->jobManager()->startJob<LoadJob>({producerId}, -1, QString(), elem);
return true;
}
......
......@@ -289,16 +289,14 @@ void ProjectClip::reloadProducer(bool refreshOnly)
if (refreshOnly) {
// In that case, we only want a new thumbnail.
// We thus set up a thumb job. We must make sure that there is no pending LOADJOB
std::vector<int> parentJobs;
if (loadjobId != -1) parentJobs.push_back(loadjobId);
pCore->jobManager()->startJob<ThumbJob>({clipId()}, parentJobs, QString(), 150, -1, true);
pCore->jobManager()->startJob<ThumbJob>({clipId()}, loadjobId, QString(), 150, -1, true);
} else {
//TODO: check if another load job is running?
QDomDocument doc;
QDomElement xml = toXml(doc);
if (!xml.isNull()) {
pCore->jobManager()->startJob<LoadJob>({clipId()}, {}, QString(), xml);
pCore->jobManager()->startJob<LoadJob>({clipId()}, -1, QString(), xml);
}
}
}
......@@ -599,7 +597,7 @@ void ProjectClip::setProperties(const QMap<QString, QString> &properties, bool r
} else {
// A proxy was requested, make sure to keep original url
setProducerProperty(QStringLiteral("kdenlive:originalurl"), url());
pCore->jobManager()->startJob<ProxyJob>({clipId()}, {}, QString());
pCore->jobManager()->startJob<ProxyJob>({clipId()}, -1, QString());
}
} else if (properties.contains(QStringLiteral("resource")) || properties.contains(QStringLiteral("templatetext")) ||
properties.contains(QStringLiteral("autorotate"))) {
......
......@@ -527,9 +527,9 @@ bool ProjectItemModel::requestAddBinClip(QString &id, const QDomElement &descrip
bool res = addItem(new_clip, parentId, undo, redo);
qDebug() << "/////////// added " << res;
if (res) {
int loadJob = pCore->jobManager()->startJob<LoadJob>({id}, {}, QString(), description);
pCore->jobManager()->startJob<ThumbJob>({id}, {loadJob}, QString(), 150, 0, true);
pCore->jobManager()->startJob<AudioThumbJob>({id}, {loadJob}, QString());
int loadJob = pCore->jobManager()->startJob<LoadJob>({id}, -1, QString(), description);
pCore->jobManager()->startJob<ThumbJob>({id}, loadJob, QString(), 150, 0, true);
pCore->jobManager()->startJob<AudioThumbJob>({id}, loadJob, QString());
}
return res;
}
......@@ -558,8 +558,9 @@ bool ProjectItemModel::requestAddBinClip(QString &id, std::shared_ptr<Mlt::Produ
std::shared_ptr<ProjectClip> new_clip = ProjectClip::construct(id, m_blankThumb, std::static_pointer_cast<ProjectItemModel>(shared_from_this()), producer);
bool res = addItem(new_clip, parentId, undo, redo);
if (res) {
pCore->jobManager()->startJob<ThumbJob>({id}, {}, QString(), 150, -1, true);
pCore->jobManager()->startJob<AudioThumbJob>({id}, {}, QString());
int blocking = pCore->jobManager()->getBlockingJobId(id, AbstractClipJob::LOADJOB);
pCore->jobManager()->startJob<ThumbJob>({id}, blocking, QString(), 150, -1, true);
pCore->jobManager()->startJob<AudioThumbJob>({id}, blocking, QString());
}
return res;
}
......@@ -577,8 +578,8 @@ bool ProjectItemModel::requestAddBinSubClip(QString &id, int in, int out, const
std::shared_ptr<ProjectSubClip> new_clip = ProjectSubClip::construct(id, clip, std::static_pointer_cast<ProjectItemModel>(shared_from_this()), in, out, tc);
bool res = addItem(new_clip, parentId, undo, redo);
if (res) {
auto parentJobs = pCore->jobManager()->getPendingJobsIds(parentId, AbstractClipJob::LOADJOB);
pCore->jobManager()->startJob<ThumbJob>({id}, parentJobs, QString(), 150, -1, true);
int parentJob = pCore->jobManager()->getBlockingJobId(parentId, AbstractClipJob::LOADJOB);
pCore->jobManager()->startJob<ThumbJob>({id}, parentJob, QString(), 150, -1, true);
}
return res;
}
......
......@@ -229,7 +229,6 @@ void AudioThumbJob::updateFfmpegProgress()
bool AudioThumbJob::startJob()
{
qDebug() << "################### audiothumbjob START";
if (m_done) {
return true;
}
......
......@@ -43,6 +43,22 @@ JobManager::~JobManager()
slotCancelJobs();
}
int JobManager::getBlockingJobId(const QString &id, AbstractClipJob::JOBTYPE type)
{
READ_LOCK();
std::vector<int> result;
if (m_jobsByClip.count(id) > 0) {
for (int jobId : m_jobsByClip.at(id)) {
if (!m_jobs.at(jobId)->m_future.isFinished() && !m_jobs.at(jobId)->m_future.isCanceled()) {
if (type == AbstractClipJob::NOJOBTYPE || m_jobs.at(jobId)->m_type == type) {
return jobId;
}
}
}
}
return -1;
}
std::vector<int> JobManager::getPendingJobsIds(const QString &id, AbstractClipJob::JOBTYPE type)
{
READ_LOCK();
......@@ -189,18 +205,16 @@ void JobManager::slotCancelJobs()
}
}
void JobManager::createJob(std::shared_ptr<Job_t> job, const std::vector<int> &parents)
void JobManager::createJob(std::shared_ptr<Job_t> job)
{
qDebug() << "################### Created JOB" << job->m_id<<", TYPE: "<<job->m_type;
/*
// This thread wait mechanism was broken and caused a race condition locking the application
// so I switched to a simpler model
bool ok = false;
// wait for parents to finish
while (!ok) {
ok = true;
for (int p : parents) {
if (!m_jobs[p]->m_processed) {
ok = false;
break;
}
if (!m_jobs[p]->m_completionMutex.tryLock()) {
ok = false;
qDebug()<<"********\nWAITING FOR JOB COMPLETION MUTEX!!: "<<job->m_id<<" : "<<m_jobs[p]->m_id<<"="<<m_jobs[p]->m_type;
......@@ -213,9 +227,9 @@ void JobManager::createJob(std::shared_ptr<Job_t> job, const std::vector<int> &p
if (!ok) {
QThread::msleep(10);
}
}
qDebug() << "################### Create JOB STARTING" << job->m_id;
}*/
// connect progress signals
QReadLocker locker(&m_lock);
for (const auto &it : job->m_indices) {
size_t i = it.second;
auto binId = it.first;
......@@ -224,32 +238,26 @@ void JobManager::createJob(std::shared_ptr<Job_t> job, const std::vector<int> &p
pCore->projectItemModel()->onItemUpdated(binId, AbstractProjectItem::JobProgress);
});
}
QWriteLocker locker(&m_lock);
connect(&job->m_future, &QFutureWatcher<bool>::started, this, &JobManager::updateJobCount);
connect(&job->m_future, &QFutureWatcher<bool>::finished, [ this, id = job->m_id ]() { slotManageFinishedJob(id); });
connect(&job->m_future, &QFutureWatcher<bool>::canceled, [ this, id = job->m_id ]() { slotManageCanceledJob(id); });
qDebug() << "################### Create JOB READY TO EXEC" << job->m_id<<", JOBS: "<<job->m_job.size();
//AbstractClipJob::execute(job->m_job.front());
job->m_actualFuture = QtConcurrent::mapped(job->m_job, AbstractClipJob::execute);
job->m_future.setFuture(job->m_actualFuture);
qDebug() << "################### Create JOB READY EXEC DONE" << job->m_id;
//connect(&job->m_future, &QFutureWatcher<bool>::finished, this, &JobManager::updateJobCount);
//connect(&job->m_future, &QFutureWatcher<bool>::canceled, this, &JobManager::updateJobCount);
// In the unlikely event that the job finished before the signal connection was made, we check manually for finish and cancel
/*if (job->m_future.isFinished()) {
emit job->m_future.finished();
//emit job->m_future.finished();
slotManageFinishedJob(job->m_id);
}
if (job->m_future.isCanceled()) {
emit job->m_future.canceled();
//emit job->m_future.canceled();
slotManageCanceledJob(job->m_id);
}*/
}
void JobManager::slotManageCanceledJob(int id)
{
QWriteLocker locker(&m_lock);
QReadLocker locker(&m_lock);
Q_ASSERT(m_jobs.count(id) > 0);
if (m_jobs[id]->m_processed) return;
m_jobs[id]->m_processed = true;
......@@ -258,12 +266,13 @@ void JobManager::slotManageCanceledJob(int id)
for (const auto &it : m_jobs[id]->m_indices) {
pCore->projectItemModel()->onItemUpdated(it.first, AbstractProjectItem::JobStatus);
}
//TODO: delete child jobs
updateJobCount();
}
void JobManager::slotManageFinishedJob(int id)
{
qDebug() << "################### JOB finished" << id;
QWriteLocker locker(&m_lock);
QReadLocker locker(&m_lock);
Q_ASSERT(m_jobs.count(id) > 0);
if (m_jobs[id]->m_processed) return;
......@@ -275,7 +284,13 @@ void JobManager::slotManageFinishedJob(int id)
for (bool res : m_jobs[id]->m_future.future()) {
ok = ok && res;
}
if (!ok) return;
if (!ok) {
qDebug()<<" * * * ** * * *\nWARNING + + +\nJOB NOT CORRECT FINISH: "<<id<<"\n------------------------";
//TODO: delete child jobs
m_jobs[id]->m_completionMutex.unlock();
updateJobCount();
return;
}
Fun undo = []() { return true; };
Fun redo = []() { return true; };
for (const auto &j : m_jobs[id]->m_job) {
......@@ -286,10 +301,17 @@ void JobManager::slotManageFinishedJob(int id)
qDebug() << "ERROR: Job " << id << " failed";
m_jobs[id]->m_failed = true;
}
m_jobs[id]->m_completionMutex.unlock();
if (ok && !m_jobs[id]->m_undoString.isEmpty()) {
pCore->pushUndo(undo, redo, m_jobs[id]->m_undoString);
}
m_jobs[id]->m_completionMutex.unlock();
if (m_jobsByParents.count(id) > 0) {
std::vector<int> children = m_jobsByParents[id];
for (int cid : children) {
QtConcurrent::run(this, &JobManager::createJob, m_jobs[cid]);
}
m_jobsByParents.erase(id);
}
updateJobCount();
}
......
......@@ -83,15 +83,15 @@ public:
@param return the id of the created job
*/
template <typename T, typename... Args>
int startJob(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args);
int startJob(const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args);
// Same function, but we specify the function used to create a new job
template <typename T, typename... Args>
int startJob(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString,
int startJob(const std::vector<QString> &binIds, int parentId, QString undoString,
std::function<std::shared_ptr<T>(const QString &, Args...)> createFn, Args &&... args);
// Same function, but do not call prepareJob
template <typename T, typename... Args>
int startJob_noprepare(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args);
int startJob_noprepare(const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args);
/** @brief Discard specific job type for a clip.
* @param binId the clip id
......@@ -111,6 +111,7 @@ public:
* @param type The type of job that you want to query. Leave to NOJOBTYPE to match all
*/
std::vector<int> getPendingJobsIds(const QString &binId, AbstractClipJob::JOBTYPE type = AbstractClipJob::NOJOBTYPE);
int getBlockingJobId(const QString &id, AbstractClipJob::JOBTYPE type);
/** @brief Get the list of finished or cancelled job ids for given clip.
* @param binId the clip id
......@@ -137,7 +138,7 @@ public:
protected:
// Helper function to launch a given job.
// This has to be launched asynchrnously since it blocks until all parents are finished
void createJob(std::shared_ptr<Job_t> job, const std::vector<int> &parents);
void createJob(std::shared_ptr<Job_t> job);
void updateJobCount();
......@@ -161,6 +162,7 @@ private:
std::map<int, std::shared_ptr<Job_t>> m_jobs;
/** @brief List of all the jobs by clip. */
std::unordered_map<QString, std::vector<int>> m_jobsByClip;
std::unordered_map<int, std::vector<int>> m_jobsByParents;
signals:
void jobCount(int);
......
......@@ -23,13 +23,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <QtConcurrent>
#include <type_traits>
template <typename T, typename... Args>
int JobManager::startJob(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString,
int JobManager::startJob(const std::vector<QString> &binIds, int parentId, QString undoString,
std::function<std::shared_ptr<T>(const QString &, Args...)> createFn, Args &&... args)
{
static_assert(std::is_base_of<AbstractClipJob, T>::value, "Your job must inherit from AbstractClipJob");
QWriteLocker locker(&m_lock);
//QWriteLocker locker(&m_lock);
int jobId = m_currentId++;
std::shared_ptr<Job_t> job(new Job_t());
job->m_completionMutex.lock();
job->m_undoString = std::move(undoString);
job->m_id = jobId;
for (const auto &id : binIds) {
......@@ -39,13 +40,21 @@ int JobManager::startJob(const std::vector<QString> &binIds, const std::vector<i
job->m_type = job->m_job.back()->jobType();
m_jobsByClip[id].push_back(jobId);
}
job->m_completionMutex.lock();
m_lock.lockForWrite();
int insertionRow = static_cast<int>(m_jobs.size());
beginInsertRows(QModelIndex(), insertionRow, insertionRow);
Q_ASSERT(m_jobs.count(jobId) == 0);
m_jobs[jobId] = job;
endInsertRows();
QtConcurrent::run(this, &JobManager::createJob, job, parents);
m_lock.unlock();
if (parentId == -1 || m_jobs[parentId]->m_completionMutex.tryLock()) {
if (parentId != -1) {
m_jobs[parentId]->m_completionMutex.unlock();
}
QtConcurrent::run(this, &JobManager::createJob, job);
} else {
m_jobsByParents[parentId].push_back(jobId);
}
return jobId;
}
......@@ -82,31 +91,31 @@ struct dummy
template <typename T, bool Noprepare, typename... Args>
static typename std::enable_if<!Detect_prepareJob<T>::value || Noprepare, int>::type
exec(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args)
exec(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args)
{
auto defaultCreate = [](const QString &id, Args... local_args) { return AbstractClipJob::make<T>(id, std::forward<Args>(local_args)...); };
using local_createFn_t = std::function<std::shared_ptr<T>(const QString &, Args...)>;
return ptr->startJob<T, Args...>(binIds, parents, std::move(undoString), local_createFn_t(std::move(defaultCreate)), std::forward<Args>(args)...);
return ptr->startJob<T, Args...>(binIds, parentId, std::move(undoString), local_createFn_t(std::move(defaultCreate)), std::forward<Args>(args)...);
}
template <typename T, bool Noprepare, typename... Args>
static typename std::enable_if<Detect_prepareJob<T>::value && !Noprepare, int>::type
exec(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args)
exec(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args)
{
// For job stabilization, there is a custom preparation function
return T::prepareJob(ptr, binIds, parents, std::move(undoString), std::forward<Args>(args)...);
return T::prepareJob(ptr, binIds, parentId, std::move(undoString), std::forward<Args>(args)...);
}
};
} // namespace impl
template <typename T, typename... Args>
int JobManager::startJob(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args)
int JobManager::startJob(const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args)
{
return impl::dummy::exec<T, false, Args...>(shared_from_this(), binIds, parents, std::move(undoString), std::forward<Args>(args)...);
return impl::dummy::exec<T, false, Args...>(shared_from_this(), binIds, parentId, std::move(undoString), std::forward<Args>(args)...);
}
template <typename T, typename... Args>
int JobManager::startJob_noprepare(const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString, Args &&... args)
int JobManager::startJob_noprepare(const std::vector<QString> &binIds, int parentId, QString undoString, Args &&... args)
{
return impl::dummy::exec<T, true, Args...>(shared_from_this(), binIds, parents, std::move(undoString), std::forward<Args>(args)...);
return impl::dummy::exec<T, true, Args...>(shared_from_this(), binIds, parentId, std::move(undoString), std::forward<Args>(args)...);
}
......@@ -82,7 +82,7 @@ void SceneSplitJob::configureProfile()
}
// static
int SceneSplitJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString)
int SceneSplitJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString)
{
// Show config dialog
QScopedPointer<QDialog> d(new QDialog(QApplication::activeWindow()));
......@@ -102,7 +102,7 @@ int SceneSplitJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector
int markersType = ui.add_markers->isChecked() ? ui.marker_type->currentIndex() : -1;
bool subclips = ui.cut_scenes->isChecked();
return ptr->startJob_noprepare<SceneSplitJob>(binIds, parents, std::move(undoString), subclips, markersType);
return ptr->startJob_noprepare<SceneSplitJob>(binIds, parentId, std::move(undoString), subclips, markersType);
}
bool SceneSplitJob::commitResult(Fun &undo, Fun &redo)
......
......@@ -48,7 +48,7 @@ public:
// This is a special function that prepares the stabilize job for a given list of clips.
// Namely, it displays the required UI to configure the job and call startJob with the right set of parameters
// Then the job is automatically put in queue. Its id is returned
static int prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString);
static int prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString);
bool commitResult(Fun &undo, Fun &redo) override;
const QString getDescription() const override;
......
......@@ -80,7 +80,7 @@ std::unordered_set<QString> StabilizeJob::supportedFilters()
}
// static
int StabilizeJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString,
int StabilizeJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString,
const QString &filterName)
{
Q_ASSERT(supportedFilters().count(filterName) > 0);
......@@ -113,7 +113,7 @@ int StabilizeJob::prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<
// We are now all set to create the job. Note that we pass all the parameters directly through the lambda, hence there are no extra parameters to
// the function
using local_createFn_t = std::function<std::shared_ptr<StabilizeJob>(const QString &)>;
return ptr->startJob<StabilizeJob>(binIds, parents, std::move(undoString), local_createFn_t(std::move(createFn)));
return ptr->startJob<StabilizeJob>(binIds, parentId, std::move(undoString), local_createFn_t(std::move(createFn)));
}
}
return -1;
......
......@@ -49,7 +49,7 @@ public:
// This is a special function that prepares the stabilize job for a given list of clips.
// Namely, it displays the required UI to configure the job and call startJob with the right set of parameters
// Then the job is automatically put in queue. Its id is returned
static int prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, const std::vector<int> &parents, QString undoString,
static int prepareJob(std::shared_ptr<JobManager> ptr, const std::vector<QString> &binIds, int parentId, QString undoString,
const QString &filterName);
// Return the list of stabilization filters that we support
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment