Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMake/resolve_dependency_modules/clp.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ include_guard(GLOBAL)
FetchContent_Declare(
clp
GIT_REPOSITORY https://github.com/y-scope/clp.git
GIT_TAG f82e6114160a6addd4727259906bcf621ac9912c
GIT_TAG a91e5f71f0715d7d6b3ea7c177e1b39b3e6a24a6
)

set(CLP_BUILD_CLP_REGEX_UTILS OFF CACHE BOOL "Build CLP regex utils")
Expand Down
16 changes: 16 additions & 0 deletions velox/connectors/clp/search_lib/ClpTimestampsUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "clp_s/Defs.hpp"
#include "velox/type/Timestamp.h"

namespace facebook::velox::connector::clp::search_lib {
Expand Down Expand Up @@ -121,4 +122,19 @@ inline auto convertToVeloxTimestamp(int64_t timestamp) -> Timestamp {
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
}

/// Converts a nanosecond precision epochtime_t into a Velox timestamp.
///
/// @param timestamp the input timestamp as an integer
/// @return the corresponding Velox timestamp
inline auto convertNanosecondEpochToVeloxTimestamp(clp_s::epochtime_t timestamp)
-> Timestamp {
int64_t seconds{timestamp / Timestamp::kNanosInSecond};
int64_t nanoseconds{timestamp % Timestamp::kNanosInSecond};
if (nanoseconds < 0) {
seconds -= 1;
nanoseconds += Timestamp::kNanosInSecond;
}
return Timestamp(seconds, static_cast<uint64_t>(nanoseconds));
}

} // namespace facebook::velox::connector::clp::search_lib
11 changes: 11 additions & 0 deletions velox/connectors/clp/search_lib/archive/ClpArchiveCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
#include <glog/logging.h>

