-
Notifications
You must be signed in to change notification settings - Fork 2.9k
ORC support integration for Spark 2.4.0 #139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf5b191
e39fd43
0e72ec0
7b50e13
7b094ee
2051372
278eeca
9bbf86c
5bdd32f
f0adb15
7bf13ee
0274109
82aea48
6c3173c
87cf57c
b5004a4
4b86672
2c8286d
c0429f0
29403e0
d5ae94a
4e785a9
bb0ef40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,38 +1,47 @@ | ||
| /* | ||
| * Copyright 2018 Hortonworks | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.orc; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import java.io.IOException; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.hadoop.HadoopInputFile; | ||
| import org.apache.iceberg.hadoop.HadoopOutputFile; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.orc.OrcConf; | ||
| import org.apache.orc.OrcFile; | ||
| import org.apache.orc.Reader; | ||
| import org.apache.orc.TypeDescription; | ||
|
|
||
| import static org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch.DEFAULT_SIZE; | ||
|
|
||
| public class ORC { | ||
|
|
||
| static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size"; | ||
|
|
||
| private ORC() { | ||
| } | ||
|
|
||
|
|
@@ -44,6 +53,7 @@ public static class WriteBuilder { | |
| private final OutputFile file; | ||
| private final Configuration conf; | ||
| private Schema schema = null; | ||
| private Function<TypeDescription, OrcValueWriter<?>> createWriterFunc; | ||
| private Map<String, byte[]> metadata = new HashMap<>(); | ||
|
|
||
| private WriteBuilder(OutputFile file) { | ||
|
|
@@ -65,15 +75,27 @@ public WriteBuilder config(String property, String value) { | |
| return this; | ||
| } | ||
|
|
||
| public WriteBuilder createWriterFunc(Function<TypeDescription, OrcValueWriter<?>> writerFunction) { | ||
| this.createWriterFunc = writerFunction; | ||
| return this; | ||
| } | ||
|
|
||
| public WriteBuilder setAll(Map<String, String> properties) { | ||
| properties.forEach(conf::set); | ||
| return this; | ||
| } | ||
|
|
||
| public WriteBuilder schema(Schema schema) { | ||
| this.schema = schema; | ||
| return this; | ||
| } | ||
|
|
||
| public OrcFileAppender build() { | ||
| OrcFile.WriterOptions options = | ||
| OrcFile.writerOptions(conf); | ||
| return new OrcFileAppender(schema, file, options, metadata); | ||
| public <D> FileAppender<D> build() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also addresses the issue #127
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But this does not actually add support for generics, right?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you are right @rdblue
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue I think Avro and Parquet store Iceberg schema as a json string in their metadata. I'm not sure why that is required, but do u think that it makes sense to do that here as well for ORC?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably a good idea, but not required. The requirement is that we can get the file's Iceberg schema from its metadata, and I prefer to build that from the file schema itself, plus the column IDs. That way, we don't have problems from a faulty conversion in an old version. |
||
| Preconditions.checkNotNull(schema, "Schema is required"); | ||
| OrcFile.WriterOptions options = OrcFile.writerOptions(conf); | ||
| return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()), | ||
| this.file, createWriterFunc, options, metadata, | ||
| conf.getInt(VECTOR_ROW_BATCH_SIZE, DEFAULT_SIZE)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -88,6 +110,8 @@ public static class ReadBuilder { | |
| private Long start = null; | ||
| private Long length = null; | ||
|
|
||
| private Function<Schema, OrcValueReader<?>> readerFunction; | ||
|
|
||
| private ReadBuilder(InputFile file) { | ||
| Preconditions.checkNotNull(file, "Input file cannot be null"); | ||
| this.file = file; | ||
|
|
@@ -116,27 +140,24 @@ public ReadBuilder schema(org.apache.iceberg.Schema schema) { | |
| return this; | ||
| } | ||
|
|
||
| public ReadBuilder caseSensitive(boolean caseSensitive) { | ||
| OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(this.conf, caseSensitive); | ||
| return this; | ||
| } | ||
|
|
||
| public ReadBuilder config(String property, String value) { | ||
| conf.set(property, value); | ||
| return this; | ||
| } | ||
|
|
||
| public OrcIterator build() { | ||
| public ReadBuilder createReaderFunc(Function<Schema, OrcValueReader<?>> readerFunction) { | ||
| this.readerFunction = readerFunction; | ||
| return this; | ||
| } | ||
|
|
||
| public <D> CloseableIterable<D> build() { | ||
| Preconditions.checkNotNull(schema, "Schema is required"); | ||
| try { | ||
| Path path = new Path(file.location()); | ||
| Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); | ||
| ColumnIdMap columnIds = new ColumnIdMap(); | ||
| TypeDescription orcSchema = TypeConversion.toOrc(schema, columnIds); | ||
| Reader.Options options = reader.options(); | ||
| if (start != null) { | ||
| options.range(start, length); | ||
| } | ||
| options.schema(orcSchema); | ||
| return new OrcIterator(path, orcSchema, reader.rows(options)); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Can't open " + file.location(), e); | ||
| } | ||
| return new OrcIterable<>(file, conf, schema, start, length, readerFunction); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,28 +1,32 @@ | ||
| /* | ||
| * Copyright 2018 Hortonworks | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.orc; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.iceberg.Metrics; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.io.FileAppender; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.orc.ColumnStatistics; | ||
|
|
@@ -34,36 +38,40 @@ | |
| /** | ||
| * Create a file appender for ORC. | ||
| */ | ||
| public class OrcFileAppender implements FileAppender<VectorizedRowBatch> { | ||
| private final Writer writer; | ||
| class OrcFileAppender<D> implements FileAppender<D> { | ||
| private final int batchSize; | ||
| private final TypeDescription orcSchema; | ||
| private final ColumnIdMap columnIds = new ColumnIdMap(); | ||
| private final Path path; | ||
| private final Writer writer; | ||
| private final VectorizedRowBatch batch; | ||
| private final OrcValueWriter<D> valueWriter; | ||
| private boolean isClosed = false; | ||
|
|
||
| public static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids"; | ||
| private static final String COLUMN_NUMBERS_ATTRIBUTE = "iceberg.column.ids"; | ||
|
|
||
| OrcFileAppender(Schema schema, | ||
| OutputFile file, | ||
| OrcFile.WriterOptions options, | ||
| Map<String,byte[]> metadata) { | ||
| orcSchema = TypeConversion.toOrc(schema, columnIds); | ||
| options.setSchema(orcSchema); | ||
| OrcFileAppender(TypeDescription schema, OutputFile file, | ||
| Function<TypeDescription, OrcValueWriter<?>> createWriterFunc, | ||
| OrcFile.WriterOptions options, Map<String, byte[]> metadata, | ||
| int batchSize) { | ||
| orcSchema = schema; | ||
| path = new Path(file.location()); | ||
| try { | ||
| writer = OrcFile.createWriter(path, options); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Can't create file " + path, e); | ||
| } | ||
| writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); | ||
| metadata.forEach( | ||
| (key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); | ||
| this.batchSize = batchSize; | ||
| batch = orcSchema.createRowBatch(this.batchSize); | ||
|
|
||
| options.setSchema(orcSchema); | ||
| writer = newOrcWriter(file, columnIds, options, metadata); | ||
| valueWriter = newOrcValueWriter(orcSchema, createWriterFunc); | ||
| } | ||
|
|
||
| @Override | ||
| public void add(VectorizedRowBatch datum) { | ||
| public void add(D datum) { | ||
| try { | ||
| writer.addRowBatch(datum); | ||
| valueWriter.write(datum, batch); | ||
| if (batch.size == this.batchSize) { | ||
| writer.addRowBatch(batch); | ||
| batch.reset(); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Problem writing to ORC file " + path, e); | ||
| } | ||
|
|
@@ -108,12 +116,39 @@ public long length() { | |
| @Override | ||
| public void close() throws IOException { | ||
| if (!isClosed) { | ||
| this.isClosed = true; | ||
| writer.close(); | ||
| try { | ||
| if (batch.size > 0) { | ||
| writer.addRowBatch(batch); | ||
| batch.reset(); | ||
| } | ||
| } finally { | ||
| writer.close(); | ||
| this.isClosed = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public TypeDescription getSchema() { | ||
| return orcSchema; | ||
| private static Writer newOrcWriter(OutputFile file, | ||
| ColumnIdMap columnIds, | ||
| OrcFile.WriterOptions options, Map<String, byte[]> metadata) { | ||
| final Path locPath = new Path(file.location()); | ||
| final Writer writer; | ||
|
|
||
| try { | ||
| writer = OrcFile.createWriter(locPath, options); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to pass an output stream?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately no, I checked the |
||
| } catch (IOException e) { | ||
| throw new RuntimeException("Can't create file " + locPath, e); | ||
| } | ||
|
|
||
| writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); | ||
| metadata.forEach((key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); | ||
|
|
||
| return writer; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static <D> OrcValueWriter<D> newOrcValueWriter(TypeDescription schema, | ||
| Function<TypeDescription, OrcValueWriter<?>> createWriterFunc) { | ||
| return (OrcValueWriter<D>) createWriterFunc.apply(schema); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.