Browse Source

Split the subjob logic out of PropagateDirectory

Jocelyn Turcotte 9 years ago
parent
commit
072698d606
2 changed files with 149 additions and 74 deletions
  1. 101 52
      src/libsync/owncloudpropagator.cpp
  2. 48 22
      src/libsync/owncloudpropagator.h

+ 101 - 52
src/libsync/owncloudpropagator.cpp

@@ -349,7 +349,6 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
 
         if (item->_isDirectory) {
             PropagateDirectory *dir = new PropagateDirectory(this, item);
-            dir->_firstJob.reset(createJob(item));
 
             if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE
                     && item->_direction == SyncFileItem::Up) {
@@ -582,23 +581,21 @@ OwncloudPropagator *PropagatorJob::propagator() const
     return qobject_cast<OwncloudPropagator*>(parent());
 }
 
-PropagatorJob::JobParallelism PropagateDirectory::parallelism()
+// ================================================================================
+
+PropagatorJob::JobParallelism PropagatorCompositeJob::parallelism()
 {
     // If any of the non-finished sub jobs is not parallel, we have to wait
-    if (_firstJob && _firstJob->parallelism() != FullParallelism) {
-        return WaitForFinished;
-    }
-
     for (int i = 0; i < _subJobs.count(); ++i) {
         if (_subJobs.at(i)->parallelism() != FullParallelism) {
-            return WaitForFinished;
+            return _subJobs.at(i)->parallelism();
         }
     }
     return FullParallelism;
 }
 
 
