Skip to content

Commit 33ad098

Browse files
committed
[FLINK-23450][avro-confluent-registry] Set properties map for DebeziumAvroFormat
1 parent c8f4d80 commit 33ad098

File tree

5 files changed

+78
-25
lines changed

5 files changed

+78
-25
lines changed

flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java

+16-19
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import org.apache.flink.table.types.DataType;
4343
import org.apache.flink.table.types.logical.RowType;
4444

45+
import javax.annotation.Nullable;
46+
4547
import java.util.HashMap;
4648
import java.util.HashSet;
4749
import java.util.Map;
@@ -84,14 +86,10 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
8486
final TypeInformation<RowData> rowDataTypeInfo =
8587
context.createTypeInformation(producedDataType);
8688
return new AvroRowDataDeserializationSchema(
87-
optionalPropertiesMap.isEmpty()
88-
? ConfluentRegistryAvroDeserializationSchema.forGeneric(
89-
AvroSchemaConverter.convertToSchema(rowType),
90-
schemaRegistryURL)
91-
: ConfluentRegistryAvroDeserializationSchema.forGeneric(
92-
AvroSchemaConverter.convertToSchema(rowType),
93-
schemaRegistryURL,
94-
optionalPropertiesMap),
89+
ConfluentRegistryAvroDeserializationSchema.forGeneric(
90+
AvroSchemaConverter.convertToSchema(rowType),
91+
schemaRegistryURL,
92+
optionalPropertiesMap),
9593
AvroToRowDataConverters.createRowConverter(rowType),
9694
rowDataTypeInfo);
9795
}
@@ -126,16 +124,11 @@ public SerializationSchema<RowData> createRuntimeEncoder(
126124
final RowType rowType = (RowType) consumedDataType.getLogicalType();
127125
return new AvroRowDataSerializationSchema(
128126
rowType,
129-
optionalPropertiesMap.isEmpty()
130-
? ConfluentRegistryAvroSerializationSchema.forGeneric(
131-
subject.get(),
132-
AvroSchemaConverter.convertToSchema(rowType),
133-
schemaRegistryURL)
134-
: ConfluentRegistryAvroSerializationSchema.forGeneric(
135-
subject.get(),
136-
AvroSchemaConverter.convertToSchema(rowType),
137-
schemaRegistryURL,
138-
optionalPropertiesMap),
127+
ConfluentRegistryAvroSerializationSchema.forGeneric(
128+
subject.get(),
129+
AvroSchemaConverter.convertToSchema(rowType),
130+
schemaRegistryURL,
131+
optionalPropertiesMap),
139132
RowDataToAvroConverters.createConverter(rowType));
140133
}
141134

@@ -173,7 +166,8 @@ public Set<ConfigOption<?>> optionalOptions() {
173166
return options;
174167
}
175168

176-
private Map<String, String> buildOptionalPropertiesMap(ReadableConfig formatOptions) {
169+
public static @Nullable Map<String, String> buildOptionalPropertiesMap(
170+
ReadableConfig formatOptions) {
177171
final Map<String, String> properties = new HashMap<>();
178172

179173
formatOptions
@@ -201,6 +195,9 @@ private Map<String, String> buildOptionalPropertiesMap(ReadableConfig formatOpti
201195
.getOptional(BEARER_AUTH_TOKEN)
202196
.ifPresent(v -> properties.put("bearer.auth.token", v));
203197

198+
if (properties.isEmpty()) {
199+
return null;
200+
}
204201
return properties;
205202
}
206203
}

flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@
3434
import org.apache.flink.types.RowKind;
3535
import org.apache.flink.util.Collector;
3636

37+
import javax.annotation.Nullable;
38+
3739
import java.io.IOException;
40+
import java.util.Map;
3841
import java.util.Objects;
3942

4043
import static java.lang.String.format;
@@ -75,14 +78,23 @@ public final class DebeziumAvroDeserializationSchema implements DeserializationS
7578

7679
public DebeziumAvroDeserializationSchema(
7780
RowType rowType, TypeInformation<RowData> producedTypeInfo, String schemaRegistryUrl) {
81+
this(rowType, producedTypeInfo, schemaRegistryUrl, null);
82+
}
83+
84+
public DebeziumAvroDeserializationSchema(
85+
RowType rowType,
86+
TypeInformation<RowData> producedTypeInfo,
87+
String schemaRegistryUrl,
88+
@Nullable Map<String, ?> registryConfigs) {
7889
this.producedTypeInfo = producedTypeInfo;
7990
RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));
8091

