Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for LZ4 compression. #74

Merged
merged 1 commit into from
Feb 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion build_tools/build_detect_platform
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
#
# -DLEVELDB_PLATFORM_POSIX if cstdatomic is present
# -DLEVELDB_PLATFORM_NOATOMIC if it is not
# -DSNAPPY if the Snappy library is present
# -DSNAPPY if the Snappy library is present
# -DLZ4 if the LZ4 library is present
#
# Using gflags in rocksdb:
# Our project depends on gflags, which requires users to take some extra steps
Expand Down Expand Up @@ -244,6 +245,17 @@ EOF
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2"
fi

# Test whether lz4 library is installed
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <lz4.h>
#include <lz4hc.h>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DLZ4"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4"
fi

# Test whether tcmalloc is available
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
int main() {}
Expand Down
138 changes: 118 additions & 20 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ DEFINE_string(benchmarks,
"randomwithverify,"
"fill100K,"
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"compress,"
"uncompress,"
"acquireload,"
"fillfromstdin,",

Expand Down Expand Up @@ -338,6 +338,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kZlibCompression;
else if (!strcasecmp(ctype, "bzip2"))
return rocksdb::kBZip2Compression;
else if (!strcasecmp(ctype, "lz4"))
return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression;

fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
return rocksdb::kSnappyCompression; //default value
Expand Down Expand Up @@ -841,7 +845,13 @@ class Benchmark {
case rocksdb::kBZip2Compression:
fprintf(stdout, "Compression: bzip2\n");
break;
}
case rocksdb::kLZ4Compression:
fprintf(stdout, "Compression: lz4\n");
break;
case rocksdb::kLZ4HCCompression:
fprintf(stdout, "Compression: lz4hc\n");
break;
}

switch (FLAGS_rep_factory) {
case kPrefixHash:
Expand Down Expand Up @@ -896,6 +906,16 @@ class Benchmark {
strlen(text), &compressed);
name = "BZip2";
break;
case kLZ4Compression:
result = port::LZ4_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4";
break;
case kLZ4HCCompression:
result = port::LZ4HC_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4HC";
break;
case kNoCompression:
assert(false); // cannot happen
break;
Expand Down Expand Up @@ -1146,10 +1166,10 @@ class Benchmark {
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("compress")) {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
Expand Down Expand Up @@ -1302,23 +1322,47 @@ class Benchmark {
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}

void SnappyCompress(ThreadState* thread) {
void Compress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(Options().compression_opts, input.data(),

// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}

if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
Expand All @@ -1328,24 +1372,78 @@ class Benchmark {
}
}

void SnappyUncompress(ThreadState* thread) {
void Uncompress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);

bool ok;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}

int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
char *uncompressed = nullptr;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
// allocate here to make comparison fair
uncompressed = new char[input.size()];
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
break;
case rocksdb::kZlibCompression:
uncompressed = port::Zlib_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = port::BZip2_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
default:
ok = false;
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}
delete[] uncompressed;

if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
thread->stats.AddBytes(bytes);
}
Expand Down
22 changes: 21 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) {
return port::BZip2_Compress(options, in.data(), in.size(), &out);
}

static std::string RandomString(Random* rnd, int len) {
static bool LZ4CompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4_Compress(options, in.data(), in.size(), &out);
}

static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4HC_Compress(options, in.data(), in.size(), &out);
}

static std::string RandomString(Random *rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
Expand Down Expand Up @@ -2624,6 +2636,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
CompressionOptions(wbits, lev, strategy))) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4CompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (LZ4HCCompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4HCCompression;
fprintf(stderr, "using lz4hc\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return false;
Expand Down
6 changes: 4 additions & 2 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
enum {
rocksdb_no_compression = 0,
rocksdb_snappy_compression = 1,
rocksdb_zlib_compression = 1,
rocksdb_bz2_compression = 1
rocksdb_zlib_compression = 2,
rocksdb_bz2_compression = 3,
rocksdb_lz4_compression = 4,
rocksdb_lz4hc_compression = 5
};
extern void rocksdb_options_set_compression(rocksdb_options_t*, int);

Expand Down
6 changes: 2 additions & 4 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ using std::shared_ptr;
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3
kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2,
kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5
};

enum CompactionStyle : char {
Expand Down
68 changes: 65 additions & 3 deletions port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
#include <bzlib.h>
#endif

#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif

#include <stdint.h>
#include <string>
#include <string.h>
Expand Down Expand Up @@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
return false;
}

inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef BZIP2
bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream));
Expand Down Expand Up @@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
return nullptr;
}

inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}

inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef LZ4
if (input_length < 8) {
return nullptr;
}
int output_len;
memcpy(&output_len, input_data, sizeof(output_len));
char *output = new char[output_len];
*decompress_size = LZ4_decompress_safe_partial(
input_data + 8, output, input_length - 8, output_len, output_len);
if (*decompress_size < 0) {
delete[] output;
return nullptr;
}
return output;
#endif
return nullptr;
}

inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
opts.level);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}

inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false;
}

Expand Down
Loading