Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced memory overhead of preparing LZ4-compressed data for server. #110

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SET ( clickhouse-cpp-lib-src
base/output.cpp
base/platform.cpp
base/socket.cpp
base/wire_format.cpp

columns/array.cpp
columns/date.cpp
Expand Down
102 changes: 16 additions & 86 deletions clickhouse/base/coded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,97 +4,27 @@

namespace clickhouse {

static const int MAX_VARINT_BYTES = 10;
//static const int MAX_VARINT_BYTES = 10;
Enmk marked this conversation as resolved.
Show resolved Hide resolved

CodedInputStream::CodedInputStream(ZeroCopyInput* input)
: input_(input)
{
}

bool CodedInputStream::ReadRaw(void* buffer, size_t size) {
uint8_t* p = static_cast<uint8_t*>(buffer);

while (size > 0) {
const void* ptr;
size_t len = input_->Next(&ptr, size);

memcpy(p, ptr, len);

p += len;
size -= len;
}

return true;
}

bool CodedInputStream::Skip(size_t count) {
while (count > 0) {
const void* ptr;
size_t len = input_->Next(&ptr, count);

if (len == 0) {
return false;
}

count -= len;
}

return true;
}

bool CodedInputStream::ReadVarint64(uint64_t* value) {
*value = 0;
//CodedInputStream::CodedInputStream(ZeroCopyInput* input)
// : input_(input)
//{
//}

for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
uint8_t byte;
//bool CodedInputStream::ReadRaw(void* buffer, size_t size) {
// uint8_t* p = static_cast<uint8_t*>(buffer);

if (!input_->ReadByte(&byte)) {
return false;
} else {
*value |= uint64_t(byte & 0x7F) << (7 * i);
// while (size > 0) {
// const void* ptr;
// size_t len = input_->Next(&ptr, size);

if (!(byte & 0x80)) {
return true;
}
}
}
// memcpy(p, ptr, len);

// TODO skip invalid
return false;
}


CodedOutputStream::CodedOutputStream(ZeroCopyOutput* output)
: output_(output)
{
}
// p += len;
// size -= len;
// }

void CodedOutputStream::Flush() {
output_->Flush();
}

void CodedOutputStream::WriteRaw(const void* buffer, int size) {
output_->Write(buffer, size);
}

void CodedOutputStream::WriteVarint64(uint64_t value) {
uint8_t bytes[MAX_VARINT_BYTES];
int size = 0;

for (size_t i = 0; i < MAX_VARINT_BYTES; ++i) {
uint8_t byte = value & 0x7F;
if (value > 0x7F)
byte |= 0x80;

bytes[size++] = byte;

value >>= 7;
if (!value) {
break;
}
}

WriteRaw(bytes, size);
}
// return true;
//}

}
55 changes: 0 additions & 55 deletions clickhouse/base/coded.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,4 @@

namespace clickhouse {

/**
Enmk marked this conversation as resolved.
Show resolved Hide resolved
* Class which reads and decodes binary data which is composed of varint-
* encoded integers and fixed-width pieces.
*/
class CodedInputStream {
public:
/// Create a CodedInputStream that reads from the given ZeroCopyInput.
explicit CodedInputStream(ZeroCopyInput* input);

// Read an unsigned integer with Varint encoding, truncating to 32 bits.
// Reading a 32-bit value is equivalent to reading a 64-bit one and casting
// it to uint32, but may be more efficient.
bool ReadVarint32(uint32_t* value);

// Read an unsigned integer with Varint encoding.
bool ReadVarint64(uint64_t* value);

// Read raw bytes, copying them into the given buffer.
bool ReadRaw(void* buffer, size_t size);

// Like ReadRaw, but reads into a string.
//
// Implementation Note: ReadString() grows the string gradually as it
// reads in the data, rather than allocating the entire requested size
// upfront. This prevents denial-of-service attacks in which a client
// could claim that a string is going to be MAX_INT bytes long in order to
// crash the server because it can't allocate this much space at once.
bool ReadString(std::string* buffer, int size);

// Skips a number of bytes. Returns false if an underlying read error
// occurs.
bool Skip(size_t count);

private:
ZeroCopyInput* input_;
};


class CodedOutputStream {
public:
/// Create a CodedInputStream that writes to the given ZeroCopyOutput.
explicit CodedOutputStream(ZeroCopyOutput* output);

void Flush();

// Write raw bytes, copying them from the given buffer.
void WriteRaw(const void* buffer, int size);

/// Write an unsigned integer with Varint encoding.
void WriteVarint64(const uint64_t value);

private:
ZeroCopyOutput* output_;
};

}
76 changes: 72 additions & 4 deletions clickhouse/base/compressed.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
#include "compressed.h"
#include "wire_format.h"
#include "output.h"

#include <cityhash/city.h>
#include <lz4/lz4.h>
#include <stdexcept>
#include <system_error>

#include <iostream>

namespace {
static const size_t HEADER_SIZE = 9;
static const size_t EXTRA_PREALLOCATE_COMPRESS_BUFFER = 15;
static const uint8_t COMPRESSION_METHOD = 0x82;
#define DBMS_MAX_COMPRESSED_SIZE 0x40000000ULL // 1GB
}
Enmk marked this conversation as resolved.
Show resolved Hide resolved

