Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/libfetchers/git-lfs-fetch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ std::vector<nlohmann::json> Fetch::fetchUrls(const std::vector<Pointer> & pointe
nlohmann::json oidList = pointerToPayload(pointers);
nlohmann::json data = {{"operation", "download"}};
data["objects"] = oidList;
request.data = data.dump();
auto payload = data.dump();
StringSource source{payload};
request.data = {source};

FileTransferResult result = getFileTransfer()->upload(request);
auto responseString = result.data;
Expand Down
20 changes: 14 additions & 6 deletions src/libstore/binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ std::optional<std::string> BinaryCacheStore::getNixCacheInfo()
void BinaryCacheStore::upsertFile(
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
{
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType, sizeHint);
auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique<StringSource>(data); });
upsertFile(path, *source, mimeType, sizeHint);
}

void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
Expand Down Expand Up @@ -271,12 +272,19 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(

/* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) {
auto source = restartableSourceFromFactory([fnTemp]() {
struct AutoCloseFDSource : AutoCloseFD, FdSource
{
AutoCloseFDSource(AutoCloseFD fd)
: AutoCloseFD(std::move(fd))
, FdSource(get())
{
}
};
return std::make_unique<AutoCloseFDSource>(toDescriptor(open(fnTemp.c_str(), O_RDONLY)));
});
stats.narWrite++;
upsertFile(
narInfo->url,
std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
"application/x-nix-nar",
narInfo->fileSize);
upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize);
} else
stats.narWriteAverted++;

Expand Down
42 changes: 22 additions & 20 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer
return 0;
}

size_t readOffset = 0;

size_t readCallback(char * buffer, size_t size, size_t nitems)
{
if (readOffset == request.data->length())
return 0;
auto count = std::min(size * nitems, request.data->length() - readOffset);
assert(count);
memcpy(buffer, request.data->data() + readOffset, count);
readOffset += count;
return count;
size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept
try {
auto data = request.data;
return data->source->read(buffer, nitems * size);
} catch (EndOfFile &) {
return 0;
} catch (...) {
return CURL_READFUNC_ABORT;
}

static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept
{
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
}
Expand All @@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer
}
#endif

size_t seekCallback(curl_off_t offset, int origin)
{
size_t seekCallback(curl_off_t offset, int origin) noexcept
try {
auto source = request.data->source;
if (origin == SEEK_SET) {
readOffset = offset;
source->restart();
source->skip(offset);
} else if (origin == SEEK_CUR) {
readOffset += offset;
source->skip(offset);
} else if (origin == SEEK_END) {
readOffset = request.data->length() + offset;
NullSink sink{};
source->drainInto(sink);
}
return CURL_SEEKFUNC_OK;
} catch (...) {
return CURL_SEEKFUNC_FAIL;
}

static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept
{
return ((TransferItem *) clientp)->seekCallback(offset, origin);
}
Expand Down Expand Up @@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer
if (request.data) {
if (request.method == HttpMethod::POST) {
curl_easy_setopt(req, CURLOPT_POST, 1L);
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length());
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint);
} else if (request.method == HttpMethod::PUT) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint);
} else {
unreachable();
}
Expand Down
21 changes: 13 additions & 8 deletions src/libstore/http-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path)
}

void HttpBinaryCacheStore::upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint)
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
{
auto req = makeRequest(path);
req.method = HttpMethod::PUT;
auto data = StreamToSourceAdapter(istream).drain();

auto compressionMethod = getCompressionMethod(path);

std::string data;
std::optional<StringSource> stringSource{};

if (compressionMethod) {
data = compress(*compressionMethod, data);
StringSink sink{};
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So future work is makeCompressionSource? :) Or should we use sinkToSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sinkToSource don't think we can. Can't use coroutines easily in curl callbacks. I have a feeling that this will be fine for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I just wanted to avoid copying stuff twice in memory (like logs) and stuffing it straight into a compression sink. That should be slightly more optimal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the boost coroutine stuff here smells like bad luck. Fine to keep this case memory buffered for now.

source.drainInto(*compressionSink);
compressionSink->finish();
data = std::move(sink.s);
req.headers.emplace_back("Content-Encoding", *compressionMethod);
stringSource = StringSource{data};
req.data = {*stringSource};
} else {
req.data = {sizeHint, source};
}

req.data = std::move(data);
req.mimeType = mimeType;

try {
Expand Down
5 changes: 1 addition & 4 deletions src/libstore/include/nix/store/binary-cache-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ public:
virtual bool fileExists(const std::string & path) = 0;

virtual void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) = 0;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0;

void upsertFile(
const std::string & path,
Expand Down
21 changes: 20 additions & 1 deletion src/libstore/include/nix/store/filetransfer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,26 @@ struct FileTransferRequest
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
ActivityId parentAct;
bool decompress = true;
std::optional<std::string> data;

struct UploadData
{
UploadData(StringSource & s)
: sizeHint(s.s.length())
, source(&s)
{
}

UploadData(std::size_t sizeHint, RestartableSource & source)
: sizeHint(sizeHint)
, source(&source)
{
}

std::size_t sizeHint = 0;
RestartableSource * source = nullptr;
};

std::optional<UploadData> data;
std::string mimeType;
std::function<void(std::string_view data)> dataCallback;
/**
Expand Down
5 changes: 1 addition & 4 deletions src/libstore/include/nix/store/http-binary-cache-store.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ protected:
bool fileExists(const std::string & path) override;

void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;

FileTransferRequest makeRequest(std::string_view path);

Expand Down
6 changes: 1 addition & 5 deletions src/libstore/local-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,12 @@ struct LocalBinaryCacheStore : virtual BinaryCacheStore
bool fileExists(const std::string & path) override;

void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override
{
auto path2 = config->binaryCacheDir + "/" + path;
static std::atomic<int> counter{0};
Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter);
AutoDelete del(tmp, false);
StreamToSourceAdapter source(istream);
writeFile(tmp, source);
std::filesystem::rename(tmp, path2);
del.cancel();
Expand Down
21 changes: 9 additions & 12 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
}

void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;

private:
ref<S3BinaryCacheStoreConfig> s3Config;
Expand Down Expand Up @@ -70,12 +67,9 @@ class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
};

void S3BinaryCacheStore::upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint)
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
{
HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint);
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
}

std::string S3BinaryCacheStore::createMultipartUpload(
Expand All @@ -92,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload(
req.uri = VerbatimURL(url);

req.method = HttpMethod::POST;
req.data = "";
StringSource payload{std::string_view("")};
req.data = {payload};
req.mimeType = mimeType;

if (contentEncoding) {
Expand Down Expand Up @@ -122,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
url.query["partNumber"] = std::to_string(partNumber);
url.query["uploadId"] = uploadId;
req.uri = VerbatimURL(url);
req.data = std::move(data);
StringSource payload{data};
req.data = {payload};
req.mimeType = "application/octet-stream";

auto result = getFileTransfer()->enqueueFileTransfer(req).get();
Expand Down Expand Up @@ -169,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload(

debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);

req.data = xml;
StringSource payload{xml};
req.data = {payload};
req.mimeType = "text/xml";

getFileTransfer()->enqueueFileTransfer(req).get();
Expand Down
24 changes: 23 additions & 1 deletion src/libutil/include/nix/util/serialise.hh
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,18 @@ struct StringSink : Sink
void operator()(std::string_view data) override;
};

/**
* Source type that can be restarted.
*/
struct RestartableSource : Source
{
virtual void restart() = 0;
};

/**
* A source that reads data from a string.
*/
struct StringSource : Source
struct StringSource : RestartableSource
{
std::string_view s;
size_t pos;
Expand All @@ -257,8 +265,22 @@ struct StringSource : Source
size_t read(char * data, size_t len) override;

void skip(size_t len) override;

void restart() override
{
pos = 0;
}
};

/**
* Create a restartable Source from a factory function.
*
* @param factory Factory function that returns a fresh instance of the Source. Gets
* called for each source restart.
* @pre factory must return an equivalent source for each invocation.
*/
std::unique_ptr<RestartableSource> restartableSourceFromFactory(std::function<std::unique_ptr<Source>()> factory);

/**
* A sink that writes all incoming data to two other sinks.
*/
Expand Down
37 changes: 37 additions & 0 deletions src/libutil/serialise.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,41 @@ size_t ChainSource::read(char * data, size_t len)
}
}

std::unique_ptr<RestartableSource> restartableSourceFromFactory(std::function<std::unique_ptr<Source>()> factory)
{
struct RestartableSourceImpl : RestartableSource
{
RestartableSourceImpl(decltype(factory) factory_)
: factory_(std::move(factory_))
, impl(this->factory_())
{
}

decltype(factory) factory_;
std::unique_ptr<Source> impl = factory_();

size_t read(char * data, size_t len) override
{
return impl->read(data, len);
}

bool good() override
{
return impl->good();
}

void skip(size_t len) override
{
return impl->skip(len);
}

void restart() override
{
impl = factory_();
}
};

return std::make_unique<RestartableSourceImpl>(std::move(factory));
}

} // namespace nix
Loading