Skip to content

Commit

Permalink
Fixed data source test
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 committed Dec 26, 2024
1 parent 0fb2d7f commit b8dbf9b
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 41 deletions.
2 changes: 1 addition & 1 deletion app/server/datasource/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type DataSource[T paging.Acceptor] interface {
logger *zap.Logger,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[T],
sinkFactory paging.SinkFactory[T],
) error
}

Expand Down
2 changes: 1 addition & 1 deletion app/server/datasource/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (m *DataSourceMock[T]) ReadSplit(
_ *zap.Logger,
_ *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[T],
sinkFactory paging.SinkFactory[T],
) error {
return m.Called(split, sinkFactory).Error(0)
}
14 changes: 2 additions & 12 deletions app/server/datasource/rdbms/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func (ds *dataSourceImpl) DescribeTable(
return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil
}

func (ds *dataSourceImpl) doReadSplit(
func (ds *dataSourceImpl) ReadSplit(
ctx context.Context,
logger *zap.Logger,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[any],
sinkFactory paging.SinkFactory[any],
) error {
// Make connection(s) to the data source.
var cs []rdbms_utils.Connection
Expand Down Expand Up @@ -219,16 +219,6 @@ func (ds *dataSourceImpl) doReadSplitSingleConn(
return nil
}

func (ds *dataSourceImpl) ReadSplit(
ctx context.Context,
logger *zap.Logger,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[any],
) error {
return ds.doReadSplit(ctx, logger, request, split, sinkFactory)
}

func NewDataSource(
logger *zap.Logger,
preset *Preset,
Expand Down
34 changes: 22 additions & 12 deletions app/server/datasource/rdbms/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

api_common "github.com/ydb-platform/fq-connector-go/api/common"
Expand Down Expand Up @@ -67,8 +68,10 @@ func TestReadSplit(t *testing.T) {
}

connection := &rdbms_utils.ConnectionMock{}
connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once()
connectionManager.On("Release", connection).Return().Once()
connection.On("From").Return("", "example_1").Twice()

connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once()
connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once()

rows := &rdbms_utils.RowsMock{
PredefinedData: [][]any{
Expand Down Expand Up @@ -99,10 +102,15 @@ func TestReadSplit(t *testing.T) {
sink.On("AddRow", transformer).Return(nil).Times(2)
sink.On("Finish").Return().Once()

sinkFactory := &paging.SinkFactoryMock{}
sinkFactory.On("MakeSinks", 1).Return([]paging.Sink[any]{sink}, nil).Once()

dataSource := NewDataSource(logger, preset, converterCollection)
dataSource.ReadSplit(ctx, logger, readSplitsRequest, split, sink)

mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink)
err := dataSource.ReadSplit(ctx, logger, readSplitsRequest, split, sinkFactory)
require.NoError(t, err)

mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink, sinkFactory)
})

t.Run("scan error", func(t *testing.T) {
Expand All @@ -116,8 +124,10 @@ func TestReadSplit(t *testing.T) {
}

connection := &rdbms_utils.ConnectionMock{}
connectionManager.On("Make", split.Select.DataSourceInstance).Return(connection, nil).Once()
connectionManager.On("Release", connection).Return().Once()
connection.On("From").Return("", "example_1").Twice()

connectionManager.On("Make", split.Select.DataSourceInstance).Return([]rdbms_utils.Connection{connection}, nil).Once()
connectionManager.On("Release", []rdbms_utils.Connection{connection}).Return().Once()

rows := &rdbms_utils.RowsMock{
PredefinedData: [][]any{
Expand Down Expand Up @@ -149,15 +159,15 @@ func TestReadSplit(t *testing.T) {

sink := &paging.SinkMock{}
sink.On("AddRow", transformer).Return(nil).Once()
sink.On("AddError", mock.MatchedBy(func(err error) bool {
return errors.Is(err, scanErr)
})).Return().Once()
sink.On("Finish").Return().Once()

sinkFactory := &paging.SinkFactoryMock{}
sinkFactory.On("MakeSinks", 1).Return([]paging.Sink[any]{sink}, nil).Once()

datasource := NewDataSource(logger, preset, converterCollection)

datasource.ReadSplit(ctx, logger, readSplitsRequest, split, sink)
err := datasource.ReadSplit(ctx, logger, readSplitsRequest, split, sinkFactory)
require.True(t, errors.Is(err, scanErr))

mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink)
mock.AssertExpectationsForObjects(t, connectionManager, connection, rows, sink, sinkFactory)
})
}
4 changes: 2 additions & 2 deletions app/server/datasource/s3/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func (ds *dataSource) ReadSplit(
logger *zap.Logger,
_ *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[string]) error {
sinkFactory paging.SinkFactory[string]) error {
return ds.doReadSplit(ctx, logger, split, sinkFactory)
}

func (*dataSource) doReadSplit(
ctx context.Context,
_ *zap.Logger,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[string]) error {
sinkFactory paging.SinkFactory[string]) error {
conn := makeConnection()

var (
Expand Down
10 changes: 10 additions & 0 deletions app/server/paging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ type Sink[T Acceptor] interface {
// Finish reports the successful completion of data stream reading.
Finish()
}

// SinkFactory should be instantiated once for each ReadSplits request.
// It owns some structures that are shared across multiple Sink instances.
type SinkFactory[T Acceptor] interface {
MakeSinks(totalSinks int) ([]Sink[T], error)
// ResultQueue returns a channel with columnar buffers generated by all sinks
ResultQueue() <-chan *ReadResult[T]
// FinalStats returns the overall statistics collected during the request processing.
FinalStats() *api_service_protos.TReadSplitsResponse_TStats
}
20 changes: 20 additions & 0 deletions app/server/paging/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ func (m *SinkMock) ResultQueue() <-chan *ReadResult[any] {
return m.Called().Get(0).(chan *ReadResult[any])
}

var _ SinkFactory[any] = (*SinkFactoryMock)(nil)

type SinkFactoryMock struct {
mock.Mock
}

func (m *SinkFactoryMock) MakeSinks(totalSinks int) ([]Sink[any], error) {
args := m.Called(totalSinks)

return args.Get(0).([]Sink[any]), args.Error(1)
}

func (m *SinkFactoryMock) ResultQueue() <-chan *ReadResult[any] {
return m.Called().Get(0).(chan *ReadResult[any])
}

func (m *SinkFactoryMock) FinalStats() *api_service_protos.TReadSplitsResponse_TStats {
return m.Called().Get(0).(*api_service_protos.TReadSplitsResponse_TStats)
}

var _ ColumnarBuffer[any] = (*ColumnarBufferMock)(nil)

type ColumnarBufferMock struct {
Expand Down
4 changes: 0 additions & 4 deletions app/server/paging/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ func (s *sinkImpl[T]) Finish() {
}
}

func (s *sinkImpl[T]) ResultQueue() <-chan *ReadResult[T] {
return s.resultQueue
}

func (s *sinkImpl[T]) respondWith(
buf ColumnarBuffer[T],
stats *api_service_protos.TReadSplitsResponse_TStats,
Expand Down
14 changes: 7 additions & 7 deletions app/server/paging/sink_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (

// SinkFactory should be instantiated once for each ReadSplits request.
// It owns some structures that are shared across multiple Sink instances.
type SinkFactory[T Acceptor] struct {
type sinkFactoryImpl[T Acceptor] struct {
ctx context.Context
logger *zap.Logger
cfg *config.TPagingConfig
Expand All @@ -37,7 +37,7 @@ type SinkFactory[T Acceptor] struct {

// MakeSinks is used to generate Sink objects, one per each data source connection.
// This method can be called only once.
func (f *SinkFactory[T]) MakeSinks(totalSinks int) ([]Sink[T], error) {
func (f *sinkFactoryImpl[T]) MakeSinks(totalSinks int) ([]Sink[T], error) {
if f.state != sinkFactoryIdle {
return nil, fmt.Errorf("sink factory is already in use")
}
Expand Down Expand Up @@ -84,12 +84,12 @@ func (f *SinkFactory[T]) MakeSinks(totalSinks int) ([]Sink[T], error) {
}

// ResultQueue returns a channel with columnar buffers generated by all sinks
func (f *SinkFactory[T]) ResultQueue() <-chan *ReadResult[T] {
func (f *sinkFactoryImpl[T]) ResultQueue() <-chan *ReadResult[T] {
return f.resultQueue
}

// FinalStats returns the overall statistics collected during the request processing.
func (f *SinkFactory[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TStats {
func (f *sinkFactoryImpl[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TStats {
overallStats := &api_service_protos.TReadSplitsResponse_TStats{}
for _, tracker := range f.trafficTrackers {

Check failure on line 94 in app/server/paging/sink_factory.go

View workflow job for this annotation

GitHub Actions / lint

ranges should only be cuddled with assignments used in the iteration (wsl)
partialStats := tracker.DumpStats(true)
Expand All @@ -101,7 +101,7 @@ func (f *SinkFactory[T]) FinalStats() *api_service_protos.TReadSplitsResponse_TS
return overallStats
}

func (f *SinkFactory[T]) sinkTerminationHandler(terminateChan <-chan struct{}) {
func (f *sinkFactoryImpl[T]) sinkTerminationHandler(terminateChan <-chan struct{}) {
terminatedSinks := 0

for {
Expand Down Expand Up @@ -135,8 +135,8 @@ func NewSinkFactory[T Acceptor](
cfg *config.TPagingConfig,
columnarBufferFactory ColumnarBufferFactory[T],
readLimiter ReadLimiter,
) *SinkFactory[T] {
sf := &SinkFactory[T]{
) SinkFactory[T] {
sf := &sinkFactoryImpl[T]{
state: sinkFactoryIdle,
bufferFactory: columnarBufferFactory,
readLimiter: readLimiter,
Expand Down
4 changes: 2 additions & 2 deletions app/server/streaming/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Streamer[T paging.Acceptor] struct {
dataSource datasource.DataSource[T]
request *api_service_protos.TReadSplitsRequest
split *api_service_protos.TSplit
sinkFactory *paging.SinkFactory[T]
sinkFactory paging.SinkFactory[T]
logger *zap.Logger
errorChan chan error // notifies about errors happened during reading process
ctx context.Context // clone of a stream context
Expand Down Expand Up @@ -135,7 +135,7 @@ func NewStreamer[T paging.Acceptor](
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
sinkFactory *paging.SinkFactory[T],
sinkFactory paging.SinkFactory[T],
dataSource datasource.DataSource[T],
) *Streamer[T] {
ctx, cancel := context.WithCancel(stream.Context())
Expand Down

0 comments on commit b8dbf9b

Please sign in to comment.