Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class Actions {
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private StreamExecutionEnvironment env;
private Table table;
private final StreamExecutionEnvironment env;
private final Table table;

private Actions(StreamExecutionEnvironment env, Table table) {
this.env = env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {

private StreamExecutionEnvironment env;
private final StreamExecutionEnvironment env;
private int maxParallelism;

public RewriteDataFilesAction(StreamExecutionEnvironment env, Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public RowData map(GenericRecord genericRecord) throws Exception {
public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = RowType.of(logicalType.getChildren().stream().toArray(LogicalType[]::new));
RowType rowType = RowType.of(logicalType.getChildren().toArray(new LogicalType[0]));
return new AvroGenericRecordToRowDataMapper(rowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MapDataStatistics implements DataStatistics<MapDataStatistics, Map<SortKey

@Override
public boolean isEmpty() {
return statistics.size() == 0;
return statistics.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static RowDataToAvroGenericRecordConverter fromIcebergSchema(
public static RowDataToAvroGenericRecordConverter fromAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = RowType.of(logicalType.getChildren().stream().toArray(LogicalType[]::new));
RowType rowType = RowType.of(logicalType.getChildren().toArray(new LogicalType[0]));
return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private ContinuousEnumerationResult discoverIncrementalSplits(
LOG.info("Current table snapshot is already enumerated: {}", currentSnapshot.snapshotId());
return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
} else {
Long lastConsumedSnapshotId = lastPosition != null ? lastPosition.snapshotId() : null;
Long lastConsumedSnapshotId = lastPosition.snapshotId();
Snapshot toSnapshotInclusive =
toSnapshotInclusive(
lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class IcebergEnumeratorState implements Serializable {
@Nullable private final IcebergEnumeratorPosition lastEnumeratedPosition;
private final Collection<IcebergSourceSplitState> pendingSplits;
private int[] enumerationSplitCountHistory;
private final int[] enumerationSplitCountHistory;

public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
this(null, pendingSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,8 @@ private static Collection<IcebergSourceSplitState> deserializePendingSplits(
private static void serializeEnumerationSplitCountHistory(
DataOutputSerializer out, int[] enumerationSplitCountHistory) throws IOException {
out.writeInt(enumerationSplitCountHistory.length);
if (enumerationSplitCountHistory.length > 0) {
for (int enumerationSplitCount : enumerationSplitCountHistory) {
out.writeInt(enumerationSplitCount);
}
for (int enumerationSplitCount : enumerationSplitCountHistory) {
out.writeInt(enumerationSplitCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static String readLongUTF(DataInputDeserializer in) throws IOException {
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 1];
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
Expand All @@ -141,8 +141,8 @@ public static String readLongUTF(DataInputDeserializer in) throws IOException {
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private FlinkPackage() {}
/** Returns Flink version string like x.y.z */
public static String version() {
if (null == VERSION.get()) {
String detectedVersion = null;
String detectedVersion;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public class TestFlinkCatalogTablePartitions extends CatalogTestBase {

private String tableName = "test_table";
private final String tableName = "test_table";

@Parameter(index = 2)
private FileFormat format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iceberg.TestTables;

public class TestTableLoader implements TableLoader {
private File dir;
private final File dir;

public static TableLoader of(String dir) {
return new TestTableLoader(dir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testTimeUnit() throws IOException {
new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MICROSECONDS);

assertThat(extractor.extractWatermark(split(0)))
.isEqualTo(MIN_VALUES.get(0).get(columnName).longValue() / 1000L);
.isEqualTo(MIN_VALUES.get(0).get(columnName) / 1000L);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class Actions {
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private StreamExecutionEnvironment env;
private Table table;
private final StreamExecutionEnvironment env;
private final Table table;

private Actions(StreamExecutionEnvironment env, Table table) {
this.env = env;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> {

private StreamExecutionEnvironment env;
private final StreamExecutionEnvironment env;
private int maxParallelism;

public RewriteDataFilesAction(StreamExecutionEnvironment env, Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public RowData map(GenericRecord genericRecord) throws Exception {
public static AvroGenericRecordToRowDataMapper forAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = RowType.of(logicalType.getChildren().stream().toArray(LogicalType[]::new));
RowType rowType = RowType.of(logicalType.getChildren().toArray(new LogicalType[0]));
return new AvroGenericRecordToRowDataMapper(rowType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MapDataStatistics implements DataStatistics<MapDataStatistics, Map<SortKey

@Override
public boolean isEmpty() {
return statistics.size() == 0;
return statistics.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static RowDataToAvroGenericRecordConverter fromIcebergSchema(
public static RowDataToAvroGenericRecordConverter fromAvroSchema(Schema avroSchema) {
DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema.toString());
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
RowType rowType = RowType.of(logicalType.getChildren().stream().toArray(LogicalType[]::new));
RowType rowType = RowType.of(logicalType.getChildren().toArray(new LogicalType[0]));
return new RowDataToAvroGenericRecordConverter(rowType, avroSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private ContinuousEnumerationResult discoverIncrementalSplits(
LOG.info("Current table snapshot is already enumerated: {}", currentSnapshot.snapshotId());
return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
} else {
Long lastConsumedSnapshotId = lastPosition != null ? lastPosition.snapshotId() : null;
Long lastConsumedSnapshotId = lastPosition.snapshotId();
Snapshot toSnapshotInclusive =
toSnapshotInclusive(
lastConsumedSnapshotId, currentSnapshot, scanContext.maxPlanningSnapshotCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class IcebergEnumeratorState implements Serializable {
@Nullable private final IcebergEnumeratorPosition lastEnumeratedPosition;
private final Collection<IcebergSourceSplitState> pendingSplits;
private int[] enumerationSplitCountHistory;
private final int[] enumerationSplitCountHistory;

public IcebergEnumeratorState(Collection<IcebergSourceSplitState> pendingSplits) {
this(null, pendingSplits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,8 @@ private static Collection<IcebergSourceSplitState> deserializePendingSplits(
private static void serializeEnumerationSplitCountHistory(
DataOutputSerializer out, int[] enumerationSplitCountHistory) throws IOException {
out.writeInt(enumerationSplitCountHistory.length);
if (enumerationSplitCountHistory.length > 0) {
for (int enumerationSplitCount : enumerationSplitCountHistory) {
out.writeInt(enumerationSplitCount);
}
for (int enumerationSplitCount : enumerationSplitCountHistory) {
out.writeInt(enumerationSplitCount);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static String readLongUTF(DataInputDeserializer in) throws IOException {
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 1];
char2 = bytearr[count - 1];
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
Expand All @@ -141,8 +141,8 @@ public static String readLongUTF(DataInputDeserializer in) throws IOException {
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
char2 = bytearr[count - 2];
char3 = bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private FlinkPackage() {}
/** Returns Flink version string like x.y.z */
public static String version() {
if (null == VERSION.get()) {
String detectedVersion = null;
String detectedVersion;
try {
detectedVersion = versionFromJar();
// use unknown version in case exact implementation version can't be found from the jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

public class TestFlinkCatalogTablePartitions extends CatalogTestBase {

private String tableName = "test_table";
private final String tableName = "test_table";

@Parameter(index = 2)
private FileFormat format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.iceberg.TestTables;

public class TestTableLoader implements TableLoader {
private File dir;
private final File dir;

public static TableLoader of(String dir) {
return new TestTableLoader(dir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testTimeUnit() throws IOException {
new ColumnStatsWatermarkExtractor(SCHEMA, columnName, TimeUnit.MICROSECONDS);

assertThat(extractor.extractWatermark(split(0)))
.isEqualTo(MIN_VALUES.get(0).get(columnName).longValue() / 1000L);
.isEqualTo(MIN_VALUES.get(0).get(columnName) / 1000L);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

class RandomGeneratingUDF implements Serializable {
private final long uniqueValues;
private Random rand = new Random();
private final Random rand = new Random();

RandomGeneratingUDF(long uniqueValues) {
this.uniqueValues = uniqueValues;
Expand All @@ -43,8 +43,7 @@ UserDefinedFunction randomLongUDF() {

UserDefinedFunction randomString() {
return udf(
() -> (String) RandomUtil.generatePrimitive(Types.StringType.get(), rand),
DataTypes.StringType)
() -> RandomUtil.generatePrimitive(Types.StringType.get(), rand), DataTypes.StringType)
.asNondeterministic()
.asNonNullable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public Type map(Types.MapType map, Supplier<Type> keyResult, Supplier<Type> valu
"Cannot project a map of optional values as required values: %s",
map);
Preconditions.checkArgument(
StringType.class.isInstance(requestedMap.keyType()),
requestedMap.keyType() instanceof StringType,
"Invalid map key type (not string): %s",
requestedMap.keyType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ public static List<SparkPartition> getPartitions(
JavaConverters.collectionAsScalaIterableConverter(ImmutableList.of(rootPath))
.asScala()
.toSeq(),
scala.collection.immutable.Map$.MODULE$.<String, String>empty(),
scala.collection.immutable.Map$.MODULE$.empty(),
userSpecifiedSchema,
fileStatusCache,
Option.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static List<SparkPartition> getPartitions(
Option<scala.collection.immutable.Map<String, String>> scalaPartitionFilter;
if (partitionFilter != null && !partitionFilter.isEmpty()) {
Builder<Tuple2<String, String>, scala.collection.immutable.Map<String, String>> builder =
Map$.MODULE$.<String, String>newBuilder();
Map$.MODULE$.newBuilder();
partitionFilter.forEach((key, value) -> builder.$plus$eq(Tuple2.apply(key, value)));
scalaPartitionFilter = Option.apply(builder.result());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
private Map<String, String> equalSchemes = flattenMap(EQUAL_SCHEMES_DEFAULT);
private Map<String, String> equalAuthorities = Collections.emptyMap();
private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
private String location = null;
private String location;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Dataset<Row> compareToFileList;
private Consumer<String> deleteFunc = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private Result doExecuteWithPartialProgress(
Stream<RewriteFileGroup> toGroupStream(
RewriteExecutionContext ctx, Map<StructLike, List<List<FileScanTask>>> groupsByPartition) {
return groupsByPartition.entrySet().stream()
.filter(e -> e.getValue().size() != 0)
.filter(e -> !e.getValue().isEmpty())
.flatMap(
e -> {
StructLike partition = e.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public class RewriteManifestsSparkAction
private final long targetManifestSizeBytes;
private final boolean shouldStageManifests;

private PartitionSpec spec = null;
private PartitionSpec spec;
private Predicate<ManifestFile> predicate = manifest -> true;
private String outputLocation = null;
private String outputLocation;

RewriteManifestsSparkAction(SparkSession spark, Table table) {
super(spark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private Result doExecuteWithPartialProgress(
// stop commit service
commitService.close();
List<RewritePositionDeletesGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
if (commitResults.isEmpty()) {
LOG.error(
"{} is true but no rewrite commits succeeded. Check the logs to determine why the individual "
+ "commits failed. If this is persistent it may help to increase {} which will break the rewrite operation "
Expand All @@ -331,7 +331,7 @@ private Stream<RewritePositionDeletesGroup> toGroupStream(
RewriteExecutionContext ctx,
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
return groupsByPartition.entrySet().stream()
.filter(e -> e.getValue().size() != 0)
.filter(e -> !e.getValue().isEmpty())
.flatMap(
e -> {
StructLike partition = e.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ private static class InternalRowWriter extends ParquetValueWriters.StructWriter<

private InternalRowWriter(List<ParquetValueWriter<?>> writers, List<DataType> types) {
super(writers);
this.types = types.toArray(new DataType[types.size()]);
this.types = types.toArray(new DataType[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class SparkScanBuilder
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();

private Schema schema = null;
private Schema schema;
private boolean caseSensitive;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredCol
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();

private Schema schema = null;
private Schema schema;

SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this.spark = spark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static Object[][] parameters() {

@Rule public TemporaryFolder temp = new TemporaryFolder();

private String baseTableName = "baseTable";
private final String baseTableName = "baseTable";
private File tableDir;
private String tableLocation;
private final String type;
Expand Down
Loading