diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index a763f9565631..515904ed5140 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import java.io.Serializable; import java.util.List; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; @@ -35,7 +36,7 @@ /** * Interface for data files listed in a table manifest. */ -public interface DataFile extends ContentFile { +public interface DataFile extends ContentFile, Serializable { // fields for adding delete data files Types.NestedField CONTENT = optional(134, "content", IntegerType.get(), "Contents of the file: 0=data, 1=position deletes, 2=equality deletes"); diff --git a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java index e08dd6ffe537..06c0eed00140 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/Namespace.java +++ b/api/src/main/java/org/apache/iceberg/catalog/Namespace.java @@ -19,13 +19,14 @@ package org.apache.iceberg.catalog; +import java.io.Serializable; import java.util.Arrays; import org.apache.iceberg.relocated.com.google.common.base.Joiner; /** * A namespace in a {@link Catalog}. */ -public class Namespace { +public class Namespace implements Serializable { private static final Namespace EMPTY_NAMESPACE = new Namespace(new String[] {}); private static final Joiner DOT = Joiner.on('.'); diff --git a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java index 05de7fd22ec8..5f439862486e 100644 --- a/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java +++ b/api/src/main/java/org/apache/iceberg/catalog/TableIdentifier.java @@ -19,6 +19,7 @@ package org.apache.iceberg.catalog; +import java.io.Serializable; import java.util.Arrays; import java.util.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -28,7 +29,7 @@ /** * Identifies a table in iceberg catalog. */ -public class TableIdentifier { +public class TableIdentifier implements Serializable { private static final Splitter DOT = Splitter.on('.'); diff --git a/beam/src/main/java/org/apache/iceberg/beam/BeamAppenderFactory.java b/beam/src/main/java/org/apache/iceberg/beam/BeamAppenderFactory.java new file mode 100644 index 000000000000..a8901d03f567 --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/BeamAppenderFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.io.IOException; +import java.util.Map; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.GenericAvroWriter; +import org.apache.iceberg.beam.writers.GenericAvroOrcWriter; +import org.apache.iceberg.beam.writers.GenericAvroParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PositionDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +public class BeamAppenderFactory implements FileAppenderFactory { + private final Schema schema; + private final Map properties; + private final PartitionSpec spec; + + BeamAppenderFactory(Schema schema, Map properties, PartitionSpec spec) { + this.schema = schema; + this.properties = properties; + this.spec = spec; + } + + @Override + public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); + try { + switch (fileFormat) { + case AVRO: + return Avro + .write(file) + .createWriterFunc(GenericAvroWriter::new) + .setAll(properties) + .schema(schema) + .overwrite() + .build(); + + case PARQUET: + return Parquet.write(file) + .createWriterFunc(GenericAvroParquetWriter::buildWriter) + .setAll(properties) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + case ORC: + return ORC.write(file) + .createWriterFunc(GenericAvroOrcWriter::buildWriter) + .setAll(properties) + .metricsConfig(metricsConfig) + .schema(schema) + .overwrite() + .build(); + + default: + throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat); + } + } catch (IOException e) { + throw new RuntimeIOException(e); + } + } + + @Override + public DataWriter newDataWriter(EncryptedOutputFile file, FileFormat format, StructLike partition) { + return new DataWriter<>(newAppender(file.encryptingOutputFile(), format), format, + file.encryptingOutputFile().location(), spec, partition, file.keyMetadata()); + } + + @Override + public EqualityDeleteWriter newEqDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + return null; + } + + @Override + public PositionDeleteWriter newPosDeleteWriter(EncryptedOutputFile file, FileFormat format, + StructLike partition) { + return null; + } +} diff --git a/beam/src/main/java/org/apache/iceberg/beam/FileWriter.java b/beam/src/main/java/org/apache/iceberg/beam/FileWriter.java new file mode 100644 index 000000000000..8a87fadfba9f --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/FileWriter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedWriter; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FileWriter extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + + private final PartitionSpec spec; + private final TableIdentifier tableIdentifier; + private final String hiveMetastoreUrl; + + private Map properties; + private Schema schema; + private LocationProvider locations; + private FileIO io; + private EncryptionManager encryptionManager; + private HiveCatalog catalog; + private FileFormat fileFormat; + + private transient TaskWriter writer; + private transient BoundedWindow lastSeenWindow; + + public FileWriter( + TableIdentifier tableIdentifier, + Schema schema, + PartitionSpec spec, + String hiveMetastoreUrl, + Map properties + ) { + this.tableIdentifier = tableIdentifier; + this.spec = spec; + this.hiveMetastoreUrl = hiveMetastoreUrl; + this.schema = schema; + this.properties = properties; + } + + @StartBundle + public void startBundle(StartBundleContext sbc) { + start(); + } + + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) { + appendRecord( + context.element(), + window, + (int) context.pane().getIndex(), + context.pane().getIndex() + ); + } + + @FinishBundle + public void finishBundle(FinishBundleContext fbc) { + DataFile[] files = finish(); + for (DataFile file : files) { + fbc.output(file, Instant.now(), lastSeenWindow); + } + } + + @VisibleForTesting + public void start() { + Configuration conf = new Configuration(); + for (String key : this.properties.keySet()) { + conf.set(key, this.properties.get(key)); + } + catalog = new HiveCatalog( + HiveCatalog.DEFAULT_NAME, + this.hiveMetastoreUrl, + 1, + conf + ); + Table table = HiveCatalogHelper.loadOrCreateTable( + catalog, + tableIdentifier, + schema, + spec, + properties + ); + this.schema = table.schema(); + this.locations = table.locationProvider(); + this.properties = table.properties(); + this.io = table.io(); + this.encryptionManager = table.encryption(); + String formatString = table.properties().getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT); + this.fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + } + + @VisibleForTesting + public void appendRecord(T element, BoundedWindow window, int partitionId, long taskId) { + if (writer == null) { + LOG.info("Setting up the writer"); + // We would rather do this in the startBundle, but we don't know the pane + + BeamAppenderFactory appenderFactory = new BeamAppenderFactory<>(schema, properties, spec); + OutputFileFactory fileFactory = new OutputFileFactory( + spec, fileFormat, locations, io, encryptionManager, partitionId, taskId); + + if (spec.isUnpartitioned()) { + writer = new UnpartitionedWriter<>(spec, fileFormat, appenderFactory, fileFactory, io, Long.MAX_VALUE); + } else { + writer = new PartitionedWriter( + spec, fileFormat, appenderFactory, fileFactory, io, Long.MAX_VALUE) { + @Override + protected PartitionKey partition(T row) { + return new PartitionKey(spec, schema); + } + }; + } + } + try { + lastSeenWindow = window; + writer.write(element); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @VisibleForTesting + public DataFile[] finish() { + LOG.info("Closing the writer"); + try { + writer.close(); + return writer.dataFiles(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + writer = null; + catalog.close(); + catalog = null; + } + } +} diff --git a/beam/src/main/java/org/apache/iceberg/beam/HiveCatalogHelper.java b/beam/src/main/java/org/apache/iceberg/beam/HiveCatalogHelper.java new file mode 100644 index 000000000000..928ff8de3cb4 --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/HiveCatalogHelper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hive.HiveCatalog; + +public class HiveCatalogHelper { + + private HiveCatalogHelper() { + } + + public static synchronized Table loadOrCreateTable( + HiveCatalog hiveCatalog, + TableIdentifier tableIdentifier, + Schema schema, + PartitionSpec spec, + Map properties) { + try { + return hiveCatalog.loadTable(tableIdentifier); + } catch (NoSuchTableException noSuchTableException) { + try { + // If it doesn't exist, we just create the table + return hiveCatalog.createTable( + tableIdentifier, + schema, + spec, + properties + ); + } catch (AlreadyExistsException alreadyExistsException) { + // It can be that there is a race condition, that the table has been + // created by another worker + return hiveCatalog.loadTable(tableIdentifier); + } + } + } +} diff --git a/beam/src/main/java/org/apache/iceberg/beam/IcebergDataFileCommitter.java b/beam/src/main/java/org/apache/iceberg/beam/IcebergDataFileCommitter.java new file mode 100644 index 000000000000..7ec2fdb2497d --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/IcebergDataFileCommitter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; + +class IcebergDataFileCommitter extends Combine.CombineFn, Snapshot> { + private final TableIdentifier tableIdentifier; + private final String hiveMetastoreUrl; + private final Map properties; + + IcebergDataFileCommitter(TableIdentifier table, String hiveMetastoreUrl, Map properties) { + this.tableIdentifier = table; + this.hiveMetastoreUrl = hiveMetastoreUrl; + this.properties = properties; + } + + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List addInput(List mutableAccumulator, DataFile input) { + mutableAccumulator.add(input); + return mutableAccumulator; + } + + @Override + public List mergeAccumulators(Iterable> accumulators) { + Iterator> itr = accumulators.iterator(); + if (itr.hasNext()) { + List first = itr.next(); + while (itr.hasNext()) { + first.addAll(itr.next()); + } + return first; + } else { + return new ArrayList<>(); + } + } + + @Override + public Snapshot extractOutput(List datafiles) { + Configuration conf = new Configuration(); + for (String key : this.properties.keySet()) { + conf.set(key, this.properties.get(key)); + } + try (HiveCatalog catalog = new HiveCatalog( + HiveCatalog.DEFAULT_NAME, + this.hiveMetastoreUrl, + 1, + conf + )) { + Table table = catalog.loadTable(tableIdentifier); + if (!datafiles.isEmpty()) { + final AppendFiles app = table.newAppend(); + for (DataFile datafile : datafiles) { + app.appendFile(datafile); + } + app.commit(); + } + return table.currentSnapshot(); + } + } +} diff --git a/beam/src/main/java/org/apache/iceberg/beam/IcebergIO.java b/beam/src/main/java/org/apache/iceberg/beam/IcebergIO.java new file mode 100644 index 000000000000..ef254785d61d --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/IcebergIO.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + + +import java.util.Map; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.catalog.TableIdentifier; + +public class IcebergIO { + private IcebergIO() { + } + + public static final class Builder { + private TableIdentifier tableIdentifier; + private Schema schema; + private String hiveMetaStoreUrl; + + private final Map properties = Maps.newHashMap(); + + public Builder withTableIdentifier(TableIdentifier newTableIdentifier) { + this.tableIdentifier = newTableIdentifier; + return this; + } + + public Builder withSchema(Schema newSchema) { + this.schema = newSchema; + return this; + } + + public Builder withHiveMetastoreUrl(String newHiveMetaStoreUrl) { + assert newHiveMetaStoreUrl.startsWith("thrift://"); + this.hiveMetaStoreUrl = newHiveMetaStoreUrl; + return this; + } + + public Builder conf(String key, String value) { + this.properties.put(key, value); + return this; + } + + public PCollection build(PCollection avroRecords) { + return IcebergIO.write( + avroRecords, + this.tableIdentifier, + this.schema, + this.hiveMetaStoreUrl, + this.properties + ); + } + } + + private static PCollection write( + PCollection avroRecords, + TableIdentifier table, + Schema schema, + String hiveMetastoreUrl, + Map properties + ) { + // We take the filenames that are emitted by the FileIO + final PCollection dataFiles = avroRecords + .apply( + "Write DataFiles", + ParDo.of(new FileWriter<>(table, schema, PartitionSpec.unpartitioned(), hiveMetastoreUrl, properties)) + ) + .setCoder(SerializableCoder.of(DataFile.class)); + + // We use a combiner, to combine all the files to a single commit in + // the Iceberg log + final IcebergDataFileCommitter combiner = new IcebergDataFileCommitter(table, hiveMetastoreUrl, properties); + final Combine.Globally combined = Combine.globally(combiner).withoutDefaults(); + + // We return the latest snapshot, which can be used to notify downstream consumers. + return dataFiles.apply("Commit DataFiles", combined); + } +} + diff --git a/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroOrcWriter.java b/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroOrcWriter.java new file mode 100644 index 000000000000..7ab2a99848f7 --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroOrcWriter.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam.writers; + +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.orc.GenericOrcWriters; +import org.apache.iceberg.orc.OrcRowWriter; +import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; +import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + +public class GenericAvroOrcWriter implements OrcRowWriter { + private final OrcValueWriter writer; + + private GenericAvroOrcWriter(Schema expectedSchema, TypeDescription orcSchema) { + Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT, + "Top level must be a struct " + orcSchema); + + writer = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder()); + } + + public static OrcRowWriter buildWriter(Schema expectedSchema, TypeDescription fileSchema) { + return new GenericAvroOrcWriter(expectedSchema, fileSchema); + } + + @Override + @SuppressWarnings("unchecked") + public void write(GenericRecord value, VectorizedRowBatch output) { + Preconditions.checkArgument(writer instanceof RecordWriter, "writer must be a RecordWriter."); + + int row = output.size; + output.size += 1; + List> writers = ((RecordWriter) writer).writers(); + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + Object fieldValue = value.get(c); + if (fieldValue instanceof Utf8) { + fieldValue = ((Utf8) fieldValue).toString(); + } + child.write(row, fieldValue, output.cols[c]); + } + } + + private static class WriteBuilder extends OrcSchemaWithTypeVisitor> { + private WriteBuilder() { + } + + @Override + public OrcValueWriter record(Types.StructType iStruct, TypeDescription record, + List names, List> fields) { + return new RecordWriter(fields); + } + + @Override + public OrcValueWriter list(Types.ListType iList, TypeDescription array, + OrcValueWriter element) { + return GenericOrcWriters.list(element); + } + + @Override + public OrcValueWriter map(Types.MapType iMap, TypeDescription map, + OrcValueWriter key, OrcValueWriter value) { + return GenericOrcWriters.map(key, value); + } + + @Override + public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) { + switch (iPrimitive.typeId()) { + case BOOLEAN: + return GenericOrcWriters.booleans(); + case INTEGER: + return GenericOrcWriters.ints(); + case LONG: + return GenericOrcWriters.longs(); + case FLOAT: + return GenericOrcWriters.floats(); + case DOUBLE: + return GenericOrcWriters.doubles(); + case DATE: + return GenericOrcWriters.dates(); + case TIME: + return GenericOrcWriters.times(); + case TIMESTAMP: + Types.TimestampType timestampType = (Types.TimestampType) iPrimitive; + if (timestampType.shouldAdjustToUTC()) { + return GenericOrcWriters.timestampTz(); + } else { + return GenericOrcWriters.timestamp(); + } + case STRING: + return GenericOrcWriters.strings(); + case UUID: + return GenericOrcWriters.uuids(); + case FIXED: + return GenericOrcWriters.byteArrays(); + case BINARY: + return GenericOrcWriters.byteBuffers(); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) iPrimitive; + return GenericOrcWriters.decimal(decimalType.precision(), decimalType.scale()); + default: + throw new IllegalArgumentException(String.format("Invalid iceberg type %s corresponding to ORC type %s", + iPrimitive, primitive)); + } + } + } + + private static class RecordWriter implements OrcValueWriter { + private final List> writers; + + RecordWriter(List> writers) { + this.writers = writers; + } + + List> writers() { + return writers; + } + + @Override + public Class getJavaClass() { + return GenericRecord.class; + } + + @Override + @SuppressWarnings("unchecked") + public void nonNullWrite(int rowId, GenericRecord data, ColumnVector output) { + StructColumnVector cv = (StructColumnVector) output; + for (int c = 0; c < writers.size(); ++c) { + OrcValueWriter child = writers.get(c); + child.write(rowId, data.get(c), cv.fields[c]); + } + } + } +} diff --git a/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroParquetWriter.java b/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroParquetWriter.java new file mode 100644 index 000000000000..97006cb2adc2 --- /dev/null +++ b/beam/src/main/java/org/apache/iceberg/beam/writers/GenericAvroParquetWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam.writers; + +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.data.parquet.BaseParquetWriter; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.parquet.schema.MessageType; + +public class GenericAvroParquetWriter extends BaseParquetWriter { + private static final GenericAvroParquetWriter INSTANCE = new GenericAvroParquetWriter(); + + private GenericAvroParquetWriter() { + } + + public static ParquetValueWriter buildWriter(MessageType type) { + return INSTANCE.createWriter(type); + } + + @Override + protected ParquetValueWriters.StructWriter createStructWriter( + List> writers + ) { + return new RecordWriter(writers); + } + + private static class RecordWriter extends ParquetValueWriters.StructWriter { + private RecordWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(GenericRecord struct, int index) { + return struct.get(index); + } + } +} diff --git a/beam/src/test/avro/Cat.avsc b/beam/src/test/avro/Cat.avsc new file mode 100644 index 000000000000..ced91dfa5ae2 --- /dev/null +++ b/beam/src/test/avro/Cat.avsc @@ -0,0 +1,8 @@ +{ + "name": "Cat", + "namespace": "org.apache.iceberg.test", + "type": "record", + "fields" : [ + {"name": "breed", "type": "string"} + ] +} \ No newline at end of file diff --git a/beam/src/test/java/org/apache/iceberg/beam/BaseTest.java b/beam/src/test/java/org/apache/iceberg/beam/BaseTest.java new file mode 100644 index 000000000000..67dcd8bc5920 --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/BaseTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.beam.util.TestHiveMetastore; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.test.Cat; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; + +public abstract class BaseTest { + public static final List FILEFORMATS = Arrays.asList( + FileFormat.AVRO, + FileFormat.PARQUET, + FileFormat.ORC + ); + + protected static final Instant START_TIME = new Instant(0); + protected static final List specificCats = Arrays.asList( + Cat.newBuilder().setBreed("Ragdoll").build(), + Cat.newBuilder().setBreed("Oriental").build(), + Cat.newBuilder().setBreed("Birman").build(), + Cat.newBuilder().setBreed("Sphynx").build() + ); + protected static final List genericCats = Arrays.asList( + Cat.newBuilder().setBreed("Ragdoll").build(), + Cat.newBuilder().setBreed("Oriental").build(), + Cat.newBuilder().setBreed("Birman").build(), + Cat.newBuilder().setBreed("Sphynx").build() + ); + + protected static final PipelineOptions options = TestPipeline.testingPipelineOptions(); + + private static TestHiveMetastore metastore; + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + protected final String hiveMetastoreUrl = "thrift://localhost:9083/default"; + + @BeforeClass + public static void startMetastore() { + metastore = new TestHiveMetastore(); + metastore.start(); + } + + @AfterClass + public static void stopMetastore() { + metastore.stop(); + } + + protected TimestampedValue event(Object word, Long timestamp) { + return TimestampedValue.of(word, START_TIME.plus(new Duration(timestamp))); + } +} diff --git a/beam/src/test/java/org/apache/iceberg/beam/BatchTest.java b/beam/src/test/java/org/apache/iceberg/beam/BatchTest.java new file mode 100644 index 000000000000..8f308e82fa10 --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/BatchTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import java.util.Arrays; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.test.Cat; +import org.junit.Test; + +public class BatchTest extends BaseTest { + + @Test + public void testWriteFiles() { + for (FileFormat format : FILEFORMATS) { + runPipeline(format); + } + } + + public void runPipeline(FileFormat fileFormat) { + final Pipeline p = Pipeline.create(options); + + p.getCoderRegistry().registerCoderForClass(GenericRecord.class, AvroCoder.of(Cat.SCHEMA$)); + + PCollection records = p.apply(Create.of(genericCats)); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(Cat.getClassSchema()); + TableIdentifier name = TableIdentifier.of("default", "test_batch_" + fileFormat.name()); + + new IcebergIO.Builder() + .withSchema(icebergSchema) + .withTableIdentifier(name) + .withHiveMetastoreUrl(hiveMetastoreUrl) + .conf(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()) + .build(records); + + p.run(); + } + + @Test + public void testWriteFilesSpecific() { + for (FileFormat format : FILEFORMATS) { + runPipelineSpecific(format); + } + } + + public void runPipelineSpecific(FileFormat fileFormat) { + final Pipeline p = Pipeline.create(options); + + p.getCoderRegistry().registerCoderForClass(Cat.class, AvroCoder.of(Cat.SCHEMA$)); + + PCollection cats = p.apply(Create.of(specificCats)); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(Cat.getClassSchema()); + TableIdentifier name = TableIdentifier.of("default", "test_specific_batch_" + fileFormat.name()); + + new IcebergIO.Builder() + .withSchema(icebergSchema) + .withTableIdentifier(name) + .withHiveMetastoreUrl(hiveMetastoreUrl) + .conf(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()) + .build(cats); + + p.run(); + } +} diff --git a/beam/src/test/java/org/apache/iceberg/beam/FileWriterTest.java b/beam/src/test/java/org/apache/iceberg/beam/FileWriterTest.java new file mode 100644 index 000000000000..c3dbd22a9c38 --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/FileWriterTest.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.beam; + +import java.util.Map; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.test.Cat; +import org.junit.Test; + +public class FileWriterTest extends BaseTest { + + @Test + public void testWriteFiles() { + for (FileFormat format : FILEFORMATS) { + writeFile(format); + } + } + + public void writeFile(FileFormat fileFormat) { + Map properties = Maps.newHashMap(); + properties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()); + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(Cat.getClassSchema()); + TableIdentifier name = TableIdentifier.of("default", "test_file_writer_" + fileFormat.name()); + + FileWriter f = new FileWriter<>( + name, + icebergSchema, + PartitionSpec.unpartitioned(), + hiveMetastoreUrl, + Maps.newHashMap() + ); + + f.start(); + + for (Cat cat : specificCats) { + f.appendRecord(cat, null, 0, 0); + } + + DataFile[] files = f.finish(); + + assert (files.length == 1); + } +} diff --git a/beam/src/test/java/org/apache/iceberg/beam/StreamingTest.java b/beam/src/test/java/org/apache/iceberg/beam/StreamingTest.java new file mode 100644 index 000000000000..e1bfd2b12fb8 --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/StreamingTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam; + +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.test.Cat; +import org.joda.time.Duration; +import org.junit.Test; + +public class StreamingTest extends BaseTest { + private static final Duration WINDOW_DURATION = Duration.standardMinutes(1); + + @Test + public void testWriteFiles() { + for (FileFormat format : FILEFORMATS) { + runPipeline(format); + } + } + + public void runPipeline(FileFormat fileFormat) { + // We should see four commits in the log + TestStream records = + TestStream.create(AvroCoder.of(Cat.getClassSchema())) + .advanceWatermarkTo(START_TIME) + .addElements(event(genericCats.get(0), 2L)) + .advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(60L))) + .addElements(event(genericCats.get(1), 62L)) + .advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(120L))) + .addElements(event(genericCats.get(2), 122L)) + .advanceWatermarkTo(START_TIME.plus(Duration.standardSeconds(180L))) + .addElements(event(genericCats.get(3), 182L)) + .advanceWatermarkToInfinity(); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(Cat.getClassSchema()); + TableIdentifier name = TableIdentifier.of("default", "test_streaming_" + fileFormat.name()); + + PCollection windowed = pipeline + .apply(records) + .apply(Window.into(FixedWindows.of(WINDOW_DURATION))); + + new IcebergIO.Builder() + .withSchema(icebergSchema) + .withTableIdentifier(name) + .withHiveMetastoreUrl(hiveMetastoreUrl) + .conf(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name()) + .build(windowed); + + pipeline.run(options).waitUntilFinish(); + } +} diff --git a/beam/src/test/java/org/apache/iceberg/beam/util/ScriptRunner.java b/beam/src/test/java/org/apache/iceberg/beam/util/ScriptRunner.java new file mode 100644 index 000000000000..12be3ca45b0d --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/util/ScriptRunner.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam.util; + +import java.io.IOException; +import java.io.LineNumberReader; +import java.io.PrintWriter; +import java.io.Reader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +public class ScriptRunner { + + private static final String DEFAULT_DELIMITER = ";"; + + private final Connection connection; + + private final boolean stopOnError; + private final boolean autoCommit; + + private PrintWriter logWriter = new PrintWriter(System.out); + private PrintWriter errorLogWriter = new PrintWriter(System.err); + + private String delimiter = DEFAULT_DELIMITER; + private boolean fullLineDelimiter = false; + + /** + * Default constructor + */ + public ScriptRunner(Connection connection, boolean autoCommit, + boolean stopOnError) { + this.connection = connection; + this.autoCommit = autoCommit; + this.stopOnError = stopOnError; + } + + public void setDelimiter(String newDelimiter, boolean newFullLineDelimiter) { + this.delimiter = newDelimiter; + this.fullLineDelimiter = newFullLineDelimiter; + } + + /** + * Setter for logWriter property + * + * @param logWriter - the new value of the logWriter property + */ + public void setLogWriter(PrintWriter logWriter) { + this.logWriter = logWriter; + } + + /** + * Setter for errorLogWriter property + * + * @param errorLogWriter - the new value of the errorLogWriter property + */ + public void setErrorLogWriter(PrintWriter errorLogWriter) { + this.errorLogWriter = errorLogWriter; + } + + /** + * Runs an SQL script (read in using the Reader parameter) + * + * @param reader - the source of the script + */ + public void runScript(Reader reader) throws IOException, SQLException { + try { + boolean originalAutoCommit = connection.getAutoCommit(); + try { + if (originalAutoCommit != this.autoCommit) { + connection.setAutoCommit(this.autoCommit); + } + runScript(connection, reader); + } finally { + connection.setAutoCommit(originalAutoCommit); + } + } catch (IOException | SQLException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException("Error running script. Cause: " + e, e); + } + } + + /** + * Runs an SQL script (read in using the Reader parameter) using the connection passed in + * + * @param conn - the connection to use for the script + * @param reader - the source of the script + * @throws SQLException if any SQL errors occur + * @throws IOException if there is an error reading from the Reader + */ + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private void runScript(Connection conn, Reader reader) throws IOException, SQLException { + StringBuilder command = null; + try { + LineNumberReader lineReader = new LineNumberReader(reader); + String line = null; + while ((line = lineReader.readLine()) != null) { + if (command == null) { + command = new StringBuilder(); + } + String trimmedLine = line.trim(); + if (trimmedLine.startsWith("--")) { + println(trimmedLine); + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) { + // Do nothing + } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("--")) { + // Do nothing + } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter()) || + fullLineDelimiter && trimmedLine.equals(getDelimiter())) { + command.append(line, 0, line + .lastIndexOf(getDelimiter())); + command.append(" "); + Statement statement = conn.createStatement(); + + println(command); + + boolean hasResults = false; + if (stopOnError) { + hasResults = statement.execute(command.toString()); + } else { + try { + statement.execute(command.toString()); + } catch (SQLException e) { + e.fillInStackTrace(); + printlnError("Error executing: " + command); + printlnError(e); + } + } + + if (autoCommit && !conn.getAutoCommit()) { + conn.commit(); + } + + ResultSet rs = statement.getResultSet(); + if (hasResults && rs != null) { + ResultSetMetaData md = rs.getMetaData(); + int cols = md.getColumnCount(); + for (int i = 0; i < cols; i++) { + String name = md.getColumnLabel(i); + print(name + "\t"); + } + println(""); + while (rs.next()) { + for (int i = 0; i < cols; i++) { + String value = rs.getString(i); + print(value + "\t"); + } + println(""); + } + } + + command = null; + try { + statement.close(); + } catch (Exception e) { + // Ignore to workaround a bug in Jakarta DBCP + } + Thread.yield(); + } else { + command.append(line); + command.append(" "); + } + } + if (!autoCommit) { + conn.commit(); + } + } catch (SQLException e) { + e.fillInStackTrace(); + printlnError("Error executing: " + command); + printlnError(e); + throw e; + } catch (IOException e) { + e.fillInStackTrace(); + printlnError("Error executing: " + command); + printlnError(e); + throw e; + } finally { + conn.rollback(); + flush(); + } + } + + private String getDelimiter() { + return delimiter; + } + + private void print(Object obj) { + if (logWriter != null) { + System.out.print(obj); + } + } + + private void println(Object obj) { + if (logWriter != null) { + logWriter.println(obj); + } + } + + private void printlnError(Object obj) { + if (errorLogWriter != null) { + errorLogWriter.println(obj); + } + } + + private void flush() { + if (logWriter != null) { + logWriter.flush(); + } + if (errorLogWriter != null) { + errorLogWriter.flush(); + } + } +} diff --git a/beam/src/test/java/org/apache/iceberg/beam/util/TestHiveMetastore.java b/beam/src/test/java/org/apache/iceberg/beam/util/TestHiveMetastore.java new file mode 100644 index 000000000000..057b4bd429ba --- /dev/null +++ b/beam/src/test/java/org/apache/iceberg/beam/util/TestHiveMetastore.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.beam.util; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.RetryingHMSHandler; +import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.hive.HiveClientPool; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TTransportFactory; + +import static java.nio.file.Files.createTempDirectory; +import static java.nio.file.attribute.PosixFilePermissions.asFileAttribute; +import static java.nio.file.attribute.PosixFilePermissions.fromString; + +public class TestHiveMetastore { + + private static final String DEFAULT_DATABASE_NAME = "default"; + private static final int DEFAULT_POOL_SIZE = 5; + + // create the metastore handlers based on whether we're working with Hive2 or Hive3 dependencies + // we need to do this because there is a breaking API change between Hive2 and Hive3 + private static final DynConstructors.Ctor HMS_HANDLER_CTOR = DynConstructors.builder() + .impl(HiveMetaStore.HMSHandler.class, String.class, Configuration.class) + .impl(HiveMetaStore.HMSHandler.class, String.class, HiveConf.class) + .build(); + + private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER = DynMethods.builder("getProxy") + .impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class) + .impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class) + .buildStatic(); + + // Hive3 introduces background metastore tasks (MetastoreTaskThread) for performing various cleanup duties. These + // threads are scheduled and executed in a static thread pool (org.apache.hadoop.hive.metastore.ThreadPool). + // This thread pool is shut down normally as part of the JVM shutdown hook, but since we're creating and tearing down + // multiple metastore instances within the same JVM, we have to call this cleanup method manually, otherwise + // threads from our previous test suite will be stuck in the pool with stale config, and keep on being scheduled. + // This can lead to issues, e.g. accidental Persistence Manager closure by ScheduledQueryExecutionsMaintTask. + private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN = DynMethods.builder("shutdown") + .impl("org.apache.hadoop.hive.metastore.ThreadPool") + .orNoop() + .buildStatic(); + + private File hiveLocalDir; + private HiveConf hiveConf; + private ExecutorService executorService; + private TServer server; + private HiveMetaStore.HMSHandler baseHandler; + private HiveClientPool clientPool; + + /** + * Starts a TestHiveMetastore with the default connection pool size (5). + */ + public void start() { + start(DEFAULT_POOL_SIZE); + } + + /** + * Starts a TestHiveMetastore with a provided connection pool size. + * + * @param poolSize The number of threads in the executor pool + */ + public void start(int poolSize) { + try { + this.hiveLocalDir = createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile(); + File derbyLogFile = new File(hiveLocalDir, "derby.log"); + System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath()); + setupMetastoreDB("jdbc:derby:" + getDerbyPath() + ";create=true"); + + TServerSocket socket = new TServerSocket(0); + int port = socket.getServerSocket().getLocalPort(); + this.hiveConf = newHiveConf(port); + this.server = newThriftServer(socket, poolSize, hiveConf); + this.executorService = Executors.newSingleThreadExecutor(); + this.executorService.submit(() -> server.serve()); + + // in Hive3, setting this as a system prop ensures that it will be picked up whenever a new HiveConf is created + System.setProperty(HiveConf.ConfVars.METASTOREURIS.varname, hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); + + this.clientPool = new HiveClientPool(1, hiveConf); + } catch (Exception e) { + throw new RuntimeException("Cannot start TestHiveMetastore", e); + } + } + + public void stop() { + if (clientPool != null) { + clientPool.close(); + } + if (server != null) { + server.stop(); + } + if (executorService != null) { + executorService.shutdown(); + } + if (hiveLocalDir != null) { + hiveLocalDir.delete(); + } + if (baseHandler != null) { + baseHandler.shutdown(); + } + METASTORE_THREADS_SHUTDOWN.invoke(); + } + + public HiveConf hiveConf() { + return hiveConf; + } + + public HiveClientPool clientPool() { + return clientPool; + } + + public String getDatabasePath(String dbName) { + File dbDir = new File(hiveLocalDir, dbName + ".db"); + return dbDir.getPath(); + } + + public void reset() throws Exception { + for (String dbName : clientPool.run(HiveMetaStoreClient::getAllDatabases)) { + for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) { + clientPool.run(client -> { + client.dropTable(dbName, tblName, true, true, true); + return null; + }); + } + + if (!DEFAULT_DATABASE_NAME.equals(dbName)) { + // Drop cascade, functions dropped by cascade + clientPool.run(client -> { + client.dropDatabase(dbName, true, true, true); + return null; + }); + } + } + + Path warehouseRoot = new Path(hiveLocalDir.getAbsolutePath()); + FileSystem fs = Util.getFs(warehouseRoot, hiveConf); + for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) { + if (!fileStatus.getPath().getName().equals("derby.log") && + !fileStatus.getPath().getName().equals("metastore_db")) { + fs.delete(fileStatus.getPath(), true); + } + } + } + + private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf) throws Exception { + HiveConf serverConf = new HiveConf(conf); + serverConf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, "jdbc:derby:" + getDerbyPath() + ";create=true"); + baseHandler = HMS_HANDLER_CTOR.newInstance("new db based metaserver", serverConf); + IHMSHandler handler = GET_BASE_HMS_HANDLER.invoke(serverConf, baseHandler, false); + + TThreadPoolServer.Args args = new TThreadPoolServer.Args(socket) + .processor(new TSetIpAddressProcessor<>(handler)) + .transportFactory(new TTransportFactory()) + .protocolFactory(new TBinaryProtocol.Factory()) + .minWorkerThreads(poolSize) + .maxWorkerThreads(poolSize); + + return new TThreadPoolServer(args); + } + + private HiveConf newHiveConf(int port) { + HiveConf newHiveConf = new HiveConf(new Configuration(), TestHiveMetastore.class); + newHiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://localhost:" + port); + newHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, "file:" + hiveLocalDir.getAbsolutePath()); + newHiveConf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false"); + newHiveConf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false"); + newHiveConf.set("iceberg.hive.client-pool-size", "2"); + return newHiveConf; + } + + private void setupMetastoreDB(String dbURL) throws SQLException, IOException { + Connection connection = DriverManager.getConnection(dbURL); + ScriptRunner scriptRunner = new ScriptRunner(connection, true, true); + + ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + + try (InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql"); + Reader reader = new InputStreamReader(inputStream)) { + scriptRunner.runScript(reader); + } + } + + private String getDerbyPath() { + File metastoreDB = new File(hiveLocalDir, "metastore_db"); + return metastoreDB.getPath(); + } +} diff --git a/beam/src/test/resources/hive-schema-3.1.0.derby.sql b/beam/src/test/resources/hive-schema-3.1.0.derby.sql new file mode 100644 index 000000000000..cf26b9117635 --- /dev/null +++ b/beam/src/test/resources/hive-schema-3.1.0.derby.sql @@ -0,0 +1,1255 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- This file was copied from Apache Hive, at: +-- https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-3.1.0.derby.sql +-- +-- This has been modified slightly for compatibility with older Hive versions. +-- +-- Timestamp: 2011-09-22 15:32:02.024 +-- Source database is: /home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb +-- Connection URL is: jdbc:derby:/home/carl/Work/repos/hive1/metastore/scripts/upgrade/derby/mdb +-- Specified schema is: APP +-- appendLogs: false + +-- ---------------------------------------------- +-- DDL Statements for functions +-- ---------------------------------------------- + +CREATE FUNCTION "APP"."NUCLEUS_ASCII"(C CHAR (1)) RETURNS INTEGER + LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.ascii'; + +CREATE FUNCTION "APP"."NUCLEUS_MATCHES"(TEXT VARCHAR (8000), PATTERN VARCHAR (8000)) RETURNS INTEGER + LANGUAGE JAVA PARAMETER STYLE JAVA READS SQL DATA CALLED ON NULL INPUT EXTERNAL NAME 'org.datanucleus.store.rdbms.adapter.DerbySQLFunction.matches'; + +-- ---------------------------------------------- +-- DDL Statements for tables +-- ---------------------------------------------- +CREATE TABLE "APP"."DBS" +( + "DB_ID" BIGINT NOT NULL, + "DESC" VARCHAR(4000), + "DB_LOCATION_URI" VARCHAR(4000) NOT NULL, + "NAME" VARCHAR(128), + "OWNER_NAME" VARCHAR(128), + "OWNER_TYPE" VARCHAR(10), + "CTLG_NAME" VARCHAR(256) +); + +CREATE TABLE "APP"."TBL_PRIVS" +( + "TBL_GRANT_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "TBL_PRIV" VARCHAR(128), + "TBL_ID" BIGINT, + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."DATABASE_PARAMS" +( + "DB_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(180) NOT NULL, + "PARAM_VALUE" VARCHAR(4000) +); + +CREATE TABLE "APP"."TBL_COL_PRIVS" +( + "TBL_COLUMN_GRANT_ID" BIGINT NOT NULL, + "COLUMN_NAME" VARCHAR(767), + "CREATE_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "TBL_COL_PRIV" VARCHAR(128), + "TBL_ID" BIGINT, + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."SERDE_PARAMS" +( + "SERDE_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(256) NOT NULL, + "PARAM_VALUE" CLOB +); + +CREATE TABLE "APP"."COLUMNS_V2" +( + "CD_ID" BIGINT NOT NULL, + "COMMENT" VARCHAR(4000), + "COLUMN_NAME" VARCHAR(767) NOT NULL, + "TYPE_NAME" CLOB, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."SORT_COLS" +( + "SD_ID" BIGINT NOT NULL, + "COLUMN_NAME" VARCHAR(767), + "ORDER" INTEGER NOT NULL, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."CDS" +( + "CD_ID" BIGINT NOT NULL +); + +CREATE TABLE "APP"."PARTITION_KEY_VALS" +( + "PART_ID" BIGINT NOT NULL, + "PART_KEY_VAL" VARCHAR(256), + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."DB_PRIVS" +( + "DB_GRANT_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "DB_ID" BIGINT, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "DB_PRIV" VARCHAR(128), + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."IDXS" +( + "INDEX_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "DEFERRED_REBUILD" CHAR(1) NOT NULL, + "INDEX_HANDLER_CLASS" VARCHAR(4000), + "INDEX_NAME" VARCHAR(128), + "INDEX_TBL_ID" BIGINT, + "LAST_ACCESS_TIME" INTEGER NOT NULL, + "ORIG_TBL_ID" BIGINT, + "SD_ID" BIGINT +); + +CREATE TABLE "APP"."INDEX_PARAMS" +( + "INDEX_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(256) NOT NULL, + "PARAM_VALUE" VARCHAR(4000) +); + +CREATE TABLE "APP"."PARTITIONS" +( + "PART_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "LAST_ACCESS_TIME" INTEGER NOT NULL, + "PART_NAME" VARCHAR(767), + "SD_ID" BIGINT, + "TBL_ID" BIGINT +); + +CREATE TABLE "APP"."SERDES" +( + "SERDE_ID" BIGINT NOT NULL, + "NAME" VARCHAR(128), + "SLIB" VARCHAR(4000), + "DESCRIPTION" VARCHAR(4000), + "SERIALIZER_CLASS" VARCHAR(4000), + "DESERIALIZER_CLASS" VARCHAR(4000), + SERDE_TYPE INTEGER +); + +CREATE TABLE "APP"."PART_PRIVS" +( + "PART_GRANT_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PART_ID" BIGINT, + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "PART_PRIV" VARCHAR(128), + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."ROLE_MAP" +( + "ROLE_GRANT_ID" BIGINT NOT NULL, + "ADD_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "ROLE_ID" BIGINT +); + +CREATE TABLE "APP"."TYPES" +( + "TYPES_ID" BIGINT NOT NULL, + "TYPE_NAME" VARCHAR(128), + "TYPE1" VARCHAR(767), + "TYPE2" VARCHAR(767) +); + +CREATE TABLE "APP"."GLOBAL_PRIVS" +( + "USER_GRANT_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "USER_PRIV" VARCHAR(128), + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."PARTITION_PARAMS" +( + "PART_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(256) NOT NULL, + "PARAM_VALUE" VARCHAR(4000) +); + +CREATE TABLE "APP"."PARTITION_EVENTS" +( + "PART_NAME_ID" BIGINT NOT NULL, + "CAT_NAME" VARCHAR(256), + "DB_NAME" VARCHAR(128), + "EVENT_TIME" BIGINT NOT NULL, + "EVENT_TYPE" INTEGER NOT NULL, + "PARTITION_NAME" VARCHAR(767), + "TBL_NAME" VARCHAR(256) +); + +CREATE TABLE "APP"."COLUMNS" +( + "SD_ID" BIGINT NOT NULL, + "COMMENT" VARCHAR(256), + "COLUMN_NAME" VARCHAR(128) NOT NULL, + "TYPE_NAME" VARCHAR(4000) NOT NULL, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."ROLES" +( + "ROLE_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "OWNER_NAME" VARCHAR(128), + "ROLE_NAME" VARCHAR(128) +); + +CREATE TABLE "APP"."TBLS" +( + "TBL_ID" BIGINT NOT NULL, + "CREATE_TIME" INTEGER NOT NULL, + "DB_ID" BIGINT, + "LAST_ACCESS_TIME" INTEGER NOT NULL, + "OWNER" VARCHAR(767), + "OWNER_TYPE" VARCHAR(10), + "RETENTION" INTEGER NOT NULL, + "SD_ID" BIGINT, + "TBL_NAME" VARCHAR(256), + "TBL_TYPE" VARCHAR(128), + "VIEW_EXPANDED_TEXT" LONG VARCHAR, + "VIEW_ORIGINAL_TEXT" LONG VARCHAR, + "IS_REWRITE_ENABLED" CHAR(1) NOT NULL DEFAULT 'N' +); + +CREATE TABLE "APP"."PARTITION_KEYS" +( + "TBL_ID" BIGINT NOT NULL, + "PKEY_COMMENT" VARCHAR(4000), + "PKEY_NAME" VARCHAR(128) NOT NULL, + "PKEY_TYPE" VARCHAR(767) NOT NULL, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."PART_COL_PRIVS" +( + "PART_COLUMN_GRANT_ID" BIGINT NOT NULL, + "COLUMN_NAME" VARCHAR(767), + "CREATE_TIME" INTEGER NOT NULL, + "GRANT_OPTION" SMALLINT NOT NULL, + "GRANTOR" VARCHAR(128), + "GRANTOR_TYPE" VARCHAR(128), + "PART_ID" BIGINT, + "PRINCIPAL_NAME" VARCHAR(128), + "PRINCIPAL_TYPE" VARCHAR(128), + "PART_COL_PRIV" VARCHAR(128), + "AUTHORIZER" VARCHAR(128) +); + +CREATE TABLE "APP"."SDS" +( + "SD_ID" BIGINT NOT NULL, + "INPUT_FORMAT" VARCHAR(4000), + "IS_COMPRESSED" CHAR(1) NOT NULL, + "LOCATION" VARCHAR(4000), + "NUM_BUCKETS" INTEGER NOT NULL, + "OUTPUT_FORMAT" VARCHAR(4000), + "SERDE_ID" BIGINT, + "CD_ID" BIGINT, + "IS_STOREDASSUBDIRECTORIES" CHAR(1) NOT NULL +); + +CREATE TABLE "APP"."SEQUENCE_TABLE" +( + "SEQUENCE_NAME" VARCHAR(256) NOT NULL, + "NEXT_VAL" BIGINT NOT NULL +); + +CREATE TABLE "APP"."TAB_COL_STATS" +( + "CAT_NAME" VARCHAR(256) NOT NULL, + "DB_NAME" VARCHAR(128) NOT NULL, + "TABLE_NAME" VARCHAR(256) NOT NULL, + "COLUMN_NAME" VARCHAR(767) NOT NULL, + "COLUMN_TYPE" VARCHAR(128) NOT NULL, + "LONG_LOW_VALUE" BIGINT, + "LONG_HIGH_VALUE" BIGINT, + "DOUBLE_LOW_VALUE" DOUBLE, + "DOUBLE_HIGH_VALUE" DOUBLE, + "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000), + "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000), + "NUM_DISTINCTS" BIGINT, + "NUM_NULLS" BIGINT NOT NULL, + "AVG_COL_LEN" DOUBLE, + "MAX_COL_LEN" BIGINT, + "NUM_TRUES" BIGINT, + "NUM_FALSES" BIGINT, + "LAST_ANALYZED" BIGINT, + "CS_ID" BIGINT NOT NULL, + "TBL_ID" BIGINT NOT NULL, + "BIT_VECTOR" BLOB +); + +CREATE TABLE "APP"."TABLE_PARAMS" +( + "TBL_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(256) NOT NULL, + "PARAM_VALUE" CLOB +); + +CREATE TABLE "APP"."BUCKETING_COLS" +( + "SD_ID" BIGINT NOT NULL, + "BUCKET_COL_NAME" VARCHAR(256), + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."TYPE_FIELDS" +( + "TYPE_NAME" BIGINT NOT NULL, + "COMMENT" VARCHAR(256), + "FIELD_NAME" VARCHAR(128) NOT NULL, + "FIELD_TYPE" VARCHAR(767) NOT NULL, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."NUCLEUS_TABLES" +( + "CLASS_NAME" VARCHAR(128) NOT NULL, + "TABLE_NAME" VARCHAR(128) NOT NULL, + "TYPE" VARCHAR(4) NOT NULL, + "OWNER" VARCHAR(2) NOT NULL, + "VERSION" VARCHAR(20) NOT NULL, + "INTERFACE_NAME" VARCHAR(256) DEFAULT NULL +); + +CREATE TABLE "APP"."SD_PARAMS" +( + "SD_ID" BIGINT NOT NULL, + "PARAM_KEY" VARCHAR(256) NOT NULL, + "PARAM_VALUE" CLOB +); + +CREATE TABLE "APP"."SKEWED_STRING_LIST" +( + "STRING_LIST_ID" BIGINT NOT NULL +); + +CREATE TABLE "APP"."SKEWED_STRING_LIST_VALUES" +( + "STRING_LIST_ID" BIGINT NOT NULL, + "STRING_LIST_VALUE" VARCHAR(256), + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."SKEWED_COL_NAMES" +( + "SD_ID" BIGINT NOT NULL, + "SKEWED_COL_NAME" VARCHAR(256), + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" +( + "SD_ID" BIGINT NOT NULL, + "STRING_LIST_ID_KID" BIGINT NOT NULL, + "LOCATION" VARCHAR(4000) +); + +CREATE TABLE "APP"."SKEWED_VALUES" +( + "SD_ID_OID" BIGINT NOT NULL, + "STRING_LIST_ID_EID" BIGINT NOT NULL, + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."MASTER_KEYS" +( + "KEY_ID" INTEGER NOT NULL generated always as identity (start with 1), + "MASTER_KEY" VARCHAR(767) +); + +CREATE TABLE "APP"."DELEGATION_TOKENS" +( + "TOKEN_IDENT" VARCHAR(767) NOT NULL, + "TOKEN" VARCHAR(767) +); + +CREATE TABLE "APP"."PART_COL_STATS" +( + "CAT_NAME" VARCHAR(256) NOT NULL, + "DB_NAME" VARCHAR(128) NOT NULL, + "TABLE_NAME" VARCHAR(256) NOT NULL, + "PARTITION_NAME" VARCHAR(767) NOT NULL, + "COLUMN_NAME" VARCHAR(767) NOT NULL, + "COLUMN_TYPE" VARCHAR(128) NOT NULL, + "LONG_LOW_VALUE" BIGINT, + "LONG_HIGH_VALUE" BIGINT, + "DOUBLE_LOW_VALUE" DOUBLE, + "DOUBLE_HIGH_VALUE" DOUBLE, + "BIG_DECIMAL_LOW_VALUE" VARCHAR(4000), + "BIG_DECIMAL_HIGH_VALUE" VARCHAR(4000), + "NUM_DISTINCTS" BIGINT, + "BIT_VECTOR" BLOB, + "NUM_NULLS" BIGINT NOT NULL, + "AVG_COL_LEN" DOUBLE, + "MAX_COL_LEN" BIGINT, + "NUM_TRUES" BIGINT, + "NUM_FALSES" BIGINT, + "LAST_ANALYZED" BIGINT, + "CS_ID" BIGINT NOT NULL, + "PART_ID" BIGINT NOT NULL +); + +CREATE TABLE "APP"."VERSION" +( + "VER_ID" BIGINT NOT NULL, + "SCHEMA_VERSION" VARCHAR(127) NOT NULL, + "VERSION_COMMENT" VARCHAR(255) +); + +CREATE TABLE "APP"."FUNCS" +( + "FUNC_ID" BIGINT NOT NULL, + "CLASS_NAME" VARCHAR(4000), + "CREATE_TIME" INTEGER NOT NULL, + "DB_ID" BIGINT, + "FUNC_NAME" VARCHAR(128), + "FUNC_TYPE" INTEGER NOT NULL, + "OWNER_NAME" VARCHAR(128), + "OWNER_TYPE" VARCHAR(10) +); + +CREATE TABLE "APP"."FUNC_RU" +( + "FUNC_ID" BIGINT NOT NULL, + "RESOURCE_TYPE" INTEGER NOT NULL, + "RESOURCE_URI" VARCHAR(4000), + "INTEGER_IDX" INTEGER NOT NULL +); + +CREATE TABLE "APP"."NOTIFICATION_LOG" +( + "NL_ID" BIGINT NOT NULL, + "CAT_NAME" VARCHAR(256), + "DB_NAME" VARCHAR(128), + "EVENT_ID" BIGINT NOT NULL, + "EVENT_TIME" INTEGER NOT NULL, + "EVENT_TYPE" VARCHAR(32) NOT NULL, + "MESSAGE" CLOB, + "TBL_NAME" VARCHAR(256), + "MESSAGE_FORMAT" VARCHAR(16) +); + +CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" +( + "NNI_ID" BIGINT NOT NULL, + "NEXT_EVENT_ID" BIGINT NOT NULL +); + +CREATE TABLE "APP"."KEY_CONSTRAINTS" +( + "CHILD_CD_ID" BIGINT, + "CHILD_INTEGER_IDX" INTEGER, + "CHILD_TBL_ID" BIGINT, + "PARENT_CD_ID" BIGINT, + "PARENT_INTEGER_IDX" INTEGER, + "PARENT_TBL_ID" BIGINT NOT NULL, + "POSITION" BIGINT NOT NULL, + "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, + "CONSTRAINT_TYPE" SMALLINT NOT NULL, + "UPDATE_RULE" SMALLINT, + "DELETE_RULE" SMALLINT, + "ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, + "DEFAULT_VALUE" VARCHAR(400) +); + +CREATE TABLE "APP"."METASTORE_DB_PROPERTIES" +( + "PROPERTY_KEY" VARCHAR(255) NOT NULL, + "PROPERTY_VALUE" VARCHAR(1000) NOT NULL, + "DESCRIPTION" VARCHAR(1000) +); + +CREATE TABLE "APP"."WM_RESOURCEPLAN" +( + RP_ID BIGINT NOT NULL, + NAME VARCHAR(128) NOT NULL, + QUERY_PARALLELISM INTEGER, + STATUS VARCHAR(20) NOT NULL, + DEFAULT_POOL_ID BIGINT +); + +CREATE TABLE "APP"."WM_POOL" +( + POOL_ID BIGINT NOT NULL, + RP_ID BIGINT NOT NULL, + PATH VARCHAR(1024) NOT NULL, + ALLOC_FRACTION DOUBLE, + QUERY_PARALLELISM INTEGER, + SCHEDULING_POLICY VARCHAR(1024) +); + +CREATE TABLE "APP"."WM_TRIGGER" +( + TRIGGER_ID BIGINT NOT NULL, + RP_ID BIGINT NOT NULL, + NAME VARCHAR(128) NOT NULL, + TRIGGER_EXPRESSION VARCHAR(1024), + ACTION_EXPRESSION VARCHAR(1024), + IS_IN_UNMANAGED INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE "APP"."WM_POOL_TO_TRIGGER" +( + POOL_ID BIGINT NOT NULL, + TRIGGER_ID BIGINT NOT NULL +); + +CREATE TABLE "APP"."WM_MAPPING" +( + MAPPING_ID BIGINT NOT NULL, + RP_ID BIGINT NOT NULL, + ENTITY_TYPE VARCHAR(128) NOT NULL, + ENTITY_NAME VARCHAR(128) NOT NULL, + POOL_ID BIGINT, + ORDERING INTEGER +); + +CREATE TABLE "APP"."MV_CREATION_METADATA" +( + "MV_CREATION_METADATA_ID" BIGINT NOT NULL, + "CAT_NAME" VARCHAR(256) NOT NULL, + "DB_NAME" VARCHAR(128) NOT NULL, + "TBL_NAME" VARCHAR(256) NOT NULL, + "TXN_LIST" CLOB, + "MATERIALIZATION_TIME" BIGINT NOT NULL +); + +CREATE TABLE "APP"."MV_TABLES_USED" +( + "MV_CREATION_METADATA_ID" BIGINT NOT NULL, + "TBL_ID" BIGINT NOT NULL +); + +CREATE TABLE "APP"."CTLGS" +( + "CTLG_ID" BIGINT NOT NULL, + "NAME" VARCHAR(256) UNIQUE, + "DESC" VARCHAR(4000), + "LOCATION_URI" VARCHAR(4000) NOT NULL +); + +-- ---------------------------------------------- +-- DML Statements +-- ---------------------------------------------- + +INSERT INTO "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID", "NEXT_EVENT_ID") +SELECT * +FROM (VALUES (1, 1)) tmp_table +WHERE NOT EXISTS(SELECT "NEXT_EVENT_ID" FROM "APP"."NOTIFICATION_SEQUENCE"); + +INSERT INTO "APP"."SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") +SELECT * +FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MNotificationLog', 1)) tmp_table +WHERE NOT EXISTS(SELECT "NEXT_VAL" + FROM "APP"."SEQUENCE_TABLE" + WHERE "SEQUENCE_NAME" = 'org.apache.hadoop.hive.metastore.model.MNotificationLog'); + +-- ---------------------------------------------- +-- DDL Statements for indexes +-- ---------------------------------------------- + +CREATE +UNIQUE INDEX "APP"."UNIQUEINDEX" ON "APP"."IDXS" ("INDEX_NAME", "ORIG_TBL_ID"); + +CREATE +INDEX "APP"."TABLECOLUMNPRIVILEGEINDEX" ON "APP"."TBL_COL_PRIVS" ("AUTHORIZER", "TBL_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_COL_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."DBPRIVILEGEINDEX" ON "APP"."DB_PRIVS" ("AUTHORIZER", "DB_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "DB_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +INDEX "APP"."PCS_STATS_IDX" ON "APP"."PART_COL_STATS" ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME","PARTITION_NAME"); + +CREATE +INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME"); + +CREATE +INDEX "APP"."PARTPRIVILEGEINDEX" ON "APP"."PART_PRIVS" ("AUTHORIZER", "PART_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."ROLEENTITYINDEX" ON "APP"."ROLES" ("ROLE_NAME"); + +CREATE +INDEX "APP"."TABLEPRIVILEGEINDEX" ON "APP"."TBL_PRIVS" ("AUTHORIZER", "TBL_ID", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."UNIQUETABLE" ON "APP"."TBLS" ("TBL_NAME", "DB_ID"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_DATABASE" ON "APP"."DBS" ("NAME", "CTLG_NAME"); + +CREATE +UNIQUE INDEX "APP"."USERROLEMAPINDEX" ON "APP"."ROLE_MAP" ("PRINCIPAL_NAME", "ROLE_ID", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."GLOBALPRIVILEGEINDEX" ON "APP"."GLOBAL_PRIVS" ("AUTHORIZER", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "USER_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_TYPE" ON "APP"."TYPES" ("TYPE_NAME"); + +CREATE +INDEX "APP"."PARTITIONCOLUMNPRIVILEGEINDEX" ON "APP"."PART_COL_PRIVS" ("AUTHORIZER", "PART_ID", "COLUMN_NAME", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "PART_COL_PRIV", "GRANTOR", "GRANTOR_TYPE"); + +CREATE +UNIQUE INDEX "APP"."UNIQUEPARTITION" ON "APP"."PARTITIONS" ("PART_NAME", "TBL_ID"); + +CREATE +UNIQUE INDEX "APP"."UNIQUEFUNCTION" ON "APP"."FUNCS" ("FUNC_NAME", "DB_ID"); + +CREATE +INDEX "APP"."FUNCS_N49" ON "APP"."FUNCS" ("DB_ID"); + +CREATE +INDEX "APP"."FUNC_RU_N49" ON "APP"."FUNC_RU" ("FUNC_ID"); + +CREATE +INDEX "APP"."CONSTRAINTS_PARENT_TBL_ID_INDEX" ON "APP"."KEY_CONSTRAINTS"("PARENT_TBL_ID"); + +CREATE +INDEX "APP"."CONSTRAINTS_CONSTRAINT_TYPE_INDEX" ON "APP"."KEY_CONSTRAINTS"("CONSTRAINT_TYPE"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" ("NAME"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_WM_POOL" ON "APP"."WM_POOL" ("RP_ID", "PATH"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_WM_TRIGGER" ON "APP"."WM_TRIGGER" ("RP_ID", "NAME"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_WM_MAPPING" ON "APP"."WM_MAPPING" ("RP_ID", "ENTITY_TYPE", "ENTITY_NAME"); + +CREATE +UNIQUE INDEX "APP"."MV_UNIQUE_TABLE" ON "APP"."MV_CREATION_METADATA" ("TBL_NAME", "DB_NAME"); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_CATALOG" ON "APP"."CTLGS" ("NAME"); + + +-- ---------------------------------------------- +-- DDL Statements for keys +-- ---------------------------------------------- + +-- primary/unique +ALTER TABLE "APP"."IDXS" + ADD CONSTRAINT "IDXS_PK" PRIMARY KEY ("INDEX_ID"); + +ALTER TABLE "APP"."TBL_COL_PRIVS" + ADD CONSTRAINT "TBL_COL_PRIVS_PK" PRIMARY KEY ("TBL_COLUMN_GRANT_ID"); + +ALTER TABLE "APP"."CDS" + ADD CONSTRAINT "SQL110922153006460" PRIMARY KEY ("CD_ID"); + +ALTER TABLE "APP"."DB_PRIVS" + ADD CONSTRAINT "DB_PRIVS_PK" PRIMARY KEY ("DB_GRANT_ID"); + +ALTER TABLE "APP"."INDEX_PARAMS" + ADD CONSTRAINT "INDEX_PARAMS_PK" PRIMARY KEY ("INDEX_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."PARTITION_KEYS" + ADD CONSTRAINT "PARTITION_KEY_PK" PRIMARY KEY ("TBL_ID", "PKEY_NAME"); + +ALTER TABLE "APP"."SEQUENCE_TABLE" + ADD CONSTRAINT "SEQUENCE_TABLE_PK" PRIMARY KEY ("SEQUENCE_NAME"); + +ALTER TABLE "APP"."PART_PRIVS" + ADD CONSTRAINT "PART_PRIVS_PK" PRIMARY KEY ("PART_GRANT_ID"); + +ALTER TABLE "APP"."SDS" + ADD CONSTRAINT "SDS_PK" PRIMARY KEY ("SD_ID"); + +ALTER TABLE "APP"."SERDES" + ADD CONSTRAINT "SERDES_PK" PRIMARY KEY ("SERDE_ID"); + +ALTER TABLE "APP"."COLUMNS" + ADD CONSTRAINT "COLUMNS_PK" PRIMARY KEY ("SD_ID", "COLUMN_NAME"); + +ALTER TABLE "APP"."PARTITION_EVENTS" + ADD CONSTRAINT "PARTITION_EVENTS_PK" PRIMARY KEY ("PART_NAME_ID"); + +ALTER TABLE "APP"."TYPE_FIELDS" + ADD CONSTRAINT "TYPE_FIELDS_PK" PRIMARY KEY ("TYPE_NAME", "FIELD_NAME"); + +ALTER TABLE "APP"."ROLES" + ADD CONSTRAINT "ROLES_PK" PRIMARY KEY ("ROLE_ID"); + +ALTER TABLE "APP"."TBL_PRIVS" + ADD CONSTRAINT "TBL_PRIVS_PK" PRIMARY KEY ("TBL_GRANT_ID"); + +ALTER TABLE "APP"."SERDE_PARAMS" + ADD CONSTRAINT "SERDE_PARAMS_PK" PRIMARY KEY ("SERDE_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."NUCLEUS_TABLES" + ADD CONSTRAINT "NUCLEUS_TABLES_PK" PRIMARY KEY ("CLASS_NAME"); + +ALTER TABLE "APP"."TBLS" + ADD CONSTRAINT "TBLS_PK" PRIMARY KEY ("TBL_ID"); + +ALTER TABLE "APP"."SD_PARAMS" + ADD CONSTRAINT "SD_PARAMS_PK" PRIMARY KEY ("SD_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."DATABASE_PARAMS" + ADD CONSTRAINT "DATABASE_PARAMS_PK" PRIMARY KEY ("DB_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."DBS" + ADD CONSTRAINT "DBS_PK" PRIMARY KEY ("DB_ID"); + +ALTER TABLE "APP"."ROLE_MAP" + ADD CONSTRAINT "ROLE_MAP_PK" PRIMARY KEY ("ROLE_GRANT_ID"); + +ALTER TABLE "APP"."GLOBAL_PRIVS" + ADD CONSTRAINT "GLOBAL_PRIVS_PK" PRIMARY KEY ("USER_GRANT_ID"); + +ALTER TABLE "APP"."BUCKETING_COLS" + ADD CONSTRAINT "BUCKETING_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."SORT_COLS" + ADD CONSTRAINT "SORT_COLS_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."PARTITION_KEY_VALS" + ADD CONSTRAINT "PARTITION_KEY_VALS_PK" PRIMARY KEY ("PART_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."TYPES" + ADD CONSTRAINT "TYPES_PK" PRIMARY KEY ("TYPES_ID"); + +ALTER TABLE "APP"."COLUMNS_V2" + ADD CONSTRAINT "SQL110922153006740" PRIMARY KEY ("CD_ID", "COLUMN_NAME"); + +ALTER TABLE "APP"."PART_COL_PRIVS" + ADD CONSTRAINT "PART_COL_PRIVS_PK" PRIMARY KEY ("PART_COLUMN_GRANT_ID"); + +ALTER TABLE "APP"."PARTITION_PARAMS" + ADD CONSTRAINT "PARTITION_PARAMS_PK" PRIMARY KEY ("PART_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."PARTITIONS" + ADD CONSTRAINT "PARTITIONS_PK" PRIMARY KEY ("PART_ID"); + +ALTER TABLE "APP"."TABLE_PARAMS" + ADD CONSTRAINT "TABLE_PARAMS_PK" PRIMARY KEY ("TBL_ID", "PARAM_KEY"); + +ALTER TABLE "APP"."SKEWED_STRING_LIST" + ADD CONSTRAINT "SKEWED_STRING_LIST_PK" PRIMARY KEY ("STRING_LIST_ID"); + +ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" + ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_PK" PRIMARY KEY ("STRING_LIST_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."SKEWED_COL_NAMES" + ADD CONSTRAINT "SKEWED_COL_NAMES_PK" PRIMARY KEY ("SD_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" + ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_PK" PRIMARY KEY ("SD_ID", "STRING_LIST_ID_KID"); + +ALTER TABLE "APP"."SKEWED_VALUES" + ADD CONSTRAINT "SKEWED_VALUES_PK" PRIMARY KEY ("SD_ID_OID", "INTEGER_IDX"); + +ALTER TABLE "APP"."TAB_COL_STATS" + ADD CONSTRAINT "TAB_COL_STATS_PK" PRIMARY KEY ("CS_ID"); + +ALTER TABLE "APP"."PART_COL_STATS" + ADD CONSTRAINT "PART_COL_STATS_PK" PRIMARY KEY ("CS_ID"); + +ALTER TABLE "APP"."FUNCS" + ADD CONSTRAINT "FUNCS_PK" PRIMARY KEY ("FUNC_ID"); + +ALTER TABLE "APP"."FUNC_RU" + ADD CONSTRAINT "FUNC_RU_PK" PRIMARY KEY ("FUNC_ID", "INTEGER_IDX"); + +ALTER TABLE "APP"."NOTIFICATION_LOG" + ADD CONSTRAINT "NOTIFICATION_LOG_PK" PRIMARY KEY ("NL_ID"); + +ALTER TABLE "APP"."NOTIFICATION_SEQUENCE" + ADD CONSTRAINT "NOTIFICATION_SEQUENCE_PK" PRIMARY KEY ("NNI_ID"); + +ALTER TABLE "APP"."KEY_CONSTRAINTS" + ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("CONSTRAINT_NAME", "POSITION"); + +ALTER TABLE "APP"."METASTORE_DB_PROPERTIES" + ADD CONSTRAINT "PROPERTY_KEY_PK" PRIMARY KEY ("PROPERTY_KEY"); + +ALTER TABLE "APP"."MV_CREATION_METADATA" + ADD CONSTRAINT "MV_CREATION_METADATA_PK" PRIMARY KEY ("MV_CREATION_METADATA_ID"); + +ALTER TABLE "APP"."CTLGS" + ADD CONSTRAINT "CTLG_PK" PRIMARY KEY ("CTLG_ID"); + + +-- foreign +ALTER TABLE "APP"."IDXS" + ADD CONSTRAINT "IDXS_FK1" FOREIGN KEY ("ORIG_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."IDXS" + ADD CONSTRAINT "IDXS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."IDXS" + ADD CONSTRAINT "IDXS_FK3" FOREIGN KEY ("INDEX_TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TBL_COL_PRIVS" + ADD CONSTRAINT "TBL_COL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."DB_PRIVS" + ADD CONSTRAINT "DB_PRIVS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."INDEX_PARAMS" + ADD CONSTRAINT "INDEX_PARAMS_FK1" FOREIGN KEY ("INDEX_ID") REFERENCES "APP"."IDXS" ("INDEX_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PARTITION_KEYS" + ADD CONSTRAINT "PARTITION_KEYS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PART_PRIVS" + ADD CONSTRAINT "PART_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SDS" + ADD CONSTRAINT "SDS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SDS" + ADD CONSTRAINT "SDS_FK2" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."COLUMNS" + ADD CONSTRAINT "COLUMNS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TYPE_FIELDS" + ADD CONSTRAINT "TYPE_FIELDS_FK1" FOREIGN KEY ("TYPE_NAME") REFERENCES "APP"."TYPES" ("TYPES_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TBL_PRIVS" + ADD CONSTRAINT "TBL_PRIVS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SERDE_PARAMS" + ADD CONSTRAINT "SERDE_PARAMS_FK1" FOREIGN KEY ("SERDE_ID") REFERENCES "APP"."SERDES" ("SERDE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TBLS" + ADD CONSTRAINT "TBLS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TBLS" + ADD CONSTRAINT "TBLS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."DBS" + ADD CONSTRAINT "DBS_FK1" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SD_PARAMS" + ADD CONSTRAINT "SD_PARAMS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."DATABASE_PARAMS" + ADD CONSTRAINT "DATABASE_PARAMS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."ROLE_MAP" + ADD CONSTRAINT "ROLE_MAP_FK1" FOREIGN KEY ("ROLE_ID") REFERENCES "APP"."ROLES" ("ROLE_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."BUCKETING_COLS" + ADD CONSTRAINT "BUCKETING_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SORT_COLS" + ADD CONSTRAINT "SORT_COLS_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PARTITION_KEY_VALS" + ADD CONSTRAINT "PARTITION_KEY_VALS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."COLUMNS_V2" + ADD CONSTRAINT "COLUMNS_V2_FK1" FOREIGN KEY ("CD_ID") REFERENCES "APP"."CDS" ("CD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PART_COL_PRIVS" + ADD CONSTRAINT "PART_COL_PRIVS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PARTITION_PARAMS" + ADD CONSTRAINT "PARTITION_PARAMS_FK1" FOREIGN KEY ("PART_ID") REFERENCES "APP"."PARTITIONS" ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PARTITIONS" + ADD CONSTRAINT "PARTITIONS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PARTITIONS" + ADD CONSTRAINT "PARTITIONS_FK2" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TABLE_PARAMS" + ADD CONSTRAINT "TABLE_PARAMS_FK1" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_STRING_LIST_VALUES" + ADD CONSTRAINT "SKEWED_STRING_LIST_VALUES_FK1" FOREIGN KEY ("STRING_LIST_ID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_COL_NAMES" + ADD CONSTRAINT "SKEWED_COL_NAMES_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" + ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK1" FOREIGN KEY ("SD_ID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_COL_VALUE_LOC_MAP" + ADD CONSTRAINT "SKEWED_COL_VALUE_LOC_MAP_FK2" FOREIGN KEY ("STRING_LIST_ID_KID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_VALUES" + ADD CONSTRAINT "SKEWED_VALUES_FK1" FOREIGN KEY ("SD_ID_OID") REFERENCES "APP"."SDS" ("SD_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."SKEWED_VALUES" + ADD CONSTRAINT "SKEWED_VALUES_FK2" FOREIGN KEY ("STRING_LIST_ID_EID") REFERENCES "APP"."SKEWED_STRING_LIST" ("STRING_LIST_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."TAB_COL_STATS" + ADD CONSTRAINT "TAB_COL_STATS_FK" FOREIGN KEY ("TBL_ID") REFERENCES TBLS ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."PART_COL_STATS" + ADD CONSTRAINT "PART_COL_STATS_FK" FOREIGN KEY ("PART_ID") REFERENCES PARTITIONS ("PART_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."VERSION" + ADD CONSTRAINT "VERSION_PK" PRIMARY KEY ("VER_ID"); + +ALTER TABLE "APP"."FUNCS" + ADD CONSTRAINT "FUNCS_FK1" FOREIGN KEY ("DB_ID") REFERENCES "APP"."DBS" ("DB_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."FUNC_RU" + ADD CONSTRAINT "FUNC_RU_FK1" FOREIGN KEY ("FUNC_ID") REFERENCES "APP"."FUNCS" ("FUNC_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_RESOURCEPLAN" + ADD CONSTRAINT "WM_RESOURCEPLAN_PK" PRIMARY KEY ("RP_ID"); + +ALTER TABLE "APP"."WM_POOL" + ADD CONSTRAINT "WM_POOL_PK" PRIMARY KEY ("POOL_ID"); + +ALTER TABLE "APP"."WM_POOL" + ADD CONSTRAINT "WM_POOL_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_RESOURCEPLAN" + ADD CONSTRAINT "WM_RESOURCEPLAN_FK1" FOREIGN KEY ("DEFAULT_POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_TRIGGER" + ADD CONSTRAINT "WM_TRIGGER_PK" PRIMARY KEY ("TRIGGER_ID"); + +ALTER TABLE "APP"."WM_TRIGGER" + ADD CONSTRAINT "WM_TRIGGER_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" + ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK1" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_POOL_TO_TRIGGER" + ADD CONSTRAINT "WM_POOL_TO_TRIGGER_FK2" FOREIGN KEY ("TRIGGER_ID") REFERENCES "APP"."WM_TRIGGER" ("TRIGGER_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_MAPPING" + ADD CONSTRAINT "WM_MAPPING_PK" PRIMARY KEY ("MAPPING_ID"); + +ALTER TABLE "APP"."WM_MAPPING" + ADD CONSTRAINT "WM_MAPPING_FK1" FOREIGN KEY ("RP_ID") REFERENCES "APP"."WM_RESOURCEPLAN" ("RP_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."WM_MAPPING" + ADD CONSTRAINT "WM_MAPPING_FK2" FOREIGN KEY ("POOL_ID") REFERENCES "APP"."WM_POOL" ("POOL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."MV_TABLES_USED" + ADD CONSTRAINT "MV_TABLES_USED_FK1" FOREIGN KEY ("MV_CREATION_METADATA_ID") REFERENCES "APP"."MV_CREATION_METADATA" ("MV_CREATION_METADATA_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."MV_TABLES_USED" + ADD CONSTRAINT "MV_TABLES_USED_FK2" FOREIGN KEY ("TBL_ID") REFERENCES "APP"."TBLS" ("TBL_ID") ON DELETE NO ACTION ON UPDATE NO ACTION; + +ALTER TABLE "APP"."DBS" + ADD CONSTRAINT "DBS_CTLG_FK" FOREIGN KEY ("CTLG_NAME") REFERENCES "APP"."CTLGS" ("NAME") ON DELETE NO ACTION ON UPDATE NO ACTION; + +-- ---------------------------------------------- +-- DDL Statements for checks +-- ---------------------------------------------- + +ALTER TABLE "APP"."IDXS" + ADD CONSTRAINT "SQL110318025504980" CHECK (DEFERRED_REBUILD IN ('Y', 'N')); + +ALTER TABLE "APP"."SDS" + ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED IN ('Y', 'N')); + +-- ---------------------------- +-- Transaction and Lock Tables +-- ---------------------------- +CREATE TABLE TXNS +( + TXN_ID bigint PRIMARY KEY, + TXN_STATE char(1) NOT NULL, + TXN_STARTED bigint NOT NULL, + TXN_LAST_HEARTBEAT bigint NOT NULL, + TXN_USER varchar(128) NOT NULL, + TXN_HOST varchar(128) NOT NULL, + TXN_AGENT_INFO varchar(128), + TXN_META_INFO varchar(128), + TXN_HEARTBEAT_COUNT integer, + TXN_TYPE integer +); + +CREATE TABLE TXN_COMPONENTS +( + TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID), + TC_DATABASE varchar(128) NOT NULL, + TC_TABLE varchar(128), + TC_PARTITION varchar(767), + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint +); + +CREATE +INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); + +CREATE TABLE COMPLETED_TXN_COMPONENTS +( + CTC_TXNID bigint NOT NULL, + CTC_DATABASE varchar(128) NOT NULL, + CTC_TABLE varchar(256), + CTC_PARTITION varchar(767), + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint, + CTC_UPDATE_DELETE char(1) NOT NULL +); + +CREATE +INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); + +CREATE TABLE NEXT_TXN_ID +( + NTXN_NEXT bigint NOT NULL +); +INSERT INTO NEXT_TXN_ID +VALUES (1); + +CREATE TABLE HIVE_LOCKS +( + HL_LOCK_EXT_ID bigint NOT NULL, + HL_LOCK_INT_ID bigint NOT NULL, + HL_TXNID bigint NOT NULL, + HL_DB varchar(128) NOT NULL, + HL_TABLE varchar(128), + HL_PARTITION varchar(767), + HL_LOCK_STATE char(1) NOT NULL, + HL_LOCK_TYPE char(1) NOT NULL, + HL_LAST_HEARTBEAT bigint NOT NULL, + HL_ACQUIRED_AT bigint, + HL_USER varchar(128) NOT NULL, + HL_HOST varchar(128) NOT NULL, + HL_HEARTBEAT_COUNT integer, + HL_AGENT_INFO varchar(128), + HL_BLOCKEDBY_EXT_ID bigint, + HL_BLOCKEDBY_INT_ID bigint, + PRIMARY KEY (HL_LOCK_EXT_ID, HL_LOCK_INT_ID) +); + +CREATE +INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID); + +CREATE TABLE NEXT_LOCK_ID +( + NL_NEXT bigint NOT NULL +); +INSERT INTO NEXT_LOCK_ID +VALUES (1); + +CREATE TABLE COMPACTION_QUEUE +( + CQ_ID bigint PRIMARY KEY, + CQ_DATABASE varchar(128) NOT NULL, + CQ_TABLE varchar(128) NOT NULL, + CQ_PARTITION varchar(767), + CQ_STATE char(1) NOT NULL, + CQ_TYPE char(1) NOT NULL, + CQ_TBLPROPERTIES varchar(2048), + CQ_WORKER_ID varchar(128), + CQ_START bigint, + CQ_RUN_AS varchar(128), + CQ_HIGHEST_WRITE_ID bigint, + CQ_META_INFO varchar(2048) for bit data, + CQ_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE NEXT_COMPACTION_QUEUE_ID +( + NCQ_NEXT bigint NOT NULL +); +INSERT INTO NEXT_COMPACTION_QUEUE_ID +VALUES (1); + +CREATE TABLE COMPLETED_COMPACTIONS +( + CC_ID bigint PRIMARY KEY, + CC_DATABASE varchar(128) NOT NULL, + CC_TABLE varchar(128) NOT NULL, + CC_PARTITION varchar(767), + CC_STATE char(1) NOT NULL, + CC_TYPE char(1) NOT NULL, + CC_TBLPROPERTIES varchar(2048), + CC_WORKER_ID varchar(128), + CC_START bigint, + CC_END bigint, + CC_RUN_AS varchar(128), + CC_HIGHEST_WRITE_ID bigint, + CC_META_INFO varchar(2048) for bit data, + CC_HADOOP_JOB_ID varchar(32) +); + +CREATE TABLE AUX_TABLE +( + MT_KEY1 varchar(128) NOT NULL, + MT_KEY2 bigint NOT NULL, + MT_COMMENT varchar(255), + PRIMARY KEY (MT_KEY1, MT_KEY2) +); + +--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK +--This is a good candidate for Index orgainzed table +CREATE TABLE WRITE_SET +( + WS_DATABASE varchar(128) NOT NULL, + WS_TABLE varchar(128) NOT NULL, + WS_PARTITION varchar(767), + WS_TXNID bigint NOT NULL, + WS_COMMIT_ID bigint NOT NULL, + WS_OPERATION_TYPE char(1) NOT NULL +); + +CREATE TABLE TXN_TO_WRITE_ID +( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE +UNIQUE INDEX TBL_TO_TXN_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); +CREATE +UNIQUE INDEX TBL_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_WRITEID); + +CREATE TABLE NEXT_WRITE_ID +( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE +UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +CREATE TABLE MIN_HISTORY_LEVEL +( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY (MHL_TXNID) +); + +CREATE +INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + +CREATE TABLE MATERIALIZATION_REBUILD_LOCKS +( + MRL_TXN_ID BIGINT NOT NULL, + MRL_DB_NAME VARCHAR(128) NOT NULL, + MRL_TBL_NAME VARCHAR(256) NOT NULL, + MRL_LAST_HEARTBEAT BIGINT NOT NULL, + PRIMARY KEY (MRL_TXN_ID) +); + +CREATE TABLE "APP"."I_SCHEMA" +( + "SCHEMA_ID" bigint primary key, + "SCHEMA_TYPE" integer not null, + "NAME" varchar(256) unique, + "DB_ID" bigint references "APP"."DBS" ("DB_ID"), + "COMPATIBILITY" integer not null, + "VALIDATION_LEVEL" integer not null, + "CAN_EVOLVE" char(1) not null, + "SCHEMA_GROUP" varchar(256), + "DESCRIPTION" varchar(4000) +); + +CREATE TABLE "APP"."SCHEMA_VERSION" +( + "SCHEMA_VERSION_ID" bigint primary key, + "SCHEMA_ID" bigint references "APP"."I_SCHEMA" ("SCHEMA_ID"), + "VERSION" integer not null, + "CREATED_AT" bigint not null, + "CD_ID" bigint references "APP"."CDS" ("CD_ID"), + "STATE" integer not null, + "DESCRIPTION" varchar(4000), + "SCHEMA_TEXT" clob, + "FINGERPRINT" varchar(256), + "SCHEMA_VERSION_NAME" varchar(256), + "SERDE_ID" bigint references "APP"."SERDES" ("SERDE_ID") +); + +CREATE +UNIQUE INDEX "APP"."UNIQUE_SCHEMA_VERSION" ON "APP"."SCHEMA_VERSION" ("SCHEMA_ID", "VERSION"); + +CREATE TABLE REPL_TXN_MAP +( + RTM_REPL_POLICY varchar(256) NOT NULL, + RTM_SRC_TXN_ID bigint NOT NULL, + RTM_TARGET_TXN_ID bigint NOT NULL, + PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID) +); + +CREATE TABLE "APP"."RUNTIME_STATS" +( + "RS_ID" bigint primary key, + "CREATE_TIME" integer not null, + "WEIGHT" integer not null, + "PAYLOAD" BLOB +); + +CREATE +INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME); + +-- ----------------------------------------------------------------- +-- Record schema version. Should be the last step in the init script +-- ----------------------------------------------------------------- +INSERT INTO "APP"."VERSION" (VER_ID, SCHEMA_VERSION, VERSION_COMMENT) +VALUES (1, '3.1.0', 'Hive release version 3.1.0'); diff --git a/build.gradle b/build.gradle index c6bf29735ce8..e49701527305 100644 --- a/build.gradle +++ b/build.gradle @@ -17,6 +17,8 @@ * under the License. */ + +import com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask import groovy.transform.Memoized buildscript { @@ -37,6 +39,7 @@ buildscript { } plugins { + id "com.commercehub.gradle.plugin.avro" version "0.22.0" id 'nebula.dependency-recommender' version '9.0.2' id 'org.projectnessie' version '0.3.0' } @@ -307,6 +310,88 @@ project(':iceberg-aws') { } } +project(':iceberg-beam') { + dependencies { + compile project(':iceberg-api') + compile project(':iceberg-common') + compile project(':iceberg-core') + compile project(':iceberg-data') + compile project(':iceberg-arrow') + compile project(':iceberg-orc') + compile project(':iceberg-hive-metastore') + compile "org.apache.beam:beam-sdks-java-core" + testCompile "org.apache.beam:beam-runners-direct-java" + testCompile "org.hamcrest:hamcrest" + compileOnly("org.apache.hive:hive-metastore") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + } + + // By default, hive-exec is a fat/uber jar and it exports a guava library + // that's really old. We use the core classifier to be able to override our guava + // version. Luckily, hive-exec seems to work okay so far with this version of guava + // See: https://github.com/apache/hive/blob/master/ql/pom.xml#L911 for more context. + testCompile("org.apache.hive:hive-exec::core") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hive', module: 'hive-llap-tez' + exclude group: 'org.apache.logging.log4j' + exclude group: 'com.google.protobuf', module: 'protobuf-java' + exclude group: 'org.apache.calcite' + exclude group: 'org.apache.calcite.avatica' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + } + + testCompile("org.apache.hive:hive-metastore") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'org.pentaho' // missing dependency + exclude group: 'org.apache.hbase' + exclude group: 'org.apache.logging.log4j' + exclude group: 'co.cask.tephra' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'org.eclipse.jetty.aggregate', module: 'jetty-all' + exclude group: 'org.eclipse.jetty.orbit', module: 'javax.servlet' + exclude group: 'org.apache.parquet', module: 'parquet-hadoop-bundle' + exclude group: 'com.tdunning', module: 'json' + exclude group: 'javax.transaction', module: 'transaction-api' + exclude group: 'com.zaxxer', module: 'HikariCP' + } + + compileOnly("org.apache.hadoop:hadoop-client") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + } + + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') + + dependencies { + implementation "org.apache.avro:avro:1.10.1" + } + + def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) { + source("src/test/avro") + outputDir = file("build/generated/sources/annotationProcessor/java/test") + } + + tasks.named("compileJava").configure { + source(generateAvro) + } + } +} + project(':iceberg-flink') { dependencies { compile project(':iceberg-api') @@ -1142,4 +1227,3 @@ String getJavadocVersion() { apply from: 'baseline.gradle' apply from: 'deploy.gradle' apply from: 'tasks.gradle' - diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index 0498069a6bfb..e1b7ded235ab 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -28,10 +28,10 @@ import org.apache.avro.io.Encoder; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class GenericAvroWriter implements DatumWriter { +public class GenericAvroWriter implements DatumWriter { private ValueWriter writer = null; - GenericAvroWriter(Schema schema) { + public GenericAvroWriter(Schema schema) { setSchema(schema); } diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index f0788361aae8..c5fe23401277 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -26,6 +26,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** * Factory responsible for generating unique but recognizable data file names. diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 101c0e1dd3af..883c8aa4a313 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -2,6 +2,7 @@ version.txt versions.lock versions.props books.json +Cat.avsc new-books.json build .gitignore diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 818fb8154a7a..7a39ba95ff40 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -63,6 +63,8 @@ public class HiveCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + public static final String DEFAULT_NAME = "hive"; + private String name; private HiveClientPool clients; private Configuration conf; @@ -74,7 +76,7 @@ public HiveCatalog() { } public HiveCatalog(Configuration conf) { - this.name = "hive"; + this.name = DEFAULT_NAME; this.clients = new HiveClientPool(conf); this.conf = conf; this.createStack = Thread.currentThread().getStackTrace(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java index c57b80f7960c..4c6539382790 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.RetryingHMSHandler; import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor; @@ -162,7 +163,7 @@ public String getDatabasePath(String dbName) { } public void reset() throws Exception { - for (String dbName : clientPool.run(client -> client.getAllDatabases())) { + for (String dbName : clientPool.run(HiveMetaStoreClient::getAllDatabases)) { for (String tblName : clientPool.run(client -> client.getAllTables(dbName))) { clientPool.run(client -> { client.dropTable(dbName, tblName, true, true, true); @@ -222,8 +223,9 @@ private void setupMetastoreDB(String dbURL) throws SQLException, IOException { ScriptRunner scriptRunner = new ScriptRunner(connection, true, true); ClassLoader classLoader = ClassLoader.getSystemClassLoader(); - InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql"); - try (Reader reader = new InputStreamReader(inputStream)) { + + try (InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql"); + Reader reader = new InputStreamReader(inputStream)) { scriptRunner.runScript(reader); } } diff --git a/settings.gradle b/settings.gradle index 037bdf802385..6b28d9de14aa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,12 +17,24 @@ * under the License. */ +pluginManagement { + repositories { + gradlePluginPortal() + jcenter() + maven { + name "JCenter Gradle Plugins" + url "https://dl.bintray.com/gradle/gradle-plugins" + } + } +} + rootProject.name = 'iceberg' include 'api' include 'common' include 'core' include 'data' include 'aws' +include 'beam' include 'flink' include 'flink-runtime' include 'mr' @@ -44,6 +56,7 @@ project(':common').name = 'iceberg-common' project(':core').name = 'iceberg-core' project(':data').name = 'iceberg-data' project(':aws').name = 'iceberg-aws' +project(':beam').name = 'iceberg-beam' project(':flink').name = 'iceberg-flink' project(':flink-runtime').name = 'iceberg-flink-runtime' project(':mr').name = 'iceberg-mr' diff --git a/site/docs/beam.md b/site/docs/beam.md new file mode 100644 index 000000000000..96a0a229bc9d --- /dev/null +++ b/site/docs/beam.md @@ -0,0 +1,39 @@ + + +# Apache Beam + +!!! Warning + The Beam API is experimental and in early alpha stage + +## Write records + +When sinking Avro files to a distributed file system using `FileIO`, we can catch rely the filenames downstream. We append these filenames to an Iceberg table incrementally. + +```java +final String hiveMetastoreUrl = "thrift://localhost:9083/default"; +final FileIO.Write avroFileIO = FileIO.write() + .via(AvroIO.sink(avroSchema)) + .to("gs://.../../") + .withSuffix(".avro"); + +final WriteFilesResult filesWritten = records.apply(avroFileIO); +final org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); +final TableIdentifier name = TableIdentifier.of("default", "test"); + +IcebergIO.write(name, icebergSchema, hiveMetastoreUrl, filesWritten); +``` \ No newline at end of file diff --git a/site/mkdocs.yml b/site/mkdocs.yml index 2b0396aa42bc..68faefc245d5 100644 --- a/site/mkdocs.yml +++ b/site/mkdocs.yml @@ -63,6 +63,7 @@ nav: - Trino: https://trino.io/docs/current/connector/iceberg.html - Flink: flink.md - Hive: hive.md + - Beam: beam.md - Integrations: - AWS: aws.md - API: diff --git a/versions.props b/versions.props index 5e2251a218bb..600d5cb286a2 100644 --- a/versions.props +++ b/versions.props @@ -1,6 +1,8 @@ org.slf4j:* = 1.7.25 org.apache.avro:avro = 1.9.2 org.apache.flink:* = 1.11.0 +org.apache.beam:beam-sdks-java-core = 2.27.0 +org.apache.beam:beam-runners-direct-java = 2.27.0 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-metastore = 2.3.7 org.apache.hive:hive-serde = 2.3.7 @@ -30,3 +32,4 @@ org.apache.hive:hive-service = 2.3.7 org.apache.tez:tez-dag = 0.8.4 org.apache.tez:tez-mapreduce = 0.8.4 com.adobe.testing:s3mock-junit4 = 2.1.28 +org.hamcrest:* = 2.2 \ No newline at end of file