diff --git a/bolt/connectors/hive/HiveConnectorSplit.h b/bolt/connectors/hive/HiveConnectorSplit.h index 89e9c7044..fd2fecc95 100644 --- a/bolt/connectors/hive/HiveConnectorSplit.h +++ b/bolt/connectors/hive/HiveConnectorSplit.h @@ -41,6 +41,7 @@ inline const std::string KPaimonDeletionFilePath = "paimon.deletion.file.path"; inline const std::string KPaimonDeletionBinOffset = "paimon.deletion.bin.offset"; inline const std::string KPaimonDeletionBinSize = "paimon.deletion.bin.size"; +constexpr std::string_view kDefaultHiveConnectorId = "hive-default"; struct HiveConnectorSplitCacheLimit : public ISerializable { // record table is or not cache, and table partition range limit; @@ -205,4 +206,122 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { static void registerSerDe(); }; +/// Builder for HiveConnectorSplit. Allows constructing splits incrementally +/// without specifying every constructor argument. +class HiveConnectorSplitBuilder { + public: + explicit HiveConnectorSplitBuilder(std::string filePath) + : filePath_{std::move(filePath)} {} + + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { + connectorId_ = connectorId; + return *this; + } + + HiveConnectorSplitBuilder& start(uint64_t start) { + start_ = start; + return *this; + } + + HiveConnectorSplitBuilder& length(uint64_t length) { + length_ = length; + return *this; + } + + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { + fileFormat_ = format; + return *this; + } + + HiveConnectorSplitBuilder& partitionKey( + std::string name, + std::optional value) { + partitionKeys_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { + tableBucketNumber_ = bucket; + return *this; + } + + HiveConnectorSplitBuilder& cacheLimit(HiveConnectorSplitCacheLimit cl) { + cacheLimit_ = std::make_shared(std::move(cl)); + return *this; + } + + HiveConnectorSplitBuilder& customSplitInfo( + std::unordered_map info) { + customSplitInfo_ = std::move(info); + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + std::shared_ptr extraFileInfo) { + extraFileInfo_ = std::move(extraFileInfo); + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + std::unordered_map params) { + serdeParameters_ = std::move(params); + return *this; + } + + HiveConnectorSplitBuilder& fileSize(uint64_t size) { + fileSize_ = size; + return *this; + } + + HiveConnectorSplitBuilder& rowIdProperties(RowIdProperties props) { + rowIdProperties_ = std::move(props); + return *this; + } + + HiveConnectorSplitBuilder& infoColumn( + const std::string& name, + const std::string& value) { + infoColumns_.emplace(name, value); + return *this; + } + + std::shared_ptr build() const { + const auto effectivePath = + filePath_.find('/') == 0 ? "file:" + filePath_ : filePath_; + return std::make_shared( + connectorId_, + effectivePath, + fileFormat_, + start_, + length_, + partitionKeys_, + tableBucketNumber_, + cacheLimit_ + ? std::make_unique(*cacheLimit_) + : nullptr, + customSplitInfo_, + extraFileInfo_, + serdeParameters_, + fileSize_, + rowIdProperties_, + infoColumns_); + } + + private: + std::string filePath_; + std::string connectorId_{kDefaultHiveConnectorId}; + dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::PARQUET}; + uint64_t start_{0}; + uint64_t length_{std::numeric_limits::max()}; + std::unordered_map> partitionKeys_; + std::optional tableBucketNumber_; + std::shared_ptr cacheLimit_; + std::unordered_map customSplitInfo_; + std::shared_ptr extraFileInfo_; + std::unordered_map serdeParameters_; + uint64_t fileSize_{0}; + std::optional rowIdProperties_; + std::unordered_map infoColumns_; +}; + } // namespace bytedance::bolt::connector::hive diff --git a/bolt/connectors/hive/HiveObjectFactory.cpp b/bolt/connectors/hive/HiveObjectFactory.cpp index 60b049468..cb54dc93a 100644 --- a/bolt/connectors/hive/HiveObjectFactory.cpp +++ b/bolt/connectors/hive/HiveObjectFactory.cpp @@ -39,93 +39,80 @@ std::shared_ptr HiveObjectFactory::makeConnectorSplit( auto* dyn = dynamic_cast(&optsBase); BOLT_CHECK(dyn != nullptr, "Expected DynamicConnectorOptions"); const auto& options = dyn->options; - dwio::common::FileFormat fileFormat = + HiveConnectorSplitBuilder builder(filePath); + builder.connectorId(connectorId()).start(start).length(length); + + builder.fileFormat( (!options.isNull() && options.count("fileFormat")) - ? static_cast(options["fileFormat"].asInt()) - : defaultFileFormat_; + ? static_cast(options["fileFormat"].asInt()) + : defaultFileFormat_); - std::unordered_map> partitionKeys; if (!options.isNull() && options.count("partitionKeys")) { for (auto& kv : options["partitionKeys"].items()) { - partitionKeys[kv.first.asString()] = kv.second.isNull() - ? std::nullopt - : std::optional(kv.second.asString()); + builder.partitionKey( + kv.first.asString(), + kv.second.isNull() + ? std::nullopt + : std::optional(kv.second.asString())); } } - std::optional tableBucketNumber; if (!options.isNull() && options.count("tableBucketNumber")) { - tableBucketNumber = options["tableBucketNumber"].asInt(); + builder.tableBucketNumber(options["tableBucketNumber"].asInt()); } - std::unordered_map customSplitInfo; if (!options.isNull() && options.count("customSplitInfo")) { + std::unordered_map info; for (auto& kv : options["customSplitInfo"].items()) { - customSplitInfo[kv.first.asString()] = kv.second.asString(); + info[kv.first.asString()] = kv.second.asString(); } + builder.customSplitInfo(std::move(info)); } - std::shared_ptr extraFileInfo; if (!options.isNull() && options.count("extraFileInfo")) { - extraFileInfo = options["extraFileInfo"].isNull() - ? std::shared_ptr() - : std::make_shared(options["extraFileInfo"].asString()); + if (!options["extraFileInfo"].isNull()) { + builder.extraFileInfo( + std::make_shared(options["extraFileInfo"].asString())); + } } - std::unordered_map serdeParameters; if (!options.isNull() && options.count("serdeParameters")) { + std::unordered_map params; for (auto& kv : options["serdeParameters"].items()) { - serdeParameters[kv.first.asString()] = kv.second.asString(); + params[kv.first.asString()] = kv.second.asString(); } + builder.serdeParameters(std::move(params)); } - std::unique_ptr hiveConnectorSplitCacheLimit; if (!options.isNull() && options.count("hiveConnectorSplitCacheLimit")) { - hiveConnectorSplitCacheLimit = HiveConnectorSplitCacheLimit::create( - options["hiveConnectorSplitCacheLimit"]); + builder.cacheLimit(*HiveConnectorSplitCacheLimit::create( + options["hiveConnectorSplitCacheLimit"])); } - uint64_t fileSize = 0; if (!options.isNull() && options.count("fileProperties")) { const auto& propertiesOption = options["fileProperties"]; if (propertiesOption.count("fileSize") && !propertiesOption["fileSize"].isNull()) { - fileSize = propertiesOption["fileSize"].asInt(); + builder.fileSize(propertiesOption["fileSize"].asInt()); } } - std::optional rowIdProperties; if (!options.isNull() && options.count("rowIdProperties")) { RowIdProperties props; const auto& rowIdPropertiesOption = options["rowIdProperties"]; props.metadataVersion = rowIdPropertiesOption["metadataVersion"].asInt(); props.partitionId = rowIdPropertiesOption["partitionId"].asInt(); props.tableGuid = rowIdPropertiesOption["tableGuid"].asString(); - rowIdProperties = props; + builder.rowIdProperties(std::move(props)); } - std::unordered_map infoColumns; if (!options.isNull() && options.count("infoColumns")) { for (auto& kv : options["infoColumns"].items()) { - infoColumns[kv.first.asString()] = kv.second.asString(); + builder.infoColumn(kv.first.asString(), kv.second.asString()); } } - return std::make_shared( - connectorId(), - filePath, - fileFormat, - start, - length, - partitionKeys, - tableBucketNumber, - std::move(hiveConnectorSplitCacheLimit), - customSplitInfo, - extraFileInfo, - serdeParameters, - fileSize, - rowIdProperties, - infoColumns); + return builder.build(); } std::shared_ptr HiveObjectFactory::makeColumnHandle( diff --git a/bolt/exec/tests/AssertQueryBuilderTest.cpp b/bolt/exec/tests/AssertQueryBuilderTest.cpp index 89d93032d..f73d6ed0b 100644 --- a/bolt/exec/tests/AssertQueryBuilderTest.cpp +++ b/bolt/exec/tests/AssertQueryBuilderTest.cpp @@ -34,6 +34,7 @@ #include "bolt/exec/tests/utils/PlanBuilder.h" #include "bolt/vector/fuzzer/VectorFuzzer.h" namespace bytedance::bolt::exec::test { +using connector::hive::HiveConnectorSplitBuilder; class AssertQueryBuilderTest : public HiveConnectorTestBase {}; @@ -121,6 +122,8 @@ TEST_F(AssertQueryBuilderTest, hiveSplits) { .planNode(), duckDbQueryRunner_) .split(HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("ds", "2022-05-10") .build()) .assertResults( diff --git a/bolt/exec/tests/HashJoinTest.cpp b/bolt/exec/tests/HashJoinTest.cpp index 819546b20..50b48cfa1 100644 --- a/bolt/exec/tests/HashJoinTest.cpp +++ b/bolt/exec/tests/HashJoinTest.cpp @@ -53,6 +53,7 @@ using namespace bytedance::bolt::exec; using namespace bytedance::bolt::exec::test; using namespace bytedance::bolt::common::testutil; +using bytedance::bolt::connector::hive::HiveConnectorSplitBuilder; using bytedance::bolt::test::BatchMaker; namespace { @@ -5366,6 +5367,8 @@ TEST_F(HashJoinTest, dynamicFiltersWithSkippedSplits) { // We add splits that have no rows. auto makeEmpty = [&]() { return exec::Split(HiveConnectorSplitBuilder(tempFiles.back()->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .start(10000000) .length(1) .build()); @@ -5570,6 +5573,8 @@ TEST_F(HashJoinTest, dynamicFiltersAppliedToPreloadedSplits) { tempFiles.push_back(TempFilePath::create()); writeToFile(tempFiles.back()->path, rowVector); auto split = HiveConnectorSplitBuilder(tempFiles.back()->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("p1", std::to_string(i)) .build(); probeSplits.push_back(exec::Split(split)); @@ -6179,10 +6184,11 @@ TEST_F(HashJoinTest, dynamicFilterOnPartitionKey) { std::vector buildVectors{ makeRowVector({"c0"}, {makeFlatVector({0, 1, 2})})}; createDuckDbTable("t", buildVectors); - auto split = - bytedance::bolt::exec::test::HiveConnectorSplitBuilder(filePaths[0]->path) - .partitionKey("k", "0") - .build(); + auto split = HiveConnectorSplitBuilder(filePaths[0]->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .partitionKey("k", "0") + .build(); auto outputType = ROW({"n1_0", "n1_1"}, {BIGINT(), BIGINT()}); ColumnHandleMap assignments = { {"n1_0", regularColumn("c0", BIGINT())}, diff --git a/bolt/exec/tests/TableScanTest.cpp b/bolt/exec/tests/TableScanTest.cpp index deb1f2ae8..eeb974ba9 100644 --- a/bolt/exec/tests/TableScanTest.cpp +++ b/bolt/exec/tests/TableScanTest.cpp @@ -196,6 +196,8 @@ class TableScanTest : public virtual HiveConnectorTestBase { const TypePtr& partitionType, const std::optional& partitionValue) { auto split = HiveConnectorSplitBuilder(filePath) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("pkey", partitionValue) .build(); auto outputType = @@ -474,6 +476,8 @@ TEST_F(TableScanTest, partitionKeyAlias) { {"ds_alias", partitionKey("ds", VARCHAR())}}; auto split = HiveConnectorSplitBuilder(filePath->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("ds", "2021-12-02") .build(); @@ -1762,7 +1766,10 @@ TEST_F(TableScanTest, splitOffsetAndLength) { } TEST_F(TableScanTest, fileNotFound) { - auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); + auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc") + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto assertMissingFile = [&](bool ignoreMissingFiles) { AssertQueryBuilder(tableScanNode()) .connectorSessionProperty( @@ -1784,6 +1791,8 @@ TEST_F(TableScanTest, validFileNoData) { auto filePath = bytedance::bolt::test::getDataFilePath("data/emptyPresto.dwrf"); auto split = HiveConnectorSplitBuilder(filePath) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .start(0) .length(fs::file_size(filePath) / 2) .build(); @@ -1892,6 +1901,8 @@ TEST_F(TableScanTest, partitionedTableDateKey) { // Test partition filter on date column. { auto split = HiveConnectorSplitBuilder(filePath->getPath()) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("pkey", partitionValue) .build(); auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()}); @@ -2795,6 +2806,8 @@ TEST_F(TableScanTest, bucket) { rowVectors.emplace_back(rowVector); splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .tableBucketNumber(bucket) .build()); } @@ -2821,6 +2834,8 @@ TEST_F(TableScanTest, bucket) { for (int i = 0; i < buckets.size(); ++i) { int bucketValue = buckets[i]; auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .tableBucketNumber(bucketValue) .build(); @@ -2841,6 +2856,8 @@ TEST_F(TableScanTest, bucket) { // Filter on bucket column, but don't project it out auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); hsplit = HiveConnectorSplitBuilder(filePaths[i]->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .tableBucketNumber(bucketValue) .build(); op = PlanBuilder() @@ -3686,7 +3703,10 @@ TEST_F(TableScanTest, reuseRowVector) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan).splits({split, split}).assertResults(expected); @@ -4232,6 +4252,8 @@ TEST_F(TableScanTest, varbinaryPartitionKey) { {"ds_alias", partitionKey("ds", VARBINARY())}}; auto split = HiveConnectorSplitBuilder(filePath->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .partitionKey("ds", "2021-12-02") .build(); @@ -4267,8 +4289,11 @@ TEST_F(TableScanTest, timestampPartitionKey) { ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; std::vector> splits; for (auto& t : inputs) { - splits.push_back( - HiveConnectorSplitBuilder(filePath->path).partitionKey("t", t).build()); + splits.push_back(HiveConnectorSplitBuilder(filePath->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .partitionKey("t", t) + .build()); } auto plan = PlanBuilder() .startTableScan() @@ -4291,13 +4316,19 @@ TEST_F(TableScanTest, paimonDeletionVector) { .planNode(); auto deletionFilePath = bytedance::bolt::test::getDataFilePath("data/deletionFile"); - auto split1 = HiveConnectorSplitBuilder(file->path).build(); + auto split1 = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); // delete 3, 5, 9 rows split1->customSplitInfo.insert( {{connector::hive::KPaimonDeletionFilePath, deletionFilePath}, {connector::hive::KPaimonDeletionBinOffset, "61"}, {connector::hive::KPaimonDeletionBinSize, "26"}}); - auto split2 = HiveConnectorSplitBuilder(file->path).build(); + auto split2 = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); // delete 1, 5, 6, 9, 13, 14 rows split2->customSplitInfo.insert( {{connector::hive::KPaimonDeletionFilePath, deletionFilePath}, @@ -4568,7 +4599,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenPrepareDisable) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan).splits({split, split}).assertResults(expected); @@ -4597,7 +4631,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenPrepareAttempt3) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan) @@ -4632,7 +4669,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenPrepareCanIgnore) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector({makeFlatVector(0, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan) @@ -4663,7 +4703,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenNextDisable) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan) @@ -4694,7 +4737,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenNextAttempt3) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan) @@ -4728,7 +4774,10 @@ TEST_F(TableScanTest, ignoreCorruptFileWhenNextCanIgnore) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->path).build(); + auto split = HiveConnectorSplitBuilder(file->path) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) + .build(); auto expected = makeRowVector({makeFlatVector(0, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan) diff --git a/bolt/exec/tests/utils/HiveConnectorTestBase.cpp b/bolt/exec/tests/utils/HiveConnectorTestBase.cpp index 75c0c545a..8c1b3ba18 100644 --- a/bolt/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/bolt/exec/tests/utils/HiveConnectorTestBase.cpp @@ -37,6 +37,8 @@ #include "bolt/dwio/dwrf/writer/Writer.h" namespace bytedance::bolt::exec::test { +using connector::hive::HiveConnectorSplitBuilder; + HiveConnectorTestBase::HiveConnectorTestBase() { filesystems::registerLocalFileSystem(); } @@ -148,6 +150,7 @@ HiveConnectorTestBase::makeHiveConnectorSplits( // Add all the splits. for (int i = 0; i < splitCount; i++) { auto splitBuilder = HiveConnectorSplitBuilder(filePath) + .connectorId(kHiveConnectorId) .fileFormat(format) .start(i * splitSize) .length(splitSize); @@ -216,6 +219,8 @@ HiveConnectorTestBase::makeHiveConnectorSplit( uint64_t start, uint64_t length) { return HiveConnectorSplitBuilder(filePath) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .start(start) .length(length) .build(); @@ -229,6 +234,8 @@ HiveConnectorTestBase::makeHiveConnectorSplit( uint64_t start, uint64_t length) { return HiveConnectorSplitBuilder(filePath) + .connectorId(kHiveConnectorId) + .fileFormat(dwio::common::FileFormat::DWRF) .infoColumn("$file_size", fmt::format("{}", fileSize)) .infoColumn("$file_modified_time", fmt::format("{}", fileModifiedTime)) .start(start) diff --git a/bolt/exec/tests/utils/HiveConnectorTestBase.h b/bolt/exec/tests/utils/HiveConnectorTestBase.h index 75fafb3ae..07f7a7f2b 100644 --- a/bolt/exec/tests/utils/HiveConnectorTestBase.h +++ b/bolt/exec/tests/utils/HiveConnectorTestBase.h @@ -210,98 +210,4 @@ class HiveConnectorTestBase : public OperatorTestBase { } }; -class HiveConnectorSplitBuilder { - public: - HiveConnectorSplitBuilder(std::string filePath) - : filePath_{std::move(filePath)} {} - - HiveConnectorSplitBuilder& start(uint64_t start) { - start_ = start; - return *this; - } - - HiveConnectorSplitBuilder& length(uint64_t length) { - length_ = length; - return *this; - } - - HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { - fileFormat_ = format; - return *this; - } - - HiveConnectorSplitBuilder& infoColumn( - const std::string& name, - const std::string& value) { - infoColumns_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& partitionKey( - std::string name, - std::optional value) { - partitionKeys_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { - tableBucketNumber_ = bucket; - return *this; - } - - HiveConnectorSplitBuilder& customSplitInfo( - const std::unordered_map& customSplitInfo) { - customSplitInfo_ = customSplitInfo; - return *this; - } - - HiveConnectorSplitBuilder& extraFileInfo( - const std::shared_ptr& extraFileInfo) { - extraFileInfo_ = extraFileInfo; - return *this; - } - - HiveConnectorSplitBuilder& serdeParameters( - const std::unordered_map& serdeParameters) { - serdeParameters_ = serdeParameters; - return *this; - } - - HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { - connectorId_ = connectorId; - return *this; - } - - std::shared_ptr build() const { - return std::make_shared( - connectorId_, - filePath_.find("/") == 0 ? "file:" + filePath_ : filePath_, - fileFormat_, - start_, - length_, - partitionKeys_, - tableBucketNumber_, - nullptr, - customSplitInfo_, - extraFileInfo_, - serdeParameters_, - 0, - std::nullopt, - infoColumns_); - } - - private: - const std::string filePath_; - dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; - uint64_t start_{0}; - uint64_t length_{std::numeric_limits::max()}; - std::unordered_map> partitionKeys_; - std::optional tableBucketNumber_; - std::unordered_map customSplitInfo_ = {}; - std::shared_ptr extraFileInfo_ = {}; - std::unordered_map serdeParameters_ = {}; - std::unordered_map infoColumns_ = {}; - std::string connectorId_ = kHiveConnectorId; -}; - } // namespace bytedance::bolt::exec::test