Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;


public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor<Type> {
private int nextId = 1;

@Override
public Type struct(StructTypeInfo struct, List<String> names, List<Type> fieldResults) {
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for (int i = 0; i < names.size(); i++) {
fields.add(Types.NestedField.optional(allocateId(), names.get(i), fieldResults.get(i)));
}
return Types.StructType.of(fields);
}

@Override
public Type map(MapTypeInfo map, Type keyResult, Type valueResult) {
return Types.MapType.ofOptional(allocateId(), allocateId(), keyResult, valueResult);
}

@Override
public Type list(ListTypeInfo list, Type elementResult) {
return Types.ListType.ofOptional(allocateId(), elementResult);
}

@Override
public Type primitive(PrimitiveTypeInfo primitive) {
switch (primitive.getPrimitiveCategory()) {
case FLOAT:
return Types.FloatType.get();
case DOUBLE:
return Types.DoubleType.get();
case BOOLEAN:
return Types.BooleanType.get();
case BYTE:
case SHORT:
case INT:
return Types.IntegerType.get();
case LONG:
return Types.LongType.get();
case CHAR:
case VARCHAR:
case STRING:
return Types.StringType.get();
case BINARY:
return Types.BinaryType.get();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) primitive;
return Types.DecimalType.of(decimalTypeInfo.precision(), decimalTypeInfo.scale());
case TIMESTAMP:
return Types.TimestampType.withoutZone();
case DATE:
return Types.DateType.get();
default:
throw new UnsupportedOperationException("Unsupported primitive type " + primitive);
}
}

private int allocateId() {
int current = nextId;
nextId += 1;
return current;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class HiveTypeUtil {
private HiveTypeUtil() {
}

public static <T> T visit(TypeInfo typeInfo, HiveSchemaVisitor<T> visitor) {
switch (typeInfo.getCategory()) {
case STRUCT:
final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List<String> names = structTypeInfo.getAllStructFieldNames();
List<T> results = Lists.newArrayListWithExpectedSize(names.size());
for (String name : names) {
results.add(visit(structTypeInfo.getStructFieldTypeInfo(name), visitor));
}
return visitor.struct(structTypeInfo, names, results);

case UNION:
throw new UnsupportedOperationException("Union data type not supported : " + typeInfo);

case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
return visitor.list(listTypeInfo, visit(listTypeInfo.getListElementTypeInfo(), visitor));

case MAP:
final MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
return visitor.map(mapTypeInfo,
visit(mapTypeInfo.getMapKeyTypeInfo(), visitor),
visit(mapTypeInfo.getMapValueTypeInfo(), visitor));

default:
final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
return visitor.primitive(primitiveTypeInfo);
}
}

public static class HiveSchemaVisitor<T> {
public T struct(StructTypeInfo struct, List<String> names, List<T> fieldResults) {
return null;
}

public T list(ListTypeInfo list, T elementResult) {
return null;
}

public T map(MapTypeInfo map, T keyResult, T valueResult) {
return null;
}

public T primitive(PrimitiveTypeInfo primitive) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ Iterable<Iterable<DataFile>> getFilesByFilter(Expression expression) {
matchingDirectories = getDirectoryInfosByFilter(expression);
}

Iterable<Iterable<DataFile>> filesPerDirectory = Iterables.transform(matchingDirectories, directory -> {
return Iterables.transform(FileSystemUtils.listFiles(directory.location(), conf),
file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format()));
});
Iterable<Iterable<DataFile>> filesPerDirectory = Iterables.transform(
matchingDirectories,
directory -> Iterables.transform(
FileSystemUtils.listFiles(directory.location(), conf),
file -> createDataFile(file, current().spec(), directory.partitionData(), directory.format())));

// Note that we return an Iterable of Iterables here so that the TableScan can process iterables of individual
// directories in parallel hence resulting in a parallel file listing
Expand Down Expand Up @@ -167,7 +168,7 @@ private List<DirectoryInfo> getDirectoryInfosByFilter(Expression expression) {
client -> client.listPartitionsByFilter(databaseName, tableName, partitionFilterString, (short) -1));
}

return LegacyHiveTableUtils.toDirectoryInfos(partitions);
return LegacyHiveTableUtils.toDirectoryInfos(partitions, current().spec());
} catch (TException e) {
String errMsg = String.format("Failed to get partition info for %s.%s + expression %s from metastore",
databaseName, tableName, expression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ public CloseableIterable<FileScanTask> planFiles() {
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = ResidualEvaluator.of(spec, filter(), isCaseSensitive());

Iterable<Iterable<FileScanTask>> tasks = Iterables.transform(hiveOps.getFilesByFilter(filter()), fileIterable ->
Iterables.transform(fileIterable, file -> new BaseFileScanTask(file, schemaString, specString, residuals)));
Iterable<Iterable<FileScanTask>> tasks = Iterables.transform(
hiveOps.getFilesByFilter(filter()),
fileIterable ->
Iterables.transform(
fileIterable,
file -> new BaseFileScanTask(file, schemaString, specString, residuals)));

return new ParallelIterable<>(tasks, ThreadPools.getWorkerPool());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,73 @@

package org.apache.iceberg.hive.legacy;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class LegacyHiveTableUtils {

private LegacyHiveTableUtils() {}
private LegacyHiveTableUtils() {
}

private static final Logger LOG = LoggerFactory.getLogger(LegacyHiveTableUtils.class);

static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Map<String, String> props = getTableProperties(table);
String schemaStr = props.get("avro.schema.literal");
Schema schema;
if (schemaStr == null) {
LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. The schema will not" +
" have case sensitivity and nullability information", table.getDbName(), table.getTableName());
" have case sensitivity and nullability information", table.getDbName(), table.getTableName());
// TODO: Add support for tables without avro.schema.literal
throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet");
} else {
schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr));
}