-bool PropagateDirectory::scheduleNextJob()
+bool PropagatorCompositeJob::scheduleNextJob()
 {
     if (_state == Finished) {
         return false;
@@ -607,25 +604,14 @@ bool PropagateDirectory::scheduleNextJob()
     if (_state == NotYetStarted) {
         _state = Running;
 
-        if (!_firstJob && _subJobs.isEmpty()) {
-            finalize();
+        if (_subJobs.isEmpty()) {
+            _state = Finished;
+            emit finished(SyncFileItem::Success);
             return true;
         }
     }
 
-    if (_firstJob && _firstJob->_state == NotYetStarted) {
-        return possiblyRunNextJob(_firstJob.data());
-    }
-
-    if (_firstJob && _firstJob->_state == Running) {
-        return false;
-    }
-
     for (int i = 0; i < _subJobs.size(); ++i) {
-        if (_subJobs.at(i)->_state == Finished) {
-            continue;
-        }
-
         if (possiblyRunNextJob(_subJobs.at(i))) {
             return true;
         }
@@ -640,25 +626,18 @@ bool PropagateDirectory::scheduleNextJob()
     return false;
 }
 
-void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
+void PropagatorCompositeJob::slotSubJobFinished(SyncFileItem::Status status)
 {
     PropagatorJob *subJob = static_cast<PropagatorJob *>(sender());
     ASSERT(subJob);
 
     // Delete the job and remove it from our list of jobs.
     subJob->deleteLater();
-    bool wasFirstJob = false;
-    if (subJob == _firstJob.data()) {
-        wasFirstJob = true;
-        _firstJob.take();
-    } else {
-        int i = _subJobs.indexOf(subJob);
-        ASSERT(i >= 0);
-        _subJobs.remove(i);
-    }
+    int i = _subJobs.indexOf(subJob);
+    ASSERT(i >= 0);
+    _subJobs.remove(i);
 
-    if (status == SyncFileItem::FatalError ||
-            (wasFirstJob && status != SyncFileItem::Success && status != SyncFileItem::Restoration)) {
+    if (status == SyncFileItem::FatalError) {
         abort();
         _state = Finished;
         emit finished(status);
@@ -667,19 +646,96 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status)
         _hasError = status;
     }
 
-    // We finished processing all the jobs
-    // check if we finished
-    if (!_firstJob && _subJobs.isEmpty()) {
-        finalize();
+    // Check if we finished processing all the jobs.
+    if (_subJobs.isEmpty()) {
+        _state = Finished;
+        emit finished(_hasError == SyncFileItem::NoStatus ?  SyncFileItem::Success : _hasError);
     } else {
         emit ready();
     }
 }
 
-void PropagateDirectory::finalize()
+qint64 PropagatorCompositeJob::committedDiskSpace() const
+{
+    qint64 needed = 0;
+    foreach (PropagatorJob* job, _subJobs) {
+        needed += job->committedDiskSpace();
+    }
+    return needed;
+}
+
+// ================================================================================
+
+PropagateDirectory::PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item)
+    : PropagatorJob(propagator)
+    , _item(item)
+    , _firstJob(propagator->createJob(item))
+    , _subJobs(propagator)
+{
+    if (_firstJob) {
+        connect(_firstJob.data(), SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotFirstJobFinished(SyncFileItem::Status)));
+        connect(_firstJob.data(), SIGNAL(itemCompleted(const SyncFileItemPtr &)), this, SIGNAL(itemCompleted(const SyncFileItemPtr &)));
+        connect(_firstJob.data(), SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
+        connect(_firstJob.data(), SIGNAL(ready()), this, SIGNAL(ready()));
+    }
+    connect(&_subJobs, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobsFinished(SyncFileItem::Status)));
+    connect(&_subJobs, SIGNAL(itemCompleted(const SyncFileItemPtr &)), this, SIGNAL(itemCompleted(const SyncFileItemPtr &)));
+    connect(&_subJobs, SIGNAL(progress(const SyncFileItem &,quint64)), this, SIGNAL(progress(const SyncFileItem &,quint64)));
+    connect(&_subJobs, SIGNAL(ready()), this, SIGNAL(ready()));
+}
+
+PropagatorJob::JobParallelism PropagateDirectory::parallelism()
+{
+    // If any of the non-finished sub jobs is not parallel, we have to wait
+    if (_firstJob && _firstJob->parallelism() != FullParallelism) {
+        return WaitForFinished;
+    }
+    if (_subJobs.parallelism() != FullParallelism) {
+        return WaitForFinished;
+    }
+    return FullParallelism;
+}
+
+
+bool PropagateDirectory::scheduleNextJob()
+{
+    if (_state == Finished) {
+        return false;
+    }
+
+    if (_state == NotYetStarted) {
+        _state = Running;
+    }
+
+    if (_firstJob && _firstJob->_state == NotYetStarted) {
+        return _firstJob->scheduleNextJob();
+    }
+
+    if (_firstJob && _firstJob->_state == Running) {
+        return false;
+    }
+
+    return _subJobs.scheduleNextJob();
+
+}
+
+void PropagateDirectory::slotFirstJobFinished(SyncFileItem::Status status)
 {
-    bool ok = true;
-    if (!_item->isEmpty() && _hasError == SyncFileItem::NoStatus) {
+    _firstJob.take()->deleteLater();
+
+    if (status != SyncFileItem::Success && status != SyncFileItem::Restoration) {
+        abort();
+        _state = Finished;
+        emit finished(status);
+        return;
+    }
+
+    emit ready();
+}
+
+void PropagateDirectory::slotSubJobsFinished(SyncFileItem::Status status)
+{
+    if (!_item->isEmpty() && status == SyncFileItem::Success) {
         if( !_item->_renameTarget.isEmpty() ) {
             if(_item->_instruction == CSYNC_INSTRUCTION_RENAME
                     && _item->_originalFile != _item->_renameTarget) {
@@ -703,26 +759,19 @@ void PropagateDirectory::finalize()
                 }
             }
             SyncJournalFileRecord record(*_item,  propagator()->_localDir + _item->_file);
-            ok = propagator()->_journal->setFileRecordMetadata(record);
+            bool ok = propagator()->_journal->setFileRecordMetadata(record);
             if (!ok) {
-                _hasError = _item->_status = SyncFileItem::FatalError;
+                status = _item->_status = SyncFileItem::FatalError;
                 _item->_errorString = tr("Error writing metadata to the database");
                 qWarning() << "Error writing to the database for file" << _item->_file;
             }
         }
     }
     _state = Finished;
-    emit finished(_hasError == SyncFileItem::NoStatus ?  SyncFileItem::Success : _hasError);
+    emit finished(status);
 }
 
-qint64 PropagateDirectory::committedDiskSpace() const
-{
-    qint64 needed = 0;
-    foreach (PropagatorJob* job, _subJobs) {
-        needed += job->committedDiskSpace();
-    }
-    return needed;
-}
+// ================================================================================
 
 CleanupPollsJob::~CleanupPollsJob()
 {}

+ 48 - 22
src/libsync/owncloudpropagator.h

@@ -170,30 +170,22 @@ public slots:
     virtual void start() = 0;
 };
 
-
 /**
- * @brief Propagate a directory, and all its sub entries.
+ * @brief Job that runs subjobs. It becomes finished only when all subjobs are finished.
  * @ingroup libsync
  */
-class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
+class PropagatorCompositeJob : public PropagatorJob {
     Q_OBJECT
 public:
-    // e.g: create the directory
-    QScopedPointer<PropagateItemJob>_firstJob;
-
-    // all the sub files or sub directories.
     QVector<PropagatorJob *> _subJobs;
-
-    SyncFileItemPtr _item;
-
     SyncFileItem::Status _hasError;  // NoStatus,  or NormalError / SoftError if there was an error
 
-    explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem))
+    explicit PropagatorCompositeJob(OwncloudPropagator *propagator)
         : PropagatorJob(propagator)
