Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -41,6 +39,9 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.metadata.HoodieTableMetadata;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -453,6 +454,10 @@ private static FileStatus[] flatMap(Collection<FileStatus[]> arrays) {
return arrays.stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
}

/**
* Partition path information containing the relative partition path
* and values of partition columns.
*/
public static final class PartitionPath {

final String path;
Expand Down Expand Up @@ -480,6 +485,9 @@ public int hashCode() {
}
}

/**
* APIs for caching {@link FileStatus}.
*/
protected interface FileStatusCache {
Option<FileStatus[]> get(Path path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* Utils for Avro Schema.
*/
public class AvroSchemaUtils {

private AvroSchemaUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.hadoop.util.VersionUtil;

import java.io.ByteArrayInputStream;
Expand All @@ -71,12 +70,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.hudi.avro;

import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.schema.MessageType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import java.io.IOException;
import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)

/**
* Annotation for superclasses of {@link HoodieConfig} that includes the
* human-readable name of the config class, the config group ({@link ConfigGroupName})
* human-readable name of the config class, the config group ({@link ConfigGroups})
* it belongs to (e.g., spark/ flink/ write)
* and the description of the config class.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface ConfigClassProperty {
String name();

ConfigGroups.Names groupName();

String description();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
* This class maintains the human readable name and description of each config group.
*/
public class ConfigGroups {
/**
* Config group names.
*/
public enum Names {
SPARK_DATASOURCE("Spark Datasource Configs"),
FLINK_SQL("Flink Sql Configs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.util.Locale;
import java.util.Properties;

/**
* Hudi configs used across engines.
*/
@ConfigClassProperty(name = "Common Configurations",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "The following set of configurations are common across Hudi.")
Expand All @@ -45,8 +48,8 @@ public class HoodieCommonConfig extends HoodieConfig {
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.");
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
Expand Down Expand Up @@ -76,6 +79,9 @@ public static HoodieCommonConfig.Builder newBuilder() {
return new HoodieCommonConfig.Builder();
}

/**
* Builder for {@link HoodieCommonConfig}.
*/
public static class Builder {

private final HoodieCommonConfig commonConfig = new HoodieCommonConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ public boolean getUseLogRecordReaderScanV2() {
return getBoolean(USE_LOG_RECORD_READER_SCAN_V2);
}

/**
* Builder for {@link HoodieMetadataConfig}.
*/
public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.config;

import javax.annotation.concurrent.Immutable;

import java.util.Properties;

/**
Expand Down Expand Up @@ -72,6 +73,9 @@ public int getConnectionRetryDelay() {
return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY);
}

/**
* Builder for {@link HoodieMetastoreConfig}.
*/
public static class Builder {
private final HoodieMetastoreConfig config = new HoodieMetastoreConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Data representation of either a stream or a list of objects with Type T.
*
* @param <T> Object value type.
*/
public abstract class HoodieBaseListData<T> {

protected final Either<Stream<T>, List<T>> data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.common.engine;

import org.apache.hadoop.conf.Configuration;

import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
Expand All @@ -31,10 +29,11 @@
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;

import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.hadoop.conf.Configuration;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.util.function.Supplier;

/**
* Supplier of task context using local Java engine.
*/
public final class LocalTaskContextSupplier extends TaskContextSupplier {
@Override
public Supplier<Integer> getPartitionIdSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.io.IOException;
import java.io.InputStream;

/**
* Implementation of {@link FSDataInputStream} with bound check based on file size.
*/
public class BoundedFsDataInputStream extends FSDataInputStream {
private FileSystem fs;
private Path file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@ public static List<FileStatus> getFileStatusAtLevel(
return result;
}

/**
* Serializable function interface.
*
* @param <T> Input value type.
* @param <R> Output value type.
*/
public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.common.fs;

import org.apache.hudi.common.util.RetryHelper;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -31,13 +33,15 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.hudi.common.util.RetryHelper;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;

/**
* Extending {@link FileSystem} implementation of I/O operations with retries.
*/
public class HoodieRetryWrapperFileSystem extends FileSystem {

private FileSystem fileSystem;
Expand Down Expand Up @@ -139,7 +143,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<Create
short replication, long blockSize, Progressable progress, Options.ChecksumOpt checksumOpt) throws IOException {
return (FSDataOutputStream) new RetryHelper(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptionsList)
.tryWith(() -> fileSystem.create(f, permission, flags, bufferSize, replication,
blockSize, progress, checksumOpt)).start();
blockSize, progress, checksumOpt)).start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class HoodieWrapperFileSystem extends FileSystem {

private static final String TMP_PATH_POSTFIX = ".tmp";

/**
* Names for metrics.
*/
protected enum MetricName {
create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write
}
Expand All @@ -91,6 +94,11 @@ public static void setMetricsRegistry(Registry registry, Registry registryMeta)
private URI uri;
private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();

/**
* Checked function interface.
*
* @param <R> Type of return value.
*/
@FunctionalInterface
public interface CheckedFunction<R> {
R get() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

import java.io.Serializable;

/**
* Serializable supplier interface.
*
* @param <T> Type of return value.
*/
@FunctionalInterface
public interface SerializableSupplier<T> extends Serializable {
T get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

package org.apache.hudi.common.metrics;

import org.apache.hudi.common.util.ReflectionUtils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hudi.common.util.ReflectionUtils;


/**
* Interface which defines a lightweight Metrics Registry to track Hudi events.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.Option;

import java.io.IOException;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.hudi.common.model;

import org.apache.hudi.hadoop.CachingPath;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.hadoop.CachingPath;

import java.io.Serializable;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.hudi.common.model;

import java.io.Serializable;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;

import java.io.Serializable;

/**
* POJO storing (partitionPath, hoodieFileId) -> external base file path.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.hudi.common.model;

import org.apache.hudi.avro.model.HoodieFileStatus;

import java.io.Serializable;
import java.util.Objects;
import org.apache.hudi.avro.model.HoodieFileStatus;

/**
* Value stored in the bootstrap index.
Expand Down
Loading