From b0bd45ffe4449a7f1f98b696f33ea8387d7ec165 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 17 Jun 2021 00:31:09 +0100 Subject: [PATCH 01/13] adding support for basic auth with confluent cloud schema registry --- hudi-utilities/pom.xml | 8 +- .../deser/KafkaAvroSchemaDeserializer.java | 4 +- .../schema/SchemaRegistryProvider.java | 28 +++++- .../TestKafkaAvroSchemaDeserializer.java | 8 +- .../schema/SchemaRegistryProviderTest.java | 97 +++++++++++++++++++ packaging/hudi-integ-test-bundle/pom.xml | 8 +- 6 files changed, 135 insertions(+), 18 deletions(-) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 4d453ff2dfb0c..ad1887dae5490 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -261,22 +261,22 @@ io.confluent kafka-avro-serializer - 3.0.0 + 6.1.1 io.confluent common-config - 3.0.0 + 6.1.1 io.confluent common-utils - 3.0.0 + 6.1.1 io.confluent kafka-schema-registry-client - 3.0.0 + 6.1.1 diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java index 5d0a116cab945..b0d0d08e73128 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -62,7 +62,6 @@ public void configure(Map configs, boolean isKey) { /** * We need to inject sourceSchema instead of reader schema during deserialization or later stages of the pipeline. * - * @param includeSchemaAndVersion * @param topic * @param isKey * @param payload @@ -72,13 +71,12 @@ public void configure(Map configs, boolean isKey) { */ @Override protected Object deserialize( - boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException { - return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); + return super.deserialize(topic, isKey, payload, sourceSchema); } protected TypedProperties getConvertToTypedProperties(Map configs) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 47c4c2f81a790..80e5752780e49 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -18,6 +18,8 @@ package org.apache.hudi.utilities.schema; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieIOException; @@ -28,8 +30,13 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Obtains latest schema from the Confluent/Kafka schema-registry. @@ -48,19 +55,34 @@ public static class Config { "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; } - private static String fetchSchemaFromRegistry(String registryUrl) throws IOException { + public String fetchSchemaFromRegistry(String registryUrl) throws IOException { URL registry = new URL(registryUrl); + HttpURLConnection connection = (HttpURLConnection) registry.openConnection(); + Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl); + if (matcher.find()) { + setAuthorizationHeader(matcher.group(1), connection); + } ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(registry.openStream()); + InputStream is = getStream(connection); + JsonNode node = mapper.readTree(is); return node.get("schema").asText(); } + public void setAuthorizationHeader(String creds, HttpURLConnection connection) { + byte[] encodedAuth = Base64.encodeBase64(creds.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + new String(encodedAuth)); + } + + public InputStream getStream(HttpURLConnection connection) throws IOException { + return connection.getInputStream(); + } + public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); } - private static Schema getSchema(String registryUrl) throws IOException { + private Schema getSchema(String registryUrl) throws IOException { return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 14c5f01079d88..71b88edc0f35a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -96,7 +96,7 @@ private IndexedRecord createExtendUserRecord() { } /** - * Tests {@link KafkaAvroSchemaDeserializer#deserialize(boolean, String, Boolean, byte[], Schema)}. + * Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, byte[], Schema)}. */ @Test public void testKafkaAvroSchemaDeserializer() { @@ -107,7 +107,7 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer.configure(new HashMap(defaultConfig), false); bytesOrigRecord = avroSerializer.serialize(topic, avroRecord); // record is serialized in orig schema and deserialized using same schema. - assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema)); + assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema)); IndexedRecord avroRecordWithAllField = createExtendUserRecord(); byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField); @@ -116,12 +116,12 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig)); avroDeserializer.configure(new HashMap(defaultConfig), false); // record is serialized w/ evolved schema, and deserialized w/ evolved schema - IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema); + IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema); assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual); assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema); // read old record w/ evolved schema. - IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema); + IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema); // record won't be equal to original record as we read w/ evolved schema. "age" will be added w/ default value of null assertNotEquals(avroRecord, actualRec); GenericRecord genericRecord = (GenericRecord) actualRec; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java new file mode 100644 index 0000000000000..cab0a62ad1cc1 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -0,0 +1,97 @@ +package org.apache.hudi.utilities.schema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +class SchemaRegistryProviderTest { + + String basicAuth = "foo:bar"; + + private final String json = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", " + + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": " + + "\\\"string\\\" }]}\"}"; + + private TypedProperties getProps() { + return new TypedProperties() {{ + put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); + put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); + put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); + put("hoodie.deltastreamer.source.kafka.topic", "foo"); + }}; + } + + Schema getExpectedSchema(String response) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8))); + return (new Schema.Parser()).parse(node.get("schema").asText()); + } + + @Test + public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } + + @Test + public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException { + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } + + @Test + public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } + + @Test + public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } +} \ No newline at end of file diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 51fe1433a8121..adca3a7734dad 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -615,25 +615,25 @@ io.confluent kafka-avro-serializer - 3.0.0 + 6.1.1 io.confluent common-config - 3.0.0 + 6.1.1 io.confluent common-utils - 3.0.0 + 6.1.1 io.confluent kafka-schema-registry-client - 3.0.0 + 6.1.1 From 4e29d4f8876a5a1bc56b19b8df5a291f6e099ba5 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 17 Jun 2021 01:33:23 +0100 Subject: [PATCH 02/13] adding license --- .../schema/SchemaRegistryProviderTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java index cab0a62ad1cc1..be31168974a70 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.utilities.schema; import com.fasterxml.jackson.databind.JsonNode; From 7f9268b5643c654879fb9b843cf7c2fa603693f5 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 10:19:58 +0100 Subject: [PATCH 03/13] fixing indentation and illegal imports --- .../schema/SchemaRegistryProvider.java | 7 +- .../schema/SchemaRegistryProviderTest.java | 137 +++++++++--------- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 80e5752780e49..9d2027754f1d9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -18,8 +18,6 @@ package org.apache.hudi.utilities.schema; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.StringUtils; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieIOException; @@ -34,6 +32,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Collections; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -69,8 +68,8 @@ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { } public void setAuthorizationHeader(String creds, HttpURLConnection connection) { - byte[] encodedAuth = Base64.encodeBase64(creds.getBytes(StandardCharsets.UTF_8)); - connection.setRequestProperty("Authorization", "Basic " + new String(encodedAuth)); + String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + encodedAuth); } public InputStream getStream(HttpURLConnection connection) throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java index be31168974a70..06c200015e816 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -31,85 +31,86 @@ import java.net.HttpURLConnection; import java.nio.charset.StandardCharsets; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; class SchemaRegistryProviderTest { - String basicAuth = "foo:bar"; + private final String basicAuth = "foo:bar"; - private final String json = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", " + - "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": " + - "\\\"string\\\" }]}\"}"; + private final String json = "{\"schema\":\"{\\\"type\\\": \\\"record\\\", \\\"namespace\\\": \\\"example\\\", " + + "\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": " + + "\\\"string\\\" }]}\"}"; - private TypedProperties getProps() { - return new TypedProperties() {{ - put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); - put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); - put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); - put("hoodie.deltastreamer.source.kafka.topic", "foo"); - }}; - } + private TypedProperties getProps() { + return new TypedProperties() {{ + put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); + put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); + put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); + put("hoodie.deltastreamer.source.kafka.topic", "foo"); + }}; + } - Schema getExpectedSchema(String response) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - JsonNode node = mapper.readTree(new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8))); - return (new Schema.Parser()).parse(node.get("schema").asText()); - } + Schema getExpectedSchema(String response) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8))); + return (new Schema.Parser()).parse(node.get("schema").asText()); + } - @Test - public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); - Schema actual = spyUnderTest.getSourceSchema(); - assertNotNull(actual); - assertEquals(actual, getExpectedSchema(json)); - verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), - Mockito.any(HttpURLConnection.class)); - } + @Test + public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } - @Test - public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException { - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); - Schema actual = spyUnderTest.getTargetSchema(); - assertNotNull(actual); - assertEquals(actual, getExpectedSchema(json)); - verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), - Mockito.any(HttpURLConnection.class)); - } + @Test + public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException { + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(1)).setAuthorizationHeader(eq(basicAuth), + Mockito.any(HttpURLConnection.class)); + } - @Test - public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { - TypedProperties props = getProps(); - props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); - Schema actual = spyUnderTest.getSourceSchema(); - assertNotNull(actual); - assertEquals(actual, getExpectedSchema(json)); - verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); - } + @Test + public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getSourceSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } - @Test - public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { - TypedProperties props = getProps(); - props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); - Schema actual = spyUnderTest.getTargetSchema(); - assertNotNull(actual); - assertEquals(actual, getExpectedSchema(json)); - verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); - } + @Test + public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { + TypedProperties props = getProps(); + props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); + InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); + SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); + SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + Schema actual = spyUnderTest.getTargetSchema(); + assertNotNull(actual); + assertEquals(actual, getExpectedSchema(json)); + verify(spyUnderTest, times(0)).setAuthorizationHeader(Mockito.any(), Mockito.any()); + } } \ No newline at end of file From 686539191a516b704e43e4a5edc1d3e353285371 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 11:29:43 +0100 Subject: [PATCH 04/13] fixing indentation once again --- .../utilities/schema/SchemaRegistryProviderTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java index 06c200015e816..4f7eb7237c247 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -47,11 +47,11 @@ class SchemaRegistryProviderTest { private TypedProperties getProps() { return new TypedProperties() {{ - put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); - put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); - put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); - put("hoodie.deltastreamer.source.kafka.topic", "foo"); - }}; + put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://" + basicAuth + "@localhost"); + put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix", "-value"); + put("hoodie.deltastreamer.schemaprovider.registry.url", "http://foo:bar@localhost"); + put("hoodie.deltastreamer.source.kafka.topic", "foo"); + }}; } Schema getExpectedSchema(String response) throws IOException { From 3e3d2c2b01fbfbf0d1906d647187cb8313f3ca94 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 14:14:26 +0100 Subject: [PATCH 05/13] making changes work with password with / in it --- .../hudi/utilities/schema/SchemaRegistryProvider.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 9d2027754f1d9..57bad44baf9fe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -55,11 +55,18 @@ public static class Config { } public String fetchSchemaFromRegistry(String registryUrl) throws IOException { - URL registry = new URL(registryUrl); - HttpURLConnection connection = (HttpURLConnection) registry.openConnection(); + URL registry; + HttpURLConnection connection; Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl); if (matcher.find()) { + String creds = matcher.group(1); + String urlWithoutCreds = registryUrl.replace(creds + "@", ""); + registry = new URL(urlWithoutCreds); + connection = (HttpURLConnection) registry.openConnection(); setAuthorizationHeader(matcher.group(1), connection); + } else { + registry = new URL(registryUrl); + connection = (HttpURLConnection) registry.openConnection(); } ObjectMapper mapper = new ObjectMapper(); InputStream is = getStream(connection); From 5cfa5f10483f81338814037045dc8974cd3a50f8 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 14:51:16 +0100 Subject: [PATCH 06/13] including kafka-schema-serializer in bundle --- packaging/hudi-utilities-bundle/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index e884e6dc29a86..2c3699240a6be 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -93,6 +93,7 @@ com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.confluent:kafka-avro-serializer + io.confluent:kafka-schema-serializer io.confluent:common-config io.confluent:common-utils io.confluent:kafka-schema-registry-client From 75d4a7114e7541fd23297a0c4aab381327da5a69 Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 18:48:45 +0100 Subject: [PATCH 07/13] bumping avro version for compatibility with new schema registry version --- .../apache/hudi/utilities/schema/SchemaRegistryProvider.java | 3 +-- pom.xml | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 57bad44baf9fe..dcefc7a8dbd6a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -69,8 +69,7 @@ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { connection = (HttpURLConnection) registry.openConnection(); } ObjectMapper mapper = new ObjectMapper(); - InputStream is = getStream(connection); - JsonNode node = mapper.readTree(is); + JsonNode node = mapper.readTree(getStream(connection)); return node.get("schema").asText(); } diff --git a/pom.xml b/pom.xml index c1b4a9940f28b..21759834a37a4 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 3 hudi-spark2 - 1.8.2 + 1.10.0 2.11.12 2.12.10 ${scala11.version} From 10c79acd28075b0e7b7351e5ffb1378c522527aa Mon Sep 17 00:00:00 2001 From: rboyle Date: Thu, 24 Jun 2021 20:46:39 +0100 Subject: [PATCH 08/13] reverting schema registry to remain on avro 1.8.2 as per the current version used by hudi --- hudi-utilities/pom.xml | 8 ++++---- .../hudi/utilities/deser/KafkaAvroSchemaDeserializer.java | 4 +++- .../utilities/deser/TestKafkaAvroSchemaDeserializer.java | 8 ++++---- packaging/hudi-integ-test-bundle/pom.xml | 8 ++++---- pom.xml | 2 +- 5 files changed, 16 insertions(+), 14 deletions(-) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index ad1887dae5490..1e3d0227e6f7a 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -261,22 +261,22 @@ io.confluent kafka-avro-serializer - 6.1.1 + 5.3.4 io.confluent common-config - 6.1.1 + 5.3.4 io.confluent common-utils - 6.1.1 + 5.3.4 io.confluent kafka-schema-registry-client - 6.1.1 + 5.3.4 diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java index b0d0d08e73128..5d0a116cab945 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -62,6 +62,7 @@ public void configure(Map configs, boolean isKey) { /** * We need to inject sourceSchema instead of reader schema during deserialization or later stages of the pipeline. * + * @param includeSchemaAndVersion * @param topic * @param isKey * @param payload @@ -71,12 +72,13 @@ public void configure(Map configs, boolean isKey) { */ @Override protected Object deserialize( + boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException { - return super.deserialize(topic, isKey, payload, sourceSchema); + return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); } protected TypedProperties getConvertToTypedProperties(Map configs) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 71b88edc0f35a..ad1acd8d9b589 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -96,7 +96,7 @@ private IndexedRecord createExtendUserRecord() { } /** - * Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, byte[], Schema)}. + * Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, Boolean, byte[], Schema)}. */ @Test public void testKafkaAvroSchemaDeserializer() { @@ -107,7 +107,7 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer.configure(new HashMap(defaultConfig), false); bytesOrigRecord = avroSerializer.serialize(topic, avroRecord); // record is serialized in orig schema and deserialized using same schema. - assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema)); + assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema)); IndexedRecord avroRecordWithAllField = createExtendUserRecord(); byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField); @@ -116,12 +116,12 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(defaultConfig)); avroDeserializer.configure(new HashMap(defaultConfig), false); // record is serialized w/ evolved schema, and deserialized w/ evolved schema - IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema); + IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema); assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual); assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema); // read old record w/ evolved schema. - IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema); + IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema); // record won't be equal to original record as we read w/ evolved schema. "age" will be added w/ default value of null assertNotEquals(avroRecord, actualRec); GenericRecord genericRecord = (GenericRecord) actualRec; diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index adca3a7734dad..f630670db9d3f 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -615,25 +615,25 @@ io.confluent kafka-avro-serializer - 6.1.1 + 5.3.4 io.confluent common-config - 6.1.1 + 5.3.4 io.confluent common-utils - 6.1.1 + 5.3.4 io.confluent kafka-schema-registry-client - 6.1.1 + 5.3.4 diff --git a/pom.xml b/pom.xml index 21759834a37a4..c1b4a9940f28b 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 3 hudi-spark2 - 1.10.0 + 1.8.2 2.11.12 2.12.10 ${scala11.version} From 8ff24679b6e2b7c31013b149b2b3663b4b59f873 Mon Sep 17 00:00:00 2001 From: rboyle Date: Tue, 29 Jun 2021 13:19:38 +0100 Subject: [PATCH 09/13] addressing PR comments --- .../schema/SchemaRegistryProvider.java | 13 +++++++-- .../schema/SchemaRegistryProviderTest.java | 29 ++++++++----------- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index dcefc7a8dbd6a..2c4d9eb52245e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -54,6 +54,15 @@ public static class Config { "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; } + /** + * The method takes the provided url {@code registryUrl} and gets the schema from the schema registry using that url. + * If the caller provides userInfo credentials in the url (e.g "https://foo:bar@schemaregistry.org") then the credentials + * are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization + * header. + * @param registryUrl + * @return the schema in String form + * @throws IOException + */ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { URL registry; HttpURLConnection connection; @@ -73,12 +82,12 @@ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { return node.get("schema").asText(); } - public void setAuthorizationHeader(String creds, HttpURLConnection connection) { + void setAuthorizationHeader(String creds, HttpURLConnection connection) { String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + encodedAuth); } - public InputStream getStream(HttpURLConnection connection) throws IOException { + InputStream getStream(HttpURLConnection connection) throws IOException { return connection.getInputStream(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java index 4f7eb7237c247..e2d61e668a72a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java @@ -54,18 +54,22 @@ private TypedProperties getProps() { }}; } - Schema getExpectedSchema(String response) throws IOException { + private Schema getExpectedSchema(String response) throws IOException { ObjectMapper mapper = new ObjectMapper(); JsonNode node = mapper.readTree(new ByteArrayInputStream(response.getBytes(StandardCharsets.UTF_8))); return (new Schema.Parser()).parse(node.get("schema").asText()); } - @Test - public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { + private SchemaRegistryProvider getUnderTest(TypedProperties props) throws IOException { InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); + SchemaRegistryProvider spyUnderTest = Mockito.spy(new SchemaRegistryProvider(props, null)); Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + return spyUnderTest; + } + + @Test + public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException { + SchemaRegistryProvider spyUnderTest = getUnderTest(getProps()); Schema actual = spyUnderTest.getSourceSchema(); assertNotNull(actual); assertEquals(actual, getExpectedSchema(json)); @@ -75,10 +79,7 @@ public void testGetSourceSchemaShouldRequestSchemaWithCreds() throws IOException @Test public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException { - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(getProps(), null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + SchemaRegistryProvider spyUnderTest = getUnderTest(getProps()); Schema actual = spyUnderTest.getTargetSchema(); assertNotNull(actual); assertEquals(actual, getExpectedSchema(json)); @@ -90,10 +91,7 @@ public void testGetTargetSchemaShouldRequestSchemaWithCreds() throws IOException public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getSourceSchema(); assertNotNull(actual); assertEquals(actual, getExpectedSchema(json)); @@ -104,10 +102,7 @@ public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws IOExcept public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws IOException { TypedProperties props = getProps(); props.put("hoodie.deltastreamer.schemaprovider.registry.url", "http://localhost"); - InputStream is = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8)); - SchemaRegistryProvider underTest = new SchemaRegistryProvider(props, null); - SchemaRegistryProvider spyUnderTest = Mockito.spy(underTest); - Mockito.doReturn(is).when(spyUnderTest).getStream(Mockito.any()); + SchemaRegistryProvider spyUnderTest = getUnderTest(props); Schema actual = spyUnderTest.getTargetSchema(); assertNotNull(actual); assertEquals(actual, getExpectedSchema(json)); From 3d16c8db9d63986ecf9cba5836cf4584a4d52bb2 Mon Sep 17 00:00:00 2001 From: rboyle Date: Tue, 29 Jun 2021 15:23:45 +0100 Subject: [PATCH 10/13] changing accessors to protected so mockito does not complain --- .../apache/hudi/utilities/schema/SchemaRegistryProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 2c4d9eb52245e..413a75fda6b8a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -82,12 +82,12 @@ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { return node.get("schema").asText(); } - void setAuthorizationHeader(String creds, HttpURLConnection connection) { + protected void setAuthorizationHeader(String creds, HttpURLConnection connection) { String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + encodedAuth); } - InputStream getStream(HttpURLConnection connection) throws IOException { + protected InputStream getStream(HttpURLConnection connection) throws IOException { return connection.getInputStream(); } From 488e268814777240c24b35e596024e799a6ff521 Mon Sep 17 00:00:00 2001 From: rboyle Date: Wed, 30 Jun 2021 09:54:11 +0100 Subject: [PATCH 11/13] fixing typo in documentation for testFileGroupLookUpManyDulicateEntries --- .../org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java index 10232ca81ec27..012d0dfa35910 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestKeyRangeLookupTree.java @@ -77,7 +77,7 @@ public void testFileGroupLookUpManyEntriesWithSameStartValue() { } /** - * Tests for many duplicte entries in the tree. + * Tests for many duplicate entries in the tree. */ @Test public void testFileGroupLookUpManyDulicateEntries() { From ca83cba83a91f21d857f1c61cd2c5a7c6d18c8da Mon Sep 17 00:00:00 2001 From: rboyle Date: Wed, 30 Jun 2021 13:29:52 +0100 Subject: [PATCH 12/13] fixing typo to nudge CICD --- .../apache/hudi/utilities/schema/SchemaRegistryProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 413a75fda6b8a..32ef60967ae8a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -60,7 +60,7 @@ public static class Config { * are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization * header. * @param registryUrl - * @return the schema in String form + * @return the Schema in String form. * @throws IOException */ public String fetchSchemaFromRegistry(String registryUrl) throws IOException { From 10ded8447f9f188adec7ba4b43d06944c3d9edc2 Mon Sep 17 00:00:00 2001 From: rboyle Date: Tue, 6 Jul 2021 00:05:35 +0100 Subject: [PATCH 13/13] setting confleunt version on parent pom and correcting test class name to align with other test classes --- hudi-utilities/pom.xml | 8 ++++---- ...yProviderTest.java => TestSchemaRegistryProvider.java} | 2 +- packaging/hudi-integ-test-bundle/pom.xml | 8 ++++---- pom.xml | 1 + 4 files changed, 10 insertions(+), 9 deletions(-) rename hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/{SchemaRegistryProviderTest.java => TestSchemaRegistryProvider.java} (99%) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 1e3d0227e6f7a..c8a58d5a588b9 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -261,22 +261,22 @@ io.confluent kafka-avro-serializer - 5.3.4 + ${confluent.version} io.confluent common-config - 5.3.4 + ${confluent.version} io.confluent common-utils - 5.3.4 + ${confluent.version} io.confluent kafka-schema-registry-client - 5.3.4 + ${confluent.version} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java similarity index 99% rename from hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java rename to hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java index e2d61e668a72a..79769ad03c178 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/SchemaRegistryProviderTest.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java @@ -37,7 +37,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -class SchemaRegistryProviderTest { +class TestSchemaRegistryProvider { private final String basicAuth = "foo:bar"; diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index f630670db9d3f..4fff5258fba01 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -615,25 +615,25 @@ io.confluent kafka-avro-serializer - 5.3.4 + ${confluent.version} io.confluent common-config - 5.3.4 + ${confluent.version} io.confluent common-utils - 5.3.4 + ${confluent.version} io.confluent kafka-schema-registry-client - 5.3.4 + ${confluent.version} diff --git a/pom.xml b/pom.xml index c1b4a9940f28b..e479401e592ee 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ 2.7.4 2.10.0 2.0.0 + 5.3.4 2.17 1.10.1 5.7.0-M1