Преглед изворни кода

Implement callback between wrapper and vfs object for hydration requests

This comes with a test simulating an open request coming from another
process (although in our case it's really just a thread). The actual
hydration works as expected by cfapi, handling of encrypted files is for
now missing.

Signed-off-by: Kevin Ottens <kevin.ottens@nextcloud.com>
Kevin Ottens пре 5 година
родитељ
комит
90fbb7d322

+ 1 - 0
src/libsync/CMakeLists.txt

@@ -64,6 +64,7 @@ set(libsync_SRCS
 if (WIN32)
     set(libsync_SRCS ${libsync_SRCS}
         vfs/cfapi/cfapiwrapper.cpp
+        vfs/cfapi/hydrationjob.cpp
         vfs/cfapi/vfs_cfapi.cpp
     )
     add_definitions(-D_WIN32_WINNT=_WIN32_WINNT_WIN10)

+ 108 - 3
src/libsync/vfs/cfapi/cfapiwrapper.cpp

@@ -15,21 +15,126 @@
 #include "cfapiwrapper.h"
 
 #include "common/utility.h"
+#include "vfs_cfapi.h"
 
 #include <QDir>
 #include <QFileInfo>
+#include <QLocalSocket>
 #include <QLoggingCategory>
 
 #include <cfapi.h>
 #include <comdef.h>
+#include <ntstatus.h>
 
 Q_LOGGING_CATEGORY(lcCfApiWrapper, "nextcloud.sync.vfs.cfapi.wrapper", QtInfoMsg)
 
+#define FIELD_SIZE( type, field ) ( sizeof( ( (type*)0 )->field ) )
+#define CF_SIZE_OF_OP_PARAM( field )                                           \
+    ( FIELD_OFFSET( CF_OPERATION_PARAMETERS, field ) +                         \
+      FIELD_SIZE( CF_OPERATION_PARAMETERS, field ) )
+
 namespace {
-void CALLBACK cfApiFetchDataCallback(_In_ CONST CF_CALLBACK_INFO* callbackInfo, _In_ CONST CF_CALLBACK_PARAMETERS* callbackParameters)
+void cfApiSendTransferInfo(const CF_CONNECTION_KEY &connectionKey, const CF_TRANSFER_KEY &transferKey, NTSTATUS status, void *buffer, qint64 offset, qint64 length)
+{
+
+    CF_OPERATION_INFO opInfo = { 0 };
+    CF_OPERATION_PARAMETERS opParams = { 0 };
+
+    opInfo.StructSize = sizeof(opInfo);
+    opInfo.Type = CF_OPERATION_TYPE_TRANSFER_DATA;
+    opInfo.ConnectionKey = connectionKey;
+    opInfo.TransferKey = transferKey;
+    opParams.ParamSize = CF_SIZE_OF_OP_PARAM(TransferData);
+    opParams.TransferData.CompletionStatus = status;
+    opParams.TransferData.Buffer = buffer;
+    opParams.TransferData.Offset.QuadPart = offset;
+    opParams.TransferData.Length.QuadPart = length;
+
+    const qint64 result = CfExecute(&opInfo, &opParams);
+    Q_ASSERT(result == S_OK);
+    if (result != S_OK) {
+        qCCritical(lcCfApiWrapper) << "Couldn't send transfer info" << QString::number(transferKey.QuadPart, 16) << ":" << _com_error(result).ErrorMessage();
+    }
+}
+
+
+void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS *callbackParameters)
 {
-    qCCritical(lcCfApiWrapper()) << "Got in!";
-    Q_ASSERT(false);
+    const auto sendTransferError = [=] {
+        cfApiSendTransferInfo(callbackInfo->ConnectionKey,
+                              callbackInfo->TransferKey,
+                              STATUS_UNSUCCESSFUL,
+                              nullptr,
+                              callbackParameters->FetchData.RequiredFileOffset.QuadPart,
+                              callbackParameters->FetchData.RequiredLength.QuadPart);
+    };
+
+    const auto sendTransferInfo = [=](QByteArray &data, qint64 offset) {
+        cfApiSendTransferInfo(callbackInfo->ConnectionKey,
+                              callbackInfo->TransferKey,
+                              STATUS_SUCCESS,
+                              data.data(),
+                              offset,
+                              data.length());
+    };
+
+    auto vfs = reinterpret_cast<OCC::VfsCfApi *>(callbackInfo->CallbackContext);
+    Q_ASSERT(vfs->metaObject()->className() == QByteArrayLiteral("OCC::VfsCfApi"));
+    const auto path = QString(QString::fromWCharArray(callbackInfo->VolumeDosName) + QString::fromWCharArray(callbackInfo->NormalizedPath));
+    const auto requestId = QString::number(callbackInfo->TransferKey.QuadPart, 16);
+
+    const auto invokeResult = QMetaObject::invokeMethod(vfs, [=] { vfs->requestHydration(requestId, path); }, Qt::QueuedConnection);
+    if (!invokeResult) {
+        qCCritical(lcCfApiWrapper) << "Failed to trigger hydration for" << path << requestId;
+        sendTransferError();
+        return;
+    }
+
+    // Block and wait for vfs to signal back the hydration is ready
+    bool hydrationRequestResult = false;
+    QEventLoop loop;
+    QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestReady, &loop, [&](const QString &id) {
+        if (requestId == id) {
+            hydrationRequestResult = true;
+            loop.quit();
+        }
+    });
+    QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFailed, &loop, [&](const QString &id) {
+        if (requestId == id) {
+            hydrationRequestResult = false;
+            loop.quit();
+        }
+    });
+    loop.exec();
+    qCInfo(lcCfApiWrapper) << "VFS replied for hydration of" << path << requestId << "status was:" << hydrationRequestResult;
+
+    if (!hydrationRequestResult) {
+        sendTransferError();
+        return;
+    }
+
+    QLocalSocket socket;
+    socket.connectToServer(requestId);
+    const auto connectResult = socket.waitForConnected();
+    if (!connectResult) {
+        qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << requestId << socket.error() << socket.errorString();
+        sendTransferError();
+        return;
+    }
+
+    qint64 offset = 0;
+    while (socket.waitForReadyRead()) {
+        auto data = socket.readAll();
+        if (data.isEmpty()) {
+            qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId;
+            sendTransferError();
+            break;
+        }
+        sendTransferInfo(data, offset);
+        offset += data.length();
+    }
+
+    qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId;
 }
 
 CF_CALLBACK_REGISTRATION cfApiCallbacks[] = {

+ 156 - 0
src/libsync/vfs/cfapi/hydrationjob.cpp

@@ -0,0 +1,156 @@
+/*
+ * Copyright (C) by Kevin Ottens <kevin.ottens@nextcloud.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "hydrationjob.h"
+
+#include "common/syncjournaldb.h"
+#include "propagatedownload.h"
+
+#include <QLocalServer>
+#include <QLocalSocket>
+
+Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg)
+
+OCC::HydrationJob::HydrationJob(QObject *parent)
+    : QObject(parent)
+{
+    connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater);
+}
+
+OCC::AccountPtr OCC::HydrationJob::account() const
+{
+    return _account;
+}
+
+void OCC::HydrationJob::setAccount(const AccountPtr &account)
+{
+    _account = account;
+}
+
+QString OCC::HydrationJob::remotePath() const
+{
+    return _remotePath;
+}
+
+void OCC::HydrationJob::setRemotePath(const QString &remotePath)
+{
+    _remotePath = remotePath;
+}
+
+QString OCC::HydrationJob::localPath() const
+{
+    return _localPath;
+}
+
+void OCC::HydrationJob::setLocalPath(const QString &localPath)
+{
+    _localPath = localPath;
+}
+
+OCC::SyncJournalDb *OCC::HydrationJob::journal() const
+{
+    return _journal;
+}
+
+void OCC::HydrationJob::setJournal(SyncJournalDb *journal)
+{
+    _journal = journal;
+}
+
+QString OCC::HydrationJob::requestId() const
+{
+    return _requestId;
+}
+
+void OCC::HydrationJob::setRequestId(const QString &requestId)
+{
+    _requestId = requestId;
+}
+
+QString OCC::HydrationJob::folderPath() const
+{
+    return _folderPath;
+}
+
+void OCC::HydrationJob::setFolderPath(const QString &folderPath)
+{
+    _folderPath = folderPath;
+}
+
+void OCC::HydrationJob::start()
+{
+    Q_ASSERT(_account);
+    Q_ASSERT(_journal);
+    Q_ASSERT(!_remotePath.isEmpty() && !_localPath.isEmpty());
+    Q_ASSERT(!_requestId.isEmpty() && !_folderPath.isEmpty());
+
+    Q_ASSERT(_remotePath.endsWith('/'));
+    Q_ASSERT(_localPath.endsWith('/'));
+    Q_ASSERT(!_folderPath.startsWith('/'));
+
+    _server = new QLocalServer(this);
+    const auto listenResult = _server->listen(_requestId);
+    if (!listenResult) {
+        qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath;
+        emitFinished();
+        return;
+    }
+
+    qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath;
+    connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
+}
+
+void OCC::HydrationJob::emitFinished()
+{
+    _socket->disconnectFromServer();
+    connect(_socket, &QLocalSocket::disconnected, this, [=]{
+        _socket->close();
+        emit finished(this);
+    });
+}
+
+void OCC::HydrationJob::onNewConnection()
+{
+    Q_ASSERT(!_socket);
+    Q_ASSERT(!_job);
+
+    qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath;
+    _socket = _server->nextPendingConnection();
+    _job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this);
+    connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished);
+    _job->start();
+}
+
+void OCC::HydrationJob::onGetFinished()
+{
+    qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->errorStatus() << _job->errorString();
+
+    if (_job->errorStatus() != SyncFileItem::NoStatus && _job->errorStatus() != SyncFileItem::Success) {
+        emitFinished();
+        return;
+    }
+
+    SyncJournalFileRecord record;
+    _journal->getFileRecord(_folderPath, &record);
+    Q_ASSERT(record.isValid());
+    if (!record.isValid()) {
+        qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
+        emitFinished();
+        return;
+    }
+
+    record._type = ItemTypeFile;
+    _journal->setFileRecord(record);
+    emitFinished();
+}

+ 75 - 0
src/libsync/vfs/cfapi/hydrationjob.h

@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) by Kevin Ottens <kevin.ottens@nextcloud.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+#pragma once
+
+#include <QObject>
+
+#include "account.h"
+
+class QLocalServer;
+class QLocalSocket;
+
+namespace OCC {
+class GETFileJob;
+class SyncJournalDb;
+
+class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
+{
+    Q_OBJECT
+public:
+    explicit HydrationJob(QObject *parent = nullptr);
+
+    AccountPtr account() const;
+    void setAccount(const AccountPtr &account);
+
+    QString remotePath() const;
+    void setRemotePath(const QString &remotePath);
+
+    QString localPath() const;
+    void setLocalPath(const QString &localPath);
+
+    SyncJournalDb *journal() const;
+    void setJournal(SyncJournalDb *journal);
+
+    QString requestId() const;
+    void setRequestId(const QString &requestId);
+
+    QString folderPath() const;
+    void setFolderPath(const QString &folderPath);
+
+    void start();
+
+signals:
+    void finished(HydrationJob *job);
+
+private:
+    void emitFinished();
+
+    void onNewConnection();
+    void onGetFinished();
+
+    AccountPtr _account;
+    QString _remotePath;
+    QString _localPath;
+    SyncJournalDb *_journal = nullptr;
+
+    QString _requestId;
+    QString _folderPath;
+
+    QLocalServer *_server = nullptr;
+    QLocalSocket *_socket = nullptr;
+    GETFileJob *_job = nullptr;
+};
+
+} // namespace OCC

+ 72 - 1
src/libsync/vfs/cfapi/vfs_cfapi.cpp

@@ -18,6 +18,7 @@
 #include <QFile>
 
 #include "cfapiwrapper.h"
+#include "hydrationjob.h"
 #include "syncfileitem.h"
 #include "filesystem.h"
 #include "common/syncjournaldb.h"
@@ -36,6 +37,7 @@ namespace OCC {
 class VfsCfApiPrivate
 {
 public:
+    QList<HydrationJob *> hydrationJobs;
     cfapi::ConnectionKey connectionKey;
 };
 
@@ -100,7 +102,7 @@ bool VfsCfApi::socketApiPinStateActionsShown() const
 
 bool VfsCfApi::isHydrating() const
 {
-    return false;
+    return !d->hydrationJobs.isEmpty();
 }
 
 Result<void, QString> VfsCfApi::updateMetadata(const QString &filePath, time_t modtime, qint64 size, const QByteArray &fileId)
@@ -260,12 +262,81 @@ Vfs::AvailabilityResult VfsCfApi::availability(const QString &folderPath)
     return AvailabilityError::NoSuchItem;
 }
 
+void VfsCfApi::requestHydration(const QString &requestId, const QString &path)
+{
+    qCInfo(lcCfApi) << "Received request to hydrate" << path << requestId;
+    const auto root = QDir::toNativeSeparators(params().filesystemPath);
+    Q_ASSERT(path.startsWith(root));
+
+    const auto relativePath = QDir::fromNativeSeparators(path.mid(root.length()));
+    const auto journal = params().journal;
+
+    // Set in the database that we should download the file
+    SyncJournalFileRecord record;
+    journal->getFileRecord(relativePath, &record);
+    if (!record.isValid()) {
+        qCInfo(lcCfApi) << "Couldn't hydrate, did not find file in db";
+        emit hydrationRequestFailed(requestId);
+        return;
+    }
+
+    if (!record.isVirtualFile()) {
+        qCInfo(lcCfApi) << "Couldn't hydrate, the file is not virtual";
+        emit hydrationRequestFailed(requestId);
+        return;
+    }
+
+    // This is impossible to handle with CfAPI since the file size is generally different
+    // between the encrypted and the decrypted file which would make CfAPI reject the hydration
+    // of the placeholder with decrypted data
+    if (record._isE2eEncrypted || !record._e2eMangledName.isEmpty()) {
+        qCInfo(lcCfApi) << "Couldn't hydrate, the file is E2EE this is not supported";
+        emit hydrationRequestFailed(requestId);
+        return;
+    }
+
+    // All good, let's hydrate now
+    scheduleHydrationJob(requestId, relativePath);
+}
+
 void VfsCfApi::fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus)
 {
     Q_UNUSED(systemFileName);
     Q_UNUSED(fileStatus);
 }
 
+void VfsCfApi::scheduleHydrationJob(const QString &requestId, const QString &folderPath)
+{
+    Q_ASSERT(std::none_of(std::cbegin(d->hydrationJobs), std::cend(d->hydrationJobs), [=](HydrationJob *job) {
+        return job->requestId() == requestId || job->folderPath() == folderPath;
+    }));
+
+    if (d->hydrationJobs.isEmpty()) {
+        emit beginHydrating();
+    }
+
+    auto job = new HydrationJob(this);
+    job->setAccount(params().account);
+    job->setRemotePath(params().remotePath);
+    job->setLocalPath(params().filesystemPath);
+    job->setJournal(params().journal);
+    job->setRequestId(requestId);
+    job->setFolderPath(folderPath);
+    connect(job, &HydrationJob::finished, this, &VfsCfApi::onHydrationJobFinished);
+    d->hydrationJobs << job;
+    job->start();
+    emit hydrationRequestReady(requestId);
+}
+
+void VfsCfApi::onHydrationJobFinished(HydrationJob *job)
+{
+    Q_ASSERT(d->hydrationJobs.contains(job));
+    d->hydrationJobs.removeAll(job);
+    if (d->hydrationJobs.isEmpty()) {
+        emit doneHydrating();
+    }
+}
+
 VfsCfApi::HydratationAndPinStates VfsCfApi::computeRecursiveHydrationAndPinStates(const QString &folderPath, const Optional<PinState> &basePinState)
 {
     Q_ASSERT(!folderPath.endsWith('/'));

+ 9 - 0
src/libsync/vfs/cfapi/vfs_cfapi.h

@@ -19,6 +19,7 @@
 #include "common/vfs.h"
 
 namespace OCC {
+class HydrationJob;
 class VfsCfApiPrivate;
 
 class VfsCfApi : public Vfs
@@ -53,12 +54,20 @@ public:
     AvailabilityResult availability(const QString &folderPath) override;
 
 public slots:
+    void requestHydration(const QString &requestId, const QString &path);
     void fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus) override;
 
+signals:
+    void hydrationRequestReady(const QString &requestId);
+    void hydrationRequestFailed(const QString &requestId);
+
 protected:
     void startImpl(const VfsSetupParams &params) override;
 
 private:
+    void scheduleHydrationJob(const QString &requestId, const QString &folderPath);
+    void onHydrationJobFinished(HydrationJob *job);
+
     struct HasHydratedDehydrated {
         bool hasHydrated = false;
         bool hasDehydrated = false;

+ 42 - 0
test/testsynccfapi.cpp

@@ -1097,6 +1097,48 @@ private slots:
         CFVERIFY_NONVIRTUAL(fakeFolder, "online/file1");
         CFVERIFY_VIRTUAL(fakeFolder, "local/file1");
     }
+
+    void testOpeningOnlineFileTriggersDownload()
+    {
+        FakeFolder fakeFolder{ FileInfo() };
+        setupVfs(fakeFolder);
+        QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
+
+        fakeFolder.remoteModifier().mkdir("online");
+        fakeFolder.remoteModifier().mkdir("online/sub");
+        QVERIFY(fakeFolder.syncOnce());
+        QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
+
+        setPinState(fakeFolder.localPath() + "online", PinState::OnlineOnly, cfapi::Recurse);
+
+        fakeFolder.remoteModifier().insert("online/sub/file1", 10 * 1024 * 1024);
+        QVERIFY(fakeFolder.syncOnce());
+
+        CFVERIFY_VIRTUAL(fakeFolder, "online/sub/file1");
+
+        // Simulate another process requesting the open
+        QEventLoop loop;
+        bool openResult = false;
+        bool readResult = false;
+        std::thread t([&] {
+            QFile file(fakeFolder.localPath() + "online/sub/file1");
+            openResult = file.open(QFile::ReadOnly);
+            readResult = !file.readAll().isEmpty();
+            file.close();
+            QMetaObject::invokeMethod(&loop, &QEventLoop::quit, Qt::QueuedConnection);
+        });
+        loop.exec();
+        t.join();
+
+        CFVERIFY_NONVIRTUAL(fakeFolder, "online/sub/file1");
+
+        // Nothing should change
+        ItemCompletedSpy completeSpy(fakeFolder);
+        QVERIFY(fakeFolder.syncOnce());
+        QVERIFY(completeSpy.isEmpty());
+
+        CFVERIFY_NONVIRTUAL(fakeFolder, "online/sub/file1");
+    }
 };
 
 QTEST_GUILESS_MAIN(TestSyncCfApi)