diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java index d7d79796aec89..5b95bc60312d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java @@ -173,4 +173,14 @@ public static String removeSuffixBy(String input, int ch) { } return input.substring(0, i); } + + public static String truncate(String str, int headLength, int tailLength) { + if (isNullOrEmpty(str) || str.length() <= headLength + tailLength) { + return str; + } + String head = str.substring(0, headLength); + String tail = str.substring(str.length() - tailLength); + + return head + "..." + tail; + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java index 3bdf6d48b395c..54985056bf08e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestStringUtils.java @@ -114,4 +114,11 @@ private static String toHexString(byte[] bytes) { } return sb.toString(); } + + @Test + public void testTruncate() { + assertNull(StringUtils.truncate(null, 10, 10)); + assertEquals("http://use...ons/latest", StringUtils.truncate("http://username:password@myregistry.com:5000/versions/latest", 10, 10)); + assertEquals("http://abc.com", StringUtils.truncate("http://abc.com", 10, 10)); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java index e7b44cf912140..007d36fc70423 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java @@ -108,7 +108,7 @@ public class CloudSourceConfig extends HoodieConfig { public static final ConfigProperty DATAFILE_FORMAT = ConfigProperty .key(STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format") - .defaultValue("parquet") + .defaultValue(HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.defaultValue()) .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format") .markAdvanced() .withDocumentation("Format of the data file. By default, this will be the same as hoodie.streamer.source.hoodieincr.file.format"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java index 780fbb9dc0add..110c8cc2fb139 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java @@ -195,7 +195,10 @@ public Schema getSourceSchema() { try { return parseSchemaFromRegistry(registryUrl); } catch (Exception e) { - throw new HoodieSchemaFetchException("Error reading source schema from registry :" + registryUrl, e); + throw new HoodieSchemaFetchException(String.format( + "Error reading source schema from registry. Please check %s is configured correctly. Truncated URL: %s", + Config.SRC_SCHEMA_REGISTRY_URL_PROP, + StringUtils.truncate(registryUrl, 10, 10)), e); } } @@ -207,7 +210,11 @@ public Schema getTargetSchema() { try { return parseSchemaFromRegistry(targetRegistryUrl); } catch (Exception e) { - throw new HoodieSchemaFetchException("Error reading target schema from registry :" + targetRegistryUrl, e); + throw new HoodieSchemaFetchException(String.format( + "Error reading target schema from registry. Please check %s is configured correctly. If that is not configured then check %s. Truncated URL: %s", + Config.SRC_SCHEMA_REGISTRY_URL_PROP, + Config.TARGET_SCHEMA_REGISTRY_URL_PROP, + StringUtils.truncate(targetRegistryUrl, 10, 10)), e); } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 4b9be847c756e..69cc59b1ecec6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -35,6 +35,7 @@ import org.apache.hudi.utilities.sources.helpers.QueryInfo; import org.apache.hudi.utilities.sources.helpers.QueryRunner; +import org.apache.parquet.Strings; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -51,6 +52,7 @@ import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT; import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH; import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH; @@ -70,6 +72,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { private static final Logger LOG = LoggerFactory.getLogger(S3EventsHoodieIncrSource.class); + private static final String EMPTY_STRING = ""; private final String srcPath; private final int numInstantsPerFetch; private final boolean checkIfFileExists; @@ -133,7 +136,13 @@ public S3EventsHoodieIncrSource( this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH); this.numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH); this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK); - this.fileFormat = getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true); + + // This is to ensure backward compatibility where we were using the + // config SOURCE_FILE_FORMAT for file format in previous versions. + this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING)) + ? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true) + : getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING); + this.missingCheckpointStrategy = getMissingCheckpointStrategy(props); this.queryRunner = queryRunner; this.cloudDataFetcher = cloudDataFetcher;