-        , _item(item), _hasError(SyncFileItem::NoStatus)
+        , _hasError(SyncFileItem::NoStatus)
     { }
 
-    virtual ~PropagateDirectory() {
+    virtual ~PropagatorCompositeJob() {
         qDeleteAll(_subJobs);
     }
 
@@ -204,18 +196,10 @@ public:
     virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
     virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
     virtual void abort() Q_DECL_OVERRIDE {
-        if (_firstJob)
-            _firstJob->abort();
         foreach (PropagatorJob *j, _subJobs)
             j->abort();
     }
 
-    void increaseAffectedCount() {
-        _firstJob->_item->_affectedItems++;
-    }
-
-    void finalize();
-
     qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
 
 private slots:
@@ -232,6 +216,48 @@ private slots:
     void slotSubJobFinished(SyncFileItem::Status status);
 };
 
+/**
+ * @brief Propagate a directory, and all its sub entries.
+ * @ingroup libsync
+ */
+class OWNCLOUDSYNC_EXPORT PropagateDirectory : public PropagatorJob {
+    Q_OBJECT
+public:
+    SyncFileItemPtr _item;
+    // e.g: create the directory
+    QScopedPointer<PropagateItemJob>_firstJob;
+
+    PropagatorCompositeJob _subJobs;
+
+    explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItemPtr &item = SyncFileItemPtr(new SyncFileItem));
+
+    void append(PropagatorJob *subJob) {
+        _subJobs.append(subJob);
+    }
+
+    virtual bool scheduleNextJob() Q_DECL_OVERRIDE;
+    virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
+    virtual void abort() Q_DECL_OVERRIDE {
+        if (_firstJob)
+            _firstJob->abort();
+        _subJobs.abort();
+    }
+
+    void increaseAffectedCount() {
+        _firstJob->_item->_affectedItems++;
+    }
+
+
+    qint64 committedDiskSpace() const Q_DECL_OVERRIDE {
+        return _subJobs.committedDiskSpace();
+    }
+
+private slots:
+
+    void slotFirstJobFinished(SyncFileItem::Status status);
+    void slotSubJobsFinished(SyncFileItem::Status status);
+};
+
 
 /**
  * @brief Dummy job that just mark it as completed and ignored
@@ -251,7 +277,6 @@ public:
 class OwncloudPropagator : public QObject {
     Q_OBJECT
 
-    PropagateItemJob *createJob(const SyncFileItemPtr& item);
     QScopedPointer<PropagateDirectory> _rootJob;
 
 public:
@@ -304,6 +329,7 @@ public:
     bool localFileNameClash(const QString& relfile);
     QString getFilePath(const QString& tmp_file_name) const;
 
+    PropagateItemJob *createJob(const SyncFileItemPtr& item);
     void abort() {
         _abortRequested.fetchAndStoreOrdered(true);
         if (_rootJob) {