8192
this.avroDeserializer =
8293
new AvroRowDataDeserializationSchema(
8394
ConfluentRegistryAvroDeserializationSchema.forGeneric(
8495
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
85-
schemaRegistryUrl),
96+
schemaRegistryUrl,
97+
registryConfigs),
8698
AvroToRowDataConverters.createRowConverter(debeziumAvroRowType),
8799
producedTypeInfo);
88100
}

flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactory.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,21 @@
3939
import org.apache.flink.types.RowKind;
4040

4141
import java.util.HashSet;
42+
import java.util.Map;
4243
import java.util.Optional;
4344
import java.util.Set;
4445

46+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory.buildOptionalPropertiesMap;
47+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
48+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BASIC_AUTH_USER_INFO;
49+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_CREDENTIALS_SOURCE;
50+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.BEARER_AUTH_TOKEN;
4551
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT;
4652
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SCHEMA_REGISTRY_URL;
53+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_LOCATION;
54+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_KEYSTORE_PASSWORD;
55+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_LOCATION;
56+
import static org.apache.flink.formats.avro.registry.confluent.RegistryAvroOptions.SSL_TRUSTSTORE_PASSWORD;
4757

4858
/**
4959
* Format factory for providing configured instances of Debezium Avro to RowData {@link
@@ -59,7 +69,9 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
5969
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
6070

6171
FactoryUtil.validateFactoryOptions(this, formatOptions);
72+
6273
String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL);
74+
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
6375

6476
return new DecodingFormat<DeserializationSchema<RowData>>() {
6577
@Override
@@ -69,7 +81,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
6981
final TypeInformation<RowData> producedTypeInfo =
7082
context.createTypeInformation(producedDataType);
7183
return new DebeziumAvroDeserializationSchema(
72-
rowType, producedTypeInfo, schemaRegistryURL);
84+
rowType, producedTypeInfo, schemaRegistryURL, optionalPropertiesMap);
7385
}
7486

7587
@Override
@@ -89,8 +101,11 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
89101
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
90102

91103
FactoryUtil.validateFactoryOptions(this, formatOptions);
104+
92105
String schemaRegistryURL = formatOptions.get(SCHEMA_REGISTRY_URL);
93106
Optional<String> subject = formatOptions.getOptional(SCHEMA_REGISTRY_SUBJECT);
107+
Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions);
108+
94109
if (!subject.isPresent()) {
95110
throw new ValidationException(
96111
String.format(
@@ -114,7 +129,7 @@ public SerializationSchema<RowData> createRuntimeEncoder(
114129
DynamicTableSink.Context context, DataType consumedDataType) {
115130
final RowType rowType = (RowType) consumedDataType.getLogicalType();
116131
return new DebeziumAvroSerializationSchema(
117-
rowType, schemaRegistryURL, subject.get());
132+
rowType, schemaRegistryURL, subject.get(), optionalPropertiesMap);
118133
}
119134
};
120135
}
@@ -135,6 +150,14 @@ public Set<ConfigOption<?>> requiredOptions() {
135150
public Set<ConfigOption<?>> optionalOptions() {
136151
Set<ConfigOption<?>> options = new HashSet<>();
137152
options.add(SCHEMA_REGISTRY_SUBJECT);
153+
options.add(SSL_KEYSTORE_LOCATION);
154+
options.add(SSL_KEYSTORE_PASSWORD);
155+
options.add(SSL_TRUSTSTORE_LOCATION);
156+
options.add(SSL_TRUSTSTORE_PASSWORD);
157+
options.add(BASIC_AUTH_CREDENTIALS_SOURCE);
158+
options.add(BASIC_AUTH_USER_INFO);
159+
options.add(BEARER_AUTH_CREDENTIALS_SOURCE);
160+
options.add(BEARER_AUTH_TOKEN);
138161
return options;
139162
}
140163
}

flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerializationSchema.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.formats.avro.registry.confluent.debezium;
2020

21+
import org.apache.flink.annotation.Internal;
2122
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.serialization.SerializationSchema;
2324
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
@@ -31,6 +32,9 @@
3132
import org.apache.flink.table.types.DataType;
3233
import org.apache.flink.table.types.logical.RowType;
3334

35+
import javax.annotation.Nullable;
36+
37+
import java.util.Map;
3438
import java.util.Objects;
3539

3640
import static java.lang.String.format;
@@ -40,6 +44,7 @@
4044
* Serialization schema from Flink Table/SQL internal data structure {@link RowData} to Debezium
4145
* Avro.
4246
*/
47+
@Internal
4348
public class DebeziumAvroSerializationSchema implements SerializationSchema<RowData> {
4449
private static final long serialVersionUID = 1L;
4550

@@ -55,6 +60,14 @@ public class DebeziumAvroSerializationSchema implements SerializationSchema<RowD
5560

5661
public DebeziumAvroSerializationSchema(
5762
RowType rowType, String schemaRegistryUrl, String schemaRegistrySubject) {
63+
this(rowType, schemaRegistryUrl, schemaRegistrySubject, null);
64+
}
65+
66+
public DebeziumAvroSerializationSchema(
67+
RowType rowType,
68+
String schemaRegistryUrl,
69+
String schemaRegistrySubject,
70+
@Nullable Map<String, ?> registryConfigs) {
5871
RowType debeziumAvroRowType = createDebeziumAvroRowType(fromLogicalToDataType(rowType));
5972

6073
this.avroSerializer =
@@ -63,7 +76,8 @@ public DebeziumAvroSerializationSchema(
6376
ConfluentRegistryAvroSerializationSchema.forGeneric(
6477
schemaRegistrySubject,
6578
AvroSchemaConverter.convertToSchema(debeziumAvroRowType),
66-
schemaRegistryUrl),
79+
schemaRegistryUrl,
80+
registryConfigs),
6781
RowDataToAvroConverters.createConverter(debeziumAvroRowType));
6882
}
6983

flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroFormatFactoryTest.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,19 @@ public class DebeziumAvroFormatFactoryTest extends TestLogger {
6868
public void testSeDeSchema() {
6969
final Map<String, String> options = getAllOptions();
7070

71+
final Map<String, String> registryConfigs = new HashMap<>();
72+
registryConfigs.put("basic.auth.user.info", "something1");
73+
registryConfigs.put("basic.auth.credentials.source", "something2");
74+
7175
DebeziumAvroDeserializationSchema expectedDeser =
7276
new DebeziumAvroDeserializationSchema(
73-
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL);
77+
ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), REGISTRY_URL, registryConfigs);
7478
DeserializationSchema<RowData> actualDeser = createDeserializationSchema(options);
7579
assertEquals(expectedDeser, actualDeser);
7680

7781
DebeziumAvroSerializationSchema expectedSer =
78-
new DebeziumAvroSerializationSchema(ROW_TYPE, REGISTRY_URL, SUBJECT);
82+
new DebeziumAvroSerializationSchema(
83+
ROW_TYPE, REGISTRY_URL, SUBJECT, registryConfigs);
7984
SerializationSchema<RowData> actualSer = createSerializationSchema(options);
8085
Assert.assertEquals(expectedSer, actualSer);
8186
}
@@ -89,6 +94,8 @@ private Map<String, String> getAllOptions() {
8994
options.put("format", DebeziumAvroFormatFactory.IDENTIFIER);
9095
options.put("debezium-avro-confluent.schema-registry.url", REGISTRY_URL);
9196
options.put("debezium-avro-confluent.schema-registry.subject", SUBJECT);
97+
options.put("debezium-avro-confluent.basic-auth.user-info", "something1");
98+
options.put("debezium-avro-confluent.basic-auth.credentials-source", "something2");
9299
return options;
93100
}
94101

0 commit comments

Comments
 (0)