Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/main/java/org/apache/iceberg/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ByteBuffers;

/**
Expand Down Expand Up @@ -230,7 +230,7 @@ private static Map<Integer, ByteBuffer> readByteBufferMap(ObjectInputStream in)
return null;

} else {
Map<Integer, ByteBuffer> result = new HashMap<>(size);
Map<Integer, ByteBuffer> result = Maps.newHashMapWithExpectedSize(size);

for (int i = 0; i < size; ++i) {
Integer key = (Integer) in.readObject();
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1099,11 +1099,15 @@ project(":iceberg-spark3-extensions") {
exclude group: 'org.apache.arrow'
}

testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-orc', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-spark', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-spark3', configuration: 'testArtifacts')

testCompile "org.apache.avro:avro"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original PR #3273, this was in spark/v3.0/build.gradle. But we're not including the structural refactor in the point release.

Additionally, this was testImplementation. Given that the above dependencies were also changed to testImplementation, I chose to be consistent and use testCompile.


spark31Implementation("org.apache.spark:spark-hive_2.12:${project.ext.Spark31Version}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,12 @@ public <D> AvroIterable<D> build() {
}
}

/**
* Returns number of rows in specified Avro file
* @param file Avro file
* @return number of rows in file
*/
public static long rowCount(InputFile file) {
return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
Expand Down Expand Up @@ -91,7 +93,9 @@ private static List<DataFile> listAvroPartition(Map<String, String> partitionPat
return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics = new Metrics(-1L, null, null, null);
InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
long rowCount = Avro.rowCount(file);
Metrics metrics = new Metrics(rowCount, null, null, null);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
tableLevelMutex.unlock();
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId, tableLevelMutex);
}
}

Expand Down Expand Up @@ -471,7 +470,8 @@ long acquireLock() throws UnknownHostException, TException, InterruptedException
return lockId;
}

private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId) {
private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId,
ReentrantLock tableLevelMutex) {
try {
if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata file
Expand All @@ -482,6 +482,7 @@ private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadata
throw e;
} finally {
unlock(lockId);
tableLevelMutex.unlock();
}
}

Expand Down
18 changes: 12 additions & 6 deletions mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.mr;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -39,6 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;

/**
Expand Down Expand Up @@ -150,7 +150,7 @@ public static Table createTable(Configuration conf, Properties props) {
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);

// Create a table property map without the controlling properties
Map<String, String> map = new HashMap<>(props.size());
Map<String, String> map = Maps.newHashMapWithExpectedSize(props.size());
for (Object key : props.keySet()) {
if (!PROPERTIES_TO_REMOVE.contains(key)) {
map.put(key.toString(), props.get(key).toString());
Expand Down Expand Up @@ -202,7 +202,15 @@ public static boolean dropTable(Configuration conf, Properties props) {
*/
public static boolean hiveCatalog(Configuration conf, Properties props) {
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName));
String catalogType = getCatalogType(conf, catalogName);
if (catalogType != null) {
return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
}
catalogType = getCatalogType(conf, ICEBERG_DEFAULT_CATALOG_NAME);
if (catalogType != null) {
return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(catalogType);
}
return getCatalogProperties(conf, catalogName, catalogType).get(CatalogProperties.CATALOG_IMPL) == null;
Copy link
Contributor Author

@kbendick kbendick Oct 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block didn't cherry-pick cleanly, because we didn't grab this PR: Core: Throw an exception if both catalog type and catalog-impl are set #3162

This is the PR from which these changes came from: Hive: Fix Catalogs.hiveCatalog method for default catalogs #3338.

I don't think we should grab #3162 as it could break existing user's presently valid configs, but very much open to discussion on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let's not port #3162.

}

