diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 68ce449394b5..5e3680a74eae 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -85,6 +85,9 @@ dependencies { // "kafka-clients" has to be provided since user can use its own version. exclude group: "org.apache.kafka", module: "kafka-clients" } + // everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation. + permitUnusedDeclared library.java.everit_json_schema + provided library.java.everit_json_schema testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(":sdks:java:io:synthetic") testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java similarity index 88% rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index 9280de6c13f6..b930e7baa460 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -36,7 +36,7 @@ @Experimental @DefaultSchema(AutoValueSchema.class) @AutoValue -public abstract class KafkaSchemaTransformReadConfiguration { +public abstract class KafkaReadSchemaTransformConfiguration { public static final Set VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest"); public static final Set VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON"); @@ -50,9 +50,9 @@ public void validate() { : "Valid data formats are " + VALID_DATA_FORMATS; } - /** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */ + /** Instantiates a {@link KafkaReadSchemaTransformConfiguration.Builder} instance. */ public static Builder builder() { - return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder(); + return new AutoValue_KafkaReadSchemaTransformConfiguration.Builder(); } /** Sets the bootstrap servers for the Kafka consumer. */ @@ -69,7 +69,7 @@ public static Builder builder() { public abstract String getConfluentSchemaRegistrySubject(); @Nullable - public abstract String getAvroSchema(); + public abstract String getSchema(); @Nullable public abstract String getAutoOffsetResetConfig(); @@ -80,7 +80,7 @@ public static Builder builder() { /** Sets the topic from which to read. */ public abstract String getTopic(); - /** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */ + /** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -91,7 +91,7 @@ public abstract static class Builder { public abstract Builder setConfluentSchemaRegistrySubject(String subject); - public abstract Builder setAvroSchema(String schema); + public abstract Builder setSchema(String schema); public abstract Builder setDataFormat(String dataFormat); @@ -102,7 +102,7 @@ public abstract static class Builder { /** Sets the topic from which to read. */ public abstract Builder setTopic(String value); - /** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */ - public abstract KafkaSchemaTransformReadConfiguration build(); + /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */ + public abstract KafkaReadSchemaTransformConfiguration build(); } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java similarity index 86% rename from sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index a13c54c22aa4..5c84922db201 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -19,6 +19,7 @@ import com.google.auto.service.AutoService; import java.util.List; +import java.util.Objects; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.schemas.Schema; @@ -27,6 +28,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -41,22 +43,22 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; @AutoService(SchemaTransformProvider.class) -public class KafkaSchemaTransformReadProvider - extends TypedSchemaTransformProvider { +public class KafkaReadSchemaTransformProvider + extends TypedSchemaTransformProvider { @Override - protected Class configurationClass() { - return KafkaSchemaTransformReadConfiguration.class; + protected Class configurationClass() { + return KafkaReadSchemaTransformConfiguration.class; } @Override - protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) { + protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { return new KafkaReadSchemaTransform(configuration); } @Override public String identifier() { - return "kafka:read"; + return "beam:schematransform:org.apache.beam:kafka_read:v1"; } @Override @@ -70,29 +72,33 @@ public List outputCollectionNames() { } private static class KafkaReadSchemaTransform implements SchemaTransform { - private final KafkaSchemaTransformReadConfiguration configuration; + private final KafkaReadSchemaTransformConfiguration configuration; - KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) { + KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration configuration) { configuration.validate(); this.configuration = configuration; } @Override public PTransform buildTransform() { - final String avroSchema = configuration.getAvroSchema(); + final String inputSchema = configuration.getSchema(); final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE; final String autoOffsetReset = configuration.getAutoOffsetResetConfig() == null ? "latest" : configuration.getAutoOffsetResetConfig(); - if (avroSchema != null) { + if (inputSchema != null) { assert configuration.getConfluentSchemaRegistryUrl() == null : "To read from Kafka, a schema must be provided directly or though Confluent " + "Schema Registry, but not both."; final Schema beamSchema = - AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema)); + Objects.equals(configuration.getDataFormat(), "JSON") + ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) + : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); SerializableFunction valueMapper = - AvroUtils.getAvroBytesToRowFunction(beamSchema); + Objects.equals(configuration.getDataFormat(), "JSON") + ? JsonUtils.getJsonBytesToRowFunction(beamSchema) + : AvroUtils.getAvroBytesToRowFunction(beamSchema); return new PTransform() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java similarity index 66% rename from sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java rename to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index c8f76ecf3cc6..0ac42c38462e 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaSchemaTransformReadProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -20,20 +20,24 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Objects; import java.util.ServiceLoader; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link KafkaSchemaTransformReadProvider}. */ +/** Tests for {@link KafkaReadSchemaTransformProvider}. */ @RunWith(JUnit4.class) -public class KafkaSchemaTransformReadProviderTest { +public class KafkaReadSchemaTransformProviderTest { private static final String AVRO_SCHEMA = "{\"type\":\"record\",\"namespace\":\"com.example\"," + "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"}," @@ -44,7 +48,7 @@ public void testValidConfigurations() { assertThrows( AssertionError.class, () -> { - KafkaSchemaTransformReadConfiguration.builder() + KafkaReadSchemaTransformConfiguration.builder() .setDataFormat("UNUSUAL_FORMAT") .setTopic("a_valid_topic") .setBootstrapServers("a_valid_server") @@ -55,7 +59,7 @@ public void testValidConfigurations() { assertThrows( IllegalStateException.class, () -> { - KafkaSchemaTransformReadConfiguration.builder() + KafkaReadSchemaTransformConfiguration.builder() .setDataFormat("UNUSUAL_FORMAT") // .setTopic("a_valid_topic") // Topic is mandatory .setBootstrapServers("a_valid_server") @@ -66,7 +70,7 @@ public void testValidConfigurations() { assertThrows( IllegalStateException.class, () -> { - KafkaSchemaTransformReadConfiguration.builder() + KafkaReadSchemaTransformConfiguration.builder() .setDataFormat("UNUSUAL_FORMAT") .setTopic("a_valid_topic") // .setBootstrapServers("a_valid_server") // Bootstrap server is mandatory @@ -81,7 +85,7 @@ public void testFindTransformAndMakeItWork() { ServiceLoader.load(SchemaTransformProvider.class); List providers = StreamSupport.stream(serviceLoader.spliterator(), false) - .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class) + .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class) .collect(Collectors.toList()); SchemaTransformProvider kafkaProvider = providers.get(0); assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT")); @@ -91,7 +95,7 @@ public void testFindTransformAndMakeItWork() { Sets.newHashSet( "bootstrapServers", "topic", - "avroSchema", + "schema", "autoOffsetResetConfig", "consumerConfigUpdates", "dataFormat", @@ -108,16 +112,43 @@ public void testBuildTransformWithAvroSchema() { ServiceLoader.load(SchemaTransformProvider.class); List providers = StreamSupport.stream(serviceLoader.spliterator(), false) - .filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class) + .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class) .collect(Collectors.toList()); - KafkaSchemaTransformReadProvider kafkaProvider = - (KafkaSchemaTransformReadProvider) providers.get(0); + KafkaReadSchemaTransformProvider kafkaProvider = + (KafkaReadSchemaTransformProvider) providers.get(0); kafkaProvider .from( - KafkaSchemaTransformReadConfiguration.builder() + KafkaReadSchemaTransformConfiguration.builder() .setTopic("anytopic") .setBootstrapServers("anybootstrap") - .setAvroSchema(AVRO_SCHEMA) + .setSchema(AVRO_SCHEMA) + .build()) + .buildTransform(); + } + + @Test + public void testBuildTransformWithJsonSchema() throws IOException { + ServiceLoader serviceLoader = + ServiceLoader.load(SchemaTransformProvider.class); + List providers = + StreamSupport.stream(serviceLoader.spliterator(), false) + .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class) + .collect(Collectors.toList()); + KafkaReadSchemaTransformProvider kafkaProvider = + (KafkaReadSchemaTransformProvider) providers.get(0); + kafkaProvider + .from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setDataFormat("JSON") + .setSchema( + new String( + ByteStreams.toByteArray( + Objects.requireNonNull( + getClass() + .getResourceAsStream("/json-schema/basic_json_schema.json"))), + StandardCharsets.UTF_8)) .build()) .buildTransform(); }