Browse Source

Merge branch 'dynamic_chunking'

Christian Kamm 9 years ago
parent
commit
53c5f03c32

+ 43 - 4
src/gui/folder.cpp

@@ -645,19 +645,58 @@ void Folder::startSync(const QStringList &pathList)
     }
 
     setDirtyNetworkLimits();
+    setSyncOptions();
 
+    _engine->setIgnoreHiddenFiles(_definition.ignoreHiddenFiles);
+
+    QMetaObject::invokeMethod(_engine.data(), "startSync", Qt::QueuedConnection);
+
+    emit syncStarted();
+}
+
+void Folder::setSyncOptions()
+{
     SyncOptions opt;
     ConfigFile cfgFile;
+
     auto newFolderLimit = cfgFile.newBigFolderSizeLimit();
     opt._newBigFolderSizeLimit = newFolderLimit.first ? newFolderLimit.second * 1000LL * 1000LL : -1; // convert from MB to B
     opt._confirmExternalStorage = cfgFile.confirmExternalStorage();
-    _engine->setSyncOptions(opt);
 
-    _engine->setIgnoreHiddenFiles(_definition.ignoreHiddenFiles);
+    QByteArray chunkSizeEnv = qgetenv("OWNCLOUD_CHUNK_SIZE");
+    if (!chunkSizeEnv.isEmpty()) {
+        opt._initialChunkSize = chunkSizeEnv.toUInt();
+    } else {
+        opt._initialChunkSize = cfgFile.chunkSize();
+    }
+    QByteArray minChunkSizeEnv = qgetenv("OWNCLOUD_MIN_CHUNK_SIZE");
+    if (!minChunkSizeEnv.isEmpty()) {
+        opt._minChunkSize = minChunkSizeEnv.toUInt();
+    } else {
+        opt._minChunkSize = cfgFile.minChunkSize();
+    }
+    QByteArray maxChunkSizeEnv = qgetenv("OWNCLOUD_MAX_CHUNK_SIZE");
+    if (!maxChunkSizeEnv.isEmpty()) {
+        opt._maxChunkSize = maxChunkSizeEnv.toUInt();
+    } else {
+        opt._maxChunkSize = cfgFile.maxChunkSize();
+    }
 
-    QMetaObject::invokeMethod(_engine.data(), "startSync", Qt::QueuedConnection);
+    // Previously min/max chunk size values didn't exist, so users might
+    // have setups where the chunk size exceeds the new min/max default
+    // values. To cope with this, adjust min/max to always include the
+    // initial chunk size value.
+    opt._minChunkSize = qMin(opt._minChunkSize, opt._initialChunkSize);
+    opt._maxChunkSize = qMax(opt._maxChunkSize, opt._initialChunkSize);
 
-    emit syncStarted();
+    QByteArray targetChunkUploadDurationEnv = qgetenv("OWNCLOUD_TARGET_CHUNK_UPLOAD_DURATION");
+    if (!targetChunkUploadDurationEnv.isEmpty()) {
+        opt._targetChunkUploadDuration = targetChunkUploadDurationEnv.toUInt();
+    } else {
+        opt._targetChunkUploadDuration = cfgFile.targetChunkUploadDuration();
+    }
+
+    _engine->setSyncOptions(opt);
 }
 
 void Folder::setDirtyNetworkLimits()

+ 2 - 0
src/gui/folder.h

@@ -307,6 +307,8 @@ private:
 
     void checkLocalPath();
 
+    void setSyncOptions();
+
     enum LogStatus {
         LogStatusRemove,
         LogStatusRename,

+ 21 - 0
src/libsync/configfile.cpp

@@ -53,6 +53,9 @@ static const char updateCheckIntervalC[] = "updateCheckInterval";
 static const char geometryC[] = "geometry";
 static const char timeoutC[] = "timeout";
 static const char chunkSizeC[] = "chunkSize";
+static const char minChunkSizeC[] = "minChunkSize";
+static const char maxChunkSizeC[] = "maxChunkSize";
+static const char targetChunkUploadDurationC[] = "targetChunkUploadDuration";
 
 static const char proxyHostC[] = "Proxy/host";
 static const char proxyTypeC[] = "Proxy/type";
@@ -130,6 +133,24 @@ quint64 ConfigFile::chunkSize() const
     return settings.value(QLatin1String(chunkSizeC), 10*1000*1000).toLongLong(); // default to 10 MB
 }
 