@VisibleForTesting
Expand Down Expand Up @@ -279,9 +287,7 @@ private static String getCatalogType(Configuration conf, String catalogName) {
}
} else {
String catalogType = conf.get(InputFormatConfig.CATALOG);
if (catalogType == null) {
return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
} else if (catalogType.equals(LOCATION)) {
if (catalogType != null && catalogType.equals(LOCATION)) {
return NO_CATALOG_TYPE;
} else {
return catalogType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.hive.serde.objectinspector.WriteObjectInspector;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types.ListType;
Expand Down Expand Up @@ -232,7 +233,7 @@ private static class FixNameMappingObjectInspectorPair extends ObjectInspectorPa
FixNameMappingObjectInspectorPair(Schema schema, ObjectInspectorPair pair) {
super(pair.writerInspector(), pair.sourceInspector());

this.sourceNameMap = new HashMap<>(schema.columns().size());
this.sourceNameMap = Maps.newHashMapWithExpectedSize(schema.columns().size());

List<? extends StructField> fields = ((StructObjectInspector) sourceInspector()).getAllStructFieldRefs();
for (int i = 0; i < schema.columns().size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -46,6 +45,7 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,7 +55,7 @@ public class HiveIcebergSerDe extends AbstractSerDe {

private ObjectInspector inspector;
private Schema tableSchema;
private Map<ObjectInspector, Deserializer> deserializers = new HashMap<>(1);
private Map<ObjectInspector, Deserializer> deserializers = Maps.newHashMapWithExpectedSize(1);
private Container<Record> row = new Container<>();

@Override
Expand Down
23 changes: 22 additions & 1 deletion mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void testLegacyLoadCatalogDefault() {
Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(defaultCatalog.isPresent());
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
Expand All @@ -205,6 +206,7 @@ public void testLegacyLoadCatalogHive() {
Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hiveCatalog.isPresent());
Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
Assert.assertTrue(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
Expand All @@ -214,6 +216,7 @@ public void testLegacyLoadCatalogHadoop() {
Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hadoopCatalog.isPresent());
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
Expand All @@ -223,12 +226,14 @@ public void testLegacyLoadCatalogCustom() {
Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(customHadoopCatalog.isPresent());
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
public void testLegacyLoadCatalogLocation() {
conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
Assert.assertFalse(Catalogs.hiveCatalog(conf, new Properties()));
}

@Test
Expand All @@ -241,9 +246,13 @@ public void testLegacyLoadCatalogUnknown() {

@Test
public void testLoadCatalogDefault() {
Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, "barCatalog");
String catalogName = "barCatalog";
Optional<Catalog> defaultCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(defaultCatalog.isPresent());
Assertions.assertThat(defaultCatalog.get()).isInstanceOf(HiveCatalog.class);
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
}

@Test
Expand All @@ -254,6 +263,9 @@ public void testLoadCatalogHive() {
Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(hiveCatalog.isPresent());
Assertions.assertThat(hiveCatalog.get()).isInstanceOf(HiveCatalog.class);
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertTrue(Catalogs.hiveCatalog(conf, properties));
}

@Test
Expand All @@ -267,6 +279,9 @@ public void testLoadCatalogHadoop() {
Assert.assertTrue(hadoopCatalog.isPresent());
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
}

@Test
Expand All @@ -279,6 +294,9 @@ public void testLoadCatalogHadoopWithLegacyWarehouseLocation() {
Assert.assertTrue(hadoopCatalog.isPresent());
Assertions.assertThat(hadoopCatalog.get()).isInstanceOf(HadoopCatalog.class);
Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
}

@Test
Expand All @@ -291,6 +309,9 @@ public void testLoadCatalogCustom() {
Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(customHadoopCatalog.isPresent());
Assertions.assertThat(customHadoopCatalog.get()).isInstanceOf(CustomHadoopCatalog.class);
Properties properties = new Properties();
properties.put(InputFormatConfig.CATALOG_NAME, catalogName);
Assert.assertFalse(Catalogs.hiveCatalog(conf, properties));
}

@Test
Expand Down
37 changes: 26 additions & 11 deletions orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -196,11 +195,17 @@ private static Optional<ByteBuffer> fromOrcMin(Type type, ColumnStatistics colum
min = Math.toIntExact((long) min);
}
} else if (columnStats instanceof DoubleColumnStatistics) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
Preconditions.checkNotNull(fieldMetrics,
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
min = fieldMetrics.lowerBound();
if (fieldMetrics != null) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
min = fieldMetrics.lowerBound();
} else {
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
min = replaceNaN(((DoubleColumnStatistics) columnStats).getMinimum(), Double.NEGATIVE_INFINITY);
if (type.typeId() == Type.TypeID.FLOAT) {
min = ((Double) min).floatValue();
}
}
} else if (columnStats instanceof StringColumnStatistics) {
min = ((StringColumnStatistics) columnStats).getMinimum();
} else if (columnStats instanceof DecimalColumnStatistics) {
Expand Down Expand Up @@ -234,11 +239,17 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
max = Math.toIntExact((long) max);
}
} else if (columnStats instanceof DoubleColumnStatistics) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
Preconditions.checkNotNull(fieldMetrics,
"[BUG] Float or double type columns should have metrics being tracked by Iceberg Orc writers");
max = fieldMetrics.upperBound();
if (fieldMetrics != null) {
// since Orc includes NaN for upper/lower bounds of floating point columns, and we don't want this behavior,
// we have tracked metrics for such columns ourselves and thus do not need to rely on Orc's column statistics.
max = fieldMetrics.upperBound();
} else {
// imported files will not have metrics that were tracked by Iceberg, so fall back to the file's metrics.
max = replaceNaN(((DoubleColumnStatistics) columnStats).getMaximum(), Double.POSITIVE_INFINITY);
if (type.typeId() == Type.TypeID.FLOAT) {
max = ((Double) max).floatValue();
}
}
} else if (columnStats instanceof StringColumnStatistics) {
max = ((StringColumnStatistics) columnStats).getMaximum();
} else if (columnStats instanceof DecimalColumnStatistics) {
Expand All @@ -262,6 +273,10 @@ private static Optional<ByteBuffer> fromOrcMax(Type type, ColumnStatistics colum
return Optional.ofNullable(Conversions.toByteBuffer(type, truncateIfNeeded(Bound.UPPER, type, max, metricsMode)));
}

private static Object replaceNaN(double value, double replacement) {
return Double.isNaN(value) ? replacement : value;
}

private static Object truncateIfNeeded(Bound bound, Type type, Object value, MetricsMode metricsMode) {
// Out of the two types which could be truncated, string or binary, ORC only supports string bounds.
// Therefore, truncation will be applied if needed only on string type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static void registerBucketUDF(SparkSession session, String funcName, Data
SparkTypeToType typeConverter = new SparkTypeToType();
Type sourceIcebergType = typeConverter.atomic(sourceType);
Transform<Object, Integer> bucket = Transforms.bucket(sourceIcebergType, numBuckets);
session.udf().register(funcName, bucket::apply, DataTypes.IntegerType);
session.udf().register(funcName,
value -> bucket.apply(SparkValueConverter.convert(sourceIcebergType, value)), DataTypes.IntegerType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ public static Object convert(Type type, Object object) {
return DateTimeUtils.fromJavaTimestamp((Timestamp) object);
case BINARY:
return ByteBuffer.wrap((byte[]) object);
case BOOLEAN:
case INTEGER:
return ((Number) object).intValue();
case BOOLEAN:
case LONG:
case FLOAT:
case DOUBLE:
Expand Down
Loading