diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java index 10232ca81ec27..012d0dfa35910 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java @@ -77,7 +77,7 @@ public void testFileGroupLookUpManyEntriesWithSameStartValue() { } /** - * Tests for many duplicte entries in the tree. + * Tests for many duplicate entries in the tree. */ @Test public void testFileGroupLookUpManyDulicateEntries() { diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 4d453ff2dfb0c..c8a58d5a588b9 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -261,22 +261,22 @@ io.confluent kafka-avro-serializer - 3.0.0 + ${confluent.version} io.confluent common-config - 3.0.0 + ${confluent.version} io.confluent common-utils - 3.0.0 + ${confluent.version} io.confluent kafka-schema-registry-client - 3.0.0 + ${confluent.version} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 47c4c2f81a790..32ef60967ae8a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -28,8 +28,14 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Collections; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Obtains latest schema from the Confluent/Kafka schema-registry. @@ -48,19 +54,49 @@ public static class Config { "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; } - private static String fetchSchemaFromRegistry(String registryUrl) throws IOException { - URL registry = new URL(registryUrl); + /** + * The method takes the provided url {@code registryUrl} and gets the schema from the schema registry using that url. + * If the caller provides userInfo credentials in the url (e.g "https://foo:bar@schemaregistry.org") then the credentials + * are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization + * header. + * @param registryUrl + * @return the Schema in String form. + * @throws IOException + */ + public String fetchSchemaFromRegistry(String registryUrl) throws IOException { + URL registry; + HttpURLConnection connection; + Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl); + if (matcher.find()) { + String creds = matcher.group(1); + String urlWithoutCreds = registryUrl.replace(creds + "@", ""); + registry = new URL(urlWithoutCreds); + connection = (HttpURLConnection) registry.openConnection(); + setAuthorizationHeader(matcher.group(1), connection); + } else { + registry = new URL(registryUrl); + connection = (HttpURLConnection) registry.openConnection(); + } ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(registry.openStream()); + JsonNode node = mapper.readTree(getStream(connection)); return node.get("schema").asText(); } + protected void setAuthorizationHeader(String creds, HttpURLConnection connection) { + String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + encodedAuth); + } + + protected InputStream getStream(HttpURLConnection connection) throws IOException { + return connection.getInputStream(); + } + public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); } - private static Schema getSchema(String registryUrl) throws IOException { + private Schema getSchema(String registryUrl) throws IOException { return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 14c5f01079d88..ad1acd8d9b589 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -96,7 +96,7 @@ private IndexedRecord createExtendUserRecord() { } /** - * Tests {@link KafkaAvroSchemaDeserializer#deserialize(boolean, String, Boolean, byte[], Schema)}. + * Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, Boolean, byte[], Schema)}. */ @Test public void testKafkaAvroSchemaDeserializer() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java new file mode 100644 index 0000000000000..79769ad03c178 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class TestSchemaRegistryProvider { + + private final String basicAuth = "foo:bar"; + + private final String json = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", " + + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": " + + "\\\"string\\\" }]}\"}"; + + private TypedProperties getProps() { + return new TypedProperties() {{ + put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); + put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); + put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); + put("hoodie.deltastreamer.source.kafka.topic", "foo"); + }}; + } + + private Schema getExpectedSchema(String response) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8))); + return (new Schema.Parser()).parse(node.get("schema").asText()); + } + + private SchemaRegistryProvider getUnderTest(TypedProperties props) throws IOException { + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider spyUnderTest = Mockito.spy(new SchemaRegistryProvider(props, null)); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + return spyUnderTest; + } + + @Test + public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { + SchemaRegistryProvider spyUnderTest = getUnderTest(getProps()); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } + + @Test + public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException { + SchemaRegistryProvider spyUnderTest = getUnderTest(getProps()); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } + + @Test + public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + SchemaRegistryProvider spyUnderTest = getUnderTest(props); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } + + @Test + public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + SchemaRegistryProvider spyUnderTest = getUnderTest(props); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } +} \ No newline at end of file diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 51fe1433a8121..4fff5258fba01 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -615,25 +615,25 @@ io.confluent kafka-avro-serializer - 3.0.0 + ${confluent.version} io.confluent common-config - 3.0.0 + ${confluent.version} io.confluent common-utils - 3.0.0 + ${confluent.version} io.confluent kafka-schema-registry-client - 3.0.0 + ${confluent.version} diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index e884e6dc29a86..2c3699240a6be 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -93,6 +93,7 @@ com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.confluent:kafka-avro-serializer + io.confluent:kafka-schema-serializer io.confluent:common-config io.confluent:common-utils io.confluent:kafka-schema-registry-client diff --git a/pom.xml b/pom.xml index c1b4a9940f28b..e479401e592ee 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ 2.7.4 2.10.0 2.0.0 + 5.3.4 2.17 1.10.1 5.7.0-M1