Skip to content

Commit

Permalink
MySQL: correctly handle empty tables (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 authored Oct 3, 2024
1 parent 839259a commit 14f6a3b
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 19 deletions.
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (
return transformer, nil
}

func (c *connectionHTTP) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c *connectionHTTP) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.DB.QueryContext(ctx, query, args...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *rowsNative) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collect
return transformer, nil
}

func (c *connectionNative) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c *connectionNative) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.Conn.Query(ctx, query, args...)
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (ds *dataSourceImpl) doReadSplit(
func() error {
var queryErr error

if rows, queryErr = conn.Query(ctx, query, args...); queryErr != nil {
if rows, queryErr = conn.Query(ctx, logger, query, args...); queryErr != nil {
return fmt.Errorf("query '%s' error: %w", query, queryErr)
}

Expand Down
3 changes: 2 additions & 1 deletion app/server/datasource/rdbms/ms_sql_server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"

_ "github.com/denisenkom/go-mssqldb"
"go.uber.org/zap"

rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
Expand All @@ -21,7 +22,7 @@ func (c Connection) Close() error {
return c.db.Close()
}

func (c Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.db.QueryContext(ctx, query, args...)
Expand Down
18 changes: 12 additions & 6 deletions app/server/datasource/rdbms/mysql/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,37 @@ import (

"github.com/go-mysql-org/go-mysql/client"
"github.com/go-mysql-org/go-mysql/mysql"
"go.uber.org/zap"

"github.com/ydb-platform/fq-connector-go/app/config"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
)

var _ rdbms_utils.Connection = (*Connection)(nil)

type Connection struct {
logger common.QueryLogger
conn *client.Conn
rowBufferCapacity uint64
logger common.QueryLogger
conn *client.Conn
cfg *config.TMySQLConfig
}

func (c *Connection) Close() error {
return c.conn.Close()
}

func (c *Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

results := make(chan rowData, c.rowBufferCapacity)
results := make(chan rowData, c.cfg.ResultChanCapacity)
result := &mysql.Result{}

r := &rows{
ctx: ctx,
cfg: c.cfg,
logger: logger,
rowChan: results,
errChan: make(chan error, 1),
lastRow: nil,
transformerInitChan: make(chan []uint8, 1),
transformerInitFinished: atomic.Uint32{},
Expand All @@ -46,8 +51,9 @@ func (c *Connection) Query(ctx context.Context, query string, args ...any) (rdbm

go func() {
defer close(r.rowChan)
defer close(r.errChan)

err = stmt.ExecuteSelectStreaming(
r.errChan <- stmt.ExecuteSelectStreaming(
result,
// In per-row handler copy entire row. The driver re-uses memory allocated for single row,
// so we need either to lock the row until the reader is done its reading and processing
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/mysql/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *connectionManager) Make(
return nil, fmt.Errorf("set time zone: %w", err)
}

return &Connection{queryLogger, conn, c.cfg.GetResultChanCapacity()}, nil
return &Connection{queryLogger, conn, c.cfg}, nil
}

func (*connectionManager) Release(_ context.Context, logger *zap.Logger, conn rdbms_utils.Connection) {
Expand Down
16 changes: 15 additions & 1 deletion app/server/datasource/rdbms/mysql/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"go.uber.org/zap"

"github.com/ydb-platform/fq-connector-go/app/config"
"github.com/ydb-platform/fq-connector-go/app/server/conversion"
"github.com/ydb-platform/fq-connector-go/app/server/paging"
"github.com/ydb-platform/fq-connector-go/common"
Expand All @@ -26,7 +28,10 @@ type rowData struct {
}

type rows struct {
ctx context.Context
ctx context.Context
logger *zap.Logger

errChan chan error
rowChan chan rowData
lastRow *rowData
inputFinished bool
Expand All @@ -35,6 +40,7 @@ type rows struct {
// it's used to initialize transformer with column types (which are encoded with uint8 values)
transformerInitChan chan []uint8
transformerInitFinished atomic.Uint32
cfg *config.TMySQLConfig
}

func (*rows) Close() error { return nil }
Expand Down Expand Up @@ -278,6 +284,14 @@ func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (
if !ok {
return nil, fmt.Errorf("mysql types are not ready")
}
case err := <-r.errChan:
if err != nil {
return nil, fmt.Errorf("error occurred during async reading: %w", err)
}

// nil error means that asynchronous reading was successfully finished
// before the first line was received - the case of empty table
r.logger.Warn("table seems to be empty")
case <-r.ctx.Done():
return nil, r.ctx.Err()
}
Expand Down
3 changes: 2 additions & 1 deletion app/server/datasource/rdbms/oracle/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

go_ora "github.com/sijms/go-ora/v2"
"go.uber.org/zap"

rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
Expand All @@ -22,7 +23,7 @@ func (c Connection) Close() error {
return c.conn.Close()
}

func (c Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

valueArgs := make([]driver.NamedValue, len(args))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c Connection) Close() error {
return c.Conn.Close(context.TODO())
}

func (c Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.Conn.Query(ctx, query, args...)
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/utils/schema_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (f *DefaultSchemaProvider) GetSchema(
) (*api_service_protos.TSchema, error) {
query, args := f.getArgsAndQuery(request)

rows, err := conn.Query(ctx, query, args...)
rows, err := conn.Query(ctx, logger, query, args...)
if err != nil {
return nil, fmt.Errorf("query builder error: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/utils/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type Connection interface {
Query(ctx context.Context, query string, args ...any) (Rows, error)
Query(ctx context.Context, logger *zap.Logger, query string, args ...any) (Rows, error)
Close() error
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/utils/sql_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ConnectionMock struct {
mock.Mock
}

func (m *ConnectionMock) Query(_ context.Context, query string, params ...any) (Rows, error) {
func (m *ConnectionMock) Query(_ context.Context, _ *zap.Logger, query string, params ...any) (Rows, error) {
called := []any{query}
called = append(called, params...)
args := m.Called(called...)
Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/rdbms/ydb/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (p
return transformer, nil
}

func (c *Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
func (c *Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.DB.QueryContext(ydb_sdk.WithQueryMode(ctx, ydb_sdk.ScanQueryMode), query, args...)
Expand Down
6 changes: 6 additions & 0 deletions tests/infra/datasource/mysql/init/init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,9 @@ INSERT INTO pushdown VALUES
(2, 20, 'b'),
(3, 30, 'c'),
(4, NULL, NULL);

DROP TABLE IF EXISTS empty_table;
CREATE TABLE empty_table (
id INT NOT NULL,
col1 VARCHAR(7)
);
7 changes: 7 additions & 0 deletions tests/infra/datasource/mysql/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ func (s *Suite) TestPushdownNegation() {
)
}

func (s *Suite) TestEmptyTable() {
s.ValidateTable(
s.dataSource,
tables["empty_table"],
)
}

// TODO: fix error mapping in `common/errors.go`
func (s *Suite) TestInvalidLogin() {
s.T().Skip()
Expand Down
13 changes: 13 additions & 0 deletions tests/infra/datasource/mysql/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,19 @@ var tables = map[string]*test_utils.Table[int32, *array.Int32Builder]{
},
},
},
"empty_table": {
Name: "empty_table",
IDArrayBuilderFactory: newInt32IDArrayBuilder(memPool),
Schema: &test_utils.TableSchema{
Columns: map[string]*Ydb.Type{
"id": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_INT32)),
"col1": common.MakeOptionalType(common.MakePrimitiveType(Ydb.Type_UTF8)),
},
},
Records: []*test_utils.Record[int32, *array.Int32Builder]{
// no items
},
},
}

func pushdownSchemaYdb() *test_utils.TableSchema {
Expand Down
3 changes: 2 additions & 1 deletion tests/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func (b *Base[ID, IDBUILDER]) doValidateTable(
splits := common.ListSplitsResponsesToSplits(listSplitsResponses)
readSplitsResponses, err := b.Connector.ClientBuffering().ReadSplits(ctx, splits)
b.Require().NoError(err)
b.Require().Len(readSplitsResponses, 1)
// either no blocks (empty table), either single block (tables are small)
b.Require().Contains([]int{0, 1}, len(readSplitsResponses))

records, err := common.ReadResponsesToArrowRecords(readSplitsResponses)
b.Require().NoError(err)
Expand Down

0 comments on commit 14f6a3b

Please sign in to comment.