Skip to content

Commit

Permalink
YDB: support pushdown for optional types in Query Service mode (#204)
Browse files Browse the repository at this point in the history
* YDB: test both connector modes

* YDB: test optional pushdown

* YDB: validate optional
  • Loading branch information
vitalyisaev2 authored Oct 14, 2024
1 parent f76af43 commit c2f4c59
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 27 deletions.
2 changes: 2 additions & 0 deletions app/server/datasource/rdbms/ydb/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (c *connectionManager) Make(
case config.TYdbConfig_MODE_UNSPECIFIED:
fallthrough
case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE:
logger.Debug("YDB Connector will use Native SDK over Query Service")
ydbConn = newConnectionNative(ctx, c.QueryLoggerFactory.Make(logger), dsi, ydbDriver)
case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES:
logger.Debug("YDB Connector will use database/sql SDK with scan queries over Table Service")
ydbConn, err = newConnectionDatabaseSQL(ctx, logger, c.QueryLoggerFactory.Make(logger), c.cfg, dsi, ydbDriver)
default:
return nil, fmt.Errorf("unknown mode: %v", c.cfg.Mode)
Expand Down
45 changes: 30 additions & 15 deletions app/server/datasource/rdbms/ydb/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,28 @@ func (c *connectionNative) Close() error {
return nil
}

func newConnectionNative(
ctx context.Context,
queryLogger common.QueryLogger,
dsi *api_common.TDataSourceInstance,
driver *ydb_sdk.Driver,
) ydbConnection {
return &connectionNative{
ctx: ctx,
driver: driver,
queryLogger: queryLogger,
dsi: dsi,
}
}

func (c *connectionNative) rewriteQuery(params *rdbms_utils.QueryParams) (string, error) {
var buf bytes.Buffer

buf.WriteString(fmt.Sprintf("PRAGMA TablePathPrefix(\"%s\");", c.dsi.Database)) //nolint:revive

for i, arg := range params.QueryArgs.GetAll() {
typeName, err := primitiveYqlTypeName(arg.YdbType.GetTypeId())
var primitiveTypeID Ydb.Type_PrimitiveTypeId

if arg.YdbType.GetOptionalType() != nil {
internalType := arg.YdbType.GetOptionalType().GetItem()

switch t := internalType.GetType().(type) {
case *Ydb.Type_TypeId:
primitiveTypeID = t.TypeId
default:
return "", fmt.Errorf("optional type contains no primitive type: %v", arg.YdbType)
}
} else {
primitiveTypeID = arg.YdbType.GetTypeId()
}

typeName, err := primitiveYqlTypeName(primitiveTypeID)
if err != nil {
return "", fmt.Errorf("get YQL type name from value %v: %w", arg, err)
}
Expand All @@ -260,3 +261,17 @@ func (c *connectionNative) rewriteQuery(params *rdbms_utils.QueryParams) (string

return buf.String(), nil
}

func newConnectionNative(
ctx context.Context,
queryLogger common.QueryLogger,
dsi *api_common.TDataSourceInstance,
driver *ydb_sdk.Driver,
) ydbConnection {
return &connectionNative{
ctx: ctx,
driver: driver,
queryLogger: queryLogger,
dsi: dsi,
}
}
54 changes: 51 additions & 3 deletions tests/infra/datasource/ydb/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

api_common "github.com/ydb-platform/fq-connector-go/api/common"
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/common"
"github.com/ydb-platform/fq-connector-go/tests/infra/datasource"
"github.com/ydb-platform/fq-connector-go/tests/suite"
Expand All @@ -15,7 +16,8 @@ import (

type Suite struct {
*suite.Base[int32, *array.Int32Builder]
dataSource *datasource.DataSource
dataSource *datasource.DataSource
connectorMode config.TYdbConfig_Mode
}

func (s *Suite) TestSelect() {
Expand Down Expand Up @@ -85,6 +87,10 @@ func (s *Suite) TestPushdownComparisonEQ() {
}

func (s *Suite) TestPushdownComparisonEQNull() {
if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE {
s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode")
}

s.ValidateTable(
s.dataSource,
tables["pushdown_comparison_EQ_NULL"],
Expand Down Expand Up @@ -260,6 +266,20 @@ func (s *Suite) TestPushdownStringsUtf8() {
)
}

func (s *Suite) TestPushdownStringsUtf8Optional() {
s.ValidateTable(
s.dataSource,
tables["pushdown_strings_utf8"],
suite.WithPredicate(&api_service_protos.TPredicate{
Payload: tests_utils.MakePredicateComparisonColumn(
"col_02_utf8",
api_service_protos.TPredicate_TComparison_EQ,
common.MakeTypedValue(common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_UTF8)), "a"),
),
}),
)
}

func (s *Suite) TestPushdownStringsString() {
s.ValidateTable(
s.dataSource,
Expand All @@ -274,6 +294,20 @@ func (s *Suite) TestPushdownStringsString() {
)
}

func (s *Suite) TestPushdownStringsStringOptional() {
s.ValidateTable(
s.dataSource,
tables["pushdown_strings_string"],
suite.WithPredicate(&api_service_protos.TPredicate{
Payload: tests_utils.MakePredicateComparisonColumn(
"col_03_string",
api_service_protos.TPredicate_TComparison_EQ,
common.MakeTypedValue(common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_STRING)), []byte("b")),
),
}),
)
}

func (s *Suite) TestLargeTable() {
// For tables larger than 1000 rows, scan queries must be used,
// otherwise output will be truncated.
Expand All @@ -300,6 +334,10 @@ func (s *Suite) TestPositiveStats() {
}

func (s *Suite) TestMissingDataSource() {
if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE {
s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode")
}

dsi := &api_common.TDataSourceInstance{
Kind: api_common.EDataSourceKind_YDB,
Endpoint: &api_common.TEndpoint{Host: "www.google.com", Port: 2136},
Expand All @@ -320,26 +358,36 @@ func (s *Suite) TestMissingDataSource() {
}

func (s *Suite) TestInvalidLogin() {
if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE {
s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode")
}

for _, dsi := range s.dataSource.Instances {
suite.TestInvalidLogin(s.Base, dsi, tables["simple"])
}
}

func (s *Suite) TestInvalidPassword() {
if s.connectorMode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE {
s.T().Skip("Skipping test in QUERY_SERVICE_NATIVE mode")
}

for _, dsi := range s.dataSource.Instances {
suite.TestInvalidPassword(s.Base, dsi, tables["simple"])
}
}

func NewSuite(
baseSuite *suite.Base[int32, *array.Int32Builder],
connectorMode config.TYdbConfig_Mode,
) *Suite {
ds, err := deriveDataSourceFromDockerCompose(baseSuite.EndpointDeterminer)
baseSuite.Require().NoError(err)

result := &Suite{
Base: baseSuite,
dataSource: ds,
Base: baseSuite,
dataSource: ds,
connectorMode: connectorMode,
}

return result
Expand Down
18 changes: 17 additions & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package tests

import (
"fmt"
"log"
"testing"

"github.com/apache/arrow/go/v13/arrow/array"
testify_suite "github.com/stretchr/testify/suite"

"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server"
"github.com/ydb-platform/fq-connector-go/tests/infra/datasource/clickhouse"
"github.com/ydb-platform/fq-connector-go/tests/infra/datasource/greenplum"
"github.com/ydb-platform/fq-connector-go/tests/infra/datasource/ms_sql_server"
Expand Down Expand Up @@ -40,7 +43,20 @@ func TestPostgreSQL(t *testing.T) {
}

func TestYDB(t *testing.T) {
testify_suite.Run(t, ydb.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, "YDB")))
modes := []config.TYdbConfig_Mode{
config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES,
config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE,
}

for _, mode := range modes {
suiteName := fmt.Sprintf("YDB_%v", config.TYdbConfig_Mode_name[int32(mode)])
option := suite.WithEmbeddedOptions(server.WithYdbConnectorMode(mode))

testify_suite.Run(
t,
ydb.NewSuite(suite.NewBase[int32, *array.Int32Builder](t, state, suiteName, option), mode),
)
}
}

func TestGreenplum(t *testing.T) {
Expand Down
52 changes: 44 additions & 8 deletions tests/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ type Base[ID test_utils.TableIDTypes, IDBUILDER test_utils.ArrowIDBuilder[ID]] s
testify_suite.Suite
*State
Connector common.TestingServer
name string
cfg *baseConfig
}

type baseConfig struct {
name string // suite name
embeddedOptions []server.EmbeddedOption // additional launching options for Connector service
}

func (b *Base[_, _]) BeforeTest(_, testName string) {
fmt.Printf("\n>>>>>>>>>> TEST STARTED: %s/%s <<<<<<<<<<\n\n", b.name, testName)
fmt.Printf("\n>>>>>>>>>> TEST STARTED: %s/%s <<<<<<<<<<\n\n", b.cfg.name, testName)
}

func (b *Base[_, _]) TearDownTest() {
Expand All @@ -38,14 +43,14 @@ func (b *Base[_, _]) TearDownTest() {
}

func (b *Base[_, _]) BeforeSuite(_ string) {
fmt.Printf("\n>>>>>>>>>> SUITE STARTED: %s <<<<<<<<<<\n", b.name)
fmt.Printf("\n>>>>>>>>>> SUITE STARTED: %s <<<<<<<<<<\n", b.cfg.name)
}

func (b *Base[_, _]) SetupSuite() {
// We want to run a distinct instance of Connector for every suite
var err error

b.Connector, err = server.NewEmbedded(
options := []server.EmbeddedOption{
server.WithLoggerConfig(
&config.TLoggerConfig{
LogLevel: config.ELogLevel_DEBUG,
Expand All @@ -66,7 +71,12 @@ func (b *Base[_, _]) SetupSuite() {
},
),
server.WithConnectionTimeouts("2s", "1s"),
server.WithYdbConnectorMode(config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES),
}

options = append(options, b.cfg.embeddedOptions...)

b.Connector, err = server.NewEmbedded(
options...,
)
b.Require().NoError(err)
b.Connector.Start()
Expand All @@ -75,7 +85,7 @@ func (b *Base[_, _]) SetupSuite() {
func (b *Base[_, _]) TearDownSuite() {
b.Connector.Stop()

fmt.Printf("\n>>>>>>>>>> Suite stopped: %s <<<<<<<<<<\n", b.name)
fmt.Printf("\n>>>>>>>>>> Suite stopped: %s <<<<<<<<<<\n", b.cfg.name)
}

type validateTableOptions struct {
Expand Down Expand Up @@ -202,13 +212,39 @@ func (b *Base[ID, IDBUILDER]) doValidateTable(
table.MatchRecords(b.T(), records, schema)
}

type BaseOption interface {
apply(cfg *baseConfig)
}

type embeddedOption struct {
options []server.EmbeddedOption
}

func (o *embeddedOption) apply(cfg *baseConfig) {
cfg.embeddedOptions = append(cfg.embeddedOptions, o.options...)
}

func WithEmbeddedOptions(options ...server.EmbeddedOption) BaseOption {
return &embeddedOption{
options: options,
}
}

func NewBase[
ID test_utils.TableIDTypes,
IDBUILDER test_utils.ArrowIDBuilder[ID],
](t *testing.T, state *State, name string) *Base[ID, IDBUILDER] {
](t *testing.T, state *State, name string, suiteOptions ...BaseOption) *Base[ID, IDBUILDER] {
cfg := &baseConfig{
name: name,
}

for _, option := range suiteOptions {
option.apply(cfg)
}

b := &Base[ID, IDBUILDER]{
State: state,
name: name,
cfg: cfg,
}

b.SetT(t)
Expand Down

0 comments on commit c2f4c59

Please sign in to comment.