Ver código fonte

schedule files upload after all other jobs have been completed

should allow smarter policy dedicated to optimizing files upload

Signed-off-by: Matthieu Gallien <matthieu.gallien@nextcloud.com>
Matthieu Gallien 4 anos atrás
pai
commit
87c583dcb6

+ 167 - 57
src/libsync/owncloudpropagator.cpp

@@ -48,6 +48,7 @@ namespace OCC {
 
 Q_LOGGING_CATEGORY(lcPropagator, "nextcloud.sync.propagator", QtInfoMsg)
 Q_LOGGING_CATEGORY(lcDirectory, "nextcloud.sync.propagator.directory", QtInfoMsg)
+Q_LOGGING_CATEGORY(lcRootDirectory, "nextcloud.sync.propagator.root.directory", QtInfoMsg)
 Q_LOGGING_CATEGORY(lcCleanupPolls, "nextcloud.sync.propagator.cleanuppolls", QtInfoMsg)
 
 qint64 criticalFreeSpaceLimit()
@@ -359,15 +360,13 @@ PropagateItemJob *OwncloudPropagator::createJob(const SyncFileItemPtr &item)
             job->setDeleteExistingFolder(deleteExisting);
             return job;
         } else {
-            PropagateUploadFileCommon *job = nullptr;
-            if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
-                // Item is above _initialChunkSize, thus will be classified as to be chunked
-                job = new PropagateUploadFileNG(this, item);
+            if (deleteExisting || !isDelayedUploadItem(item)) {
+                auto job = createUploadJob(item, deleteExisting);
+                return job.release();
             } else {
-                job = new PropagateUploadFileV1(this, item);
+                pushDelayedUploadTask(item);
+                return nullptr;
             }
-            job->setDeleteExisting(deleteExisting);
-            return job;
         }
     case CSYNC_INSTRUCTION_RENAME:
         if (item->_direction == SyncFileItem::Up) {
@@ -384,6 +383,33 @@ PropagateItemJob *OwncloudPropagator::createJob(const SyncFileItemPtr &item)
     return nullptr;
 }
 
+std::unique_ptr<PropagateUploadFileCommon> OwncloudPropagator::createUploadJob(SyncFileItemPtr item, bool deleteExisting)
+{
+    auto job = std::unique_ptr<PropagateUploadFileCommon>{};
+
+    if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
+        // Item is above _initialChunkSize, thus will be classified as to be chunked
+        job = std::make_unique<PropagateUploadFileNG>(this, item);
+    } else {
+        job = std::make_unique<PropagateUploadFileV1>(this, item);
+    }
+
+    job->setDeleteExisting(deleteExisting);
+
+    return job;
+}
+
+void OwncloudPropagator::pushDelayedUploadTask(SyncFileItemPtr item)
+{
+    _delayedTasks.push_back(item);
+}
+
+void OwncloudPropagator::resetDelayedUploadTasks()
+{
+    _scheduleDelayedTasks = false;
+    _delayedTasks.clear();
+}
+
 qint64 OwncloudPropagator::smallFileSize()
 {
     const qint64 smallFileSize = 100 * 1024; //default to 1 MB. Not dynamic right now.
@@ -419,6 +445,7 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
             items.end());
     }
 
+    resetDelayedUploadTasks();
     _rootJob.reset(new PropagateRootDirectory(this));
     QStack<QPair<QString /* directory name */, PropagateDirectory * /* job */>> directories;
     directories.push(qMakePair(QString(), _rootJob.data()));
