diff --git a/.github/labeler.yml b/.github/labeler.yml index d11c68264cb5..6afc3141ee31 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -130,12 +130,6 @@ MR: 'mr/**/*' ] -PIG: - - changed-files: - - any-glob-to-any-file: [ - 'pig/**/*' - ] - AWS: - changed-files: - any-glob-to-any-file: [ diff --git a/.github/workflows/delta-conversion-ci.yml b/.github/workflows/delta-conversion-ci.yml index 9326d9d533fd..521d061f6552 100644 --- a/.github/workflows/delta-conversion-ci.yml +++ b/.github/workflows/delta-conversion-ci.yml @@ -53,7 +53,6 @@ on: - 'hive-runtime/**' - 'flink/**' - 'kafka-connect/**' - - 'pig/**' - 'docs/**' - 'site/**' - 'open-api/**' diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml index 8ed555847861..22f4f008a215 100644 --- a/.github/workflows/flink-ci.yml +++ b/.github/workflows/flink-ci.yml @@ -53,7 +53,6 @@ on: - 'hive-runtime/**' - 'kafka-connect/**' - 'spark/**' - - 'pig/**' - 'docs/**' - 'site/**' - 'open-api/**' diff --git a/.github/workflows/hive-ci.yml b/.github/workflows/hive-ci.yml index bcaf62cc07f8..d95ca1bd5c6a 100644 --- a/.github/workflows/hive-ci.yml +++ b/.github/workflows/hive-ci.yml @@ -51,7 +51,6 @@ on: - 'spark/**' - 'flink/**' - 'kafka-connect/**' - - 'pig/**' - 'docs/**' - 'site/**' - 'open-api/**' diff --git a/.github/workflows/kafka-connect-ci.yml b/.github/workflows/kafka-connect-ci.yml index 98ec18a77953..60cd9188b61d 100644 --- a/.github/workflows/kafka-connect-ci.yml +++ b/.github/workflows/kafka-connect-ci.yml @@ -53,7 +53,6 @@ on: - 'hive3-orc-bundle/**' - 'hive-runtime/**' - 'spark/**' - - 'pig/**' - 'docs/**' - 'site/**' - 'open-api/**' diff --git a/.github/workflows/spark-ci.yml b/.github/workflows/spark-ci.yml index b5d91d3cc76c..0d7bd2d3d3e7 100644 --- a/.github/workflows/spark-ci.yml +++ b/.github/workflows/spark-ci.yml @@ -54,7 +54,6 @@ on: - 'hive-runtime/**' - 'flink/**' - 'kafka-connect/**' - - 'pig/**' - 'docs/**' - 'open-api/**' - 'format/**' diff --git a/README.md b/README.md index 7d2056077804..5c6e5fd96d35 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,6 @@ Iceberg also has modules for adding Iceberg support to processing engines: * `iceberg-spark` is an implementation of Spark's Datasource V2 API for Iceberg with submodules for each spark versions (use runtime jars for a shaded version) * `iceberg-flink` contains classes for integrating with Apache Flink (use iceberg-flink-runtime for a shaded version) * `iceberg-mr` contains an InputFormat and other classes for integrating with Apache Hive -* `iceberg-pig` is an implementation of Pig's LoadFunc API for Iceberg --- **NOTE** diff --git a/build.gradle b/build.gradle index 7990ffbadcb2..81daf14a357f 100644 --- a/build.gradle +++ b/build.gradle @@ -838,39 +838,6 @@ project(':iceberg-arrow') { } } -project(':iceberg-pig') { - test { - useJUnitPlatform() - } - - dependencies { - implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') - api project(':iceberg-api') - implementation project(':iceberg-common') - implementation project(':iceberg-core') - implementation project(':iceberg-parquet') - - implementation(libs.parquet.avro) { - exclude group: 'org.apache.avro', module: 'avro' - // already shaded by Parquet - exclude group: 'it.unimi.dsi' - exclude group: 'org.codehaus.jackson' - } - - compileOnly(libs.pig) { - exclude group: "junit", module: "junit" - } - compileOnly(libs.hadoop2.mapreduce.client.core) - compileOnly(libs.hadoop2.client) { - exclude group: 'org.apache.avro', module: 'avro' - } - - testImplementation(libs.hadoop2.minicluster) { - exclude group: 'org.apache.avro', module: 'avro' - } - } -} - project(':iceberg-nessie') { test { useJUnitPlatform() diff --git a/docs/docs/api.md b/docs/docs/api.md index 286f7bd2254d..e4ea1b1043b4 100644 --- a/docs/docs/api.md +++ b/docs/docs/api.md @@ -251,6 +251,5 @@ This project Iceberg also has modules for adding Iceberg support to processing e * `iceberg-mr` is an implementation of MapReduce and Hive InputFormats and SerDes for Iceberg (use iceberg-hive-runtime for a shaded version for use with Hive) * `iceberg-nessie` is a module used to integrate Iceberg table metadata history and operations with [Project Nessie](https://projectnessie.org/) * `iceberg-data` is a client library used to read Iceberg tables from JVM applications -* `iceberg-pig` is an implementation of Pig's LoadFunc API for Iceberg * `iceberg-runtime` generates a shaded runtime jar for Spark to integrate with iceberg tables diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 85fecabcae1a..57d1810bd23c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -76,7 +76,6 @@ netty-buffer-compat = "4.1.114.Final" object-client-bundle = "3.3.2" orc = "1.9.4" parquet = "1.13.1" -pig = "0.17.0" roaringbitmap = "1.3.0" scala-collection-compat = "2.12.0" slf4j = "2.0.16" @@ -169,7 +168,6 @@ orc-core = { module = "org.apache.orc:orc-core", version.ref = "orc" } parquet-avro = { module = "org.apache.parquet:parquet-avro", version.ref = "parquet" } parquet-column = { module = "org.apache.parquet:parquet-column", version.ref = "parquet" } parquet-hadoop = { module = "org.apache.parquet:parquet-hadoop", version.ref = "parquet" } -pig = { module = "org.apache.pig:pig", version.ref = "pig" } roaringbitmap = { module = "org.roaringbitmap:RoaringBitmap", version.ref = "roaringbitmap" } scala-collection-compat = { module = "org.scala-lang.modules:scala-collection-compat_2.13", version.ref = "scala-collection-compat"} slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 185617aec258..415eb8c9b858 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -77,7 +77,6 @@ private InputFormatConfig() {} public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog."; public enum InMemoryDataModel { - PIG, HIVE, GENERIC // Default data model is of Iceberg Generics } @@ -169,11 +168,6 @@ public ConfigBuilder useHiveRows() { return this; } - public ConfigBuilder usePigTuples() { - conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.PIG.name()); - return this; - } - /** * Compute platforms pass down filters to data sources. If the data source cannot apply some * filters, or only partially applies the filter, it will return the residual filter back. If diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 7ea2d26891f8..9b8d4e9247a2 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -166,9 +166,7 @@ private List planInputSplits( Table serializableTable = SerializableTable.copyOf(table); tasksIterable.forEach( task -> { - if (applyResidual - && (model == InputFormatConfig.InMemoryDataModel.HIVE - || model == InputFormatConfig.InMemoryDataModel.PIG)) { + if (applyResidual && (model == InputFormatConfig.InMemoryDataModel.HIVE)) { // TODO: We do not support residual evaluation for HIVE and PIG in memory data model // yet checkResiduals(task); @@ -347,9 +345,6 @@ private CloseableIterable openTask(FileScanTask currentTask, Schema readSchem @SuppressWarnings("unchecked") private CloseableIterable open(FileScanTask currentTask, Schema readSchema) { switch (inMemoryDataModel) { - case PIG: - // TODO: Support Pig and Hive object models for IcebergInputFormat - throw new UnsupportedOperationException("Pig and Hive object models are not supported."); case HIVE: return openTask(currentTask, readSchema); case GENERIC: @@ -390,7 +385,6 @@ private CloseableIterable newAvroIterable( } switch (inMemoryDataModel) { - case PIG: case HIVE: // TODO implement value readers for Pig and Hive throw new UnsupportedOperationException( @@ -413,8 +407,6 @@ private CloseableIterable newParquetIterable( CloseableIterable parquetIterator = null; switch (inMemoryDataModel) { - case PIG: - throw new UnsupportedOperationException("Parquet support not yet supported for Pig"); case HIVE: if (HiveVersion.min(HiveVersion.HIVE_3)) { parquetIterator = @@ -459,9 +451,6 @@ private CloseableIterable newOrcIterable( CloseableIterable orcIterator = null; // ORC does not support reuse containers yet switch (inMemoryDataModel) { - case PIG: - // TODO: implement value readers for Pig - throw new UnsupportedOperationException("ORC support not yet supported for Pig"); case HIVE: if (HiveVersion.min(HiveVersion.HIVE_3)) { orcIterator = diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java index 2b93b276ad94..668703cc5d92 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java +++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java @@ -223,8 +223,6 @@ public void testFailedResidualFiltering() throws Exception { .hasMessage( "Filter expression ref(name=\"id\") == 0 is not completely satisfied. Additional rows can be returned not satisfied by the filter expression"); - builder.usePigTuples(); - assertThatThrownBy(() -> testInputFormat.create(builder.conf())) .isInstanceOf(UnsupportedOperationException.class) .hasMessage( diff --git a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java b/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java deleted file mode 100644 index 932de72ac8c0..000000000000 --- a/pig/src/main/java/org/apache/iceberg/pig/IcebergPigInputFormat.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.pig; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.hadoop.HadoopInputFile; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.ByteBuffers; -import org.apache.iceberg.util.SerializationUtil; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.impl.util.ObjectSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @deprecated will be removed in 1.8.0 - */ -@Deprecated -public class IcebergPigInputFormat extends InputFormat { - private static final Logger LOG = LoggerFactory.getLogger(IcebergPigInputFormat.class); - - static final String ICEBERG_SCHEMA = "iceberg.schema"; - static final String ICEBERG_PROJECTED_FIELDS = "iceberg.projected.fields"; - static final String ICEBERG_FILTER_EXPRESSION = "iceberg.filter.expression"; - - private final Table table; - private final String signature; - private List splits; - - IcebergPigInputFormat(Table table, String signature) { - LOG.warn("Iceberg Pig is deprecated and will be removed in Iceberg 1.8.0"); - this.table = table; - this.signature = signature; - } - - @Override - @SuppressWarnings("unchecked") - public List getSplits(JobContext context) throws IOException { - if (splits != null) { - LOG.info("Returning cached splits: {}", splits.size()); - return splits; - } - - splits = Lists.newArrayList(); - - TableScan scan = table.newScan(); - - // Apply Filters - Expression filterExpression = - (Expression) - ObjectSerializer.deserialize( - context.getConfiguration().get(scope(ICEBERG_FILTER_EXPRESSION))); - LOG.info("[{}]: iceberg filter expressions: {}", signature, filterExpression); - - if (filterExpression != null) { - LOG.info("Filter Expression: {}", filterExpression); - scan = scan.filter(filterExpression); - } - - // Wrap in Splits - try (CloseableIterable tasks = scan.planTasks()) { - tasks.forEach(scanTask -> splits.add(new IcebergSplit(scanTask))); - } - - return splits; - } - - @Override - public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { - return new IcebergRecordReader<>(); - } - - private static class IcebergSplit extends InputSplit implements Writable { - private static final String[] ANYWHERE = new String[] {"*"}; - - private CombinedScanTask task; - - IcebergSplit(CombinedScanTask task) { - this.task = task; - } - - @Override - public long getLength() { - return task.files().stream().mapToLong(FileScanTask::length).sum(); - } - - @Override - public String[] getLocations() { - return ANYWHERE; - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] data = SerializationUtil.serializeToBytes(this.task); - out.writeInt(data.length); - out.write(data); - } - - @Override - public void readFields(DataInput in) throws IOException { - byte[] data = new byte[in.readInt()]; - in.readFully(data); - - this.task = SerializationUtil.deserializeFromBytes(data); - } - } - - private String scope(String key) { - return key + '.' + signature; - } - - public class IcebergRecordReader extends RecordReader { - private TaskAttemptContext context; - - private Iterator tasks; - - private CloseableIterable reader; - private Iterator recordIterator; - private T currentRecord; - - @Override - public void initialize(InputSplit split, TaskAttemptContext initContext) throws IOException { - this.context = initContext; - - CombinedScanTask task = ((IcebergSplit) split).task; - this.tasks = task.files().iterator(); - - advance(); - } - - @SuppressWarnings("unchecked") - private boolean advance() throws IOException { - if (reader != null) { - reader.close(); - } - - if (!tasks.hasNext()) { - return false; - } - - FileScanTask currentTask = tasks.next(); - - Schema tableSchema = - (Schema) - ObjectSerializer.deserialize(context.getConfiguration().get(scope(ICEBERG_SCHEMA))); - LOG.debug("[{}]: Task table schema: {}", signature, tableSchema); - - List projectedFields = - (List) - ObjectSerializer.deserialize( - context.getConfiguration().get(scope(ICEBERG_PROJECTED_FIELDS))); - LOG.debug("[{}]: Task projected fields: {}", signature, projectedFields); - - Schema projectedSchema = - projectedFields != null ? SchemaUtil.project(tableSchema, projectedFields) : tableSchema; - - PartitionSpec spec = currentTask.asFileScanTask().spec(); - DataFile file = currentTask.file(); - InputFile inputFile = HadoopInputFile.fromLocation(file.path(), context.getConfiguration()); - - Set idColumns = spec.identitySourceIds(); - - // schema needed for the projection and filtering - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); - - switch (file.format()) { - case PARQUET: - Map partitionValueMap = Maps.newHashMap(); - - if (hasJoinedPartitionColumns) { - - Schema readSchema = TypeUtil.selectNot(projectedSchema, idColumns); - Schema projectedPartitionSchema = TypeUtil.select(projectedSchema, idColumns); - - Map partitionSpecFieldIndexMap = Maps.newHashMap(); - for (int i = 0; i < spec.fields().size(); i++) { - partitionSpecFieldIndexMap.put(spec.fields().get(i).name(), i); - } - - for (Types.NestedField field : projectedPartitionSchema.columns()) { - int partitionIndex = partitionSpecFieldIndexMap.get(field.name()); - - Object partitionValue = file.partition().get(partitionIndex, Object.class); - partitionValueMap.put( - field.fieldId(), convertPartitionValue(field.type(), partitionValue)); - } - - reader = - Parquet.read(inputFile) - .project(readSchema) - .split(currentTask.start(), currentTask.length()) - .filter(currentTask.residual()) - .createReaderFunc( - fileSchema -> - PigParquetReader.buildReader( - fileSchema, projectedSchema, partitionValueMap)) - .build(); - } else { - reader = - Parquet.read(inputFile) - .project(projectedSchema) - .split(currentTask.start(), currentTask.length()) - .filter(currentTask.residual()) - .createReaderFunc( - fileSchema -> - PigParquetReader.buildReader( - fileSchema, projectedSchema, partitionValueMap)) - .build(); - } - - recordIterator = reader.iterator(); - - break; - default: - throw new UnsupportedOperationException("Unsupported file format: " + file.format()); - } - - return true; - } - - private Object convertPartitionValue(Type type, Object value) { - if (type.typeId() == Types.BinaryType.get().typeId()) { - return new DataByteArray(ByteBuffers.toByteArray((ByteBuffer) value)); - } - - return value; - } - - @Override - public boolean nextKeyValue() throws IOException { - if (recordIterator.hasNext()) { - currentRecord = recordIterator.next(); - return true; - } - - while (advance()) { - if (recordIterator.hasNext()) { - currentRecord = recordIterator.next(); - return true; - } - } - - return false; - } - - @Override - public Void getCurrentKey() { - return null; - } - - @Override - public T getCurrentValue() { - return currentRecord; - } - - @Override - public float getProgress() { - return 0; - } - - @Override - public void close() {} - } -} diff --git a/pig/src/main/java/org/apache/iceberg/pig/IcebergStorage.java b/pig/src/main/java/org/apache/iceberg/pig/IcebergStorage.java deleted file mode 100644 index 0ce23c39913a..000000000000 --- a/pig/src/main/java/org/apache/iceberg/pig/IcebergStorage.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.pig; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.Tables; -import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.pig.IcebergPigInputFormat.IcebergRecordReader; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.NaNUtil; -import org.apache.pig.Expression; -import org.apache.pig.Expression.BetweenExpression; -import org.apache.pig.Expression.BinaryExpression; -import org.apache.pig.Expression.Column; -import org.apache.pig.Expression.Const; -import org.apache.pig.Expression.InExpression; -import org.apache.pig.Expression.OpType; -import org.apache.pig.Expression.UnaryExpression; -import org.apache.pig.LoadFunc; -import org.apache.pig.LoadMetadata; -import org.apache.pig.LoadPredicatePushdown; -import org.apache.pig.LoadPushDown; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceStatistics; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.util.ObjectSerializer; -import org.apache.pig.impl.util.UDFContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @deprecated will be removed in 1.8.0 - */ -@Deprecated -public class IcebergStorage extends LoadFunc - implements LoadMetadata, LoadPredicatePushdown, LoadPushDown { - private static final Logger LOG = LoggerFactory.getLogger(IcebergStorage.class); - - public static final String PIG_ICEBERG_TABLES_IMPL = "pig.iceberg.tables.impl"; - private static Tables iceberg; - private static final Map TABLES = Maps.newConcurrentMap(); - private static final Map LOCATIONS = Maps.newConcurrentMap(); - - private String signature; - - private IcebergRecordReader reader; - - public IcebergStorage() { - LOG.warn("Iceberg Pig is deprecated and will be removed in Iceberg 1.8.0"); - } - - @Override - public void setLocation(String location, Job job) { - LOG.info("[{}]: setLocation() -> {}", signature, location); - - LOCATIONS.put(signature, location); - - Configuration conf = job.getConfiguration(); - - copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_SCHEMA); - copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_PROJECTED_FIELDS); - copyUDFContextToScopedConfiguration(conf, IcebergPigInputFormat.ICEBERG_FILTER_EXPRESSION); - } - - @Override - public InputFormat getInputFormat() { - LOG.info("[{}]: getInputFormat()", signature); - String location = LOCATIONS.get(signature); - - return new IcebergPigInputFormat(TABLES.get(location), signature); - } - - @Override - public Tuple getNext() throws IOException { - if (!reader.nextKeyValue()) { - return null; - } - - return (Tuple) reader.getCurrentValue(); - } - - @Override - public void prepareToRead(RecordReader newReader, PigSplit split) { - LOG.info("[{}]: prepareToRead() -> {}", signature, split); - - this.reader = (IcebergRecordReader) newReader; - } - - @Override - public ResourceSchema getSchema(String location, Job job) throws IOException { - LOG.info("[{}]: getSchema() -> {}", signature, location); - - Schema schema = load(location, job).schema(); - storeInUDFContext(IcebergPigInputFormat.ICEBERG_SCHEMA, schema); - - return SchemaUtil.convert(schema); - } - - @Override - public ResourceStatistics getStatistics(String location, Job job) { - LOG.info("[{}]: getStatistics() -> : {}", signature, location); - - return null; - } - - @Override - public String[] getPartitionKeys(String location, Job job) { - LOG.info("[{}]: getPartitionKeys()", signature); - return new String[0]; - } - - @Override - public void setPartitionFilter(Expression partitionFilter) { - LOG.info("[{}]: setPartitionFilter() -> {}", signature, partitionFilter); - } - - @Override - public List getPredicateFields(String location, Job job) throws IOException { - LOG.info("[{}]: getPredicateFields() -> {}", signature, location); - Schema schema = load(location, job).schema(); - - List result = Lists.newArrayList(); - - for (Types.NestedField nf : schema.columns()) { - switch (nf.type().typeId()) { - case MAP: - case LIST: - case STRUCT: - continue; - default: - result.add(nf.name()); - } - } - - return result; - } - - @Override - public ImmutableList getSupportedExpressionTypes() { - LOG.info("[{}]: getSupportedExpressionTypes()", signature); - return ImmutableList.of( - OpType.OP_AND, - OpType.OP_OR, - OpType.OP_EQ, - OpType.OP_NE, - OpType.OP_NOT, - OpType.OP_GE, - OpType.OP_GT, - OpType.OP_LE, - OpType.OP_LT, - OpType.OP_BETWEEN, - OpType.OP_IN, - OpType.OP_NULL); - } - - @Override - public void setPushdownPredicate(Expression predicate) throws IOException { - LOG.info("[{}]: setPushdownPredicate()", signature); - LOG.info("[{}]: Pig predicate expression: {}", signature, predicate); - - org.apache.iceberg.expressions.Expression icebergExpression = convert(predicate); - - LOG.info("[{}]: Iceberg predicate expression: {}", signature, icebergExpression); - - storeInUDFContext(IcebergPigInputFormat.ICEBERG_FILTER_EXPRESSION, icebergExpression); - } - - private org.apache.iceberg.expressions.Expression convert(Expression expression) - throws IOException { - OpType op = expression.getOpType(); - - if (expression instanceof BinaryExpression) { - Expression lhs = ((BinaryExpression) expression).getLhs(); - Expression rhs = ((BinaryExpression) expression).getRhs(); - - switch (op) { - case OP_AND: - return Expressions.and(convert(lhs), convert(rhs)); - case OP_OR: - return Expressions.or(convert(lhs), convert(rhs)); - case OP_BETWEEN: - BetweenExpression between = (BetweenExpression) rhs; - return Expressions.and( - convert(OpType.OP_GE, (Column) lhs, (Const) between.getLower()), - convert(OpType.OP_LE, (Column) lhs, (Const) between.getUpper())); - case OP_IN: - return ((InExpression) rhs) - .getValues().stream() - .map(value -> convert(OpType.OP_EQ, (Column) lhs, (Const) value)) - .reduce(Expressions.alwaysFalse(), Expressions::or); - default: - if (lhs instanceof Column && rhs instanceof Const) { - return convert(op, (Column) lhs, (Const) rhs); - } else if (lhs instanceof Const && rhs instanceof Column) { - throw new FrontendException("Invalid expression ordering " + expression); - } - } - - } else if (expression instanceof UnaryExpression) { - Expression unary = ((UnaryExpression) expression).getExpression(); - - switch (op) { - case OP_NOT: - return Expressions.not(convert(unary)); - case OP_NULL: - return Expressions.isNull(((Column) unary).getName()); - default: - throw new FrontendException("Unsupported unary operator" + op); - } - } - - throw new FrontendException("Failed to pushdown expression " + expression); - } - - private org.apache.iceberg.expressions.Expression convert(OpType op, Column col, Const constant) { - String name = col.getName(); - Object value = constant.getValue(); - - switch (op) { - case OP_GE: - return Expressions.greaterThanOrEqual(name, value); - case OP_GT: - return Expressions.greaterThan(name, value); - case OP_LE: - return Expressions.lessThanOrEqual(name, value); - case OP_LT: - return Expressions.lessThan(name, value); - case OP_EQ: - return NaNUtil.isNaN(value) ? Expressions.isNaN(name) : Expressions.equal(name, value); - case OP_NE: - return NaNUtil.isNaN(value) ? Expressions.notNaN(name) : Expressions.notEqual(name, value); - } - - throw new RuntimeException( - String.format( - "[%s]: Failed to pushdown expression: %s %s %s", signature, col, op, constant)); - } - - @Override - public List getFeatures() { - return Collections.singletonList(OperatorSet.PROJECTION); - } - - @Override - public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) { - LOG.info("[{}]: pushProjection() -> {}", signature, requiredFieldList); - - try { - List projection = - requiredFieldList.getFields().stream() - .map(RequiredField::getAlias) - .collect(Collectors.toList()); - - storeInUDFContext(IcebergPigInputFormat.ICEBERG_PROJECTED_FIELDS, (Serializable) projection); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new RequiredFieldResponse(true); - } - - @Override - public void setUDFContextSignature(String newSignature) { - this.signature = newSignature; - } - - private void storeInUDFContext(String key, Serializable value) throws IOException { - Properties properties = - UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] {signature}); - - properties.setProperty(key, ObjectSerializer.serialize(value)); - } - - private void copyUDFContextToScopedConfiguration(Configuration conf, String key) { - String value = - UDFContext.getUDFContext() - .getUDFProperties(this.getClass(), new String[] {signature}) - .getProperty(key); - - if (value != null) { - conf.set(key + '.' + signature, value); - } - } - - @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException { - return location; - } - - private Table load(String location, Job job) throws IOException { - if (iceberg == null) { - Class tablesImpl = - job.getConfiguration().getClass(PIG_ICEBERG_TABLES_IMPL, HadoopTables.class); - LOG.info("Initializing iceberg tables implementation: {}", tablesImpl); - iceberg = (Tables) ReflectionUtils.newInstance(tablesImpl, job.getConfiguration()); - } - - Table result = TABLES.get(location); - - if (result == null) { - try { - LOG.info("[{}]: Loading table for location: {}", signature, location); - result = iceberg.load(location); - TABLES.put(location, result); - } catch (Exception e) { - throw new FrontendException("Failed to instantiate tables implementation", e); - } - } - - return result; - } -} diff --git a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java b/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java deleted file mode 100644 index 15ba9068caf5..000000000000 --- a/pig/src/main/java/org/apache/iceberg/pig/PigParquetReader.java +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.pig; - -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import org.apache.iceberg.Schema; -import org.apache.iceberg.parquet.ParquetSchemaUtil; -import org.apache.iceberg.parquet.ParquetValueReader; -import org.apache.iceberg.parquet.ParquetValueReaders; -import org.apache.iceberg.parquet.ParquetValueReaders.BinaryAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; -import org.apache.iceberg.parquet.ParquetValueReaders.IntAsLongReader; -import org.apache.iceberg.parquet.ParquetValueReaders.IntegerAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.LongAsDecimalReader; -import org.apache.iceberg.parquet.ParquetValueReaders.PrimitiveReader; -import org.apache.iceberg.parquet.ParquetValueReaders.RepeatedKeyValueReader; -import org.apache.iceberg.parquet.ParquetValueReaders.RepeatedReader; -import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; -import org.apache.iceberg.parquet.ParquetValueReaders.StringReader; -import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; -import org.apache.iceberg.parquet.ParquetValueReaders.UnboxedReader; -import org.apache.iceberg.parquet.TypeWithSchemaVisitor; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.Type.TypeID; -import org.apache.iceberg.types.Types; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @deprecated will be removed in 1.8.0 - */ -@Deprecated -public class PigParquetReader { - - private static final Logger LOG = LoggerFactory.getLogger(PigParquetReader.class); - - private PigParquetReader() { - LOG.warn("Iceberg Pig is deprecated and will be removed in Iceberg 1.8.0"); - } - - @SuppressWarnings("unchecked") - public static ParquetValueReader buildReader( - MessageType fileSchema, Schema expectedSchema, Map partitionValues) { - - if (ParquetSchemaUtil.hasIds(fileSchema)) { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit( - expectedSchema.asStruct(), fileSchema, new ReadBuilder(fileSchema, partitionValues)); - } else { - return (ParquetValueReader) - TypeWithSchemaVisitor.visit( - expectedSchema.asStruct(), - fileSchema, - new FallbackReadBuilder(fileSchema, partitionValues)); - } - } - - private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type, Map partitionValues) { - super(type, partitionValues); - } - - @Override - public ParquetValueReader message( - Types.StructType expected, MessageType message, List> fieldReaders) { - // the top level matches by ID, but the remaining IDs are missing - return super.struct(expected, message, fieldReaders); - } - - @Override - public ParquetValueReader struct( - Types.StructType ignored, GroupType struct, List> fieldReaders) { - // the expected struct is ignored because nested fields are never found when the - List> newFields = - Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List types = Lists.newArrayListWithExpectedSize(fieldReaders.size()); - List fields = struct.getFields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i); - int fieldD = getMessageType().getMaxDefinitionLevel(path(fieldType.getName())) - 1; - newFields.add(ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - types.add(fieldType); - } - - return new TupleReader(types, newFields); - } - } - - private static class ReadBuilder extends TypeWithSchemaVisitor> { - private final MessageType type; - private final Map partitionValues; - - ReadBuilder(MessageType type, Map partitionValues) { - this.type = type; - this.partitionValues = partitionValues; - } - - MessageType getMessageType() { - return this.type; - } - - @Override - public ParquetValueReader message( - Types.StructType expected, MessageType message, List> fieldReaders) { - return struct(expected, message.asGroupType(), fieldReaders); - } - - @Override - public ParquetValueReader struct( - Types.StructType expected, GroupType struct, List> fieldReaders) { - // match the expected struct's order - Map> readersById = Maps.newHashMap(); - Map typesById = Maps.newHashMap(); - Map maxDefinitionLevelsById = Maps.newHashMap(); - List fields = struct.getFields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i); - int fieldD = type.getMaxDefinitionLevel(path(fieldType.getName())) - 1; - int id = fieldType.getId().intValue(); - readersById.put(id, ParquetValueReaders.option(fieldType, fieldD, fieldReaders.get(i))); - typesById.put(id, fieldType); - if (partitionValues.containsKey(id)) { - maxDefinitionLevelsById.put(id, fieldD); - } - } - - List expectedFields = - expected != null ? expected.fields() : ImmutableList.of(); - List> reorderedFields = - Lists.newArrayListWithExpectedSize(expectedFields.size()); - List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); - // Defaulting to parent max definition level - int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); - for (Types.NestedField field : expectedFields) { - int id = field.fieldId(); - if (partitionValues.containsKey(id)) { - // the value may be null so containsKey is used to check for a partition value - int fieldMaxDefinitionLevel = - maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel); - reorderedFields.add( - ParquetValueReaders.constant(partitionValues.get(id), fieldMaxDefinitionLevel)); - types.add(null); - } else { - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } - } - } - - return new TupleReader(types, reorderedFields); - } - - @Override - public ParquetValueReader list( - Types.ListType expectedList, GroupType array, ParquetValueReader elementReader) { - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - - Type elementType = ParquetSchemaUtil.determineListElementType(array); - int elementD = type.getMaxDefinitionLevel(path(elementType.getName())) - 1; - - return new ArrayReader<>( - repeatedD, repeatedR, ParquetValueReaders.option(elementType, elementD, elementReader)); - } - - @Override - public ParquetValueReader map( - Types.MapType expectedMap, - GroupType map, - ParquetValueReader keyReader, - ParquetValueReader valueReader) { - GroupType repeatedKeyValue = map.getFields().get(0).asGroupType(); - String[] repeatedPath = currentPath(); - - int repeatedD = type.getMaxDefinitionLevel(repeatedPath) - 1; - int repeatedR = type.getMaxRepetitionLevel(repeatedPath) - 1; - - Type keyType = repeatedKeyValue.getType(0); - int keyD = type.getMaxDefinitionLevel(path(keyType.getName())) - 1; - Type valueType = repeatedKeyValue.getType(1); - int valueD = type.getMaxDefinitionLevel(path(valueType.getName())) - 1; - - return new MapReader<>( - repeatedD, - repeatedR, - ParquetValueReaders.option(keyType, keyD, keyReader), - ParquetValueReaders.option(valueType, valueD, valueReader)); - } - - @Override - public ParquetValueReader primitive( - org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { - ColumnDescriptor desc = type.getColumnDescription(currentPath()); - - if (primitive.getOriginalType() != null) { - switch (primitive.getOriginalType()) { - case ENUM: - case JSON: - case UTF8: - return new StringReader(desc); - case DATE: - return new DateReader(desc); - case INT_8: - case INT_16: - case INT_32: - if (expected != null && expected.typeId() == Types.LongType.get().typeId()) { - return new IntAsLongReader(desc); - } else { - return new UnboxedReader(desc); - } - case INT_64: - return new UnboxedReader<>(desc); - case TIMESTAMP_MILLIS: - return new TimestampMillisReader(desc); - case TIMESTAMP_MICROS: - return new TimestampMicrosReader(desc); - case DECIMAL: - DecimalLogicalTypeAnnotation decimal = - (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); - switch (primitive.getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - return new BinaryAsDecimalReader(desc, decimal.getScale()); - case INT32: - return new IntegerAsDecimalReader(desc, decimal.getScale()); - case INT64: - return new LongAsDecimalReader(desc, decimal.getScale()); - default: - throw new UnsupportedOperationException( - "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); - } - default: - throw new UnsupportedOperationException( - "Unsupported type: " + primitive.getOriginalType()); - } - } - - switch (primitive.getPrimitiveTypeName()) { - case FIXED_LEN_BYTE_ARRAY: - case BINARY: - return new BytesReader(desc); - case INT32: - if (expected != null && expected.typeId() == TypeID.LONG) { - return new IntAsLongReader(desc); - } else { - return new UnboxedReader<>(desc); - } - case FLOAT: - if (expected != null && expected.typeId() == TypeID.DOUBLE) { - return new FloatAsDoubleReader(desc); - } else { - return new UnboxedReader<>(desc); - } - case BOOLEAN: - case INT64: - case DOUBLE: - return new UnboxedReader<>(desc); - default: - throw new UnsupportedOperationException("Unsupported type: " + primitive); - } - } - } - - private static class DateReader extends PrimitiveReader { - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - - DateReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public String read(String reuse) { - OffsetDateTime day = EPOCH.plusDays(column.nextInteger()); - return String.format( - Locale.ROOT, - "%04d-%02d-%02d", - day.getYear(), - day.getMonth().getValue(), - day.getDayOfMonth()); - } - } - - private static class BytesReader extends PrimitiveReader { - BytesReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public DataByteArray read(DataByteArray reuse) { - byte[] bytes = column.nextBinary().getBytes(); - return new DataByteArray(bytes); - } - } - - private static class TimestampMicrosReader extends UnboxedReader { - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - - TimestampMicrosReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public String read(String ignored) { - return ChronoUnit.MICROS.addTo(EPOCH, column.nextLong()).toString(); - } - } - - private static class TimestampMillisReader extends UnboxedReader { - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - - TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public String read(String ignored) { - return ChronoUnit.MILLIS.addTo(EPOCH, column.nextLong()).toString(); - } - } - - private static class MapReader extends RepeatedKeyValueReader, Map, K, V> { - private final ReusableEntry nullEntry = new ReusableEntry<>(); - - MapReader( - int definitionLevel, - int repetitionLevel, - ParquetValueReader keyReader, - ParquetValueReader valueReader) { - super(definitionLevel, repetitionLevel, keyReader, valueReader); - } - - @Override - protected Map newMapData(Map reuse) { - return new LinkedHashMap<>(); - } - - @Override - protected Map.Entry getPair(Map reuse) { - return nullEntry; - } - - @Override - protected void addPair(Map map, K key, V value) { - map.put(key, value); - } - - @Override - protected Map buildMap(Map map) { - return map; - } - } - - private static class ArrayReader extends RepeatedReader { - private final BagFactory bagFactory = BagFactory.getInstance(); - private final TupleFactory tupleFactory = TupleFactory.getInstance(); - - ArrayReader(int definitionLevel, int repetitionLevel, ParquetValueReader reader) { - super(definitionLevel, repetitionLevel, reader); - } - - @Override - protected DataBag newListData(DataBag reuse) { - return bagFactory.newDefaultBag(); - } - - @Override - protected T getElement(DataBag list) { - return null; - } - - @Override - protected void addElement(DataBag bag, T element) { - bag.add(tupleFactory.newTuple(element)); - } - - @Override - protected DataBag buildList(DataBag bag) { - return bag; - } - } - - private static class TupleReader extends StructReader { - private static final TupleFactory TF = TupleFactory.getInstance(); - private final int numColumns; - - TupleReader(List types, List> readers) { - super(types, readers); - this.numColumns = readers.size(); - } - - @Override - protected Tuple newStructData(Tuple reuse) { - return TF.newTuple(numColumns); - } - - @Override - protected Object getField(Tuple tuple, int pos) { - return null; - } - - @Override - protected Tuple buildStruct(Tuple tuple) { - return tuple; - } - - @Override - protected void set(Tuple tuple, int pos, Object value) { - try { - tuple.set(pos, value); - } catch (ExecException e) { - throw new RuntimeException( - String.format( - Locale.ROOT, "Error setting tuple value for pos: %d, value: %s", pos, value), - e); - } - } - } -} diff --git a/pig/src/main/java/org/apache/iceberg/pig/SchemaUtil.java b/pig/src/main/java/org/apache/iceberg/pig/SchemaUtil.java deleted file mode 100644 index 4602a5effa97..000000000000 --- a/pig/src/main/java/org/apache/iceberg/pig/SchemaUtil.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.pig; - -import java.io.IOException; -import java.util.List; -import org.apache.iceberg.Schema; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.data.DataType; -import org.apache.pig.impl.logicalLayer.FrontendException; - -/** - * @deprecated will be removed in 1.8.0 - */ -@Deprecated -public class SchemaUtil { - - private SchemaUtil() {} - - public static ResourceSchema convert(Schema icebergSchema) throws IOException { - ResourceSchema result = new ResourceSchema(); - result.setFields(convertFields(icebergSchema.columns())); - return result; - } - - private static ResourceFieldSchema convert(Types.NestedField field) throws IOException { - ResourceFieldSchema result = convert(field.type()); - result.setName(field.name()); - result.setDescription(String.format("FieldId: %s", field.fieldId())); - - return result; - } - - private static ResourceFieldSchema convert(Type type) throws IOException { - ResourceFieldSchema result = new ResourceFieldSchema(); - result.setType(convertType(type)); - - if (!type.isPrimitiveType()) { - result.setSchema(convertComplex(type)); - } - - return result; - } - - private static ResourceFieldSchema[] convertFields(List fields) - throws IOException { - List result = Lists.newArrayList(); - - for (Types.NestedField nf : fields) { - result.add(convert(nf)); - } - - return result.toArray(new ResourceFieldSchema[0]); - } - - private static byte convertType(Type type) throws IOException { - switch (type.typeId()) { - case BOOLEAN: - return DataType.BOOLEAN; - case INTEGER: - return DataType.INTEGER; - case LONG: - return DataType.LONG; - case FLOAT: - return DataType.FLOAT; - case DOUBLE: - return DataType.DOUBLE; - case TIMESTAMP: - return DataType.CHARARRAY; - case DATE: - return DataType.CHARARRAY; - case STRING: - return DataType.CHARARRAY; - case FIXED: - return DataType.BYTEARRAY; - case BINARY: - return DataType.BYTEARRAY; - case DECIMAL: - return DataType.BIGDECIMAL; - case STRUCT: - return DataType.TUPLE; - case LIST: - return DataType.BAG; - case MAP: - return DataType.MAP; - default: - throw new FrontendException("Unsupported primitive type:" + type); - } - } - - private static ResourceSchema convertComplex(Type type) throws IOException { - ResourceSchema result = new ResourceSchema(); - - switch (type.typeId()) { - case STRUCT: - Types.StructType structType = type.asStructType(); - - List fields = Lists.newArrayList(); - - for (Types.NestedField f : structType.fields()) { - fields.add(convert(f)); - } - - result.setFields(fields.toArray(new ResourceFieldSchema[0])); - - return result; - case LIST: - Types.ListType listType = type.asListType(); - - ResourceFieldSchema[] elementFieldSchemas = - new ResourceFieldSchema[] {convert(listType.elementType())}; - - if (listType.elementType().isStructType()) { - result.setFields(elementFieldSchemas); - } else { - // Wrap non-struct types in tuples - ResourceSchema elementSchema = new ResourceSchema(); - elementSchema.setFields(elementFieldSchemas); - - ResourceFieldSchema tupleSchema = new ResourceFieldSchema(); - tupleSchema.setType(DataType.TUPLE); - tupleSchema.setSchema(elementSchema); - - result.setFields(new ResourceFieldSchema[] {tupleSchema}); - } - - return result; - case MAP: - Types.MapType mapType = type.asMapType(); - - if (mapType.keyType().typeId() != Type.TypeID.STRING) { - throw new FrontendException("Unsupported map key type: " + mapType.keyType()); - } - result.setFields(new ResourceFieldSchema[] {convert(mapType.valueType())}); - - return result; - default: - throw new FrontendException("Unsupported complex type: " + type); - } - } - - public static Schema project(Schema schema, List requiredFields) { - List columns = Lists.newArrayList(); - - for (String column : requiredFields) { - columns.add(schema.findField(column)); - } - - return new Schema(columns); - } -} diff --git a/pig/src/test/java/org/apache/iceberg/pig/SchemaUtilTest.java b/pig/src/test/java/org/apache/iceberg/pig/SchemaUtilTest.java deleted file mode 100644 index f7136045a1f5..000000000000 --- a/pig/src/test/java/org/apache/iceberg/pig/SchemaUtilTest.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.pig; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import java.io.IOException; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types.BinaryType; -import org.apache.iceberg.types.Types.BooleanType; -import org.apache.iceberg.types.Types.DecimalType; -import org.apache.iceberg.types.Types.DoubleType; -import org.apache.iceberg.types.Types.FloatType; -import org.apache.iceberg.types.Types.IntegerType; -import org.apache.iceberg.types.Types.ListType; -import org.apache.iceberg.types.Types.LongType; -import org.apache.iceberg.types.Types.MapType; -import org.apache.iceberg.types.Types.StringType; -import org.apache.iceberg.types.Types.StructType; -import org.apache.pig.ResourceSchema; -import org.apache.pig.impl.logicalLayer.FrontendException; -import org.junit.jupiter.api.Test; - -public class SchemaUtilTest { - - @Test - public void testPrimitive() throws IOException { - Schema icebergSchema = - new Schema( - optional(1, "b", BooleanType.get()), - optional(2, "i", IntegerType.get()), - optional(3, "l", LongType.get()), - optional(4, "f", FloatType.get()), - optional(5, "d", DoubleType.get()), - optional(6, "dec", DecimalType.of(0, 2)), - optional(7, "s", StringType.get()), - optional(8, "bi", BinaryType.get())); - - ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema); - assertThat(pigSchema.toString()) - .isEqualTo( - "b:boolean,i:int,l:long,f:float,d:double,dec:bigdecimal,s:chararray,bi:bytearray"); - } - - @Test - public void testComplex() throws IOException { - convertToPigSchema( - new Schema( - optional(1, "bag", ListType.ofOptional(2, BooleanType.get())), - optional(3, "map", MapType.ofOptional(4, 5, StringType.get(), DoubleType.get())), - optional( - 6, - "tuple", - StructType.of( - optional(7, "i", IntegerType.get()), optional(8, "f", FloatType.get())))), - "bag:{(boolean)},map:[double],tuple:(i:int,f:float)", - null); - } - - @Test - public void invalidMap() { - assertThatThrownBy( - () -> - convertToPigSchema( - new Schema( - optional( - 1, - "invalid", - MapType.ofOptional(2, 3, IntegerType.get(), DoubleType.get()))), - "", - "")) - .isInstanceOf(FrontendException.class) - .hasMessageContaining("Unsupported map key type: int"); - } - - @Test - public void nestedMaps() throws IOException { - convertToPigSchema( - new Schema( - optional( - 1, - "nested", - MapType.ofOptional( - 2, - 3, - StringType.get(), - MapType.ofOptional( - 4, - 5, - StringType.get(), - MapType.ofOptional(6, 7, StringType.get(), DecimalType.of(10, 2)))))), - "nested:[[[bigdecimal]]]", - ""); - } - - @Test - public void nestedBags() throws IOException { - convertToPigSchema( - new Schema( - optional( - 1, - "nested", - ListType.ofOptional( - 2, ListType.ofOptional(3, ListType.ofOptional(4, DoubleType.get()))))), - "nested:{({({(double)})})}", - ""); - } - - @Test - public void nestedTuples() throws IOException { - convertToPigSchema( - new Schema( - optional( - 1, - "first", - StructType.of( - optional( - 2, - "second", - StructType.of( - optional( - 3, - "third", - StructType.of(optional(4, "val", StringType.get())))))))), - "first:(second:(third:(val:chararray)))", - ""); - } - - @Test - public void complexNested() throws IOException { - convertToPigSchema( - new Schema( - optional( - 1, - "t", - StructType.of( - optional( - 2, - "b", - ListType.ofOptional( - 3, - StructType.of( - optional(4, "i", IntegerType.get()), - optional(5, "s", StringType.get())))))), - optional( - 6, - "m1", - MapType.ofOptional( - 7, - 8, - StringType.get(), - StructType.of( - optional(9, "b", ListType.ofOptional(10, BinaryType.get())), - optional( - 11, - "m2", - MapType.ofOptional(12, 13, StringType.get(), IntegerType.get()))))), - optional( - 14, - "b1", - ListType.ofOptional( - 15, - MapType.ofOptional( - 16, 17, StringType.get(), ListType.ofOptional(18, FloatType.get()))))), - "t:(b:{(i:int,s:chararray)}),m1:[(b:{(bytearray)},m2:[int])],b1:{([{(float)}])}", - ""); - } - - @Test - public void mapConversions() throws IOException { - // consistent behavior for maps conversions. The below test case, correctly does not specify map - // key types - convertToPigSchema( - new Schema( - required( - 1, - "a", - MapType.ofRequired( - 2, - 3, - StringType.get(), - ListType.ofRequired( - 4, - StructType.of( - required(5, "b", LongType.get()), - required(6, "c", StringType.get())))))), - "a:[{(b:long,c:chararray)}]", - "We do not specify the map key type here"); - // struct>> -> (a:[[double]]) - // As per https://pig.apache.org/docs/latest/basic.html#map-schema. It seems that - // we only need to specify value type as keys are always of type chararray - convertToPigSchema( - new Schema( - StructType.of( - required( - 1, - "a", - MapType.ofRequired( - 2, - 3, - StringType.get(), - MapType.ofRequired(4, 5, StringType.get(), DoubleType.get())))) - .fields()), - "a:[[double]]", - "A map key type does not need to be specified"); - } - - @Test - public void testTupleInMap() throws IOException { - Schema icebergSchema = - new Schema( - optional( - 1, - "nested_list", - MapType.ofOptional( - 2, - 3, - StringType.get(), - ListType.ofOptional( - 4, - StructType.of( - required(5, "id", LongType.get()), - optional(6, "data", StringType.get())))))); - - ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema); - // The output should contain a nested struct within a list within a map, I think. - assertThat(pigSchema.toString()).isEqualTo("nested_list:[{(id:long,data:chararray)}]"); - } - - @Test - public void testLongInBag() throws IOException { - Schema icebergSchema = - new Schema( - optional( - 1, - "nested_list", - MapType.ofOptional( - 2, 3, StringType.get(), ListType.ofRequired(5, LongType.get())))); - SchemaUtil.convert(icebergSchema); - } - - @Test - public void doubleWrappingTuples() throws IOException { - // struct>> -> (a:{(b:chararray)}) - convertToPigSchema( - new Schema( - StructType.of( - required( - 1, - "a", - ListType.ofRequired(2, StructType.of(required(3, "b", StringType.get()))))) - .fields()), - "a:{(b:chararray)}", - "A tuple inside a bag should not be double wrapped"); - // struct> -> "(a:{(boolean)}) - convertToPigSchema( - new Schema( - StructType.of(required(1, "a", ListType.ofRequired(2, BooleanType.get()))).fields()), - "a:{(boolean)}", - "boolean (or anything non-tuple) element inside a bag should be wrapped inside a tuple"); - } - - private static void convertToPigSchema( - Schema icebergSchema, String expectedPigSchema, String assertMessage) throws IOException { - ResourceSchema pigSchema = SchemaUtil.convert(icebergSchema); - assertThat(pigSchema.toString()).as(assertMessage).isEqualTo(expectedPigSchema); - } -} diff --git a/settings.gradle b/settings.gradle index 56a68c384c5f..103741389a26 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,7 +33,6 @@ include 'arrow' include 'parquet' include 'bundled-guava' include 'spark' -include 'pig' include 'hive-metastore' include 'nessie' include 'gcp' @@ -58,7 +57,6 @@ project(':arrow').name = 'iceberg-arrow' project(':parquet').name = 'iceberg-parquet' project(':bundled-guava').name = 'iceberg-bundled-guava' project(':spark').name = 'iceberg-spark' -project(':pig').name = 'iceberg-pig' project(':hive-metastore').name = 'iceberg-hive-metastore' project(':nessie').name = 'iceberg-nessie' project(':gcp').name = 'iceberg-gcp' diff --git a/site/docs/contribute.md b/site/docs/contribute.md index 94b4679da78c..a12936a7bc49 100644 --- a/site/docs/contribute.md +++ b/site/docs/contribute.md @@ -119,7 +119,6 @@ This project Iceberg also has modules for adding Iceberg support to processing e * `iceberg-spark` is an implementation of Spark's Datasource V2 API for Iceberg with submodules for each spark versions (use runtime jars for a shaded version) * `iceberg-flink` contains classes for integrating with Apache Flink (use iceberg-flink-runtime for a shaded version) * `iceberg-mr` contains an InputFormat and other classes for integrating with Apache Hive -* `iceberg-pig` is an implementation of Pig's LoadFunc API for Iceberg ## Setting up IDE and Code Style