diff --git a/CMake/resolve_dependency_modules/clp.cmake b/CMake/resolve_dependency_modules/clp.cmake index c9579fbc314b..779c5239910f 100644 --- a/CMake/resolve_dependency_modules/clp.cmake +++ b/CMake/resolve_dependency_modules/clp.cmake @@ -16,7 +16,7 @@ include_guard(GLOBAL) FetchContent_Declare( clp GIT_REPOSITORY https://github.com/y-scope/clp.git - GIT_TAG f82e6114160a6addd4727259906bcf621ac9912c + GIT_TAG a91e5f71f0715d7d6b3ea7c177e1b39b3e6a24a6 ) set(CLP_BUILD_CLP_REGEX_UTILS OFF CACHE BOOL "Build CLP regex utils") diff --git a/velox/connectors/clp/search_lib/ClpTimestampsUtils.h b/velox/connectors/clp/search_lib/ClpTimestampsUtils.h index 4e49af389a0c..06359fb9a74c 100644 --- a/velox/connectors/clp/search_lib/ClpTimestampsUtils.h +++ b/velox/connectors/clp/search_lib/ClpTimestampsUtils.h @@ -16,6 +16,7 @@ #pragma once +#include "clp_s/Defs.hpp" #include "velox/type/Timestamp.h" namespace facebook::velox::connector::clp::search_lib { @@ -121,4 +122,19 @@ inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp { return Timestamp(seconds, static_cast(nanoseconds)); } +/// Converts a nanosecond precision epochtime_t into a Velox timestamp. +/// +/// @param timestamp the input timestamp as an integer +/// @return the corresponding Velox timestamp +inline auto convertNanosecondEpochToVeloxTimestamp(clp_s::epochtime_t timestamp) + -> Timestamp { + int64_t seconds{timestamp / Timestamp::kNanosInSecond}; + int64_t nanoseconds{timestamp % Timestamp::kNanosInSecond}; + if (nanoseconds < 0) { + seconds -= 1; + nanoseconds += Timestamp::kNanosInSecond; + } + return Timestamp(seconds, static_cast(nanoseconds)); +} + } // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp b/velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp index 4953c585f379..27b9320956b2 100644 --- a/velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp +++ b/velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp @@ -17,9 +17,12 @@ #include #include "clp_s/ArchiveReader.hpp" +#include "clp_s/SingleFileArchiveDefs.hpp" #include "clp_s/search/EvaluateTimestampIndex.hpp" #include "clp_s/search/ast/EmptyExpr.hpp" #include "clp_s/search/ast/SearchUtils.hpp" +#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp" +#include "clp_s/search/ast/TimestampLiteral.hpp" #include "velox/connectors/clp/ClpColumnHandle.h" #include "velox/connectors/clp/search_lib/archive/ClpArchiveCursor.h" #include "velox/connectors/clp/search_lib/archive/ClpArchiveJsonStringVectorLoader.h" @@ -136,6 +139,14 @@ ErrorCode ClpArchiveCursor::loadSplit() { auto schemaTree = archiveReader_->get_schema_tree(); auto schemaMap = archiveReader_->get_schema_map(); + auto const defaultTimestampPrecision{ + archiveReader_->has_deprecated_timestamp_format() + ? TimestampLiteral::Precision::Milliseconds + : TimestampLiteral::Precision::Nanoseconds}; + SetTimestampLiteralPrecision timestampPrecisionPass{ + defaultTimestampPrecision}; + expr_ = timestampPrecisionPass.run(expr_); + EvaluateTimestampIndex timestampIndex(timestampDict); if (clp_s::EvaluatedValue::False == timestampIndex.run(expr_)) { VLOG(2) << "No matching timestamp ranges for query '" << query_ << "'"; diff --git a/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp b/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp index eb2434726ae0..7e291dea83a8 100644 --- a/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp +++ b/velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp @@ -73,6 +73,7 @@ void ClpArchiveVectorLoader::populateTimestampData( case clp_s::NodeType::DictionaryFloat: case clp_s::NodeType::Integer: case clp_s::NodeType::DeprecatedDateString: + case clp_s::NodeType::Timestamp: supportedNodeType = true; break; default: @@ -88,7 +89,13 @@ void ClpArchiveVectorLoader::populateTimestampData( for (int vectorIndex : rows) { auto messageIndex = filteredRowIndices_->at(vectorIndex); - if (clp_s::NodeType::Float == Type) { + if (clp_s::NodeType::Timestamp == Type) { + auto reader{static_cast(columnReader_)}; + vector->set( + vectorIndex, + convertNanosecondEpochToVeloxTimestamp( + reader->get_encoded_time(messageIndex))); + } else if (clp_s::NodeType::Float == Type) { auto reader = static_cast(columnReader_); vector->set( vectorIndex, @@ -211,7 +218,12 @@ void ClpArchiveVectorLoader::loadInternal( } case ColumnType::Timestamp: { auto timestampVector = vector->asFlatVector(); - if (nullptr != dynamic_cast(columnReader_)) { + if (nullptr != + dynamic_cast(columnReader_)) { + populateTimestampData( + rows, timestampVector); + } else if ( + nullptr != dynamic_cast(columnReader_)) { populateTimestampData(rows, timestampVector); } else if ( nullptr != @@ -256,10 +268,22 @@ template void ClpArchiveVectorLoader::populateData( RowSet rows, FlatVector* vector); template void +ClpArchiveVectorLoader::populateTimestampData( + RowSet rows, + FlatVector* vector); +template void ClpArchiveVectorLoader::populateTimestampData( RowSet rows, FlatVector* vector); template void +ClpArchiveVectorLoader::populateTimestampData( + RowSet rows, + FlatVector* vector); +template void +ClpArchiveVectorLoader::populateTimestampData( + RowSet rows, + FlatVector* vector); +template void ClpArchiveVectorLoader::populateTimestampData( RowSet rows, FlatVector* vector); diff --git a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp index 78ecee7fde2d..ece5c435cecf 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp +++ b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp @@ -16,6 +16,8 @@ #include "clp_s/ColumnReader.hpp" #include "clp_s/InputConfig.hpp" +#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp" +#include "clp_s/search/ast/TimestampLiteral.hpp" #include "ffi/ir_stream/search/QueryHandler.hpp" #include "velox/connectors/clp/ClpColumnHandle.h" @@ -75,6 +77,10 @@ ErrorCode ClpIrCursor::loadSplit() { ? NetworkAuthOption{.method = AuthMethod::None} : NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4}; + search::ast::SetTimestampLiteralPrecision timestampPrecisionPass{ + search::ast::TimestampLiteral::Precision::Milliseconds}; + expr_ = timestampPrecisionPass.run(expr_); + auto projections = splitFieldsToNamesAndTypes(); auto queryHandlerResult{QueryHandlerType::create( projectionResolutionCallback_, diff --git a/velox/connectors/clp/tests/ClpConnectorTest.cpp b/velox/connectors/clp/tests/ClpConnectorTest.cpp index 114d5e50cbdd..d11d46502a0b 100644 --- a/velox/connectors/clp/tests/ClpConnectorTest.cpp +++ b/velox/connectors/clp/tests/ClpConnectorTest.cpp @@ -580,6 +580,10 @@ TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) { TEST_F(ClpConnectorTest, test4IrTimestampPushdown) { // Only the second event meet the condition, the first event is a date string // which is not supported yet so the value will be NULL. + // This test can not use the `timestamp()` literal, since the integer + // timestamps are in microsecond precision, and we currently assume all IR + // timestamps are millisecond precision when comparing against timestamp + // literals. const std::shared_ptr kqlQuery = std::make_shared("(timestamp < 1756003005000000)"); auto plan = @@ -661,11 +665,9 @@ TEST_F(ClpConnectorTest, test5FloatTimestampNoPushdown) { TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) { // Test filtering rows with a timestamp parsed from a date string and floats - // in various formats. Because KQL doesn’t automatically interpret the unit of - // the timestamp, the returned result differs slightly from the one without - // pushdown. + // in various formats. const std::shared_ptr kqlQuery = std::make_shared( - "(timestamp < 1746003005.127 and timestamp >= 1746003005.124)"); + R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))"); auto plan = PlanBuilder(pool_.get()) .startTableScan() @@ -695,12 +697,67 @@ TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) { {Timestamp(1746003005, 124000000), Timestamp(1746003005, 124100000), Timestamp(1746003005, 125000000), - Timestamp(1746003005, 126000000)}), + Timestamp(1746003005, 126000000), + Timestamp(1746003005, 127000000), + Timestamp(1746003060, 0), + Timestamp(1746003065, 0)}), makeFlatVector( - {1.234567891234500E9, + {1.2345678912345E9, 1E16, 1.234567891234567E9, - 1.234567891234567E9})}); + 1.234567891234567E9, + -1.234567891234567E-9, + 1234567891.234567, + -1234567891.234567})}); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test5NewTimestampFormatFloatTimestampPushdown) { + // Test filtering rows with a timestamp parsed from a date string and floats + // in various formats. + const std::shared_ptr kqlQuery = std::make_shared( + R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))"); + auto plan = + PlanBuilder(pool_.get()) + .startTableScan() + .outputType(ROW({"timestamp", "floatValue"}, {TIMESTAMP(), DOUBLE()})) + .tableHandle( + std::make_shared(kClpConnectorId, "test_5")) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", TIMESTAMP())}, + {"floatValue", + std::make_shared( + "floatValue", "floatValue", DOUBLE())}}) + .endTableScan() + .orderBy({"\"timestamp\" ASC"}, false) + .planNode(); + + auto output = getResults( + plan, + {makeClpSplit( + getExampleFilePath("test_5.v0.5.0.clps"), + ClpConnectorSplit::SplitType::kArchive, + kqlQuery)}); + auto expected = makeRowVector( + {// timestamp + makeFlatVector( + {Timestamp(1746003005, 124000000), + Timestamp(1746003005, 124100000), + Timestamp(1746003005, 125000000), + Timestamp(1746003005, 126000000), + Timestamp(1746003005, 127000001), + Timestamp(1746003060, 0), + Timestamp(1746003065, 0)}), + makeFlatVector( + {1.2345678912345E9, + 1E16, + 1.234567891234567E9, + 1.234567891234567E9, + -1.234567891234567E-9, + 1234567891.234567, + -1234567891.234567})}); test::assertEqualVectors(expected, output); } diff --git a/velox/connectors/clp/tests/examples/test_5.v0.5.0.clps b/velox/connectors/clp/tests/examples/test_5.v0.5.0.clps new file mode 100644 index 000000000000..169d042dba2b Binary files /dev/null and b/velox/connectors/clp/tests/examples/test_5.v0.5.0.clps differ