@@ -474,56 +501,17 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
         }
 
         if (item->isDirectory()) {
-            auto *dir = new PropagateDirectory(this, item);
-
-            if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE
-                && item->_direction == SyncFileItem::Up) {
-                // Skip all potential uploads to the new folder.
-                // Processing them now leads to problems with permissions:
-                // checkForPermissions() has already run and used the permissions
-                // of the file we're about to delete to decide whether uploading
-                // to the new dir is ok...
-                foreach (const SyncFileItemPtr &item2, items) {
-                    if (item2->destination().startsWith(item->destination() + "/")) {
-                        item2->_instruction = CSYNC_INSTRUCTION_NONE;
-                        _anotherSyncNeeded = true;
-                    }
-                }
-            }
-
-            if (item->_instruction == CSYNC_INSTRUCTION_REMOVE) {
-                // We do the removal of directories at the end, because there might be moves from
-                // these directories that will happen later.
-                directoriesToRemove.prepend(dir);
-                removedDirectory = item->_file + "/";
-
-                // We should not update the etag of parent directories of the removed directory
-                // since it would be done before the actual remove (issue #1845)
-                // NOTE: Currently this means that we don't update those etag at all in this sync,
-                //       but it should not be a problem, they will be updated in the next sync.
-                for (int i = 0; i < directories.size(); ++i) {
-                    if (directories[i].second->_item->_instruction == CSYNC_INSTRUCTION_UPDATE_METADATA)
-                        directories[i].second->_item->_instruction = CSYNC_INSTRUCTION_NONE;
-                }
-            } else {
-                PropagateDirectory *currentDirJob = directories.top().second;
-                currentDirJob->appendJob(dir);
-            }
-            directories.push(qMakePair(item->destination() + "/", dir));
+            startDirectoryPropagation(item,
+                                      directories,
+                                      directoriesToRemove,
+                                      removedDirectory,
+                                      items);
         } else {
-            if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
-                // will delete directories, so defer execution
-                directoriesToRemove.prepend(createJob(item));
-                removedDirectory = item->_file + "/";
-            } else {
-                directories.top().second->appendTask(item);
-            }
-
-            if (item->_instruction == CSYNC_INSTRUCTION_CONFLICT) {
-                // This might be a file or a directory on the local side. If it's a
-                // directory we want to skip processing items inside it.
-                maybeConflictDirectory = item->_file + "/";
-            }
+            startFilePropagation(item,
+                                 directories,
+                                 directoriesToRemove,
+                                 removedDirectory,
+                                 maybeConflictDirectory);
         }
     }
 
@@ -537,6 +525,75 @@ void OwncloudPropagator::start(SyncFileItemVector &&items)
     scheduleNextJob();
 }
 
+void OwncloudPropagator::startDirectoryPropagation(const SyncFileItemPtr &item,
+                                                   QStack<QPair<QString, PropagateDirectory *>> &directories,
+                                                   QVector<PropagatorJob *> &directoriesToRemove,
+                                                   QString &removedDirectory,
+                                                   const SyncFileItemVector &items)
+{
+    auto directoryPropagationJob = std::make_unique<PropagateDirectory>(this, item);
+
+    if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE
+        && item->_direction == SyncFileItem::Up) {
+        // Skip all potential uploads to the new folder.
+        // Processing them now leads to problems with permissions:
+        // checkForPermissions() has already run and used the permissions
+        // of the file we're about to delete to decide whether uploading
+        // to the new dir is ok...
+        foreach (const SyncFileItemPtr &dirItem, items) {
+            if (dirItem->destination().startsWith(item->destination() + "/")) {
+                dirItem->_instruction = CSYNC_INSTRUCTION_NONE;
+                _anotherSyncNeeded = true;
+            }
+        }
+    }
+
+    if (item->_instruction == CSYNC_INSTRUCTION_REMOVE) {
+        // We do the removal of directories at the end, because there might be moves from
+        // these directories that will happen later.
+        directoriesToRemove.prepend(directoryPropagationJob.get());
+        removedDirectory = item->_file + "/";
+
+        // We should not update the etag of parent directories of the removed directory
+        // since it would be done before the actual remove (issue #1845)
+        // NOTE: Currently this means that we don't update those etag at all in this sync,
+        //       but it should not be a problem, they will be updated in the next sync.
+        for (int i = 0; i < directories.size(); ++i) {
+            if (directories[i].second->_item->_instruction == CSYNC_INSTRUCTION_UPDATE_METADATA) {
+                directories[i].second->_item->_instruction = CSYNC_INSTRUCTION_NONE;
+            }
+        }
+    } else {
+        const auto currentDirJob = directories.top().second;
+        currentDirJob->appendJob(directoryPropagationJob.get());
+    }
+    directories.push(qMakePair(item->destination() + "/", directoryPropagationJob.release()));
+}
+
+void OwncloudPropagator::startFilePropagation(const SyncFileItemPtr &item,
+                                              QStack<QPair<QString, PropagateDirectory *> > &directories,
+                                              QVector<PropagatorJob *> &directoriesToRemove,
+                                              QString &removedDirectory,
+                                              QString &maybeConflictDirectory)
+{
+    if (item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) {
+        // will delete directories, so defer execution
+        auto job = createJob(item);
+        if (job) {
+            directoriesToRemove.prepend(job);
+        }
+        removedDirectory = item->_file + "/";
+    } else {
+        directories.top().second->appendTask(item);
+    }
+
+    if (item->_instruction == CSYNC_INSTRUCTION_CONFLICT) {
+        // This might be a file or a directory on the local side. If it's a
+        // directory we want to skip processing items inside it.
+        maybeConflictDirectory = item->_file + "/";
+    }
+}
+
 const SyncOptions &OwncloudPropagator::syncOptions() const
 {
     return _syncOptions;
@@ -802,6 +859,21 @@ Result<Vfs::ConvertToPlaceholderResult, QString> OwncloudPropagator::staticUpdat
     return Vfs::ConvertToPlaceholderResult::Ok;
 }
 
