-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathacceptance_test.go
93 lines (82 loc) · 2.68 KB
/
acceptance_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"strings"
"testing"
"time"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit-connector-kafka/source"
"github.com/conduitio/conduit-connector-kafka/test"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
)
func TestAcceptance(t *testing.T) {
srcCfg := map[string]string{
"servers": "localhost:9092",
// source params
"readFromBeginning": "true",
}
destCfg := map[string]string{
"servers": "localhost:9092",
// destination params
"batchBytes": "1000012",
"acks": "all",
"compression": "snappy",
}
sdk.AcceptanceTest(t, AcceptanceTestDriver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: srcCfg,
DestinationConfig: destCfg,
BeforeTest: func(t *testing.T) {
lastSlash := strings.LastIndex(t.Name(), "/")
randomName := t.Name()[lastSlash+1:] + uuid.NewString()
srcCfg["topics"] = randomName
destCfg["topic"] = randomName
},
WriteTimeout: time.Second * 10,
ReadTimeout: time.Second * 10,
},
},
})
}
type AcceptanceTestDriver struct {
sdk.ConfigurableAcceptanceTestDriver
}
// ReadFromDestination is overwritten because the source connector uses a consumer
// group which results in slow reads. This speeds up the destination tests.
func (d AcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
cfg := test.ParseConfigMap[source.Config](t, d.SourceConfig(t))
kgoRecs := test.Consume(t, cfg.Servers, cfg.Topics[0], len(records))
recs := make([]opencdc.Record, len(kgoRecs))
for i, rec := range kgoRecs {
metadata := opencdc.Metadata{}
metadata.SetCollection(rec.Topic)
metadata.SetCreatedAt(rec.Timestamp)
recs[i] = sdk.Util.Source.NewRecordCreate(
source.Position{
GroupID: "",
Topic: rec.Topic,
Partition: rec.Partition,
Offset: rec.Offset,
}.ToSDKPosition(),
metadata,
opencdc.RawData(rec.Key),
opencdc.RawData(rec.Value),
)
}
return recs
}