From 353da0eed91dc9af1df78cdfad3b939b4bd0c14a Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Thu, 22 Jun 2017 13:02:45 -0700 Subject: [PATCH 01/11] FIFO Compaction with TTL. - Added a new TableProperty, creation_time, to keep track of when the SST file is created. - Creation_time: - On Flush: Set to the time of flush. - On Compaction: Set to the max creation_time of all the files involved in the compaction. - Added a new TTL option to FIFO compaction options. --- db/builder.cc | 11 +++-- db/builder.h | 8 ++-- db/compaction.cc | 13 ++++++ db/compaction.h | 2 + db/compaction_job.cc | 7 ++- db/compaction_picker.cc | 73 ++++++++++++++++++++++++------ db/flush_job.cc | 8 +++- db/flush_job.h | 2 +- include/rocksdb/advanced_options.h | 10 +++- include/rocksdb/listener.h | 4 ++ include/rocksdb/table_properties.h | 4 ++ options/options.cc | 2 + table/block_based_table_builder.cc | 11 +++-- table/block_based_table_builder.h | 3 +- table/block_based_table_factory.cc | 3 +- table/meta_blocks.cc | 3 ++ table/table_builder.h | 7 ++- table/table_properties.cc | 3 ++ tools/db_bench_tool.cc | 3 +- 19 files changed, 139 insertions(+), 38 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 0c0bbb236b6..83a425d3a98 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,9 +47,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, - const std::string* compression_dict, const bool skip_filters) { + const CompressionOptions& compression_opts, int level, + const std::string* compression_dict, const bool skip_filters, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -57,7 +57,7 @@ TableBuilder* NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, compression_opts, compression_dict, skip_filters, - column_family_name, level), + column_family_name, level, creation_time), column_family_id, file); } @@ -76,7 +76,8 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level) { + TableProperties* table_properties, int level, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); diff --git a/db/builder.h b/db/builder.h index b438aad8f9e..1f8102df1d9 100644 --- a/db/builder.h +++ b/db/builder.h @@ -50,10 +50,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, + const CompressionOptions& compression_opts, int level, const std::string* compression_dict = nullptr, - const bool skip_filters = false); + const bool skip_filters = false, const uint64_t creation_time = 0); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of @@ -79,6 +78,7 @@ extern Status BuildTable( InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr, int level = -1); + TableProperties* table_properties = nullptr, int level = -1, + const uint64_t creation_time = 0); } // namespace rocksdb diff --git a/db/compaction.cc b/db/compaction.cc index 5c382d16324..0d5253814f0 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -461,4 +461,17 @@ bool Compaction::ShouldFormSubcompactions() const { } } +uint64_t Compaction::MaxInputFileCreationTime() const { + uint64_t max_creation_time = 0; + if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { + for (const auto& file : inputs_[0].files) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = + creation_time > max_creation_time ? creation_time : max_creation_time; + } + } + return max_creation_time; +} + } // namespace rocksdb diff --git a/db/compaction.h b/db/compaction.h index 457c2cd075d..0167b16f4c4 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -243,6 +243,8 @@ class Compaction { uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + uint64_t MaxInputFileCreationTime() const; + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 41765f19420..4778bc2b46c 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1025,7 +1025,6 @@ Status CompactionJob::FinishCompactionOutputFile( uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); - TableProperties table_properties; // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; @@ -1263,14 +1262,14 @@ Status CompactionJob::OpenCompactionOutputFile( // data is going to be found bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, - sub_compact->compaction->output_level(), - &sub_compact->compression_dict, - skip_filters)); + sub_compact->compaction->output_level(), &sub_compact->compression_dict, + skip_filters, sub_compact->compaction->MaxInputFileCreationTime())); LogFlush(db_options_.info_log); return s; } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index c5d2d94c029..e7897d1a576 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1457,27 +1457,74 @@ Compaction* FIFOCompactionPicker::PickCompaction( return nullptr; } + auto reason = CompactionReason::kFIFOTtl; std::vector inputs; inputs.emplace_back(); inputs[0].level = 0; - // delete old files (FIFO) - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - char tmp_fsize[16]; - AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); - ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 - " with size %s for deletion", - cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { - break; + + // Delete files based on TTL + const bool ttl_enabled = ioptions_.compaction_options_fifo.ttl > 0; + if (ttl_enabled) { + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + const uint64_t current_time = static_cast(_current_time); + if (status.ok()) { + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); + ++ritr) { + auto f = *ritr; + auto props = f->fd.table_reader->GetTableProperties(); + auto creation_time = props->creation_time; + if (creation_time == 0) { + // Newer files that we might encounter later might have a + // creation_time embedded in them. But we don't want to proceed to + // them before dropping all the old files with the default '0' + // timestamp. Hence, stop. + break; + } + if (creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } + } + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Falling back to size-based table file deletions. ", + cf_name.c_str(), status.ToString().c_str()); } } + + // If TTL-based-deletion is not enabled, or enabled but the table files have + // a creation time of 0 (old files), then we fall back to deleting oldest + // files based on size. + const bool should_proceed_to_size_based_deletion = inputs[0].files.empty(); + if (should_proceed_to_size_based_deletion) { + reason = CompactionReason::kFIFOMaxSize; + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); + if (total_size <= + ioptions_.compaction_options_fifo.max_table_files_size) { + break; + } + } + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + /* is deletion compaction */ true, reason); RegisterCompaction(c); return c; } diff --git a/db/flush_job.cc b/db/flush_job.cc index 5c4645eeb42..adeb7051df2 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -243,6 +243,7 @@ Status FlushJob::WriteLevel0Table() { ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); + Status s; { db_mutex_->Unlock(); @@ -298,6 +299,11 @@ Status FlushJob::WriteLevel0Table() { &output_compression_); EnvOptions optimized_env_options = db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); + + int64_t _current_time; + db_options_.env->GetCurrentTime(&_current_time); + const uint64_t current_time = static_cast(_current_time); + s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, optimized_env_options, cfd_->table_cache(), iter.get(), @@ -308,7 +314,7 @@ Status FlushJob::WriteLevel0Table() { cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */); + Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index b06654d7731..6a685c09f89 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -70,7 +70,7 @@ class FlushJob { ~FlushJob(); // Require db_mutex held. - // Once PickMemTable() is called, either Run() or Cancel() has to be call. + // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); void Cancel(); diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 1e271483628..07f4c8a3cd1 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -62,6 +62,13 @@ struct CompactionOptionsFIFO { // Default: 1GB uint64_t max_table_files_size; + // Drop files older than TTL. TTL based deletion will take precedence over + // size based deletion if ttl > 0. + // delete if sst_file_creation_time < (current_time - ttl) + // unit: seconds. Ex: 1 day = 1 * 24 * 60 * 60 + // Default: 0 (disabled) + uint64_t ttl = 0; + // If true, try to do compaction to compact smaller files into larger ones. // Minimum files to compact follows options.level0_file_num_compaction_trigger // and compaction won't trigger if average compact bytes per del file is @@ -71,9 +78,10 @@ struct CompactionOptionsFIFO { bool allow_compaction = false; CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} - CompactionOptionsFIFO(uint64_t _max_table_files_size, + CompactionOptionsFIFO(uint64_t _max_table_files_size, uint64_t _ttl, bool _allow_compaction) : max_table_files_size(_max_table_files_size), + ttl(_ttl), allow_compaction(_allow_compaction) {} }; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 17fee59844d..065fc9fd0af 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,6 +27,8 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, + kBulkLoading, + kUnknown, }; struct TableFileCreationBriefInfo { @@ -71,6 +73,8 @@ enum class CompactionReason { kFIFOMaxSize, // [FIFO] reduce number of files. kFIFOReduceNumFiles, + // [FIFO] files with creation time < (current_time - interval) + kFIFOTtl, // Manual compaction kManualCompaction, // DB::SuggestCompactRange() marked files for compaction diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 6559b1f3add..08360d1794a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -48,6 +48,7 @@ struct TablePropertiesNames { static const std::string kPrefixExtractorName; static const std::string kPropertyCollectors; static const std::string kCompression; + static const std::string kCreationTime; }; extern const std::string kPropertiesBlock; @@ -158,6 +159,9 @@ struct TableProperties { // by column_family_name. uint64_t column_family_id = rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + // The time when the SST file was created. + // Since SST files are immutable, this is equivalent to last modified time. + uint64_t creation_time = 0; // Name of the column family with which this SST file is associated. // If column family is unknown, `column_family_name` will be an empty string. diff --git a/options/options.cc b/options/options.cc index 4aaedefda73..3f9fb3027e0 100644 --- a/options/options.cc +++ b/options/options.cc @@ -367,6 +367,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.allow_compaction: %d", compaction_options_fifo.allow_compaction); + ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.ttl: %" PRIu64, + compaction_options_fifo.ttl); std::string collector_names; for (const auto& collector_factory : table_properties_collector_factories) { collector_names.append(collector_factory->Name()); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 910a70fb251..88258994c33 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -269,6 +269,7 @@ struct BlockBasedTableBuilder::Rep { std::unique_ptr flush_block_policy; uint32_t column_family_id; const std::string& column_family_name; + uint64_t creation_time = 0; std::vector> table_properties_collectors; @@ -281,7 +282,7 @@ struct BlockBasedTableBuilder::Rep { const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, - const std::string& _column_family_name) + const std::string& _column_family_name, const uint64_t _creation_time) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -297,7 +298,8 @@ struct BlockBasedTableBuilder::Rep { table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)), column_family_id(_column_family_id), - column_family_name(_column_family_name) { + column_family_name(_column_family_name), + creation_time(_creation_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -336,7 +338,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name) { + const std::string& column_family_name, const uint64_t creation_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -352,7 +354,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, - skip_filters, column_family_name); + skip_filters, column_family_name, creation_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -728,6 +730,7 @@ Status BlockBasedTableBuilder::Finish() { r->props.top_level_index_size = r->p_index_builder_->EstimateTopLevelIndexSize(r->offset); } + r->props.creation_time = r->creation_time; // Add basic properties property_block_builder.AddTableProperty(r->props); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 6f7f494c62d..3b351443acd 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -17,6 +17,7 @@ #include #include "rocksdb/flush_block_policy.h" +#include "rocksdb/listener.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" @@ -48,7 +49,7 @@ class BlockBasedTableBuilder : public TableBuilder { const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name); + const std::string& column_family_name, const uint64_t creation_time = 0); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 8db76ea38ed..8745027a6d1 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -72,7 +72,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.compression_opts, table_builder_options.compression_dict, table_builder_options.skip_filters, - table_builder_options.column_family_name); + table_builder_options.column_family_name, + table_builder_options.creation_time); return table_builder; } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 6af536fbcd4..229b7a7cfaa 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -77,6 +77,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kFormatVersion, props.format_version); Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); + Add(TablePropertiesNames::kCreationTime, props.creation_time); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); @@ -208,6 +209,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, &new_table_properties->fixed_key_len}, {TablePropertiesNames::kColumnFamilyId, &new_table_properties->column_family_id}, + {TablePropertiesNames::kCreationTime, + &new_table_properties->creation_time}, }; std::string last_key; diff --git a/table/table_builder.h b/table/table_builder.h index cacb2a65dd8..4e413b41110 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -56,7 +56,8 @@ struct TableBuilderOptions { CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, bool _skip_filters, - const std::string& _column_family_name, int _level) + const std::string& _column_family_name, int _level, + const uint64_t _creation_time = 0) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), @@ -65,7 +66,8 @@ struct TableBuilderOptions { compression_dict(_compression_dict), skip_filters(_skip_filters), column_family_name(_column_family_name), - level(_level) {} + level(_level), + creation_time(_creation_time) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -77,6 +79,7 @@ struct TableBuilderOptions { bool skip_filters; // only used by BlockBasedTableBuilder const std::string& column_family_name; int level; // what level this table/file is on, -1 for "not set, don't know" + const uint64_t creation_time; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_properties.cc b/table/table_properties.cc index b03928e8868..f3373ba539d 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -139,6 +139,8 @@ std::string TableProperties::ToString( compression_name.empty() ? std::string("N/A") : compression_name, prop_delim, kv_delim); + AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim); + return result; } @@ -190,6 +192,7 @@ const std::string TablePropertiesNames::kPrefixExtractorName = const std::string TablePropertiesNames::kPropertyCollectors = "rocksdb.property.collectors"; const std::string TablePropertiesNames::kCompression = "rocksdb.compression"; +const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 1ecae9a49d6..497c28a082d 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -638,6 +638,7 @@ DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0, "The limit of total table file sizes to trigger FIFO compaction"); DEFINE_bool(fifo_compaction_allow_compaction, true, "Allow compaction in FIFO compaction."); +DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds."); #endif // ROCKSDB_LITE DEFINE_bool(report_bg_io_stats, false, @@ -2864,7 +2865,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, - FLAGS_fifo_compaction_allow_compaction); + FLAGS_fifo_compaction_ttl, FLAGS_fifo_compaction_allow_compaction); #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset( From 2d87ed4b8e54537a0d165e6b14f3c46770444dbf Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Thu, 22 Jun 2017 13:49:34 -0700 Subject: [PATCH 02/11] Remove accidental additions to TableFileCreationReason enum. --- include/rocksdb/listener.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 065fc9fd0af..40d318e0941 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,8 +27,6 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, - kBulkLoading, - kUnknown, }; struct TableFileCreationBriefInfo { From 5d8f1844978f97ad9e773a483aa5338612c82565 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 12:58:29 -0700 Subject: [PATCH 03/11] Cleaned up FIFO compaction with TTL logic, and added tests. - Added unit tests. - Split PickCompaction into PickTTLCompaction and PickSizeCompaction. - Fixed a bug in BuildTable where time wasn't being passed to NewTableBuilder. --- db/builder.cc | 3 +- db/compaction_picker.cc | 159 +++++++++++++++++++------------- db/compaction_picker.h | 13 +++ db/db_test.cc | 200 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 309 insertions(+), 66 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 83a425d3a98..6d34c9efe96 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -126,7 +126,8 @@ Status BuildTable( builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, - compression_opts, level); + compression_opts, level, nullptr /* compression_dict */, + false /* skip_filters */, creation_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e7897d1a576..a0c177b5ad6 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1405,17 +1405,79 @@ bool FIFOCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } -Compaction* FIFOCompactionPicker::PickCompaction( +uint64_t FIFOCompactionPicker::GetTotalFilesSize( + const std::vector& files) { + uint64_t total_size = 0; + for (const auto& f : files) { + total_size += f->fd.file_size; + } + return total_size; +} + +Compaction* FIFOCompactionPicker::PickTTLCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - assert(vstorage->num_levels() == 1); + assert(ioptions_.compaction_options_fifo.ttl > 0); + const int kLevel0 = 0; const std::vector& level_files = vstorage->LevelFiles(kLevel0); - uint64_t total_size = 0; - for (const auto& file : level_files) { - total_size += file->fd.file_size; + uint64_t total_size = GetTotalFilesSize(level_files); + + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Not doing compactions based on TTL. ", + cf_name.c_str(), status.ToString().c_str()); + return nullptr; + } + const uint64_t current_time = static_cast(_current_time); + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + auto props = f->fd.table_reader->GetTableProperties(); + auto creation_time = props->creation_time; + if (creation_time == 0) { + continue; + } else if (creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } } + // Return a nullptr and proceed to size-based FIFO compaction if: + // 1. there are no files older than ttl OR + // 2. there are a few files older than ttl, but deleting them will not bring + // the total size to be less than max_table_files_size threshold. + if (inputs[0].files.empty() || + total_size > ioptions_.compaction_options_fifo.max_table_files_size) { + return nullptr; + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, + kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ true, CompactionReason::kFIFOTtl); + return c; +} + +Compaction* FIFOCompactionPicker::PickSizeCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + uint64_t total_size = GetTotalFilesSize(level_files); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || level_files.size() == 0) { // total size not exceeded @@ -1435,7 +1497,6 @@ Compaction* FIFOCompactionPicker::PickCompaction( /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); - RegisterCompaction(c); return c; } } @@ -1457,74 +1518,44 @@ Compaction* FIFOCompactionPicker::PickCompaction( return nullptr; } - auto reason = CompactionReason::kFIFOTtl; std::vector inputs; inputs.emplace_back(); inputs[0].level = 0; - // Delete files based on TTL - const bool ttl_enabled = ioptions_.compaction_options_fifo.ttl > 0; - if (ttl_enabled) { - int64_t _current_time; - auto status = ioptions_.env->GetCurrentTime(&_current_time); - const uint64_t current_time = static_cast(_current_time); - if (status.ok()) { - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); - ++ritr) { - auto f = *ritr; - auto props = f->fd.table_reader->GetTableProperties(); - auto creation_time = props->creation_time; - if (creation_time == 0) { - // Newer files that we might encounter later might have a - // creation_time embedded in them. But we don't want to proceed to - // them before dropping all the old files with the default '0' - // timestamp. Hence, stop. - break; - } - if (creation_time < - (current_time - ioptions_.compaction_options_fifo.ttl)) { - inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); - } - } - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: Couldn't get current time: %s. " - "Falling back to size-based table file deletions. ", - cf_name.c_str(), status.ToString().c_str()); - } - } - - // If TTL-based-deletion is not enabled, or enabled but the table files have - // a creation time of 0 (old files), then we fall back to deleting oldest - // files based on size. - const bool should_proceed_to_size_based_deletion = inputs[0].files.empty(); - if (should_proceed_to_size_based_deletion) { - reason = CompactionReason::kFIFOMaxSize; - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - char tmp_fsize[16]; - AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with size %s for deletion", - cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= - ioptions_.compaction_options_fifo.max_table_files_size) { - break; - } + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { + break; } } Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, reason); + /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + return c; +} + +Compaction* FIFOCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(vstorage->num_levels() == 1); + + Compaction* c = nullptr; + if (ioptions_.compaction_options_fifo.ttl > 0) { + c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } + if (c == nullptr) { + c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } RegisterCompaction(c); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ae829cb4961..c289fcb37a0 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -244,6 +244,19 @@ class FIFOCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; + + private: + Compaction* PickTTLCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + Compaction* PickSizeCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + uint64_t GetTotalFilesSize(const std::vector& files); }; class NullCompactionPicker : public CompactionPicker { diff --git a/db/db_test.cc b/db/db_test.cc index 52ee7306842..e88bfeb3b7b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2792,7 +2792,7 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_EQ(NumTableFilesAtLevel(0), 10); for (int i = 0; i < 60; i++) { - // Generate and flush a file about 10KB. + // Generate and flush a file about 20KB. for (int j = 0; j < 20; j++) { ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); } @@ -2807,6 +2807,204 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_LE(SizeAtLevel(0), options.compaction_options_fifo.max_table_files_size); } + +TEST_F(DBTest, FIFOCompactionTestWithTTL) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.write_buffer_size = 10 << 10; // 10KB + options.arena_block_size = 4096; + options.compression = kNoCompression; + options.create_if_missing = true; + + // Test to make sure that all files with expired ttl are deleted on next + // manual compaction. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 600; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // sleep for 5 seconds + env_->SleepForMicroseconds(5 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // change ttl to 1 sec. So all files should be deleted on next compaction. + options.compaction_options_fifo.ttl = 1; + Reopen(options); + + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + } + + // Test to make sure that all files with expired ttl are deleted on compaction + // that is triggerred by size going beyond max_table_files_size threshold. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // Create 10 more files. The old 10 files are dropped. + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + + // Only the new 10 files remain. + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test that shows the fall back to size-based FIFO compaction if TTL-based + // deletion doesn't move the total size to be less than max_table_files_size. + { + options.write_buffer_size = 110 << 10; // 10KB + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 3; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 140; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with TTL + Intra-L0 compactions. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 5; // seconds + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1 + // (due to level0_file_num_compaction_trigger = 6). + // So total files = 1 + remaining 4 = 5. + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Sleep for a little over ttl time. + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Create 10 more files. The old 5 files are dropped as their ttl expired. + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with large TTL + Intra-L0 compactions. + // Files dropped based on size, as ttl doesn't kick in. + { + options.write_buffer_size = 20 << 10; // 20K + options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1.5MB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 60 * 60; // 1 hour + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // It should be compacted to 10 files. + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // It should be compacted to no more than 20 files. + ASSERT_GT(NumTableFilesAtLevel(0), 10); + ASSERT_LT(NumTableFilesAtLevel(0), 18); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } +} #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE From 3a90501f687efcf1d92356d900665b048d648be0 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 13:02:15 -0700 Subject: [PATCH 04/11] Added creation_time to the tables built during repair and recovery. Also handled the return status of GetCurrentTime, so that a garbage value is not used when return statu is not ok. time is set to 0 instead. --- db/compaction_job.cc | 13 ++++++++++++- db/db_impl_open.cc | 18 +++++++++++++++++- db/flush_job.cc | 6 ++++-- db/repair.cc | 12 +++++++++++- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4778bc2b46c..af83532a1e8 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1263,13 +1263,24 @@ Status CompactionJob::OpenCompactionOutputFile( bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + uint64_t output_file_creation_time = + sub_compact->compaction->MaxInputFileCreationTime(); + if (output_file_creation_time == 0) { + int64_t _current_time; + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + output_file_creation_time = static_cast(_current_time); + } + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, sub_compact->compaction->output_level(), &sub_compact->compression_dict, - skip_filters, sub_compact->compaction->MaxInputFileCreationTime())); + skip_filters, output_file_creation_time)); LogFlush(db_options_.info_log); return s; } diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 995b329bfa6..4a81ff4b9f6 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -18,6 +18,7 @@ #include "db/builder.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" +#include "table/block_based_table_factory.h" #include "util/rate_limiter.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" @@ -164,6 +165,12 @@ static Status ValidateOptions( "universal and level compaction styles. "); } } + if (cfd.options.compaction_options_fifo.ttl > 0 && + cfd.options.table_factory->Name() != BlockBasedTableFactory().Name()) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported in " + "Block-Based Table format. "); + } } if (db_options.db_paths.size() > 4) { @@ -832,6 +839,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, *cfd->GetLatestMutableCFOptions(); bool paranoid_file_checks = cfd->GetLatestMutableCFOptions()->paranoid_file_checks; + + int64_t _current_time; + s = env_->GetCurrentTime(&_current_time); + if (!s.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + { mutex_.Unlock(); @@ -851,7 +866,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, - &event_logger_, job_id); + &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/flush_job.cc b/db/flush_job.cc index adeb7051df2..d93e2c64e81 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -243,7 +243,6 @@ Status FlushJob::WriteLevel0Table() { ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); - Status s; { db_mutex_->Unlock(); @@ -301,7 +300,10 @@ Status FlushJob::WriteLevel0Table() { db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); int64_t _current_time; - db_options_.env->GetCurrentTime(&_current_time); + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } const uint64_t current_time = static_cast(_current_time); s = BuildTable( diff --git a/db/repair.cc b/db/repair.cc index 1f9e344e130..da6e0f958d0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -382,6 +382,14 @@ class Repairer { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); EnvOptions optimized_env_options = env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); + + int64_t _current_time; + status = env_->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), optimized_env_options, table_cache_, iter.get(), @@ -389,7 +397,9 @@ class Repairer { &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */, TableFileCreationReason::kRecovery); + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), From 20514a969f223a5a5d0f70ac30382c9a345314a8 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 13:05:51 -0700 Subject: [PATCH 05/11] Add ttl param to the end in CompactionOptionsFIFO constructor. with a default value of 0. --- include/rocksdb/advanced_options.h | 4 ++-- tools/db_bench_tool.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 07f4c8a3cd1..701bcb320a9 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -78,8 +78,8 @@ struct CompactionOptionsFIFO { bool allow_compaction = false; CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} - CompactionOptionsFIFO(uint64_t _max_table_files_size, uint64_t _ttl, - bool _allow_compaction) + CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction, + uint64_t _ttl = 0) : max_table_files_size(_max_table_files_size), ttl(_ttl), allow_compaction(_allow_compaction) {} diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 497c28a082d..6193e603fd4 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2865,7 +2865,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, - FLAGS_fifo_compaction_ttl, FLAGS_fifo_compaction_allow_compaction); + FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl); #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset( From ee95191856e14488f91979767ba0c2e856999b8f Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 00:39:38 -0700 Subject: [PATCH 06/11] Add to HISTORY.md and update score based on TTL. --- HISTORY.md | 1 + db/compaction.cc | 9 +++++---- db/compaction_picker.cc | 25 +++++++++++++------------ db/db_test.cc | 11 ++++++----- db/version_set.cc | 30 ++++++++++++++++++++++++++++++ 5 files changed, 55 insertions(+), 21 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index cabf93b73e2..d13fa01f98e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### New Features * Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property. * RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions. +* [Experimental] FIFO compaction with TTL support. It can be enabled by setting CompactionOptionsFIFO.ttl > 0. ## 5.6.0 (06/06/2017) ### Public API Change diff --git a/db/compaction.cc b/db/compaction.cc index 0d5253814f0..e7dfb529cff 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -465,10 +465,11 @@ uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { for (const auto& file : inputs_[0].files) { - uint64_t creation_time = - file->fd.table_reader->GetTableProperties()->creation_time; - max_creation_time = - creation_time > max_creation_time ? creation_time : max_creation_time; + if (file->fd.table_reader != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); + } } } return max_creation_time; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index a0c177b5ad6..aac7db967f1 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1440,18 +1440,19 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; - auto props = f->fd.table_reader->GetTableProperties(); - auto creation_time = props->creation_time; - if (creation_time == 0) { - continue; - } else if (creation_time < - (current_time - ioptions_.compaction_options_fifo.ttl)) { - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); + if (f->fd.table_reader != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } } } diff --git a/db/db_test.cc b/db/db_test.cc index e88bfeb3b7b..2dac10f984b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2849,8 +2849,8 @@ TEST_F(DBTest, FIFOCompactionTestWithTTL) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); } - // Test to make sure that all files with expired ttl are deleted on compaction - // that is triggerred by size going beyond max_table_files_size threshold. + // Test to make sure that all files with expired ttl are deleted on next + // automatic compaction. { options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.allow_compaction = false; @@ -2873,16 +2873,17 @@ TEST_F(DBTest, FIFOCompactionTestWithTTL) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ(NumTableFilesAtLevel(0), 10); - // Create 10 more files. The old 10 files are dropped. - for (int i = 0; i < 10; i++) { + // Create 1 more file to trigger TTL compaction. The old files are dropped. + for (int i = 0; i < 1; i++) { for (int j = 0; j < 10; j++) { ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); } Flush(); } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Only the new 10 files remain. - ASSERT_EQ(NumTableFilesAtLevel(0), 10); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_LE(SizeAtLevel(0), options.compaction_options_fifo.max_table_files_size); } diff --git a/db/version_set.cc b/db/version_set.cc index 6c220b5ef82..008e22c127b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1318,6 +1318,31 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded( } } +namespace { +uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, + const std::vector& files) { + uint32_t ttl_expired_files_count = 0; + + int64_t _current_time; + auto status = ioptions.env->GetCurrentTime(&_current_time); + if (status.ok()) { + const uint64_t current_time = static_cast(_current_time); + for (auto f : files) { + if (!f->being_compacted && f->fd.table_reader != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions.compaction_options_fifo.ttl)) { + ttl_expired_files_count++; + } + } + } + } + return ttl_expired_files_count; +} +} // anonymous namespace + void VersionStorageInfo::ComputeCompactionScore( const ImmutableCFOptions& immutable_cf_options, const MutableCFOptions& mutable_cf_options) { @@ -1364,6 +1389,11 @@ void VersionStorageInfo::ComputeCompactionScore( mutable_cf_options.level0_file_num_compaction_trigger, score); } + if (immutable_cf_options.compaction_options_fifo.ttl > 0) { + score = std::max(static_cast(GetExpiredTtlFilesCount( + immutable_cf_options, files_[level])), + score); + } } else { score = static_cast(num_sorted_runs) / From e6fc920860db84005e25fa4f848768718a6aa5b2 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 11:48:30 -0700 Subject: [PATCH 07/11] Return MaxInputFileCreationTime irrespective of compaction. Also moved GetTotalFilesSize into anonymouse namespace from FIFOCompactionPicker class. --- db/compaction.cc | 12 +++++------- db/compaction_picker.cc | 4 +++- db/compaction_picker.h | 2 -- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index e7dfb529cff..e4690fc4b5a 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -463,13 +463,11 @@ bool Compaction::ShouldFormSubcompactions() const { uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; - if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { - for (const auto& file : inputs_[0].files) { - if (file->fd.table_reader != nullptr) { - uint64_t creation_time = - file->fd.table_reader->GetTableProperties()->creation_time; - max_creation_time = std::max(max_creation_time, creation_time); - } + for (const auto& file : inputs_[0].files) { + if (file->fd.table_reader != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); } } return max_creation_time; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index aac7db967f1..04038f83b47 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1405,7 +1405,8 @@ bool FIFOCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } -uint64_t FIFOCompactionPicker::GetTotalFilesSize( +namespace { +uint64_t GetTotalFilesSize( const std::vector& files) { uint64_t total_size = 0; for (const auto& f : files) { @@ -1413,6 +1414,7 @@ uint64_t FIFOCompactionPicker::GetTotalFilesSize( } return total_size; } +} // anonymous namespace Compaction* FIFOCompactionPicker::PickTTLCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c289fcb37a0..eb5f06819b6 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -255,8 +255,6 @@ class FIFOCompactionPicker : public CompactionPicker { const MutableCFOptions& mutable_cf_options, VersionStorageInfo* version, LogBuffer* log_buffer); - - uint64_t GetTotalFilesSize(const std::vector& files); }; class NullCompactionPicker : public CompactionPicker { From 7c096ef84275ec0626f993b3a83fe6a353338678 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 14:47:17 -0700 Subject: [PATCH 08/11] Move log message to the right place. Since there is a potential chance that we could discard the files picked in the main loop in PickTTLCompaction, move the log message to the right place, to be right before creating the Compaction object. --- db/compaction_picker.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 04038f83b47..17f223353e9 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1450,10 +1450,6 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( (current_time - ioptions_.compaction_options_fifo.ttl)) { total_size -= f->compensated_file_size; inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); } } } @@ -1467,6 +1463,14 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( return nullptr; } + for (const auto& f : inputs[0].files) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), + f->fd.table_reader->GetTableProperties()->creation_time); + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), From a87140a1c50b776f51c05cac8101b69e3a91876a Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 16:30:35 -0700 Subject: [PATCH 09/11] Support FIFO-Compaction-with-TTL only with unlimited open files. Support FIFO-compaction-with-TTL only when max_open_files=-1, as the creation_time embedded in every file's table properties need to be consulted to figure out the files that need to be deleted. table_reader embedded in a FileDescriptor could potentially get deleted if max_open_files is not set to -1. We could initialize the table_reader again to get the table properties again, but it would involve a performance penalty due to reading new blocks from disk. We could potentially support it in the future, but may be not in the first version. --- db/db_impl_open.cc | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 4a81ff4b9f6..9641d492dbc 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -165,11 +165,18 @@ static Status ValidateOptions( "universal and level compaction styles. "); } } - if (cfd.options.compaction_options_fifo.ttl > 0 && - cfd.options.table_factory->Name() != BlockBasedTableFactory().Name()) { - return Status::NotSupported( - "FIFO Compaction with TTL is only supported in " - "Block-Based Table format. "); + if (cfd.options.compaction_options_fifo.ttl > 0) { + if (db_options.max_open_files != -1) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported when files are always " + "kept open (set max_open_files = -1). "); + } + if (cfd.options.table_factory->Name() != + BlockBasedTableFactory().Name()) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported in " + "Block-Based Table format. "); + } } } From 05bf810a78a387495b868beb1d483217fd270776 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 23:35:20 -0700 Subject: [PATCH 10/11] Add tests to check the compatiblity of FIFO-with-TTL with other options. 1. It is only supported with max_open_files = -1. 2. It is only supported with Block based table format. --- db/db_test.cc | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/db/db_test.cc b/db/db_test.cc index 2dac10f984b..2a3546defd0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2808,7 +2808,47 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { options.compaction_options_fifo.max_table_files_size); } -TEST_F(DBTest, FIFOCompactionTestWithTTL) { +// Check that FIFO-with-TTL is not supported with max_open_files != -1. +TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + // Check that it is not supported with max_open_files != -1. + options.max_open_files = 100; + options = CurrentOptions(options); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + options.max_open_files = -1; + ASSERT_OK(TryReopen(options)); +} + +// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory. +TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + options = CurrentOptions(options); + options.table_factory.reset(NewBlockBasedTableFactory()); + ASSERT_OK(TryReopen(options)); + + Destroy(options); + options.table_factory.reset(NewPlainTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewCuckooTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewAdaptiveTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); +} + +TEST_F(DBTest, FIFOCompactionWithTTLTest) { Options options; options.compaction_style = kCompactionStyleFIFO; options.write_buffer_size = 10 << 10; // 10KB From 12a5edac7042b3b800cbc5fa47767032489b1f06 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Tue, 27 Jun 2017 16:27:31 -0700 Subject: [PATCH 11/11] Pick only continguous files for deletion based on TTL. Addressing review comments: 1. Pick only continguous files for deletion in FIFO-with-TTL. 2. Add a check to make sure that GetTableProperties() does not return a null pointer, before accessing fields fruther. --- db/compaction.cc | 3 ++- db/compaction_picker.cc | 12 +++++++----- db/version_set.cc | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index e4690fc4b5a..bb2384a3598 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -464,7 +464,8 @@ bool Compaction::ShouldFormSubcompactions() const { uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; for (const auto& file : inputs_[0].files) { - if (file->fd.table_reader != nullptr) { + if (file->fd.table_reader != nullptr && + file->fd.table_reader->GetTableProperties() != nullptr) { uint64_t creation_time = file->fd.table_reader->GetTableProperties()->creation_time; max_creation_time = std::max(max_creation_time, creation_time); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 17f223353e9..fc6a8a8da86 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1442,15 +1442,17 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; - if (f->fd.table_reader != nullptr) { + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; - if (creation_time > 0 && - creation_time < + if (creation_time == 0 || + creation_time >= (current_time - ioptions_.compaction_options_fifo.ttl)) { - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); + break; } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); } } diff --git a/db/version_set.cc b/db/version_set.cc index 008e22c127b..9251cbd6d14 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1328,7 +1328,8 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, if (status.ok()) { const uint64_t current_time = static_cast(_current_time); for (auto f : files) { - if (!f->being_compacted && f->fd.table_reader != nullptr) { + if (!f->being_compacted && f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; if (creation_time > 0 &&