+bool OwncloudPropagator::isDelayedUploadItem(const SyncFileItemPtr &item) const
+{
+    return !_scheduleDelayedTasks && !item->_isEncrypted;
+}
+
+void OwncloudPropagator::setScheduleDelayedTasks(bool active)
+{
+    _scheduleDelayedTasks = active;
+}
+
+void OwncloudPropagator::clearDelayedTasks()
+{
+    _delayedTasks.clear();
+}
+
 // ================================================================================
 
 PropagatorJob::PropagatorJob(OwncloudPropagator *propagator)
@@ -1012,6 +1084,7 @@ void PropagateDirectory::slotFirstJobFinished(SyncFileItem::Status status)
             // Synchronously abort
             abort(AbortType::Synchronous);
             _state = Finished;
+            qCInfo(lcPropagator) << "PropagateDirectory::slotFirstJobFinished" << "emit finished" << status;
             emit finished(status);
         }
         return;
@@ -1054,6 +1127,7 @@ void PropagateDirectory::slotSubJobsFinished(SyncFileItem::Status status)
         }
     }
     _state = Finished;
+    qCInfo(lcPropagator) << "PropagateDirectory::slotSubJobsFinished" << "emit finished" << status;
     emit finished(status);
 }
 
@@ -1106,11 +1180,13 @@ qint64 PropagateRootDirectory::committedDiskSpace() const
 
 bool PropagateRootDirectory::scheduleSelfOrChild()
 {
+    qCInfo(lcRootDirectory()) << "scheduleSelfOrChild" << _state << "pending uploads" << propagator()->delayedTasks().size() << "subjobs state" << _subJobs._state;
+
     if (_state == Finished) {
         return false;
     }
 
-    if (PropagateDirectory::scheduleSelfOrChild()) {
+    if (PropagateDirectory::scheduleSelfOrChild() && propagator()->delayedTasks().empty()) {
         return true;
     }
 
@@ -1119,11 +1195,22 @@ bool PropagateRootDirectory::scheduleSelfOrChild()
         return false;
     }
 
+    if (!propagator()->delayedTasks().empty()) {
+        return scheduleDelayedJobs();
+    }
+
     return _dirDeletionJobs.scheduleSelfOrChild();
 }
 
 void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
 {
+    qCInfo(lcRootDirectory()) << status << "slotSubJobsFinished" << _state << "pending uploads" << propagator()->delayedTasks().size() << "subjobs state" << _subJobs._state;
+
+    if (!propagator()->delayedTasks().empty()) {
+        scheduleDelayedJobs();
+        return;
+    }
+
     if (status != SyncFileItem::Success
         && status != SyncFileItem::Restoration
         && status != SyncFileItem::Conflict) {
@@ -1131,6 +1218,7 @@ void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
             // Synchronously abort
             abort(AbortType::Synchronous);
             _state = Finished;
+            qCInfo(lcPropagator) << "PropagateRootDirectory::slotSubJobsFinished" << "emit finished" << status;
             emit finished(status);
         }
         return;
@@ -1142,9 +1230,21 @@ void PropagateRootDirectory::slotSubJobsFinished(SyncFileItem::Status status)
 void PropagateRootDirectory::slotDirDeletionJobsFinished(SyncFileItem::Status status)
 {
     _state = Finished;
+    qCInfo(lcPropagator) << "PropagateRootDirectory::slotDirDeletionJobsFinished" << "emit finished" << status;
     emit finished(status);
 }
 
+bool PropagateRootDirectory::scheduleDelayedJobs()
+{
+    qCInfo(lcPropagator) << "PropagateRootDirectory::scheduleDelayedJobs";
+    propagator()->setScheduleDelayedTasks(true);
+    auto bulkPropagatorJob = std::make_unique<BulkPropagatorJob>(propagator(), propagator()->delayedTasks());
+    propagator()->clearDelayedTasks();
+    _subJobs.appendJob(bulkPropagatorJob.release());
+    _subJobs._state = Running;
+    return _subJobs.scheduleSelfOrChild();
+}
+
 // ================================================================================
 
 CleanupPollsJob::~CleanupPollsJob() = default;
@@ -1203,4 +1303,14 @@ QString OwncloudPropagator::remotePath() const
 {
     return _remoteFolder;
 }
+
+BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const QVector<SyncFileItemPtr> &items)
+    : PropagatorCompositeJob(propagator)
+    , _items(items)
+{
+    for(const auto &oneItemJob : _items) {
+        appendTask(oneItemJob);
+    }
+    _items.clear();
+}
 }

