diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index b96fd084d9c..353c3d098ad 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -282,12 +282,22 @@ struct curlFileTransfer : public FileTransfer if (request.head) curl_easy_setopt(req, CURLOPT_NOBODY, 1); + else if (request.post) + curl_easy_setopt(req, CURLOPT_POST, 1); if (request.data) { - curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper); - curl_easy_setopt(req, CURLOPT_READDATA, this); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + if (request.post) { + // based off of https://curl.haxx.se/libcurl/c/postit2.html + curl_mime *form = curl_mime_init(req); + curl_mimepart *field = curl_mime_addpart(form); + curl_mime_data(field, request.data->data(), request.data->length()); + curl_easy_setopt(req, CURLOPT_MIMEPOST, form); + } else { + curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); + curl_easy_setopt(req, CURLOPT_READFUNCTION, readCallbackWrapper); + curl_easy_setopt(req, CURLOPT_READDATA, this); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + } } if (request.verifyTLS) { diff --git a/src/libstore/filetransfer.hh b/src/libstore/filetransfer.hh index 0fbda4c22ab..6f05c929c08 100644 --- a/src/libstore/filetransfer.hh +++ b/src/libstore/filetransfer.hh @@ -39,6 +39,7 @@ struct FileTransferRequest std::string expectedETag; bool verifyTLS = true; bool head = false; + bool post = false; size_t tries = fileTransferSettings.tries; unsigned int baseRetryTimeMs = 250; ActivityId parentAct; diff --git a/src/libstore/ipfs-binary-cache-store.cc b/src/libstore/ipfs-binary-cache-store.cc index 5a2d4b591cf..8de3b035934 100644 --- a/src/libstore/ipfs-binary-cache-store.cc +++ b/src/libstore/ipfs-binary-cache-store.cc @@ -1,9 +1,9 @@ #include +#include #include "binary-cache-store.hh" #include "filetransfer.hh" #include "nar-info-disk-cache.hh" -#include "ipfs.hh" namespace nix { @@ -15,28 +15,19 @@ class IPFSBinaryCacheStore : public BinaryCacheStore private: std::string cacheUri; + std::string daemonUri; - /* Host where a IPFS API can be reached (usually localhost) */ - std::string ipfsAPIHost; - /* Port where a IPFS API can be reached (usually 5001) */ - uint16_t ipfsAPIPort; - /* Whether to use a IPFS Gateway instead of the API */ - bool useIpfsGateway; - /* Where to find a IPFS Gateway */ - std::string ipfsGatewayURL; - - std::string constructIPFSRequest(const std::string & path) { - std::string uri; - std::string ipfsPath = cacheUri + "/" + path; - if (useIpfsGateway == false) { - uri = ipfs::buildAPIURL(ipfsAPIHost, ipfsAPIPort) + - "/cat" + - ipfs::buildQuery({{"arg", ipfsPath}}); - } else { - uri = ipfsGatewayURL + ipfsPath; - } - return uri; + std::string getIpfsPath() { + auto state(_state.lock()); + return state->ipfsPath; } + std::optional optIpnsPath; + + struct State + { + std::string ipfsPath; + }; + Sync _state; public: @@ -44,19 +35,46 @@ class IPFSBinaryCacheStore : public BinaryCacheStore const Params & params, const Path & _cacheUri) : BinaryCacheStore(params) , cacheUri(_cacheUri) - , ipfsAPIHost(get(params, "host").value_or("127.0.0.1")) - , ipfsAPIPort(std::stoi(get(params, "port").value_or("5001"))) - , useIpfsGateway(get(params, "use_gateway").value_or("0") == "1") - , ipfsGatewayURL(get(params, "gateway").value_or("https://ipfs.io")) { + auto state(_state.lock()); + if (cacheUri.back() == '/') cacheUri.pop_back(); - /* - * A cache is still useful since the IPFS API or - * gateway may have a higher latency when not running on - * localhost - */ - diskCache = getNarInfoDiskCache(); + + if (hasPrefix(cacheUri, "ipfs://")) + state->ipfsPath = "/ipfs/" + std::string(cacheUri, 7); + else if (hasPrefix(cacheUri, "ipns://")) + optIpnsPath = "/ipns/" + std::string(cacheUri, 7); + else + throw Error("unknown IPNS URI '%s'", cacheUri); + + std::string ipfsAPIHost(get(params, "host").value_or("127.0.0.1")); + std::string ipfsAPIPort(get(params, "port").value_or("5001")); + daemonUri = "http://" + ipfsAPIHost + ":" + ipfsAPIPort; + + // Check the IPFS daemon is running + FileTransferRequest request(daemonUri + "/api/v0/version"); + request.post = true; + request.tries = 1; + auto res = getFileTransfer()->download(request); + auto versionInfo = nlohmann::json::parse(*res.data); + if (versionInfo.find("Version") == versionInfo.end()) + throw Error("daemon for IPFS is not running properly"); + + // Resolve the IPNS name to an IPFS object + if (optIpnsPath) { + auto ipnsPath = *optIpnsPath; + debug("Resolving IPFS object of '%s', this could take a while.", ipnsPath); + auto uri = daemonUri + "/api/v0/name/resolve?offline=true&arg=" + getFileTransfer()->urlEncode(ipnsPath); + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; + auto res = getFileTransfer()->download(request); + auto json = nlohmann::json::parse(*res.data); + if (json.find("Path") == json.end()) + throw Error("daemon for IPFS is not running properly"); + state->ipfsPath = json["Path"]; + } } std::string getUri() override @@ -66,61 +84,96 @@ class IPFSBinaryCacheStore : public BinaryCacheStore void init() override { - if (auto cacheInfo = diskCache->cacheExists(getUri())) { - wantMassQuery.setDefault(cacheInfo->wantMassQuery ? "true" : "false"); - priority.setDefault(fmt("%d", cacheInfo->priority)); - } else { - try { - BinaryCacheStore::init(); - } catch (UploadToIPFS &) { - throw Error(format("ā€˜%s’ does not appear to be a binary cache") % cacheUri); - } - diskCache->createCache(cacheUri, storeDir, wantMassQuery, priority); - } + std::string cacheInfoFile = "nix-cache-info"; + if (!fileExists(cacheInfoFile)) + upsertFile(cacheInfoFile, "StoreDir: " + storeDir + "\n", "text/x-nix-cache-info"); + BinaryCacheStore::init(); } protected: bool fileExists(const std::string & path) override { - /* - * TODO: Try a local mount first, best to share code with - * LocalBinaryCacheStore - */ + auto uri = daemonUri + "/api/v0/object/stat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path); - /* TODO: perform ipfs ls instead instead of trying to fetch it */ - auto uri = constructIPFSRequest(path); + FileTransferRequest request(uri); + request.post = true; + request.tries = 1; try { - FileTransferRequest request(uri); - //request.showProgress = FileTransferRequest::no; - request.tries = 5; - if (useIpfsGateway) - request.head = true; - getFileTransfer()->download(request); - return true; + auto res = getFileTransfer()->download(request); + auto json = nlohmann::json::parse(*res.data); + + return json.find("Hash") != json.end(); } catch (FileTransferError & e) { - if (e.error == FileTransfer::NotFound) - return false; - throw; + // probably should verify this is a not found error but + // ipfs gives us a 500 + return false; } } + // IPNS publish can be slow, we try to do it rarely. + void sync() override + { + if (!optIpnsPath) + return; + auto ipnsPath = *optIpnsPath; + + auto state(_state.lock()); + + debug("Publishing '%s' to '%s', this could take a while.", state->ipfsPath, ipnsPath); + + auto uri = daemonUri + "/api/v0/name/publish?offline=true&arg=" + getFileTransfer()->urlEncode(state->ipfsPath); + uri += "&key=" + std::string(ipnsPath, 6); + + auto req = FileTransferRequest(uri); + req.post = true; + req.tries = 1; + getFileTransfer()->download(req); + } + + void addLink(std::string name, std::string ipfsObject) + { + auto state(_state.lock()); + + auto uri = daemonUri + "/api/v0/object/patch/add-link?create=true"; + uri += "&arg=" + getFileTransfer()->urlEncode(state->ipfsPath); + uri += "&arg=" + getFileTransfer()->urlEncode(name); + uri += "&arg=" + getFileTransfer()->urlEncode(ipfsObject); + + auto req = FileTransferRequest(uri); + req.post = true; + req.tries = 1; + auto res = getFileTransfer()->download(req); + auto json = nlohmann::json::parse(*res.data); + + state->ipfsPath = "/ipfs/" + (std::string) json["Hash"]; + } + void upsertFile(const std::string & path, const std::string & data, const std::string & mimeType) override { - throw UploadToIPFS("uploading to an IPFS binary cache is not supported"); + // TODO: use callbacks + + auto req = FileTransferRequest(daemonUri + "/api/v0/add"); + req.data = std::make_shared(data); + req.post = true; + req.tries = 1; + try { + auto res = getFileTransfer()->upload(req); + auto json = nlohmann::json::parse(*res.data); + addLink(path, "/ipfs/" + (std::string) json["Hash"]); + } catch (FileTransferError & e) { + throw UploadToIPFS("while uploading to IPFS binary cache at '%s': %s", cacheUri, e.msg()); + } } void getFile(const std::string & path, Callback> callback) noexcept override { - /* - * TODO: Try local mount first, best to share code with - * LocalBinaryCacheStore - */ - auto uri = constructIPFSRequest(path); + auto uri = daemonUri + "/api/v0/cat?arg=" + getFileTransfer()->urlEncode(getIpfsPath() + "/" + path); + FileTransferRequest request(uri); - //request.showProgress = FileTransferRequest::no; - request.tries = 8; + request.post = true; + request.tries = 1; auto callbackPtr = std::make_shared(std::move(callback)); @@ -129,9 +182,7 @@ class IPFSBinaryCacheStore : public BinaryCacheStore try { (*callbackPtr)(result.get().data); } catch (FileTransferError & e) { - if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden) - return (*callbackPtr)(std::shared_ptr()); - callbackPtr->rethrow(); + return (*callbackPtr)(std::shared_ptr()); } catch (...) { callbackPtr->rethrow(); } @@ -145,12 +196,8 @@ static RegisterStoreImplementation regStore([]( const std::string & uri, const Store::Params & params) -> std::shared_ptr { - /* - * TODO: maybe use ipfs:/ fs:/ipfs/ - * https://github.com/ipfs/go-ipfs/issues/1678#issuecomment-157478515 - */ - if (uri.substr(0, strlen("/ipfs/")) != "/ipfs/" && - uri.substr(0, strlen("/ipns/")) != "/ipns/") + if (uri.substr(0, strlen("ipfs://")) != "ipfs://" && + uri.substr(0, strlen("ipns://")) != "ipns://") return 0; auto store = std::make_shared(params, uri); store->init(); diff --git a/src/libstore/ipfs.hh b/src/libstore/ipfs.hh deleted file mode 100644 index 00e99dbbf09..00000000000 --- a/src/libstore/ipfs.hh +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include - -#include "types.hh" -#include "filetransfer.hh" - -namespace nix { -namespace ipfs { - -MakeError (CommandError, Error); - -inline std::string buildAPIURL(const std::string & host, - uint16_t port = 5001, - const std::string & version = "v0") -{ - return "http://" + host + ":" + std::to_string(port) + "/api/" + version; -} - -inline std::string buildQuery(const std::vector> & params = {}) { - std::string query = "?stream-channels=true&json=true&encoding=json"; - for (auto& param : params) { - std::string key = getFileTransfer()->urlEncode(param.first); - std::string value = getFileTransfer()->urlEncode(param.second); - query += "&" + key + "=" + value; - } - return query; -} - -} -} diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 095363d0c9b..5f86f4dba14 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -680,6 +680,8 @@ void copyPaths(ref srcStore, ref dstStore, const StorePathSet & st nrDone++; showProgress(); }); + + dstStore->sync(); } diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index b1e25fc7d66..7c6ee3595b4 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -680,6 +680,10 @@ public: virtual void createUser(const std::string & userName, uid_t userId) { } + /* Sync writes to commits written data, usually a no-op. */ + virtual void sync() + { }; + protected: Stats stats; diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 3a3060ad82d..bdde54dcff5 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -167,6 +167,8 @@ static void opAdd(Strings opFlags, Strings opArgs) for (auto & i : opArgs) cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i))); + + store->sync(); } @@ -188,6 +190,8 @@ static void opAddFixed(Strings opFlags, Strings opArgs) for (auto & i : opArgs) cout << fmt("%s\n", store->printStorePath(store->addToStore(std::string(baseNameOf(i)), i, recursive, hashAlgo))); + + store->sync(); } @@ -952,6 +956,7 @@ static void opServe(Strings opFlags, Strings opArgs) SizedSource sizedSource(in, info.narSize); store->addToStore(info, sizedSource, NoRepair, NoCheckSigs); + store->sync(); // consume all the data that has been sent before continuing. sizedSource.drainAll(); diff --git a/src/nix/add-to-store.cc b/src/nix/add-to-store.cc index f43f774c1c8..8b7a5997a11 100644 --- a/src/nix/add-to-store.cc +++ b/src/nix/add-to-store.cc @@ -53,6 +53,7 @@ struct CmdAddToStore : MixDryRun, StoreCommand if (!dryRun) { auto source = StringSource { *sink.s }; store->addToStore(info, source); + store->sync(); } logger->stdout("%s", store->printStorePath(info.path)); diff --git a/src/nix/make-content-addressable.cc b/src/nix/make-content-addressable.cc index 3e7ff544d6d..96bc119b32f 100644 --- a/src/nix/make-content-addressable.cc +++ b/src/nix/make-content-addressable.cc @@ -100,6 +100,8 @@ struct CmdMakeContentAddressable : StorePathsCommand, MixJSON remappings.insert_or_assign(std::move(path), std::move(info.path)); } + + store->sync(); } };