Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/libstore-tests/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ if get_option('benchmarks')
benchmark_sources = files(
'bench-main.cc',
'derivation-parser-bench.cc',
'readline-bench.cc',
'ref-scan-bench.cc',
)

Expand Down
52 changes: 52 additions & 0 deletions src/libstore-tests/readline-bench.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <benchmark/benchmark.h>

#include "nix/util/file-descriptor.hh"
#include "nix/util/file-system.hh"

#ifndef _WIN32

# include <fcntl.h>
# include <unistd.h>

using namespace nix;

static void BM_ReadLineFile(benchmark::State & state)
{
const int lineCount = state.range(0);
const std::string line = std::string(80, 'x') + "\n";
std::string payload;
payload.reserve(line.size() * lineCount);
for (int i = 0; i < lineCount; ++i)
payload += line;

auto [file, path] = createTempFile();
writeFull(file.get(), payload, /*allowInterrupts=*/false);
file.close();

for (auto _ : state) {
state.PauseTiming();

int flags = O_RDONLY;
# ifdef O_CLOEXEC
flags |= O_CLOEXEC;
# endif
AutoCloseFD in(open(path.c_str(), flags));
if (!in)
throw SysError("opening file");

state.ResumeTiming();

for (int i = 0; i < lineCount; ++i) {
auto s = readLine(in.get());
benchmark::DoNotOptimize(s);
}
}

deletePath(path);

state.SetItemsProcessed(state.iterations() * lineCount);
}

