Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,14 @@ public static String removeSuffixBy(String input, int ch) {
}
return input.substring(0, i);
}

public static String truncate(String str, int headLength, int tailLength) {
if (isNullOrEmpty(str) || str.length() <= headLength + tailLength) {
return str;
}
String head = str.substring(0, headLength);
String tail = str.substring(str.length() - tailLength);

return head + "..." + tail;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,11 @@ private static String toHexString(byte[] bytes) {
}
return sb.toString();
}

@Test
public void testTruncate() {
assertNull(StringUtils.truncate(null, 10, 10));
assertEquals("http://use...ons/latest", StringUtils.truncate("http://username:[email protected]:5000/versions/latest", 10, 10));
assertEquals("http://abc.com", StringUtils.truncate("http://abc.com", 10, 10));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class CloudSourceConfig extends HoodieConfig {

public static final ConfigProperty<String> DATAFILE_FORMAT = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format")
.defaultValue("parquet")
.defaultValue(HoodieIncrSourceConfig.SOURCE_FILE_FORMAT.defaultValue())
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.cloud.data.datafile.format")
.markAdvanced()
.withDocumentation("Format of the data file. By default, this will be the same as hoodie.streamer.source.hoodieincr.file.format");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ public Schema getSourceSchema() {
try {
return parseSchemaFromRegistry(registryUrl);
} catch (Exception e) {
throw new HoodieSchemaFetchException("Error reading source schema from registry :" + registryUrl, e);
throw new HoodieSchemaFetchException(String.format(
"Error reading source schema from registry. Please check %s is configured correctly. Truncated URL: %s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
StringUtils.truncate(registryUrl, 10, 10)), e);
}
}

Expand All @@ -207,7 +210,11 @@ public Schema getTargetSchema() {
try {
return parseSchemaFromRegistry(targetRegistryUrl);
} catch (Exception e) {
throw new HoodieSchemaFetchException("Error reading target schema from registry :" + targetRegistryUrl, e);
throw new HoodieSchemaFetchException(String.format(
"Error reading target schema from registry. Please check %s is configured correctly. If that is not configured then check %s. Truncated URL: %s",
Config.SRC_SCHEMA_REGISTRY_URL_PROP,
Config.TARGET_SCHEMA_REGISTRY_URL_PROP,
StringUtils.truncate(targetRegistryUrl, 10, 10)), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.sources.helpers.QueryRunner;

import org.apache.parquet.Strings;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand All @@ -51,6 +52,7 @@
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.utilities.config.CloudSourceConfig.DATAFILE_FORMAT;
import static org.apache.hudi.utilities.config.CloudSourceConfig.ENABLE_EXISTS_CHECK;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.config.HoodieIncrSourceConfig.NUM_INSTANTS_PER_FETCH;
Expand All @@ -70,6 +72,7 @@
public class S3EventsHoodieIncrSource extends HoodieIncrSource {

private static final Logger LOG = LoggerFactory.getLogger(S3EventsHoodieIncrSource.class);
private static final String EMPTY_STRING = "";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringUtils already contains EMPTY_STRING

private final String srcPath;
private final int numInstantsPerFetch;
private final boolean checkIfFileExists;
Expand Down Expand Up @@ -133,7 +136,13 @@ public S3EventsHoodieIncrSource(
this.srcPath = getStringWithAltKeys(props, HOODIE_SRC_BASE_PATH);
this.numInstantsPerFetch = getIntWithAltKeys(props, NUM_INSTANTS_PER_FETCH);
this.checkIfFileExists = getBooleanWithAltKeys(props, ENABLE_EXISTS_CHECK);
this.fileFormat = getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true);

// This is to ensure backward compatibility where we were using the
// config SOURCE_FILE_FORMAT for file format in previous versions.
this.fileFormat = Strings.isNullOrEmpty(getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING))
? getStringWithAltKeys(props, SOURCE_FILE_FORMAT, true)
: getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for last one

getStringWithAltKeys(props, DATAFILE_FORMAT, EMPTY_STRING);

you can ignore the last arg.

getStringWithAltKeys(props, DATAFILE_FORMAT);

default value will not be picked.


this.missingCheckpointStrategy = getMissingCheckpointStrategy(props);
this.queryRunner = queryRunner;
this.cloudDataFetcher = cloudDataFetcher;
Expand Down