namespace clickhouse {

CompressedInput::CompressedInput(CodedInputStream* input)
CompressedInput::CompressedInput(InputStream* input)
: input_(input)
{
}
Expand Down Expand Up @@ -50,7 +58,7 @@ bool CompressedInput::Decompress() {
return false;
}

if (method != 0x82) {
if (method != COMPRESSION_METHOD) {
throw std::runtime_error("unsupported compression method " +
std::to_string(int(method)));
} else {
Expand All @@ -75,7 +83,7 @@ bool CompressedInput::Decompress() {
out.Write(&original, sizeof(original));
}

if (!WireFormat::ReadBytes(input_, tmp.data() + 9, compressed - 9)) {
if (!WireFormat::ReadBytes(input_, tmp.data() + HEADER_SIZE, compressed - HEADER_SIZE)) {
return false;
} else {
if (hash != CityHash128((const char*)tmp.data(), compressed)) {
Expand All @@ -85,7 +93,7 @@ bool CompressedInput::Decompress() {

data_ = Buffer(original);

if (LZ4_decompress_safe((const char*)tmp.data() + 9, (char*)data_.data(), compressed - 9, original) < 0) {
if (LZ4_decompress_safe((const char*)tmp.data() + HEADER_SIZE, (char*)data_.data(), compressed - HEADER_SIZE, original) < 0) {
throw std::runtime_error("can't decompress data");
} else {
mem_.Reset(data_.data(), original);
Expand All @@ -95,4 +103,64 @@ bool CompressedInput::Decompress() {
return true;
}


CompressedOutput::CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size)
: destination_(destination),
max_compressed_chunk_size_(max_compressed_chunk_size)
Enmk marked this conversation as resolved.
Show resolved Hide resolved
{
}

CompressedOutput::~CompressedOutput() {
Flush();
}
Enmk marked this conversation as resolved.
Show resolved Hide resolved

size_t CompressedOutput::DoWrite(const void* data, size_t len) {
const size_t original_len = len;
const size_t max_chunk_size = max_compressed_chunk_size_ ? max_compressed_chunk_size_ : len;
Enmk marked this conversation as resolved.
Show resolved Hide resolved

while (len > 0)
{
auto to_compress = std::min(len, max_chunk_size);
if (!Compress(data, to_compress))
break;

len -= to_compress;
data = reinterpret_cast<const char*>(data) + to_compress;
}

return original_len - len;
}

void CompressedOutput::DoFlush() {
destination_->Flush();
}

bool CompressedOutput::Compress(const void * data, size_t len) {

const size_t expected_out_size = LZ4_compressBound(len);
Enmk marked this conversation as resolved.
Show resolved Hide resolved
compressed_buffer_.resize(std::max(compressed_buffer_.size(), expected_out_size + HEADER_SIZE + EXTRA_PREALLOCATE_COMPRESS_BUFFER));

const int compressed_size = LZ4_compress_default(
(const char*)data,
(char*)compressed_buffer_.data() + HEADER_SIZE,
len,
compressed_buffer_.size() - HEADER_SIZE);
Enmk marked this conversation as resolved.
Show resolved Hide resolved

{
auto header = compressed_buffer_.data();
WriteUnaligned(header, COMPRESSION_METHOD);
// Compressed data size with header
WriteUnaligned(header + 1, static_cast<uint32_t>(compressed_size + HEADER_SIZE));
// Original data size
WriteUnaligned(header + 5, static_cast<uint32_t>(len));
}

WireFormat::WriteFixed(destination_, CityHash128(
(const char*)compressed_buffer_.data(), compressed_size + HEADER_SIZE));
WireFormat::WriteBytes(destination_, compressed_buffer_.data(), compressed_size + HEADER_SIZE);

destination_->Flush();
return true;
Enmk marked this conversation as resolved.
Show resolved Hide resolved
}

}
25 changes: 22 additions & 3 deletions clickhouse/base/compressed.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include "coded.h"
#include "input.h"
#include "output.h"
#include "buffer.h"

namespace clickhouse {

class CompressedInput : public ZeroCopyInput {
public:
CompressedInput(CodedInputStream* input);
CompressedInput(InputStream* input);
Enmk marked this conversation as resolved.
Show resolved Hide resolved
~CompressedInput();

protected:
Expand All @@ -15,10 +17,27 @@ class CompressedInput : public ZeroCopyInput {
bool Decompress();

private:
CodedInputStream* const input_;
InputStream* const input_;

Buffer data_;
ArrayInput mem_;
};

class CompressedOutput : public OutputStream {
public:
CompressedOutput(OutputStream * destination, size_t max_compressed_chunk_size = 0);
~CompressedOutput();

protected:
size_t DoWrite(const void* data, size_t len) override;
void DoFlush() override;
bool Compress(const void * data, size_t len);


Enmk marked this conversation as resolved.
Show resolved Hide resolved
private:
OutputStream * destination_;
Buffer compressed_buffer_;
size_t max_compressed_chunk_size_;
};

}
15 changes: 15 additions & 0 deletions clickhouse/base/input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@

namespace clickhouse {

bool ZeroCopyInput::Skip(size_t bytes) {
while (bytes > 0) {
const void* ptr;
Enmk marked this conversation as resolved.
Show resolved Hide resolved
size_t len = Next(&ptr, bytes);

if (len == 0) {
return false;
}

bytes -= len;
}

return true;
}

size_t ZeroCopyInput::DoRead(void* buf, size_t len) {
const void* ptr;
size_t result = DoNext(&ptr, len);
Expand Down
5 changes: 5 additions & 0 deletions clickhouse/base/input.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class InputStream {
return DoRead(buf, len);
}

// Skips a number of bytes. Returns false if an underlying read error occurs.
virtual bool Skip(size_t bytes) = 0;

protected:
virtual size_t DoRead(void* buf, size_t len) = 0;
};
Expand All @@ -32,6 +35,8 @@ class ZeroCopyInput : public InputStream {
return DoNext(buf, len);
}

bool Skip(size_t bytes) override;

protected:
virtual size_t DoNext(const void** ptr, size_t len) = 0;

Expand Down
Loading