+quint64 ConfigFile::maxChunkSize() const
+{
+    QSettings settings(configFile(), QSettings::IniFormat);
+    return settings.value(QLatin1String(maxChunkSizeC), 100*1000*1000).toLongLong(); // default to 100 MB
+}
+
+quint64 ConfigFile::minChunkSize() const
+{
+    QSettings settings(configFile(), QSettings::IniFormat);
+    return settings.value(QLatin1String(minChunkSizeC), 1000*1000).toLongLong(); // default to 1 MB
+}
+
+quint64 ConfigFile::targetChunkUploadDuration() const
+{
+    QSettings settings(configFile(), QSettings::IniFormat);
+    return settings.value(QLatin1String(targetChunkUploadDurationC), 60*1000).toLongLong(); // default to 1 minute
+}
+
 void ConfigFile::setOptionalDesktopNotifications(bool show)
 {
     QSettings settings(configFile(), QSettings::IniFormat);

+ 3 - 0
src/libsync/configfile.h

@@ -115,6 +115,9 @@ public:
 
     int timeout() const;
     quint64 chunkSize() const;
+    quint64 maxChunkSize() const;
+    quint64 minChunkSize() const;
+    quint64 targetChunkUploadDuration() const;
 
     void saveGeometry(QWidget *w);
     void restoreGeometry(QWidget *w);

+ 30 - 1
src/libsync/discoveryphase.h

@@ -35,12 +35,41 @@ class Account;
  */
 
 struct SyncOptions {
-    SyncOptions() : _newBigFolderSizeLimit(-1), _confirmExternalStorage(false) {}
+    SyncOptions()
+        : _newBigFolderSizeLimit(-1)
+        , _confirmExternalStorage(false)
+        , _initialChunkSize(10 * 1000 * 1000) // 10 MB
+        , _minChunkSize(1 * 1000 * 1000) // 1 MB
+        , _maxChunkSize(100 * 1000 * 1000) // 100 MB
+        , _targetChunkUploadDuration(60 * 1000) // 1 minute
+    {}
+
     /** Maximum size (in Bytes) a folder can have without asking for confirmation.
      * -1 means infinite */
     qint64 _newBigFolderSizeLimit;
+
     /** If a confirmation should be asked for external storages */
     bool _confirmExternalStorage;
+
+    /** The initial un-adjusted chunk size in bytes for chunked uploads
+     *
+     * When dynamic chunk size adjustments are done, this is the
+     * starting value and is then gradually adjusted within the
+     * minChunkSize / maxChunkSize bounds.
+     */
+    quint64 _initialChunkSize;
+
+    /** The minimum chunk size in bytes for chunked uploads */
+    quint64 _minChunkSize;
+
+    /** The maximum chunk size in bytes for chunked uploads */
+    quint64 _maxChunkSize;
+
+    /** The target duration of chunk uploads for dynamic chunk sizing.
+     *
+     * Set to 0 it will disable dynamic chunk sizing.
+     */
+    quint64 _targetChunkUploadDuration;
 };
 
 

+ 13 - 16
src/libsync/owncloudpropagator.cpp

@@ -369,7 +369,7 @@ PropagateItemJob* OwncloudPropagator::createJob(const SyncFileItemPtr &item) {
                 return job;
             } else {
                 PropagateUploadFileCommon *job = 0;
-                if (item->_size > chunkSize() && account()->capabilities().chunkingNg()) {
+                if (item->_size > _chunkSize && account()->capabilities().chunkingNg()) {
                     job = new PropagateUploadFileNG(this, item);
                 } else {
                     job = new PropagateUploadFileV1(this, item);
@@ -503,6 +503,17 @@ void OwncloudPropagator::start(const SyncFileItemVector& items)
     scheduleNextJob();
 }
 
+const SyncOptions& OwncloudPropagator::syncOptions() const
+{
+    return _syncOptions;
+}
+
+void OwncloudPropagator::setSyncOptions(const SyncOptions& syncOptions)
+{
+    _syncOptions = syncOptions;
+    _chunkSize = syncOptions._initialChunkSize;
+}
+
 // ownCloud server  < 7.0 did not had permissions so we need some other euristics
 // to detect wrong doing in a Shared directory
 bool OwncloudPropagator::isInSharedDirectory(const QString& file)
@@ -522,7 +533,7 @@ bool OwncloudPropagator::isInSharedDirectory(const QString& file)
 
 int OwncloudPropagator::httpTimeout()
 {
-    static int timeout;
+    static int timeout = 0;
     if (!timeout) {
         timeout = qgetenv("OWNCLOUD_TIMEOUT").toUInt();
         if (timeout == 0) {
@@ -534,20 +545,6 @@ int OwncloudPropagator::httpTimeout()
     return timeout;
 }
 
-quint64 OwncloudPropagator::chunkSize()
-{
-    static uint chunkSize;
-    if (!chunkSize) {
-        chunkSize = qgetenv("OWNCLOUD_CHUNK_SIZE").toUInt();
-        if (chunkSize == 0) {
-            ConfigFile cfg;
-            chunkSize = cfg.chunkSize();
-        }
-    }
-    return chunkSize;
-}
-
-
 bool OwncloudPropagator::localFileNameClash( const QString& relFile )
 {
     bool re = false;

+ 15 - 4
src/libsync/owncloudpropagator.h

@@ -29,6 +29,7 @@
 #include "syncjournaldb.h"
 #include "bandwidthmanager.h"
 #include "accountfwd.h"
+#include "discoveryphase.h"
 
 namespace OCC {
 
@@ -279,7 +280,6 @@ public:
     SyncJournalDb * const _journal;
     bool _finishedEmited; // used to ensure that finished is only emitted once
 
-
 public:
     OwncloudPropagator(AccountPtr account, const QString &localDir,
                        const QString &remoteFolder, SyncJournalDb *progressDb)
@@ -289,6 +289,7 @@ public:
             , _finishedEmited(false)
             , _bandwidthManager(this)
             , _anotherSyncNeeded(false)
+            , _chunkSize(10 * 1000 * 1000) // 10 MB, overridden in setSyncOptions
             , _account(account)
     { }
 
@@ -296,6 +297,9 @@ public:
 
     void start(const SyncFileItemVector &_syncedItems);
 
+    const SyncOptions& syncOptions() const;
+    void setSyncOptions(const SyncOptions& syncOptions);
+
     QAtomicInt _downloadLimit;
     QAtomicInt _uploadLimit;
     BandwidthManager _bandwidthManager;
@@ -315,6 +319,15 @@ public:
 
     /* the maximum number of jobs using bandwidth (uploads or downloads, in parallel) */
     int maximumActiveTransferJob();
+
+    /** The size to use for upload chunks.
+     *
+     * Will be dynamically adjusted after each chunk upload finishes
+     * if Capabilities::desiredChunkUploadDuration has a target
+     * chunk-upload duration set.
+     */
+    quint64 _chunkSize;
+
     /* The maximum number of active jobs in parallel  */
     int hardMaximumActiveJob();
 
@@ -355,9 +368,6 @@ public:
     // timeout in seconds
     static int httpTimeout();
 
-    /** returns the size of chunks in bytes  */
-    static quint64 chunkSize();
-
     AccountPtr account() const;
 
     enum DiskSpaceResult
@@ -404,6 +414,7 @@ private:
 
     AccountPtr _account;
     QScopedPointer<PropagateDirectory> _rootJob;
+    SyncOptions _syncOptions;
 
 #if QT_VERSION < QT_VERSION_CHECK(5, 0, 0)
     // access to signals which are protected in Qt4

+ 1 - 0
src/libsync/propagateupload.cpp

@@ -97,6 +97,7 @@ void PUTFileJob::start() {
         connect(_device, SIGNAL(wasReset()), this, SLOT(slotSoftAbort()));
 #endif
 
+    _requestTimer.start();
     AbstractNetworkJob::start();
 }
 

+ 10 - 4
src/libsync/propagateupload.h

@@ -19,6 +19,7 @@
 #include <QBuffer>
 #include <QFile>
 #include <QDebug>
+#include <QElapsedTimer>
 
 
 namespace OCC {
@@ -90,6 +91,7 @@ private:
     QMap<QByteArray, QByteArray> _headers;
     QString _errorString;
     QUrl _url;
+    QElapsedTimer _requestTimer;
 
 public:
     // Takes ownership of the device
@@ -123,6 +125,10 @@ public:
 
     virtual void slotTimeout() Q_DECL_OVERRIDE;
 
+    quint64 msSinceStart() const {
+        return _requestTimer.elapsed();
+    }
+
 
 signals:
     void finishedSignal();
@@ -201,7 +207,6 @@ protected:
     QByteArray _transmissionChecksum;
     QByteArray _transmissionChecksumType;
 
-
 public:
     PropagateUploadFileCommon(OwncloudPropagator* propagator,const SyncFileItemPtr& item)
         : PropagateItemJob(propagator, item), _finished(false), _deleteExisting(false) {}
@@ -276,7 +281,7 @@ private:
     int _chunkCount; /// Total number of chunks for this file
     int _transferId; /// transfer id (part of the url)
 
-    quint64 chunkSize() const { return propagator()->chunkSize(); }
+    quint64 chunkSize() const { return propagator()->syncOptions()._initialChunkSize; }
 
 
 public:
@@ -303,6 +308,7 @@ private:
     quint64 _sent; /// amount of data (bytes) that was already sent
     uint _transferId; /// transfer id (part of the url)
     int _currentChunk; /// Id of the next chunk that will be sent
+    quint64 _currentChunkSize; /// current chunk size
     bool _removeJobError; /// If not null, there was an error removing the job
 
     // Map chunk number with its size  from the PROPFIND on resume.
@@ -310,7 +316,6 @@ private:
     struct ServerChunkInfo { quint64 size; QString originalName; };
     QMap<int, ServerChunkInfo> _serverChunks;
 
-    quint64 chunkSize() const { return propagator()->chunkSize(); }
     /**
      * Return the URL of a chunk.
      * If chunk == -1, returns the URL of the parent folder containing the chunks
@@ -319,9 +324,10 @@ private:
 
 public:
     PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item) :
-        PropagateUploadFileCommon(propagator,item) {}
+        PropagateUploadFileCommon(propagator,item), _currentChunkSize(0) {}
 
     void doStartUpload() Q_DECL_OVERRIDE;
+
 private:
     void startNewUpload();
     void startNextChunk();

+ 37 - 5
src/libsync/propagateuploadng.cpp

@@ -272,10 +272,11 @@ void PropagateUploadFileNG::startNextChunk()
     quint64 fileSize = _item->_size;
     ENFORCE(fileSize >= _sent, "Sent data exceeds file size");
 
-    quint64 currentChunkSize = qMin(chunkSize(), fileSize - _sent);
+    // prevent situation that chunk size is bigger then required one to send
+    _currentChunkSize = qMin(propagator()->_chunkSize, fileSize - _sent);
 
-    if (currentChunkSize == 0) {
-        ASSERT(_jobs.isEmpty());
+    if (_currentChunkSize == 0) {
+        Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore
         _finished = true;
         // Finish with a MOVE
         QString destination = QDir::cleanPath(propagator()->account()->url().path() + QLatin1Char('/')
@@ -305,7 +306,7 @@ void PropagateUploadFileNG::startNextChunk()
     auto device = new UploadDevice(&propagator()->_bandwidthManager);
     const QString fileName = propagator()->getFilePath(_item->_file);
 
-    if (! device->prepareAndOpen(fileName, _sent, currentChunkSize)) {
+    if (! device->prepareAndOpen(fileName, _sent, _currentChunkSize)) {
         qDebug() << "ERR: Could not prepare upload device: " << device->errorString();
 
         // If the file is currently locked, we want to retry the sync
@@ -321,7 +322,7 @@ void PropagateUploadFileNG::startNextChunk()
     QMap<QByteArray, QByteArray> headers;
     headers["OC-Chunk-Offset"] = QByteArray::number(_sent);
 
-    _sent += currentChunkSize;
+    _sent += _currentChunkSize;
     QUrl url = chunkUrl(_currentChunk);
 
     // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing
@@ -394,6 +395,37 @@ void PropagateUploadFileNG::slotPutFinished()
     }
 
     ENFORCE(_sent <= _item->_size, "can't send more than size");
+
+    // Adjust the chunk size for the time taken.
+    //
+    // Dynamic chunk sizing is enabled if the server configured a
+    // target duration for each chunk upload.
+    double targetDuration = propagator()->syncOptions()._targetChunkUploadDuration;
+    if (targetDuration > 0) {
+        double uploadTime = job->msSinceStart();
+
+        auto predictedGoodSize = static_cast<quint64>(
+                _currentChunkSize / uploadTime * targetDuration);
+
+        // The whole targeting is heuristic. The predictedGoodSize will fluctuate
+        // quite a bit because of external factors (like available bandwidth)
+        // and internal factors (like number of parallel uploads).
+        //
+        // We use an exponential moving average here as a cheap way of smoothing
+        // the chunk sizes a bit.
+        quint64 targetSize = (propagator()->_chunkSize + predictedGoodSize) / 2;
+
+        propagator()->_chunkSize = qBound(
+                propagator()->syncOptions()._minChunkSize,
+                targetSize,
+                propagator()->syncOptions()._maxChunkSize);
+
+        qDebug() << "Chunked upload of" << _currentChunkSize << "bytes took" << uploadTime
+                 << "ms, desired is" << targetDuration << "ms, expected good chunk size is"
+                 << predictedGoodSize << "bytes and nudged next chunk size to "
+                 << propagator()->_chunkSize << "bytes";
+    }
+
     bool finished = _sent == _item->_size;
 
     // Check if the file still exists

+ 1 - 0
src/libsync/syncengine.cpp

@@ -1004,6 +1004,7 @@ void SyncEngine::slotDiscoveryJobFinished(int discoveryResult)
 
     _propagator = QSharedPointer<OwncloudPropagator>(
         new OwncloudPropagator (_account, _localPath, _remotePath, _journal));
+    _propagator->setSyncOptions(_syncOptions);
     connect(_propagator.data(), SIGNAL(itemCompleted(const SyncFileItemPtr &)),
             this, SLOT(slotItemCompleted(const SyncFileItemPtr &)));
     connect(_propagator.data(), SIGNAL(progress(const SyncFileItem &,quint64)),