#include "clp_s/ArchiveReader.hpp"
#include "clp_s/SingleFileArchiveDefs.hpp"
#include "clp_s/search/EvaluateTimestampIndex.hpp"
#include "clp_s/search/ast/EmptyExpr.hpp"
#include "clp_s/search/ast/SearchUtils.hpp"
#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp"
#include "clp_s/search/ast/TimestampLiteral.hpp"
#include "velox/connectors/clp/ClpColumnHandle.h"
#include "velox/connectors/clp/search_lib/archive/ClpArchiveCursor.h"
#include "velox/connectors/clp/search_lib/archive/ClpArchiveJsonStringVectorLoader.h"
Expand Down Expand Up @@ -136,6 +139,14 @@ ErrorCode ClpArchiveCursor::loadSplit() {
auto schemaTree = archiveReader_->get_schema_tree();
auto schemaMap = archiveReader_->get_schema_map();

auto const defaultTimestampPrecision{
archiveReader_->has_deprecated_timestamp_format()
? TimestampLiteral::Precision::Milliseconds
: TimestampLiteral::Precision::Nanoseconds};
SetTimestampLiteralPrecision timestampPrecisionPass{
defaultTimestampPrecision};
expr_ = timestampPrecisionPass.run(expr_);

EvaluateTimestampIndex timestampIndex(timestampDict);
if (clp_s::EvaluatedValue::False == timestampIndex.run(expr_)) {
VLOG(2) << "No matching timestamp ranges for query '" << query_ << "'";
Expand Down
28 changes: 26 additions & 2 deletions velox/connectors/clp/search_lib/archive/ClpArchiveVectorLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void ClpArchiveVectorLoader::populateTimestampData(
case clp_s::NodeType::DictionaryFloat:
case clp_s::NodeType::Integer:
case clp_s::NodeType::DeprecatedDateString:
case clp_s::NodeType::Timestamp:
supportedNodeType = true;
break;
default:
Expand All @@ -88,7 +89,13 @@ void ClpArchiveVectorLoader::populateTimestampData(
for (int vectorIndex : rows) {
auto messageIndex = filteredRowIndices_->at(vectorIndex);

if (clp_s::NodeType::Float == Type) {
if (clp_s::NodeType::Timestamp == Type) {
auto reader{static_cast<clp_s::TimestampColumnReader*>(columnReader_)};
vector->set(
vectorIndex,
convertNanosecondEpochToVeloxTimestamp(
reader->get_encoded_time(messageIndex)));
} else if (clp_s::NodeType::Float == Type) {
auto reader = static_cast<clp_s::FloatColumnReader*>(columnReader_);
vector->set(
vectorIndex,
Expand Down Expand Up @@ -211,7 +218,12 @@ void ClpArchiveVectorLoader::loadInternal(
}
case ColumnType::Timestamp: {
auto timestampVector = vector->asFlatVector<Timestamp>();
if (nullptr != dynamic_cast<clp_s::Int64ColumnReader*>(columnReader_)) {
if (nullptr !=
dynamic_cast<clp_s::TimestampColumnReader*>(columnReader_)) {
populateTimestampData<clp_s::NodeType::Timestamp>(
rows, timestampVector);
} else if (
nullptr != dynamic_cast<clp_s::Int64ColumnReader*>(columnReader_)) {
populateTimestampData<clp_s::NodeType::Integer>(rows, timestampVector);
} else if (
nullptr !=
Expand Down Expand Up @@ -256,10 +268,22 @@ template void ClpArchiveVectorLoader::populateData<std::string>(
RowSet rows,
FlatVector<StringView>* vector);
template void
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Timestamp>(
RowSet rows,
FlatVector<facebook::velox::Timestamp>* vector);
template void
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Float>(
RowSet rows,
FlatVector<facebook::velox::Timestamp>* vector);
template void
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::FormattedFloat>(
RowSet rows,
FlatVector<facebook::velox::Timestamp>* vector);
template void
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::DictionaryFloat>(
RowSet rows,
FlatVector<facebook::velox::Timestamp>* vector);
template void
ClpArchiveVectorLoader::populateTimestampData<clp_s::NodeType::Integer>(
RowSet rows,
FlatVector<facebook::velox::Timestamp>* vector);
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "clp_s/ColumnReader.hpp"
#include "clp_s/InputConfig.hpp"
#include "clp_s/search/ast/SetTimestampLiteralPrecision.hpp"
#include "clp_s/search/ast/TimestampLiteral.hpp"

#include "ffi/ir_stream/search/QueryHandler.hpp"
#include "velox/connectors/clp/ClpColumnHandle.h"
Expand Down Expand Up @@ -75,6 +77,10 @@ ErrorCode ClpIrCursor::loadSplit() {
? NetworkAuthOption{.method = AuthMethod::None}
: NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4};

search::ast::SetTimestampLiteralPrecision timestampPrecisionPass{
search::ast::TimestampLiteral::Precision::Milliseconds};
expr_ = timestampPrecisionPass.run(expr_);

auto projections = splitFieldsToNamesAndTypes();
auto queryHandlerResult{QueryHandlerType::create(
projectionResolutionCallback_,
Expand Down
71 changes: 64 additions & 7 deletions velox/connectors/clp/tests/ClpConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ TEST_F(ClpConnectorTest, test4IrTimestampNoPushdown) {
TEST_F(ClpConnectorTest, test4IrTimestampPushdown) {
// Only the second event meet the condition, the first event is a date string
// which is not supported yet so the value will be NULL.
// This test can not use the `timestamp()` literal, since the integer
// timestamps are in microsecond precision, and we currently assume all IR
// timestamps are millisecond precision when comparing against timestamp
// literals.
Comment on lines +583 to +586
Copy link

@coderabbitai coderabbitai bot Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

IR timestamp literal precision mismatch still implies incorrect filtering risk

Line 583-Line 586 documents that IR integer timestamps are microseconds, while timestamp() literal comparisons assume milliseconds. That is a correctness gap for IR pushdown predicates on microsecond data and can silently mis-filter rows. Please fix the precision normalization path and convert this test to assert timestamp() behaviour directly instead of relying on a raw integer literal.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@velox/connectors/clp/tests/ClpConnectorTest.cpp` around lines 583 - 586, The
IR currently treats integer timestamps as microseconds while timestamp()
literals are assumed milliseconds, causing incorrect predicate pushdown; update
the precision normalization code path so integer IR timestamps (microsecond
precision) are converted to millisecond precision before any comparison with
timestamp() literals, and change the test in ClpConnectorTest.cpp to use
timestamp() literals for assertions (verifying timestamp() behavior directly
rather than a raw integer literal) so the test validates the corrected
normalization and prevents silent mis-filtering. Ensure you modify the
normalization logic where IR timestamp literals are handled and update the test
expectations to use timestamp(...) calls.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably the issue is more with the existing test than the pushdown logic. Nominally, timestamps are in millisecond precision for existing kv-ir, though technically this isn't guaranteed.

Medium-term we'll fix for the kv-ir format like we did for the archive format by updating the format to support a timestamp encoding with well-defined precision.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gibber9809, thank you for the clarification! That makes sense—documenting the current limitation in the test is reasonable, and having a clear plan to update the kv-ir format with well-defined timestamp precision (similar to the archive format approach) is the right long-term solution. I appreciate you explaining the nominal millisecond precision assumption for existing kv-ir.

Looking forward to seeing that format update when you tackle it! 👍

🐰✨
[Taking notes for future reviews...]

const std::shared_ptr<std::string> kqlQuery =
std::make_shared<std::string>("(timestamp < 1756003005000000)");
auto plan =
Expand Down Expand Up @@ -661,11 +665,9 @@ TEST_F(ClpConnectorTest, test5FloatTimestampNoPushdown) {

TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) {
// Test filtering rows with a timestamp parsed from a date string and floats
// in various formats. Because KQL doesn’t automatically interpret the unit of
// the timestamp, the returned result differs slightly from the one without
// pushdown.
// in various formats.
const std::shared_ptr<std::string> kqlQuery = std::make_shared<std::string>(
"(timestamp < 1746003005.127 and timestamp >= 1746003005.124)");
R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))");
auto plan =
PlanBuilder(pool_.get())
.startTableScan()
Expand Down Expand Up @@ -695,12 +697,67 @@ TEST_F(ClpConnectorTest, test5FloatTimestampPushdown) {
{Timestamp(1746003005, 124000000),
Timestamp(1746003005, 124100000),
Timestamp(1746003005, 125000000),
Timestamp(1746003005, 126000000)}),
Timestamp(1746003005, 126000000),
Timestamp(1746003005, 127000000),
Timestamp(1746003060, 0),
Timestamp(1746003065, 0)}),
makeFlatVector<double>(
{1.234567891234500E9,
{1.2345678912345E9,
1E16,
1.234567891234567E9,
1.234567891234567E9})});
1.234567891234567E9,
-1.234567891234567E-9,
1234567891.234567,
-1234567891.234567})});
test::assertEqualVectors(expected, output);
}

TEST_F(ClpConnectorTest, test5NewTimestampFormatFloatTimestampPushdown) {
// Test filtering rows with a timestamp parsed from a date string and floats
// in various formats.
const std::shared_ptr<std::string> kqlQuery = std::make_shared<std::string>(
R"(timestamp < timestamp("1746003070000", "\L") and timestamp >= timestamp("1746003005124", "\L"))");
auto plan =
PlanBuilder(pool_.get())
.startTableScan()
.outputType(ROW({"timestamp", "floatValue"}, {TIMESTAMP(), DOUBLE()}))
.tableHandle(
std::make_shared<ClpTableHandle>(kClpConnectorId, "test_5"))
.assignments(
{{"timestamp",
std::make_shared<ClpColumnHandle>(
"timestamp", "timestamp", TIMESTAMP())},
{"floatValue",
std::make_shared<ClpColumnHandle>(
"floatValue", "floatValue", DOUBLE())}})
.endTableScan()
.orderBy({"\"timestamp\" ASC"}, false)
.planNode();

auto output = getResults(
plan,
{makeClpSplit(
getExampleFilePath("test_5.v0.5.0.clps"),
ClpConnectorSplit::SplitType::kArchive,
kqlQuery)});
auto expected = makeRowVector(
{// timestamp
makeFlatVector<Timestamp>(
{Timestamp(1746003005, 124000000),
Timestamp(1746003005, 124100000),
Timestamp(1746003005, 125000000),
Timestamp(1746003005, 126000000),
Timestamp(1746003005, 127000001),
Timestamp(1746003060, 0),
Timestamp(1746003065, 0)}),
makeFlatVector<double>(
{1.2345678912345E9,
1E16,
1.234567891234567E9,
1.234567891234567E9,
-1.234567891234567E-9,
1234567891.234567,
-1234567891.234567})});
test::assertEqualVectors(expected, output);
}

Expand Down
Binary file not shown.