List<String> partCols = table.getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
return addPartitionColumnsIfRequired(schema, partCols);
Schema schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr));
Types.StructType dataStructType = schema.asStruct();
List<Types.NestedField> fields = Lists.newArrayList(dataStructType.fields());

Schema partitionSchema = partitionSchema(table.getPartitionKeys(), schema);
Types.StructType partitionStructType = partitionSchema.asStruct();
fields.addAll(partitionStructType.fields());
return new Schema(fields);
}

private static Schema addPartitionColumnsIfRequired(Schema schema, List<String> partitionColumns) {
List<Types.NestedField> fields = new ArrayList<>(schema.columns());
private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema) {
AtomicInteger fieldId = new AtomicInteger(10000);
partitionColumns.stream().forEachOrdered(column -> {
Types.NestedField field = schema.findField(column);
if (field == null) {
// TODO: Support partition fields with non-string types
fields.add(Types.NestedField.required(fieldId.incrementAndGet(), column, Types.StringType.get()));
} else {
Preconditions.checkArgument(field.type().equals(Types.StringType.get()),
"Tables with non-string partition columns not supported yet");
List<Types.NestedField> partitionFields = Lists.newArrayList();
partitionKeys.forEach(f -> {
Types.NestedField field = dataSchema.findField(f.getName());
if (field != null) {
throw new IllegalStateException(String.format("Partition field %s also present in data", field.name()));
}
partitionFields.add(
Types.NestedField.optional(
fieldId.incrementAndGet(), f.getName(), icebergType(f.getType()), f.getComment()));
});
return new Schema(fields);
return new Schema(partitionFields);
}

private static Type icebergType(String hiveTypeString) {
PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString);
return HiveTypeUtil.visit(primitiveTypeInfo, new HiveTypeToIcebergType());
}

static Map<String, String> getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) {
Expand All @@ -87,40 +98,39 @@ static Map<String, String> getTableProperties(org.apache.hadoop.hive.metastore.a

static PartitionSpec getPartitionSpec(org.apache.hadoop.hive.metastore.api.Table table, Schema schema) {
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);

table.getPartitionKeys().forEach(fieldSchema -> {
// TODO: Support partition fields with non-string types
Preconditions.checkArgument(fieldSchema.getType().equals("string"),
"Tables with non-string partition columns not supported yet");
builder.identity(fieldSchema.getName());
});

table.getPartitionKeys().forEach(fieldSchema -> builder.identity(fieldSchema.getName()));
return builder.build();
}

static DirectoryInfo toDirectoryInfo(org.apache.hadoop.hive.metastore.api.Table table) {
return new DirectoryInfo(table.getSd().getLocation(),
serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null);
serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()), null);
}

static List<DirectoryInfo> toDirectoryInfos(List<Partition> partitions) {
return partitions.stream().map(p -> {
return new DirectoryInfo(p.getSd().getLocation(),
serdeToFileFormat(p.getSd().getSerdeInfo().getSerializationLib()), buildPartitionStructLike(p.getValues()));
}).collect(Collectors.toList());
static List<DirectoryInfo> toDirectoryInfos(List<Partition> partitions, PartitionSpec spec) {
return partitions.stream().map(
p -> new DirectoryInfo(
p.getSd().getLocation(),
serdeToFileFormat(
p.getSd().getSerdeInfo().getSerializationLib()),
buildPartitionStructLike(p.getValues(), spec))
).collect(Collectors.toList());
}

private static StructLike buildPartitionStructLike(List<String> partitionValues) {
private static StructLike buildPartitionStructLike(List<String> partitionValues, PartitionSpec spec) {
List<Types.NestedField> fields = spec.partitionType().fields();
return new StructLike() {

@Override
public int size() {
return partitionValues.size();
}

@Override
public <T> T get(int pos, Class<T> javaClass) {
return javaClass.cast(partitionValues.get(pos));
final Object partitionValue = Conversions.fromPartitionString(
fields.get(pos).type(),
partitionValues.get(pos));
return javaClass.cast(partitionValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private HiveConf newHiveConf(int port) {
HiveConf newHiveConf = new HiveConf(new Configuration(), TestHiveMetastore.class);
newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port);
newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath());
newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "true");
newHiveConf.set("iceberg.hive.client-pool-size", "2");
return newHiveConf;
}
Expand Down
Loading