BENCHMARK(BM_ReadLineFile)->Arg(1'000)->Arg(10'000)->Arg(100'000);

#endif
58 changes: 58 additions & 0 deletions src/libutil-tests/file-descriptor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include <gtest/gtest.h>

#include "nix/util/file-descriptor.hh"

#ifndef _WIN32

# include <unistd.h>

namespace nix {

TEST(ReadLine, ReadsLinesFromPipe)
{
int fds[2];
ASSERT_EQ(pipe(fds), 0);

AutoCloseFD readSide{fds[0]};
AutoCloseFD writeSide{fds[1]};

writeFull(writeSide.get(), "hello\nworld\n", /*allowInterrupts=*/false);
writeSide.close();

EXPECT_EQ(readLine(readSide.get()), "hello");
EXPECT_EQ(readLine(readSide.get()), "world");
EXPECT_EQ(readLine(readSide.get(), /*eofOk=*/true), "");
EXPECT_EQ(readLine(readSide.get(), /*eofOk=*/true), "");
}

TEST(ReadLine, ReturnsPartialLineOnEofWhenAllowed)
{
int fds[2];
ASSERT_EQ(pipe(fds), 0);

AutoCloseFD readSide{fds[0]};
AutoCloseFD writeSide{fds[1]};

writeFull(writeSide.get(), "partial", /*allowInterrupts=*/false);
writeSide.close();

EXPECT_EQ(readLine(readSide.get(), /*eofOk=*/true), "partial");
EXPECT_EQ(readLine(readSide.get(), /*eofOk=*/true), "");
}

TEST(ReadLine, ThrowsOnEofWhenNotAllowed)
{
int fds[2];
ASSERT_EQ(pipe(fds), 0);

AutoCloseFD readSide{fds[0]};
AutoCloseFD writeSide{fds[1]};

writeSide.close();

EXPECT_THROW(readLine(readSide.get()), EndOfFile);
}

} // namespace nix

#endif
1 change: 1 addition & 0 deletions src/libutil-tests/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ sources = files(
'config.cc',
'executable-path.cc',
'file-content-address.cc',
'file-descriptor.cc',
'file-system.cc',
'git.cc',
'hash.cc',
Expand Down
2 changes: 2 additions & 0 deletions src/libutil/file-descriptor.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "nix/util/file-descriptor.hh"
#include "nix/util/serialise.hh"
#include "nix/util/util.hh"

Expand Down Expand Up @@ -77,6 +78,7 @@ Descriptor AutoCloseFD::get() const
void AutoCloseFD::close()
{
if (fd != INVALID_DESCRIPTOR) {
clearReadLineCache(fd);
if (
#ifdef _WIN32
::CloseHandle(fd)
Expand Down
7 changes: 7 additions & 0 deletions src/libutil/include/nix/util/file-descriptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ void writeFull(Descriptor fd, std::string_view s, bool allowInterrupts = true);
*/
std::string readLine(Descriptor fd, bool eofOk = false);

/**
* Discard any buffered state associated with `readLine`.
*
* This is intended for internal use by file descriptor wrappers like `AutoCloseFD`.
*/
void clearReadLineCache(Descriptor fd);

/**
* Write a line to a file descriptor.
*/
Expand Down
68 changes: 55 additions & 13 deletions src/libutil/unix/file-descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <unistd.h>
#include <poll.h>

#include <unordered_map>

#if defined(__linux__) && defined(__NR_openat2)
# define HAVE_OPENAT2 1
# include <sys/syscall.h>
Expand All @@ -24,6 +26,14 @@ namespace nix {

namespace {

struct ReadLineState
{
std::string buffer;
size_t offset = 0;
};

thread_local std::unordered_map<int, ReadLineState> readLineStates;

// This function is needed to handle non-blocking reads/writes. This is needed in the buildhook, because
// somehow the json logger file descriptor ends up being non-blocking and breaks remote-building.
// TODO: get rid of buildhook and remove this function again (https://github.com/NixOS/nix/issues/12688)
Expand All @@ -39,6 +49,11 @@ void pollFD(int fd, int events)
}
} // namespace

void clearReadLineCache(int fd)
{
readLineStates.erase(fd);
}

std::string readFile(int fd)
{
struct stat st;
Expand Down Expand Up @@ -93,32 +108,59 @@ void writeFull(int fd, std::string_view s, bool allowInterrupts)

std::string readLine(int fd, bool eofOk)
{
std::string s;
while (1) {
auto & state = readLineStates[fd];

while (true) {
if (state.offset < state.buffer.size()) {
auto newlinePos = state.buffer.find('\n', state.offset);
if (newlinePos != std::string::npos) {
std::string line(state.buffer, state.offset, newlinePos - state.offset);
state.offset = newlinePos + 1;

if (state.offset == state.buffer.size()) {
// Clear the buffer (no more data)
state.buffer.clear();
state.offset = 0;
} else if (state.offset > 64 * 1024 && state.offset > state.buffer.size() / 2) {
// Compact the buffer (has more data)
state.buffer.erase(0, state.offset);
state.offset = 0;
}

return line;
}
}

checkInterrupt();
char ch;
// FIXME: inefficient
ssize_t rd = read(fd, &ch, 1);

char buf[8192];
ssize_t rd = read(fd, buf, sizeof(buf));
if (rd == -1) {
switch (errno) {
case EINTR:
continue;
case EAGAIN: {
case EAGAIN:
pollFD(fd, POLLIN);
continue;
}
default:
throw SysError("reading a line");
}
} else if (rd == 0) {
std::string line;
if (state.offset < state.buffer.size()) {
// Consume the rest of the buffer
line.assign(state.buffer, state.offset, state.buffer.size() - state.offset);
state.buffer.clear();
state.offset = 0;
}

clearReadLineCache(fd);

if (eofOk)
return s;
else
throw EndOfFile("unexpected EOF reading a line");
return line;
throw EndOfFile("unexpected EOF reading a line");
} else {
if (ch == '\n')
return s;
s += ch;
state.buffer.append(buf, rd);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/libutil/windows/file-descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ std::string readLine(HANDLE handle, bool eofOk)
}
}

void clearReadLineCache(HANDLE) {}

void drainFD(HANDLE handle, Sink & sink /*, bool block*/)
{
std::vector<unsigned char> buf(64 * 1024);
Expand Down
Loading