Skip to content

Commit b72249c

Browse files
committed
add: phaker -> values test
1 parent b91b3a9 commit b72249c

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

build.sbt

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ libraryDependencies ++= Seq(
1313
"org.apache.flink" % "flink-cdc-runtime" % flinkCdcVersion,
1414
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
1515
"org.apache.flink" % "flink-clients" % flinkVersion % Test,
16-
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Test
16+
"org.apache.flink" % "flink-streaming-java" % flinkVersion % Test,
17+
"org.apache.flink" % "flink-cdc-composer" % flinkCdcVersion % Test,
18+
"org.apache.flink" % "flink-cdc-pipeline-connector-values" % flinkCdcVersion % Test
1719
)

src/test/scala/PhakerTest.scala

+59-2
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,80 @@
11
package io.github.yuxiqian.phaker
22

3+
import factory.PhakerDataFactory
34
import source.PhakerSourceFunction
45

56
import org.apache.flink.cdc.common.event.TableId
7+
import org.apache.flink.cdc.composer.definition.{SinkDef, SourceDef}
8+
import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer
9+
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory
10+
import org.apache.flink.cdc.connectors.values.sink.{ValuesDataSink, ValuesDataSinkOptions}
611
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
712
import org.scalatest.funsuite.AnyFunSuite
813

914
class PhakerTest extends AnyFunSuite {
1015

11-
test("Phaker test example") {
16+
import org.apache.flink.cdc.common.configuration.Configuration
17+
import org.apache.flink.cdc.common.pipeline.PipelineOptions
18+
import org.apache.flink.cdc.composer.definition.PipelineDef
19+
20+
import java.util.Collections
21+
22+
test("Phaker source test") {
1223
val source = new PhakerSourceFunction(
1324
TableId.tableId("default_namespace", "default_schema", "default_table"),
1425
true,
1526
17,
16-
1,
27+
17,
1728
1000
1829
)
1930
val env = StreamExecutionEnvironment.getExecutionEnvironment
2031
env.addSource(source).print().setParallelism(1)
2132
env.execute("Let's Test Phaker Source...")
2233
}
34+
35+
test("Phaker to Values test") {
36+
import source.PhakerDataSourceOptions
37+
38+
import org.apache.flink.cdc.composer.definition.{RouteDef, TransformDef}
39+
40+
val composer = FlinkPipelineComposer.ofMiniCluster
41+
42+
// Setup value source
43+
val sourceConfig = new Configuration
44+
sourceConfig
45+
.set(PhakerDataSourceOptions.NAMESPACE_NAME, "default_namespace")
46+
.set(PhakerDataSourceOptions.SCHEMA_NAME, "default_schema")
47+
.set(PhakerDataSourceOptions.TABLE_NAME, "default_table")
48+
.set[java.lang.Integer](PhakerDataSourceOptions.BATCH_COUNT, 1)
49+
.set[java.lang.Integer](PhakerDataSourceOptions.MAX_COLUMN_COUNT, 50)
50+
.set[java.lang.Integer](PhakerDataSourceOptions.SLEEP_TIME, 1000)
51+
52+
val sourceDef =
53+
new SourceDef(PhakerDataFactory.IDENTIFIER, "Value Source", sourceConfig)
54+
55+
// Setup value sink
56+
val sinkConfig = new Configuration
57+
sinkConfig.set(
58+
ValuesDataSinkOptions.SINK_API,
59+
ValuesDataSink.SinkApi.SINK_V2
60+
)
61+
val sinkDef =
62+
new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig)
63+
64+
// Setup pipeline
65+
val pipelineConfig = new Configuration
66+
pipelineConfig
67+
.set[java.lang.Integer](PipelineOptions.PIPELINE_PARALLELISM, 1)
68+
val pipelineDef = new PipelineDef(
69+
sourceDef,
70+
sinkDef,
71+
Collections.emptyList[RouteDef],
72+
Collections.emptyList[TransformDef],
73+
pipelineConfig
74+
)
75+
76+
// Execute the pipeline
77+
val execution = composer.compose(pipelineDef)
78+
execution.execute
79+
}
2380
}

0 commit comments

Comments
 (0)