diff --git a/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp index c54a281462a3c..77f7cf3222945 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskInfoTest.cpp @@ -18,40 +18,45 @@ #include "presto_cpp/main/common/tests/test_json.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" -using namespace facebook; -using namespace facebook::presto::protocol; +using namespace facebook::presto; -class TaskInfoTest : public ::testing::Test {}; +class TaskInfoTest : public ::testing::Test { + protected: + void SetUp() override { + registerPrestoToVeloxConnector( + std::make_unique("hive")); + } + + void TearDown() override { + unregisterPrestoToVeloxConnector("hive"); + } +}; const std::string BASE_DATA_PATH = "/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/"; TEST_F(TaskInfoTest, duration) { double thrift = 0; - facebook::presto::thrift::toThrift(Duration(123, TimeUnit::MILLISECONDS), thrift); + thrift::toThrift(protocol::Duration(123, protocol::TimeUnit::MILLISECONDS), thrift); ASSERT_EQ(thrift, 123); } TEST_F(TaskInfoTest, binaryMetadataUpdates) { std::string str = slurp(getDataPath(BASE_DATA_PATH, "MetadataUpdates.json")); json j = json::parse(str); - registerPrestoToVeloxConnector(std::make_unique("hive")); - MetadataUpdates metadataUpdates = j; + protocol::MetadataUpdates metadataUpdates = j; std::unique_ptr thriftMetadataUpdates = std::make_unique(); - facebook::presto::thrift::toThrift(metadataUpdates, *thriftMetadataUpdates); + thrift::toThrift(metadataUpdates, *thriftMetadataUpdates); json thriftJson = json::parse(*thriftMetadataUpdates); ASSERT_EQ(j, thriftJson); - - presto::unregisterPrestoToVeloxConnector("hive"); } TEST_F(TaskInfoTest, taskInfo) { std::string str = slurp(getDataPath(BASE_DATA_PATH, "TaskInfo.json")); json j = json::parse(str); - registerPrestoToVeloxConnector(std::make_unique("hive")); - TaskInfo taskInfo = j; - facebook::presto::thrift::TaskInfo thriftTaskInfo; - facebook::presto::thrift::toThrift(taskInfo, thriftTaskInfo); + protocol::TaskInfo taskInfo = j; + thrift::TaskInfo thriftTaskInfo; + thrift::toThrift(taskInfo, thriftTaskInfo); json thriftJson = json::parse(*thriftTaskInfo.metadataUpdates()->metadataUpdates()); ASSERT_EQ(taskInfo.metadataUpdates, thriftJson); @@ -59,18 +64,16 @@ TEST_F(TaskInfoTest, taskInfo) { ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()->size(), 2); ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[0].bufferId()->id(), 100); ASSERT_EQ(thriftTaskInfo.outputBuffers()->buffers()[1].bufferId()->id(), 200); - ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY), 1); + ASSERT_EQ(thriftTaskInfo.stats()->blockedReasons()->count(thrift::BlockedReason::WAITING_FOR_MEMORY), 1); ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()->size(), 2); ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric1"].sum(), 123); ASSERT_EQ(thriftTaskInfo.stats()->runtimeStats()->metrics()["test_metric2"].name(), "test_metric2"); - - presto::unregisterPrestoToVeloxConnector("hive"); } TEST_F(TaskInfoTest, taskId) { - TaskId taskId = "queryId.1.2.3.4"; - facebook::presto::thrift::TaskId thriftTaskId; - facebook::presto::thrift::toThrift(taskId, thriftTaskId); + protocol::TaskId taskId = "queryId.1.2.3.4"; + thrift::TaskId thriftTaskId; + thrift::toThrift(taskId, thriftTaskId); ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->queryId(), "queryId"); ASSERT_EQ(thriftTaskId.stageExecutionId()->stageId()->id(), 1); @@ -83,9 +86,9 @@ TEST_F(TaskInfoTest, taskId) { TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) { std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStatsEmptyBlockedReason.json")); json j = json::parse(str); - OperatorStats operatorStats = j; - facebook::presto::thrift::OperatorStats thriftOperatorStats; - facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats); + protocol::OperatorStats operatorStats = j; + thrift::OperatorStats thriftOperatorStats; + thrift::toThrift(operatorStats, thriftOperatorStats); ASSERT_EQ(thriftOperatorStats.blockedReason().has_value(), false); ASSERT_EQ(thriftOperatorStats.blockedWall(), 80); @@ -95,9 +98,9 @@ TEST_F(TaskInfoTest, operatorStatsEmptyBlockedReason) { TEST_F(TaskInfoTest, operatorStats) { std::string str = slurp(getDataPath(BASE_DATA_PATH, "OperatorStats.json")); json j = json::parse(str); - OperatorStats operatorStats = j; - facebook::presto::thrift::OperatorStats thriftOperatorStats; - facebook::presto::thrift::toThrift(operatorStats, thriftOperatorStats); + protocol::OperatorStats operatorStats = j; + thrift::OperatorStats thriftOperatorStats; + thrift::toThrift(operatorStats, thriftOperatorStats); - ASSERT_EQ(thriftOperatorStats.blockedReason(), facebook::presto::thrift::BlockedReason::WAITING_FOR_MEMORY); + ASSERT_EQ(thriftOperatorStats.blockedReason(), thrift::BlockedReason::WAITING_FOR_MEMORY); } diff --git a/presto-native-execution/presto_cpp/main/tests/TaskStatusTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskStatusTest.cpp index d74d1e9ed89ea..00aba3d3ea1ae 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskStatusTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskStatusTest.cpp @@ -16,8 +16,7 @@ #include "presto_cpp/main/thrift/ProtocolToThrift.h" #include "presto_cpp/main/common/tests/test_json.h" -using namespace facebook; -using namespace facebook::presto::protocol; +using namespace facebook::presto; class TaskStatusTest : public ::testing::Test {}; @@ -25,9 +24,9 @@ TEST_F(TaskStatusTest, lifeSpan) { std::string str = R"("Group1001")"; json j = json::parse(str); - Lifespan lifeSpan = j; - facebook::presto::thrift::Lifespan thriftLifespan; - facebook::presto::thrift::toThrift(lifeSpan, thriftLifespan); + protocol::Lifespan lifeSpan = j; + thrift::Lifespan thriftLifespan; + thrift::toThrift(lifeSpan, thriftLifespan); ASSERT_EQ(thriftLifespan.grouped(), true); ASSERT_EQ(thriftLifespan.groupId(), 1001); @@ -42,13 +41,13 @@ TEST_F(TaskStatusTest, errorCode) { })"; json j = json::parse(str); - ErrorCode errorCode = j; - facebook::presto::thrift::ErrorCode thriftErrorCode; - facebook::presto::thrift::toThrift(errorCode, thriftErrorCode); + protocol::ErrorCode errorCode = j; + thrift::ErrorCode thriftErrorCode; + thrift::toThrift(errorCode, thriftErrorCode); ASSERT_EQ(thriftErrorCode.code(), 1234); ASSERT_EQ(thriftErrorCode.name(), "name"); - ASSERT_EQ(thriftErrorCode.type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR); + ASSERT_EQ(thriftErrorCode.type(), thrift::ErrorType::INTERNAL_ERROR); ASSERT_EQ(thriftErrorCode.retriable(), false); } @@ -73,16 +72,16 @@ TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsEmpty) { })"; json j = json::parse(str); - ExecutionFailureInfo executionFailureInfo = j; - facebook::presto::thrift::ExecutionFailureInfo thriftExecutionFailureInfo; - facebook::presto::thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo); + protocol::ExecutionFailureInfo executionFailureInfo = j; + thrift::ExecutionFailureInfo thriftExecutionFailureInfo; + thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo); ASSERT_EQ(thriftExecutionFailureInfo.type(), "type"); ASSERT_EQ(thriftExecutionFailureInfo.errorLocation()->columnNumber(), 2); ASSERT_EQ(thriftExecutionFailureInfo.remoteHost()->hostPortString(), "localhost:8080"); - ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR); + ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->type(), thrift::ErrorType::INTERNAL_ERROR); ASSERT_EQ(thriftExecutionFailureInfo.errorCode()->retriable(), false); - ASSERT_EQ(thriftExecutionFailureInfo.errorCause(), facebook::presto::thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT); + ASSERT_EQ(thriftExecutionFailureInfo.errorCause(), thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT); ASSERT_EQ(thriftExecutionFailureInfo.cause(), nullptr); ASSERT_EQ(thriftExecutionFailureInfo.suppressed()->size(), 0); } @@ -91,18 +90,18 @@ TEST_F(TaskStatusTest, executionFailureInfoOptionalFieldsNonempty) { std::string str = slurp(getDataPath("/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/", "ExecutionFailureInfo.json")); json j = json::parse(str); - ExecutionFailureInfo executionFailureInfo = j; - facebook::presto::thrift::ExecutionFailureInfo thriftExecutionFailureInfo; - facebook::presto::thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo); + protocol::ExecutionFailureInfo executionFailureInfo = j; + thrift::ExecutionFailureInfo thriftExecutionFailureInfo; + thrift::toThrift(executionFailureInfo, thriftExecutionFailureInfo); ASSERT_EQ((*thriftExecutionFailureInfo.cause()).type(), "cause"); - ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCause(), facebook::presto::thrift::ErrorCause::UNKNOWN); - ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->type(), facebook::presto::thrift::ErrorType::INSUFFICIENT_RESOURCES); + ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCause(), thrift::ErrorCause::UNKNOWN); + ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->type(), thrift::ErrorType::INSUFFICIENT_RESOURCES); ASSERT_EQ((*thriftExecutionFailureInfo.cause()).errorCode()->retriable(), true); ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].type(), "suppressed1"); - ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCause(), facebook::presto::thrift::ErrorCause::LOW_PARTITION_COUNT); - ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCode()->type(), facebook::presto::thrift::ErrorType::EXTERNAL); + ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCause(), thrift::ErrorCause::LOW_PARTITION_COUNT); + ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[0].errorCode()->type(), thrift::ErrorType::EXTERNAL); ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].type(), "suppressed2"); - ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCause(), facebook::presto::thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT); - ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCode()->type(), facebook::presto::thrift::ErrorType::INTERNAL_ERROR); + ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCause(), thrift::ErrorCause::EXCEEDS_BROADCAST_MEMORY_LIMIT); + ASSERT_EQ((*thriftExecutionFailureInfo.suppressed())[1].errorCode()->type(), thrift::ErrorType::INTERNAL_ERROR); } diff --git a/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp index f2f5d2b8a32d3..0bf4223423d21 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskUpdateRequestTest.cpp @@ -17,65 +17,74 @@ #include "presto_cpp/main/common/tests/test_json.h" #include "presto_cpp/main/connectors/PrestoToVeloxConnector.h" -using namespace facebook; -using namespace facebook::presto::protocol; +using namespace facebook::presto; -class TaskUpdateRequestTest : public ::testing::Test {}; +class TaskUpdateRequestTest : public ::testing::Test { + protected: + void SetUp() override { + registerPrestoToVeloxConnector( + std::make_unique("hive")); + } + + void TearDown() override { + unregisterPrestoToVeloxConnector("hive"); + } +}; const std::string BASE_DATA_PATH = "/github/presto-trunk/presto-native-execution/presto_cpp/main/tests/data/"; TEST_F(TaskUpdateRequestTest, connectorId) { - ConnectorId connectorId; - facebook::presto::thrift::ConnectorId thriftConnectorId; + protocol::ConnectorId connectorId; + thrift::ConnectorId thriftConnectorId; thriftConnectorId.catalogName_ref() = "test"; - facebook::presto::thrift::fromThrift(thriftConnectorId, connectorId); + thrift::fromThrift(thriftConnectorId, connectorId); ASSERT_EQ(connectorId, "test"); } TEST_F(TaskUpdateRequestTest, optionalField) { - ResourceEstimates resourceEstimates; - facebook::presto::thrift::ResourceEstimates thriftResourceEstimates; + protocol::ResourceEstimates resourceEstimates; + thrift::ResourceEstimates thriftResourceEstimates; thriftResourceEstimates.executionTime_ref() = 100; thriftResourceEstimates.peakMemory_ref() = 1024 * 1024 * 1024; - facebook::presto::thrift::fromThrift(thriftResourceEstimates, resourceEstimates); - ASSERT_EQ(*resourceEstimates.executionTime, Duration(100, TimeUnit::MILLISECONDS)); + thrift::fromThrift(thriftResourceEstimates, resourceEstimates); + ASSERT_EQ(*resourceEstimates.executionTime, protocol::Duration(100, protocol::TimeUnit::MILLISECONDS)); ASSERT_EQ(resourceEstimates.cpuTime, nullptr); - ASSERT_EQ(*resourceEstimates.peakMemory, DataSize(1024 * 1024 * 1024, DataUnit::BYTE)); + ASSERT_EQ(*resourceEstimates.peakMemory, protocol::DataSize(1024 * 1024 * 1024, protocol::DataUnit::BYTE)); ASSERT_EQ(resourceEstimates.peakTaskMemory, nullptr); } TEST_F(TaskUpdateRequestTest, qualifiedObjectName) { - QualifiedObjectName qualifiedObjectName; - facebook::presto::thrift::QualifiedObjectName thriftQualifiedObjectName; + protocol::QualifiedObjectName qualifiedObjectName; + thrift::QualifiedObjectName thriftQualifiedObjectName; thriftQualifiedObjectName.catalogName_ref() = "test_catalog"; thriftQualifiedObjectName.schemaName_ref() = "test_schema"; thriftQualifiedObjectName.objectName_ref() = "test_object"; - facebook::presto::thrift::fromThrift(thriftQualifiedObjectName, qualifiedObjectName); + thrift::fromThrift(thriftQualifiedObjectName, qualifiedObjectName); ASSERT_EQ(qualifiedObjectName, "test_catalog.test_schema.test_object"); } TEST_F(TaskUpdateRequestTest, routineCharacteristics) { - RoutineCharacteristics routineCharacteristics; - facebook::presto::thrift::RoutineCharacteristics thriftRroutineCharacteristics; - facebook::presto::thrift::Language thriftLanguage; + protocol::RoutineCharacteristics routineCharacteristics; + thrift::RoutineCharacteristics thriftRroutineCharacteristics; + thrift::Language thriftLanguage; thriftLanguage.language_ref() = "English"; thriftRroutineCharacteristics.language_ref() = std::move(thriftLanguage); - thriftRroutineCharacteristics.determinism_ref() = facebook::presto::thrift::Determinism::NOT_DETERMINISTIC; - thriftRroutineCharacteristics.nullCallClause_ref() = facebook::presto::thrift::NullCallClause::RETURNS_NULL_ON_NULL_INPUT; - facebook::presto::thrift::fromThrift(thriftRroutineCharacteristics, routineCharacteristics); + thriftRroutineCharacteristics.determinism_ref() = thrift::Determinism::NOT_DETERMINISTIC; + thriftRroutineCharacteristics.nullCallClause_ref() = thrift::NullCallClause::RETURNS_NULL_ON_NULL_INPUT; + thrift::fromThrift(thriftRroutineCharacteristics, routineCharacteristics); ASSERT_EQ((*routineCharacteristics.language).language, "English"); - ASSERT_EQ(*routineCharacteristics.determinism, Determinism::NOT_DETERMINISTIC); - ASSERT_EQ(*routineCharacteristics.nullCallClause, NullCallClause::RETURNS_NULL_ON_NULL_INPUT); + ASSERT_EQ(*routineCharacteristics.determinism, protocol::Determinism::NOT_DETERMINISTIC); + ASSERT_EQ(*routineCharacteristics.nullCallClause, protocol::NullCallClause::RETURNS_NULL_ON_NULL_INPUT); } TEST_F(TaskUpdateRequestTest, mapOutputBuffers) { - OutputBuffers outputBuffers; - facebook::presto::thrift::OutputBuffers thriftOutputBuffers; - thriftOutputBuffers.type_ref() = facebook::presto::thrift::BufferType::ARBITRARY; + protocol::OutputBuffers outputBuffers; + thrift::OutputBuffers thriftOutputBuffers; + thriftOutputBuffers.type_ref() = thrift::BufferType::ARBITRARY; thriftOutputBuffers.version_ref() = 1; thriftOutputBuffers.noMoreBufferIds_ref() = true; - facebook::presto::thrift::OutputBufferId outputBufferId1; - facebook::presto::thrift::OutputBufferId outputBufferId2; + thrift::OutputBufferId outputBufferId1; + thrift::OutputBufferId outputBufferId2; outputBufferId1.id_ref() = 1; outputBufferId2.id_ref() = 2; thriftOutputBuffers.buffers_ref() = { @@ -83,39 +92,49 @@ TEST_F(TaskUpdateRequestTest, mapOutputBuffers) { {outputBufferId2, 20} }; - facebook::presto::thrift::fromThrift(thriftOutputBuffers, outputBuffers); - ASSERT_EQ(outputBuffers.type, BufferType::ARBITRARY); + thrift::fromThrift(thriftOutputBuffers, outputBuffers); + ASSERT_EQ(outputBuffers.type, protocol::BufferType::ARBITRARY); ASSERT_EQ(outputBuffers.version, 1); ASSERT_EQ(outputBuffers.buffers.size(), 2); ASSERT_EQ(outputBuffers.buffers["1"], 10); ASSERT_EQ(outputBuffers.buffers["2"], 20); } -TEST_F(TaskUpdateRequestTest, binarySplit) { - std::string str = slurp(getDataPath(BASE_DATA_PATH, "Split.json")); - Split split; +TEST_F(TaskUpdateRequestTest, binarySplitFromThrift) { + thrift::Split thriftSplit; + thriftSplit.connectorId()->catalogName_ref() = "hive"; + thriftSplit.transactionHandle()->jsonValue_ref() = R"({ + "@type": "hive", + "uuid": "8a4d6c83-60ee-46de-9715-bc91755619fa" + })"; + thriftSplit.connectorSplit()->jsonValue_ref() = slurp(getDataPath(BASE_DATA_PATH, "HiveSplit.json")); + + protocol::Split split; + thrift::fromThrift(thriftSplit, split); - registerPrestoToVeloxConnector(std::make_unique("hive")); - facebook::presto::thrift::fromThrift(str, split); - auto hiveSplit = std::dynamic_pointer_cast(split.connectorSplit); - ASSERT_EQ(split.connectorId, "hive"); + // Verify that connector specific fields are set correctly with json codec + auto transactionHandle = + std::dynamic_pointer_cast( + split.transactionHandle); + ASSERT_EQ(transactionHandle->uuid, "8a4d6c83-60ee-46de-9715-bc91755619fa"); + + auto hiveSplit = + std::dynamic_pointer_cast( + split.connectorSplit); ASSERT_EQ(hiveSplit->database, "tpch"); - ASSERT_EQ(hiveSplit->nodeSelectionStrategy, NodeSelectionStrategy::NO_PREFERENCE); - - presto::unregisterPrestoToVeloxConnector("hive"); + ASSERT_EQ( + hiveSplit->nodeSelectionStrategy, + protocol::NodeSelectionStrategy::NO_PREFERENCE); } TEST_F(TaskUpdateRequestTest, binaryTableWriteInfo) { std::string str = slurp(getDataPath(BASE_DATA_PATH, "TableWriteInfo.json")); - TableWriteInfo tableWriteInfo; + protocol::TableWriteInfo tableWriteInfo; - registerPrestoToVeloxConnector(std::make_unique("hive")); - facebook::presto::thrift::fromThrift(str, tableWriteInfo); - auto hiveTableHandle = std::dynamic_pointer_cast((*tableWriteInfo.analyzeTableHandle).connectorHandle); + thrift::fromThrift(str, tableWriteInfo); + auto hiveTableHandle = std::dynamic_pointer_cast((*tableWriteInfo.analyzeTableHandle).connectorHandle); ASSERT_EQ(hiveTableHandle->tableName, "test_table"); ASSERT_EQ(hiveTableHandle->analyzePartitionValues->size(), 2); - - presto::unregisterPrestoToVeloxConnector("hive"); } TEST_F(TaskUpdateRequestTest, fragment) { @@ -125,33 +144,30 @@ TEST_F(TaskUpdateRequestTest, fragment) { str.erase(strEnd + 1); } - registerPrestoToVeloxConnector(std::make_unique("hive")); - PlanFragment f = json::parse(velox::encoding::Base64::decode(str)); + protocol::PlanFragment f = json::parse(facebook::velox::encoding::Base64::decode(str)); ASSERT_EQ(f.root->_type, ".AggregationNode"); - std::shared_ptr root = - std::static_pointer_cast(f.root); + std::shared_ptr root = + std::static_pointer_cast(f.root); ASSERT_EQ(root->id, "211"); ASSERT_NE(root->source, nullptr); ASSERT_EQ(root->source->_type, ".ProjectNode"); - std::shared_ptr proj = - std::static_pointer_cast(root->source); + std::shared_ptr proj = + std::static_pointer_cast(root->source); ASSERT_EQ(proj->id, "233"); ASSERT_NE(proj->source, nullptr); ASSERT_EQ(proj->source->_type, ".TableScanNode"); - std::shared_ptr scan = - std::static_pointer_cast(proj->source); + std::shared_ptr scan = + std::static_pointer_cast(proj->source); ASSERT_EQ(scan->id, "0"); - - presto::unregisterPrestoToVeloxConnector("hive"); } TEST_F(TaskUpdateRequestTest, sessionRepresentation) { - SessionRepresentation sessionRepresentation; - facebook::presto::thrift::SessionRepresentation thriftSessionRepresentation; + protocol::SessionRepresentation sessionRepresentation; + thrift::SessionRepresentation thriftSessionRepresentation; std::map> thriftMap; thriftMap["Person1"] = { {"Name", "John Doe"}, @@ -170,7 +186,7 @@ TEST_F(TaskUpdateRequestTest, sessionRepresentation) { }; thriftSessionRepresentation.unprocessedCatalogProperties_ref() = std::move(thriftMap); - facebook::presto::thrift::fromThrift(thriftSessionRepresentation, sessionRepresentation); + thrift::fromThrift(thriftSessionRepresentation, sessionRepresentation); ASSERT_EQ(sessionRepresentation.unprocessedCatalogProperties.size(), 3); ASSERT_EQ(sessionRepresentation.unprocessedCatalogProperties["Person1"]["City"], "New York"); ASSERT_EQ(sessionRepresentation.unprocessedCatalogProperties["Person2"]["Age"], "25"); diff --git a/presto-native-execution/presto_cpp/main/tests/data/HiveSplit.json b/presto-native-execution/presto_cpp/main/tests/data/HiveSplit.json new file mode 100644 index 0000000000000..ccd0ee9b6abd6 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/tests/data/HiveSplit.json @@ -0,0 +1,51 @@ +{ + "@type": "hive", + "nodeSelectionStrategy": "NO_PREFERENCE", + "addresses": [], + "database": "tpch", + "partitionDataColumnCount": 148, + "cacheQuota": { + "cacheQuotaScope": "TABLE" + }, + "fileSplit": { + "path": "test_path", + "start": 1, + "length": 2, + "fileSize": 3, + "fileModifiedTime": 4, + "customSplitInfo": {}, + "affinitySchedulingFileSectionIndex": 5 + }, + "tableToPartitionMapping": { + "partitionSchemaDifference": {} + }, + "redundantColumnDomains": [], + "splitWeight": 1, + "partitionKeys": [ + { + "name": "attributed_ds", + "value": "2019-12-06" + }, + { + "name": "experiment_id", + "value": "3" + } + ], + "partitionName": "test_partition", + "partitionSchemaDifference": {}, + "path": "/test_path/test.orc", + "s3SelectPushdownEnabled": false, + "start": 301989888, + "storage": { + "location": "/test_path", + "serdeParameters": {}, + "parameters": {}, + "skewed": false, + "storageFormat": { + "inputFormat": "com.facebook.hive.orc.OrcInputFormat", + "outputFormat": "com.facebook.hive.orc.OrcOutputFormat", + "serDe": "com.facebook.hive.orc.OrcSerde" + } + }, + "table": "test_table" +} diff --git a/presto-native-execution/presto_cpp/main/tests/data/Split.json b/presto-native-execution/presto_cpp/main/tests/data/Split.json index 12a253142981e..926bd5f8b7f32 100644 --- a/presto-native-execution/presto_cpp/main/tests/data/Split.json +++ b/presto-native-execution/presto_cpp/main/tests/data/Split.json @@ -5,8 +5,6 @@ "nodeSelectionStrategy": "NO_PREFERENCE", "addresses": [], "database": "tpch", - "fileSize": 396036393, - "length": 33554432, "partitionDataColumnCount": 148, "cacheQuota": {"cacheQuotaScope": "TABLE"}, "fileSplit": { diff --git a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-cpp.mustache b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-cpp.mustache index 152749fd5f01d..ecd14288721df 100644 --- a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-cpp.mustache +++ b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift-cpp.mustache @@ -20,6 +20,7 @@ {{/.}} #include "presto_cpp/main/thrift/ProtocolToThrift.h" +#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h" namespace facebook::presto::thrift { @@ -227,4 +228,3 @@ void fromThrift(const {{class_name}}& thrift, facebook::presto::protocol::{{clas {{/.}} } - diff --git a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.cpp b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.cpp index f347e088a4b7a..99ae525d05d5b 100644 --- a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.cpp +++ b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.cpp @@ -16,6 +16,7 @@ // This file is generated DO NOT EDIT @generated #include "presto_cpp/main/thrift/ProtocolToThrift.h" +#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h" namespace facebook::presto::thrift { @@ -328,27 +329,29 @@ void fromThrift( } void toThrift( - const facebook::presto::protocol::Split& split, - SplitWrapper& thriftSplitWrapper) { - toThrift(split, *thriftSplitWrapper.split_ref()); + const facebook::presto::protocol::MetadataUpdates& metadataUpdates, + MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper) { + toThrift( + metadataUpdates, *thriftMetadataUpdatesWrapper.metadataUpdates_ref()); } void toThrift( - const facebook::presto::protocol::Split& split, - std::string& thriftSplit) { - json jsonSplit = split; - std::string str = jsonSplit.dump(); - toThrift(str, thriftSplit); + const facebook::presto::protocol::MetadataUpdates& metadataUpdates, + std::string& thriftMetadataUpdates) { + json jsonMetadataUpdates = metadataUpdates; + std::string str = jsonMetadataUpdates.dump(); + toThrift(str, thriftMetadataUpdates); } void fromThrift( - const SplitWrapper& thriftSplitWrapper, - facebook::presto::protocol::Split& split) { - fromThrift(*thriftSplitWrapper.split_ref(), split); + const MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper, + facebook::presto::protocol::MetadataUpdates& metadataUpdates) { + fromThrift( + *thriftMetadataUpdatesWrapper.metadataUpdates_ref(), metadataUpdates); } void fromThrift( - const std::string& thriftSplit, - facebook::presto::protocol::Split& split) { - json j = json::parse(thriftSplit); - split = j; + const std::string& thriftMetadataUpdates, + facebook::presto::protocol::MetadataUpdates& metadataUpdates) { + json j = json::parse(thriftMetadataUpdates); + metadataUpdates = j; } void toThrift( const facebook::presto::protocol::TableWriteInfo& tableWriteInfo, @@ -374,29 +377,38 @@ void fromThrift( tableWriteInfo = j; } void toThrift( - const facebook::presto::protocol::MetadataUpdates& metadataUpdates, - MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper) { - toThrift( - metadataUpdates, *thriftMetadataUpdatesWrapper.metadataUpdates_ref()); -} -void toThrift( - const facebook::presto::protocol::MetadataUpdates& metadataUpdates, - std::string& thriftMetadataUpdates) { - json jsonMetadataUpdates = metadataUpdates; - std::string str = jsonMetadataUpdates.dump(); - toThrift(str, thriftMetadataUpdates); -} + const std::shared_ptr& proto, + ConnectorSplitWrapper& thrift) {} void fromThrift( - const MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper, - facebook::presto::protocol::MetadataUpdates& metadataUpdates) { - fromThrift( - *thriftMetadataUpdatesWrapper.metadataUpdates_ref(), metadataUpdates); + const ConnectorSplitWrapper& thrift, + std::shared_ptr& proto) { + if (thrift.connectorId().has_value() && + thrift.customSerializedValue().has_value()) { + facebook::presto::protocol::getConnectorProtocol( + thrift.connectorId().value()) + .deserialize(thrift.customSerializedValue().value(), proto); + } else if (thrift.jsonValue().has_value()) { + json j = json::parse(thrift.jsonValue().value()); + from_json(j, proto); + } } +void toThrift( + const std::shared_ptr< + facebook::presto::protocol::ConnectorTransactionHandle>& proto, + ConnectorTransactionHandleWrapper& thrift) {} void fromThrift( - const std::string& thriftMetadataUpdates, - facebook::presto::protocol::MetadataUpdates& metadataUpdates) { - json j = json::parse(thriftMetadataUpdates); - metadataUpdates = j; + const ConnectorTransactionHandleWrapper& thrift, + std::shared_ptr& + proto) { + if (thrift.connectorId().has_value() && + thrift.customSerializedValue().has_value()) { + facebook::presto::protocol::getConnectorProtocol( + thrift.connectorId().value()) + .deserialize(thrift.customSerializedValue().value(), proto); + } else if (thrift.jsonValue().has_value()) { + json j = json::parse(thrift.jsonValue().value()); + from_json(j, proto); + } } void toThrift( const facebook::presto::protocol::Lifespan& proto, @@ -765,18 +777,14 @@ void fromThrift( } void toThrift( - const facebook::presto::protocol::ScheduledSplit& proto, - ScheduledSplit& thrift) { - toThrift(proto.sequenceId, *thrift.sequenceId_ref()); - toThrift(proto.planNodeId, *thrift.planNodeId_ref()); - toThrift(proto.split, *thrift.split_ref()); + const facebook::presto::protocol::SplitContext& proto, + SplitContext& thrift) { + toThrift(proto.cacheable, *thrift.cacheable_ref()); } void fromThrift( - const ScheduledSplit& thrift, - facebook::presto::protocol::ScheduledSplit& proto) { - fromThrift(*thrift.sequenceId_ref(), proto.sequenceId); - fromThrift(*thrift.planNodeId_ref(), proto.planNodeId); - fromThrift(*thrift.split_ref(), proto.split); + const SplitContext& thrift, + facebook::presto::protocol::SplitContext& proto) { + fromThrift(*thrift.cacheable_ref(), proto.cacheable); } void toThrift( @@ -1340,6 +1348,21 @@ void fromThrift( *thrift.longVariableConstraints_ref(), proto.longVariableConstraints); } +void toThrift(const facebook::presto::protocol::Split& proto, Split& thrift) { + toThrift(proto.connectorId, *thrift.connectorId_ref()); + toThrift(proto.transactionHandle, *thrift.transactionHandle_ref()); + toThrift(proto.connectorSplit, *thrift.connectorSplit_ref()); + toThrift(proto.lifespan, *thrift.lifespan_ref()); + toThrift(proto.splitContext, *thrift.splitContext_ref()); +} +void fromThrift(const Split& thrift, facebook::presto::protocol::Split& proto) { + fromThrift(*thrift.connectorId_ref(), proto.connectorId); + fromThrift(*thrift.transactionHandle_ref(), proto.transactionHandle); + fromThrift(*thrift.connectorSplit_ref(), proto.connectorSplit); + fromThrift(*thrift.lifespan_ref(), proto.lifespan); + fromThrift(*thrift.splitContext_ref(), proto.splitContext); +} + void toThrift( const facebook::presto::protocol::OutputBuffers& proto, OutputBuffers& thrift) { @@ -1458,6 +1481,21 @@ void fromThrift( fromThrift(*thrift.functionId_ref(), proto.functionId); } +void toThrift( + const facebook::presto::protocol::ScheduledSplit& proto, + ScheduledSplit& thrift) { + toThrift(proto.sequenceId, *thrift.sequenceId_ref()); + toThrift(proto.planNodeId, *thrift.planNodeId_ref()); + toThrift(proto.split, *thrift.split_ref()); +} +void fromThrift( + const ScheduledSplit& thrift, + facebook::presto::protocol::ScheduledSplit& proto) { + fromThrift(*thrift.sequenceId_ref(), proto.sequenceId); + fromThrift(*thrift.planNodeId_ref(), proto.planNodeId); + fromThrift(*thrift.split_ref(), proto.split); +} + void toThrift( const facebook::presto::protocol::TaskInfo& proto, TaskInfo& thrift) { diff --git a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.h b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.h index a5eddbd85b449..9040667b3ad8a 100644 --- a/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.h +++ b/presto-native-execution/presto_cpp/main/thrift/ProtocolToThrift.h @@ -115,17 +115,17 @@ void fromThrift( facebook::presto::protocol::BufferType& proto); void toThrift( - const facebook::presto::protocol::Split& split, - SplitWrapper& thriftSplitWrapper); + const facebook::presto::protocol::MetadataUpdates& metadataUpdates, + MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper); void toThrift( - const facebook::presto::protocol::Split& split, - std::string& thriftSplit); + const facebook::presto::protocol::MetadataUpdates& metadataUpdates, + std::string& thriftMetadataUpdates); void fromThrift( - const SplitWrapper& thriftSplitWrapper, - facebook::presto::protocol::Split& split); + const MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper, + facebook::presto::protocol::MetadataUpdates& metadataUpdates); void fromThrift( - const std::string& thriftSplit, - facebook::presto::protocol::Split& split); + const std::string& thriftMetadataUpdates, + facebook::presto::protocol::MetadataUpdates& metadataUpdates); void toThrift( const facebook::presto::protocol::TableWriteInfo& tableWriteInfo, TableWriteInfoWrapper& thriftTableWriteInfoWrapper); @@ -139,17 +139,19 @@ void fromThrift( const std::string& thriftTableWriteInfo, facebook::presto::protocol::TableWriteInfo& tableWriteInfo); void toThrift( - const facebook::presto::protocol::MetadataUpdates& metadataUpdates, - MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper); -void toThrift( - const facebook::presto::protocol::MetadataUpdates& metadataUpdates, - std::string& thriftMetadataUpdates); + const std::shared_ptr& proto, + ConnectorSplitWrapper& thrift); void fromThrift( - const MetadataUpdatesWrapper& thriftMetadataUpdatesWrapper, - facebook::presto::protocol::MetadataUpdates& metadataUpdates); + const ConnectorSplitWrapper& thrift, + std::shared_ptr& proto); +void toThrift( + const std::shared_ptr< + facebook::presto::protocol::ConnectorTransactionHandle>& proto, + ConnectorTransactionHandleWrapper& thrift); void fromThrift( - const std::string& thriftMetadataUpdates, - facebook::presto::protocol::MetadataUpdates& metadataUpdates); + const ConnectorTransactionHandleWrapper& thrift, + std::shared_ptr& + proto); void toThrift( const facebook::presto::protocol::Lifespan& proto, Lifespan& thrift); @@ -298,11 +300,11 @@ void fromThrift( facebook::presto::protocol::TaskSource& proto); void toThrift( - const facebook::presto::protocol::ScheduledSplit& proto, - ScheduledSplit& thrift); + const facebook::presto::protocol::SplitContext& proto, + SplitContext& thrift); void fromThrift( - const ScheduledSplit& thrift, - facebook::presto::protocol::ScheduledSplit& proto); + const SplitContext& thrift, + facebook::presto::protocol::SplitContext& proto); void toThrift( const facebook::presto::protocol::TaskStatus& proto, @@ -388,6 +390,9 @@ void fromThrift( const Signature& thrift, facebook::presto::protocol::Signature& proto); +void toThrift(const facebook::presto::protocol::Split& proto, Split& thrift); +void fromThrift(const Split& thrift, facebook::presto::protocol::Split& proto); + void toThrift( const facebook::presto::protocol::OutputBuffers& proto, OutputBuffers& thrift); @@ -427,6 +432,13 @@ void fromThrift( const SqlInvokedFunction& thrift, facebook::presto::protocol::SqlInvokedFunction& proto); +void toThrift( + const facebook::presto::protocol::ScheduledSplit& proto, + ScheduledSplit& thrift); +void fromThrift( + const ScheduledSplit& thrift, + facebook::presto::protocol::ScheduledSplit& proto); + void toThrift( const facebook::presto::protocol::TaskInfo& proto, TaskInfo& thrift); diff --git a/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.py b/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.py index f935177a84131..534d3ebc42e3a 100755 --- a/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.py +++ b/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.py @@ -43,7 +43,7 @@ def parse_args(): def special_file(filename, special, thrift_item, key): if os.path.isfile(filename): (status, stdout, stderr) = util.run( - "../../../velox/scripts/license-header.py --header ../../../license.header --remove " + "../../../velox/scripts/checks/license-header.py --header ../../../license.header --remove " + filename ) thrift_item[key] = stdout @@ -66,17 +66,24 @@ def main(): comment = "// This file is generated DO NOT EDIT @" + "generated" result = [{"comment": comment}] + # Skip structs that are not used in cpp thrift = [item for item in thrift if "class_name" in item and item.class_name not in config.SkipStruct] for thrift_item in thrift: if "class_name" not in thrift_item: continue + # For structs that are defined in presto_protocol_core.h + if thrift_item.class_name in config.StructInProtocolCore: + thrift_item["core"] = "true" + + # For structs that have a single field in IDL but defined using type aliases in cpp if thrift_item.class_name in config.WrapperStruct: thrift_item["wrapper"] = "true" del thrift_item["struct"] continue config_item = None + # For structs that need special implementations if thrift_item.class_name in config.Special: hfile = "./special/" + thrift_item.class_name + ".hpp.inc" special = special_file(hfile, special, thrift_item, "hinc") @@ -89,6 +96,7 @@ def main(): special = False if "struct" in thrift_item: + # For structs that have different field names in cpp and IDL if thrift_item.class_name in config.StructMap: config_item = config.StructMap[thrift_item.class_name] thrift_item["proto_name"] = config_item.class_name diff --git a/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.yml b/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.yml index b4e44171674e0..34dafa19fadb3 100644 --- a/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.yml +++ b/presto-native-execution/presto_cpp/main/thrift/presto_protocol-to-thrift-json.yml @@ -53,12 +53,16 @@ WrapperStruct: TimeZoneKey: Special: + ConnectorTransactionHandleWrapper: + ConnectorSplitWrapper: QualifiedObjectName: MetadataUpdatesWrapper: OperatorInfoUnion: OutputBufferId: - SplitWrapper: TableWriteInfoWrapper: TaskId: Type: TypeSignature: + +StructInProtocolCore: + dummy: diff --git a/presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift b/presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift index 2623225df0c11..4600783a33d98 100644 --- a/presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift +++ b/presto-native-execution/presto_cpp/main/thrift/presto_thrift.thrift @@ -80,14 +80,21 @@ enum BufferType { DISCARDING = 3, SPOOLING = 4, } -struct SplitWrapper { - 1: string split; +struct MetadataUpdatesWrapper { + 1: string metadataUpdates; } struct TableWriteInfoWrapper { 1: string tableWriteInfo; } -struct MetadataUpdatesWrapper { - 1: string metadataUpdates; +struct ConnectorSplitWrapper { + 1: optional string connectorId; + 2: optional binary customSerializedValue; + 3: optional string jsonValue; +} +struct ConnectorTransactionHandleWrapper { + 1: optional string connectorId; + 2: optional binary customSerializedValue; + 3: optional string jsonValue; } struct Lifespan { 1: bool grouped; @@ -279,10 +286,8 @@ struct TaskSource { 3: set noMoreSplitsForLifespan; 4: bool noMoreSplits; } -struct ScheduledSplit { - 1: i64 sequenceId; - 2: PlanNodeId planNodeId; - 3: SplitWrapper split; +struct SplitContext { + 1: bool cacheable; } struct TaskStatus { 1: i64 taskInstanceIdLeastSignificantBits; @@ -476,6 +481,13 @@ struct Signature { 6: list typeVariableConstraints; 7: list longVariableConstraints; } +struct Split { + 1: ConnectorId connectorId; + 2: ConnectorTransactionHandleWrapper transactionHandle; + 3: ConnectorSplitWrapper connectorSplit; + 4: Lifespan lifespan; + 5: SplitContext splitContext; +} struct OutputBuffers { 1: BufferType type; 2: i64 version; @@ -527,6 +539,11 @@ struct SqlInvokedFunction { 6: Signature signature; 7: SqlFunctionId functionId; } +struct ScheduledSplit { + 1: i64 sequenceId; + 2: PlanNodeId planNodeId; + 3: Split split; +} struct TaskInfo { 1: TaskId taskId; 2: TaskStatus taskStatus; diff --git a/presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.cpp.inc b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.cpp.inc new file mode 100644 index 0000000000000..479a9bebef0ec --- /dev/null +++ b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.cpp.inc @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +void toThrift(const std::shared_ptr& proto, ConnectorSplitWrapper& thrift) { +} +void fromThrift(const ConnectorSplitWrapper& thrift, std::shared_ptr& proto) { + if (thrift.connectorId().has_value() && thrift.customSerializedValue().has_value()) { + facebook::presto::protocol::getConnectorProtocol(thrift.connectorId().value()) + .deserialize(thrift.customSerializedValue().value(), proto); + } else if (thrift.jsonValue().has_value()) { + json j = json::parse(thrift.jsonValue().value()); + from_json(j, proto); + } +} diff --git a/presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.hpp.inc b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.hpp.inc similarity index 59% rename from presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.hpp.inc rename to presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.hpp.inc index 8602093461f7c..9e033358058d3 100644 --- a/presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.hpp.inc +++ b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorSplitWrapper.hpp.inc @@ -12,7 +12,5 @@ * limitations under the License. */ -void toThrift(const facebook::presto::protocol::Split& split, SplitWrapper& thriftSplitWrapper); -void toThrift(const facebook::presto::protocol::Split& split, std::string& thriftSplit); -void fromThrift(const SplitWrapper& thriftSplitWrapper, facebook::presto::protocol::Split& split); -void fromThrift(const std::string& thriftSplit, facebook::presto::protocol::Split& split); +void toThrift(const std::shared_ptr& proto, ConnectorSplitWrapper& thrift); +void fromThrift(const ConnectorSplitWrapper& thrift, std::shared_ptr& proto); diff --git a/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.cpp.inc b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.cpp.inc new file mode 100644 index 0000000000000..9f915ada88ded --- /dev/null +++ b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.cpp.inc @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +void toThrift(const std::shared_ptr& proto, ConnectorTransactionHandleWrapper& thrift) { +} +void fromThrift(const ConnectorTransactionHandleWrapper& thrift, std::shared_ptr& proto) { + if (thrift.connectorId().has_value() && thrift.customSerializedValue().has_value()) { + facebook::presto::protocol::getConnectorProtocol(thrift.connectorId().value()) + .deserialize(thrift.customSerializedValue().value(), proto); + } else if (thrift.jsonValue().has_value()) { + json j = json::parse(thrift.jsonValue().value()); + from_json(j, proto); + } +} diff --git a/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.hpp.inc b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.hpp.inc new file mode 100644 index 0000000000000..10ccee7345183 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/thrift/special/ConnectorTransactionHandleWrapper.hpp.inc @@ -0,0 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +void toThrift(const std::shared_ptr& proto, ConnectorTransactionHandleWrapper& thrift); +void fromThrift(const ConnectorTransactionHandleWrapper& thrift, std::shared_ptr& proto); diff --git a/presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.cpp.inc b/presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.cpp.inc deleted file mode 100644 index 5411d2769da49..0000000000000 --- a/presto-native-execution/presto_cpp/main/thrift/special/SplitWrapper.cpp.inc +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -void toThrift(const facebook::presto::protocol::Split& split, SplitWrapper& thriftSplitWrapper) { - toThrift(split, *thriftSplitWrapper.split_ref()); -} -void toThrift(const facebook::presto::protocol::Split& split, std::string& thriftSplit) { - json jsonSplit = split; - std::string str = jsonSplit.dump(); - toThrift(str, thriftSplit); -} -void fromThrift(const SplitWrapper& thriftSplitWrapper, facebook::presto::protocol::Split& split) { - fromThrift(*thriftSplitWrapper.split_ref(), split); -} -void fromThrift(const std::string& thriftSplit, facebook::presto::protocol::Split& split) { - json j = json::parse(thriftSplit); - split = j; -} diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h b/presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h index 0cb4aa5c88597..3b87dd58f83fe 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h +++ b/presto-native-execution/presto_cpp/presto_protocol/core/ConnectorProtocol.h @@ -103,6 +103,20 @@ class ConnectorProtocol { virtual void from_json( const json& j, std::shared_ptr& p) const = 0; + + virtual void serialize( + const std::shared_ptr& proto, + std::string& thrift) const = 0; + virtual void deserialize( + const std::string& thrift, + std::shared_ptr& proto) const = 0; + + virtual void serialize( + const std::shared_ptr& proto, + std::string& thrift) const = 0; + virtual void deserialize( + const std::string& thrift, + std::shared_ptr& proto) const = 0; }; namespace { @@ -222,6 +236,29 @@ class ConnectorProtocolTemplate final : public ConnectorProtocol { from_json_template(j, p); } + void serialize( + const std::shared_ptr& proto, + std::string& thrift) const final { + serializeTemplate(proto, thrift); + } + + void deserialize( + const std::string& thrift, + std::shared_ptr& proto) const final { + deserializeTemplate(thrift, proto); + } + + void serialize( + const std::shared_ptr& proto, + std::string& thrift) const final { + serializeTemplate(proto, thrift); + } + void deserialize( + const std::string& thrift, + std::shared_ptr& proto) const final { + deserializeTemplate(thrift, proto); + } + private: template static void to_json_template( @@ -262,6 +299,26 @@ class ConnectorProtocolTemplate final : public ConnectorProtocol { BASE>::type* = 0) { VELOX_NYI("Not implemented: {}", typeid(BASE).name()); } + + template + static void serializeTemplate( + const std::shared_ptr& proto, + std::string& thrift, + typename std::enable_if::value, BASE>:: + type* = 0) { + auto derived = *std::static_pointer_cast(proto); + thrift = derived.serialize(derived); + } + + template + static void deserializeTemplate( + const std::string& thrift, + std::shared_ptr& proto, + typename std::enable_if::value, BASE>:: + type* = 0) { + std::shared_ptr derived; + proto = derived->deserialize(thrift, derived); + } }; using SystemConnectorProtocol = ConnectorProtocolTemplate< SystemTableHandle, diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache index 1e3ba8304cf99..886735f96963c 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol-json-hpp.mustache @@ -35,6 +35,7 @@ #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/presto_protocol/core/DataSize.h" #include "presto_cpp/presto_protocol/core/Duration.h" +#include "velox/common/base/Exceptions.h" using nlohmann::json; @@ -241,6 +242,7 @@ template struct adl_serializer& p); void from_json(const json& j, std::shared_ptr<{{&class_name}}>& p); } +{{/hinc}} {{/abstract}} {{/.}} diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h index deeb58df22711..29f06baad49d6 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h @@ -31,6 +31,7 @@ #include "presto_cpp/external/json/nlohmann/json.hpp" #include "presto_cpp/presto_protocol/core/DataSize.h" #include "presto_cpp/presto_protocol/core/Duration.h" +#include "velox/common/base/Exceptions.h" #include "velox/common/encode/Base64.h" using nlohmann::json; @@ -290,21 +291,11 @@ void to_json(json& j, const std::shared_ptr& p); void from_json(const json& j, std::shared_ptr& p); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { -struct ConnectorTransactionHandle : public JsonEncodedSubclass {}; -void to_json(json& j, const std::shared_ptr& p); -void from_json(const json& j, std::shared_ptr& p); -} // namespace facebook::presto::protocol -namespace facebook::presto::protocol { struct ExecutionWriterTarget : public JsonEncodedSubclass {}; void to_json(json& j, const std::shared_ptr& p); void from_json(const json& j, std::shared_ptr& p); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { -struct ConnectorSplit : public JsonEncodedSubclass {}; -void to_json(json& j, const std::shared_ptr& p); -void from_json(const json& j, std::shared_ptr& p); -} // namespace facebook::presto::protocol -namespace facebook::presto::protocol { struct ConnectorOutputTableHandle : public JsonEncodedSubclass {}; void to_json(json& j, const std::shared_ptr& p); void from_json(const json& j, std::shared_ptr& p); @@ -501,6 +492,20 @@ void to_json(json& j, const AllOrNoneValueSet& p); void from_json(const json& j, AllOrNoneValueSet& p); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +struct ConnectorTransactionHandle : public JsonEncodedSubclass { + static std::string serialize(ConnectorTransactionHandle& p) { + VELOX_NYI("Serialization not implemented for ConnectorTransactionHandle"); + } + static std::shared_ptr deserialize( + const std::string& data, + std::shared_ptr p) { + VELOX_NYI("Deserialization not implemented for ConnectorTransactionHandle"); + } +}; +void to_json(json& j, const std::shared_ptr& p); +void from_json(const json& j, std::shared_ptr& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { struct AnalyzeTableHandle { ConnectorId connectorId = {}; std::shared_ptr transactionHandle = {}; @@ -739,6 +744,20 @@ struct Lifespan { void to_json(json& j, const Lifespan& p); void from_json(const json& j, Lifespan& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +struct ConnectorSplit : public JsonEncodedSubclass { + static std::string serialize(ConnectorSplit& p) { + VELOX_NYI("Serialization not implemented for ConnectorSplit"); + } + static std::shared_ptr deserialize( + const std::string& data, + std::shared_ptr p) { + VELOX_NYI("Deserialization not implemented for ConnectorSplit"); + } +}; +void to_json(json& j, const std::shared_ptr& p); +void from_json(const json& j, std::shared_ptr& p); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { struct SplitContext { diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorSplit.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorSplit.hpp.inc new file mode 100644 index 0000000000000..7a75c8b6bac8a --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorSplit.hpp.inc @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace facebook::presto::protocol { +struct ConnectorSplit : public JsonEncodedSubclass { + static std::string serialize(ConnectorSplit& p) { + VELOX_NYI("Serialization not implemented for ConnectorSplit"); + } + static std::shared_ptr deserialize(const std::string& data, std::shared_ptr p) { + VELOX_NYI("Deserialization not implemented for ConnectorSplit"); + } +}; +void to_json(json& j, const std::shared_ptr& p); +void from_json(const json& j, std::shared_ptr& p); +} // namespace facebook::presto::protocol diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorTransactionHandle.hpp.inc b/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorTransactionHandle.hpp.inc new file mode 100644 index 0000000000000..a0010e7c878c5 --- /dev/null +++ b/presto-native-execution/presto_cpp/presto_protocol/core/special/ConnectorTransactionHandle.hpp.inc @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace facebook::presto::protocol { +struct ConnectorTransactionHandle : public JsonEncodedSubclass { + static std::string serialize(ConnectorTransactionHandle& p) { + VELOX_NYI("Serialization not implemented for ConnectorTransactionHandle"); + } + static std::shared_ptr deserialize(const std::string& data, std::shared_ptr p) { + VELOX_NYI("Deserialization not implemented for ConnectorTransactionHandle"); + } +}; +void to_json(json& j, const std::shared_ptr& p); +void from_json(const json& j, std::shared_ptr& p); +} // namespace facebook::presto::protocol