+ 52 - 0
src/libsync/owncloudpropagator.h

@@ -374,6 +374,23 @@ public:
 private slots:
     void slotSubJobsFinished(SyncFileItem::Status status) override;
     void slotDirDeletionJobsFinished(SyncFileItem::Status status);
+
+private:
+
+    bool scheduleDelayedJobs();
+};
+
+class BulkPropagatorJob : public PropagatorCompositeJob
+{
+    Q_OBJECT
+public:
+
+    explicit BulkPropagatorJob(OwncloudPropagator *propagator,
+                               const QVector<SyncFileItemPtr> &items);
+
+private:
+
+    QVector<SyncFileItemPtr> _items;
 };
 
 /**
@@ -403,6 +420,8 @@ public:
     }
 };
 
+class PropagateUploadFileCommon;
+
 class OWNCLOUDSYNC_EXPORT OwncloudPropagator : public QObject
 {
     Q_OBJECT
@@ -429,6 +448,18 @@ public:
 
     void start(SyncFileItemVector &&_syncedItems);
 
+    void startDirectoryPropagation(const SyncFileItemPtr &item,
+                                   QStack<QPair<QString, PropagateDirectory*>> &directories,
+                                   QVector<PropagatorJob *> &directoriesToRemove,
+                                   QString &removedDirectory,
+                                   const SyncFileItemVector &items);
+
+    void startFilePropagation(const SyncFileItemPtr &item,
+                              QStack<QPair<QString, PropagateDirectory*>> &directories,
+                              QVector<PropagatorJob *> &directoriesToRemove,
+                              QString &removedDirectory,
+                              QString &maybeConflictDirectory);
+
     const SyncOptions &syncOptions() const;
     void setSyncOptions(const SyncOptions &syncOptions);
 
@@ -578,6 +609,17 @@ public:
     static Result<Vfs::ConvertToPlaceholderResult, QString> staticUpdateMetadata(const SyncFileItem &item, const QString localDir,
                                                                                  Vfs *vfs, SyncJournalDb * const journal);
 
+    Q_REQUIRED_RESULT bool isDelayedUploadItem(const SyncFileItemPtr &item) const;
+
+    Q_REQUIRED_RESULT const QVector<SyncFileItemPtr>& delayedTasks() const
+    {
+        return _delayedTasks;
+    }
+
+    void setScheduleDelayedTasks(bool active);
+
+    void clearDelayedTasks();
+
 private slots:
 
     void abortTimeout()
@@ -617,6 +659,13 @@ signals:
     void insufficientRemoteStorage();
 
 private:
+    std::unique_ptr<PropagateUploadFileCommon> createUploadJob(SyncFileItemPtr item,
+                                                               bool deleteExisting);
+
+    void pushDelayedUploadTask(SyncFileItemPtr item);
+
+    void resetDelayedUploadTasks();
+
     AccountPtr _account;
     QScopedPointer<PropagateRootDirectory> _rootJob;
     SyncOptions _syncOptions;
@@ -624,6 +673,9 @@ private:
 
     const QString _localDir; // absolute path to the local directory. ends with '/'
     const QString _remoteFolder; // remote folder, ends with '/'
+
+    QVector<SyncFileItemPtr> _delayedTasks;
+    bool _scheduleDelayedTasks = false;
 };
 
 

+ 3 - 0
test/syncenginetestutils.cpp

@@ -1061,6 +1061,9 @@ OCC::SyncFileItemPtr ItemCompletedSpy::findItem(const QString &path) const
 
 OCC::SyncFileItemPtr ItemCompletedSpy::findItemWithExpectedRank(const QString &path, int rank) const
 {
+    Q_ASSERT(size() > rank);
+    Q_ASSERT(!(*this)[rank].isEmpty());
+
     auto item = (*this)[rank][0].value<OCC::SyncFileItemPtr>();
     if (item->destination() == path) {
         return item;