diff --git a/api/src/main/java/org/apache/iceberg/catalog/ViewCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/ViewCatalog.java new file mode 100644 index 000000000000..461d3ab2a72c --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/catalog/ViewCatalog.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.catalog; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewDefinition; +import org.apache.iceberg.view.ViewRepresentation; + +/** + * A Catalog API for view create, drop, and load operations. + */ +public interface ViewCatalog { + + /** + * Return the name for this catalog. + * + * @return this catalog's name + */ + default String name() { + return toString(); + } + + /** + * Return all the identifiers under this namespace. + * + * @param namespace a namespace + * @return a list of identifiers for view + * @throws NotFoundException if the namespace is not found + */ + List listViews(Namespace namespace); + + /** + * Load a view. + * + * @param identifier a view identifier + * @return instance of {@link View} implementation referred by {@code tableIdentifier} + * @throws NoSuchViewException if the view does not exist + */ + View loadView(TableIdentifier identifier); + + /** + * Check whether view exists. + * + * @param identifier a view identifier + * @return true if the table exists, false otherwise + */ + default boolean viewExists(TableIdentifier identifier) { + try { + loadView(identifier); + return true; + } catch (NoSuchViewException e) { + return false; + } + } + + /** + * Create a view. + * + * @param identifier a view identifier + * @param representations a list of view representations + * @param properties a string map of view properties + */ + View createView( + TableIdentifier identifier, + List representations, + Map properties); + + /** + * Create a view with SQL definition. + * + * @param identifier a view identifier + * @param definition a view definition + * @param properties a string map of view properties + */ + default View createView( + TableIdentifier identifier, + ViewDefinition definition, + Map properties) { + return createView(identifier, Collections.singletonList(definition), properties); + } + + /** + * Drop a view and delete all data and metadata files. + * + * @param identifier a view identifier + * @return true if the view was dropped, false if the view did not exist + */ + default boolean dropView(TableIdentifier identifier) { + return dropView(identifier, true /* drop data and metadata files */); + } + + /** + * Drop a view; optionally delete data and metadata files. + *

+ * If purge is set to true the implementation should delete all data and metadata files. + * + * @param identifier a view identifier + * @param purge if true, delete all data and metadata files in the view + * @return true if the view was dropped, false if the view did not exist + */ + boolean dropView(TableIdentifier identifier, boolean purge); + + /** + * Rename a view. + * + * @param from identifier of the view to rename + * @param to new view identifier + * @throws NoSuchViewException if the "from" view does not exist + * @throws AlreadyExistsException if the "to" view already exists + */ + void renameView(TableIdentifier from, TableIdentifier to); + + /** + * Invalidate cached view metadata from current catalog. + *

+ * If the view is already loaded or cached, drop cached data. If the view does not exist or is not cached, do + * nothing. + * + * @param identifier a view identifier + */ + default void invalidateView(TableIdentifier identifier) { + } + + /** + * Initialize a view catalog given a custom name and a map of catalog properties. + *

+ * A custom view catalog implementation must have a no-arg constructor. + * A compute engine like Spark or Flink will first initialize the catalog without any arguments, + * and then call this method to complete catalog initialization with properties passed into the engine. + * + * @param name a custom name for the catalog + * @param properties catalog properties + */ + default void initialize(String name, Map properties) { + } +} diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchViewException.java new file mode 100644 index 000000000000..f137562b2b78 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchViewException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** + * Exception raised when attempting to load a view that does not exist. + */ +public class NoSuchViewException extends RuntimeException { + @FormatMethod + public NoSuchViewException(String message, Object... args) { + super(String.format(message, args)); + } + + @FormatMethod + public NoSuchViewException(Throwable cause, String message, Object... args) { + super(String.format(message, args), cause); + } +} diff --git a/api/src/main/java/org/apache/iceberg/view/View.java b/api/src/main/java/org/apache/iceberg/view/View.java new file mode 100644 index 000000000000..9a3e106f56a1 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/View.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; + +/** + * Interface for view definition. + */ +public interface View { + + String name(); + + /** + * Get the current version for this view, or null if there are no versions. + * + * @return the current view version. + */ + ViewVersion currentVersion(); + + /** + * Get the versions of this view. + * + * @return an Iterable of versions of this view. + */ + Iterable versions(); + + /** + * Get a version in this view by ID. + * + * @param versionId version ID + * @return a version, or null if the ID cannot be found + */ + ViewVersion version(int versionId); + + /** + * Get the version history of this table. + * + * @return a list of {@link ViewHistoryEntry} + */ + List history(); + + /** + * Return a map of string properties for this view. + * + * @return this view's properties map + */ + Map properties(); + + /** + * Update view properties and commit the changes. + * + * @return a new {@link ViewUpdateProperties} + */ + ViewUpdateProperties updateProperties(); +} diff --git a/api/src/main/java/org/apache/iceberg/view/ViewDefinition.java b/api/src/main/java/org/apache/iceberg/view/ViewDefinition.java new file mode 100644 index 000000000000..ac774c8033c4 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/ViewDefinition.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import org.apache.iceberg.Schema; + +public interface ViewDefinition extends ViewRepresentation { + @Override + default Type type() { + return Type.SQL; + } + + /** + * Returns the view query SQL text. + * + * @return the view query SQL text + */ + String sql(); + + /** + * Returns the SQL dialect of the query SQL text. + * + * @return the SQL dialect of the query SQL text + */ + String dialect(); + + /** + * Returns the view query output schema. + * + * @return the view query output schema + */ + Schema schema(); + + /** + * Returns the default catalog when the view is created. + * + * @return the default catalog + */ + String defaultCatalog(); + + /** + * Returns the default namespace when the view is created. + * + * @return the default namespace + */ + List defaultNamespace(); + + /** + * Returns the field aliases specified when creating the view. + * + * @return the field aliases + */ + List fieldAliases(); + + /** + * Returns the field comments specified when creating the view. + * + * @return the field comments + */ + List fieldComments(); +} diff --git a/api/src/main/java/org/apache/iceberg/view/ViewHistoryEntry.java b/api/src/main/java/org/apache/iceberg/view/ViewHistoryEntry.java new file mode 100644 index 000000000000..5a426db173ee --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/ViewHistoryEntry.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +/** + * View history entry. + *

+ * An entry contains a change to the view state. + * At the given timestamp, the current version was set to the given version ID. + */ +public interface ViewHistoryEntry { + /** + * Returns the timestamp in milliseconds of the change + */ + long timestampMillis(); + + /** + * Returns ID of the new current version + */ + int versionId(); +} diff --git a/api/src/main/java/org/apache/iceberg/view/ViewRepresentation.java b/api/src/main/java/org/apache/iceberg/view/ViewRepresentation.java new file mode 100644 index 000000000000..48b3d66072a3 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/ViewRepresentation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Locale; + +public interface ViewRepresentation { + + enum Type { + SQL; + + public static Type parse(String typeName) { + return valueOf(typeName.toUpperCase(Locale.ENGLISH)); + } + + public String typeName() { + return name().toLowerCase(Locale.ENGLISH); + } + } + + Type type(); +} diff --git a/api/src/main/java/org/apache/iceberg/view/ViewUpdateProperties.java b/api/src/main/java/org/apache/iceberg/view/ViewUpdateProperties.java new file mode 100644 index 000000000000..6dc75d785b2f --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/ViewUpdateProperties.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Map; +import org.apache.iceberg.PendingUpdate; + +/** + * API for updating view properties. + *

+ * Apply returns the updated view properties as a map for validation. + *

+ * When committing, these changes will be applied to the current view metadata. + * Commit conflicts will be resolved by applying the pending changes to the new view metadata. + */ +public interface ViewUpdateProperties extends PendingUpdate> { + + /** + * Add a key/value property to the view. + * + * @param key a String key + * @param value a String value + * @return this for method chaining + * @throws NullPointerException If either the key or value is null + */ + ViewUpdateProperties set(String key, String value); + + /** + * Remove the given property key from the view. + * + * @param key a String key + * @return this for method chaining + * @throws NullPointerException If the key is null + */ + ViewUpdateProperties remove(String key); +} diff --git a/api/src/main/java/org/apache/iceberg/view/ViewVersion.java b/api/src/main/java/org/apache/iceberg/view/ViewVersion.java new file mode 100644 index 000000000000..40e01ce61ad0 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/view/ViewVersion.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; + +/** + * A version of the view at a point in time. + *

+ * A version consists of a view metadata file. + *

+ * Versions are created by view operations, like Create and Replace. + */ +public interface ViewVersion { + /** + * Return this version's ID. + * + * @return a long ID + */ + int versionId(); + + /** + * Return this version's parent ID or null. + * + * @return a long ID for this version's parent, or null if it has no parent + */ + Integer parentId(); + + /** + * Return this version's timestamp. + *

+ * This timestamp is the same as those produced by {@link System#currentTimeMillis()}. + * + * @return a long timestamp in milliseconds + */ + long timestampMillis(); + + /** + * Returns the version summary such as the name and genie-id of the operation that created that version of the view + * + * @return a version summary + */ + Map summary(); + + /** + * Returns the list of view representations + *

+ * Must contain at least one representation. + * + * @return the list of view representations + */ + List representations(); +} diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java index ff4a185d8aa3..789d925daae1 100644 --- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -210,6 +211,62 @@ public static void writeLongFieldIf(boolean condition, String key, Long value, J } } + @FunctionalInterface + public interface JsonWriter { + void write(T object, JsonGenerator generator) throws IOException; + } + + public static void writeObjectList( + String property, + Iterable objectList, + JsonWriter writer, + JsonGenerator generator) + throws IOException { + generator.writeArrayFieldStart(property); + for (T object : objectList) { + writer.write(object, generator); + } + generator.writeEndArray(); + } + + public static void writeStringList(String property, List stringList, JsonGenerator generator) + throws IOException { + generator.writeArrayFieldStart(property); + for (String s : stringList) { + generator.writeString(s); + } + generator.writeEndArray(); + } + + public static void writeStringMap(String property, Map map, JsonGenerator generator) + throws IOException { + generator.writeObjectFieldStart(property); + for (Map.Entry entry : map.entrySet()) { + generator.writeStringField(entry.getKey(), entry.getValue()); + } + generator.writeEndObject(); + } + + @FunctionalInterface + public interface JsonReader { + T read(JsonNode node); + } + + public static T getObject(String property, JsonNode node, JsonReader reader) { + Preconditions.checkArgument(node.has(property), "Cannot parse missing object %s", property); + JsonNode pNode = node.get(property); + Preconditions.checkArgument(pNode.isObject(), + "Cannot parse %s from non-object value: %s", property, pNode); + return reader.read(pNode); + } + + public static List getObjectList(String property, JsonNode node, Function reader) { + Preconditions.checkArgument(node.has(property), "Cannot parse missing list %s", property); + return ImmutableList.builder() + .addAll(objectArrayIterator(property, node, reader)) + .build(); + } + abstract static class JsonArrayIterator implements Iterator { private final Iterator elements; @@ -285,8 +342,24 @@ Long convert(JsonNode element) { @Override void validate(JsonNode element) { - Preconditions.checkArgument(element.isIntegralNumber() && element.canConvertToLong(), - "Cannot parse long from non-long value: %s", element); + Preconditions.checkArgument( + element.isIntegralNumber() && element.canConvertToLong(), + "Cannot parse long from non-long value: %s", + element); } } + + static Iterator objectArrayIterator(String property, JsonNode node, Function reader) { + return new JsonArrayIterator(property, node) { + protected T convert(JsonNode element) { + return reader.apply(element); + } + + protected void validate(JsonNode element) { + Preconditions.checkArgument( + element.isObject(), + "Cannot parse %s from non-object value: %s", property, element); + } + }; + } } diff --git a/core/src/main/java/org/apache/iceberg/view/BaseView.java b/core/src/main/java/org/apache/iceberg/view/BaseView.java new file mode 100644 index 000000000000..f13365451517 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseView.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; + +/** + * Base {@link View} implementation. + *

+ * This can be extended by providing a {@link ViewOperations} to the constructor. + */ +public class BaseView implements View, HasViewOperations { + private final ViewOperations ops; + private final String name; + + public BaseView(ViewOperations ops, String name) { + this.ops = ops; + this.name = name; + } + + @Override + public String name() { + return name; + } + + @Override + public ViewOperations operations() { + return ops; + } + + @Override + public ViewVersion currentVersion() { + return ops.current().currentVersion(); + } + + @Override + public ViewVersion version(int versionId) { + return ops.current().version(versionId); + } + + @Override + public Iterable versions() { + return ops.current().versions(); + } + + @Override + public List history() { + return ops.current().history(); + } + + @Override + public ViewUpdateProperties updateProperties() { + return new ViewPropertiesUpdate(ops); + } + + @Override + public Map properties() { + return ops.current().properties(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewDefinition.java b/core/src/main/java/org/apache/iceberg/view/BaseViewDefinition.java new file mode 100644 index 000000000000..0243c21ef85b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewDefinition.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * SQL definition for a view + */ +public class BaseViewDefinition implements ViewDefinition { + private final String sql; + private final String dialect; + private final Schema schema; + private final String defaultCatalog; + private final List defaultNamespace; + private final List fieldAliases; + private final List fieldComments; + + public static Builder builder() { + return new Builder(); + } + + public static Builder buildFrom(ViewDefinition that) { + return builder() + .sql(that.sql()) + .dialect(that.dialect()) + .schema(that.schema()) + .defaultCatalog(that.defaultCatalog()) + .defaultNamespace(that.defaultNamespace()) + .fieldAliases(that.fieldAliases()) + .fieldComments(that.fieldComments()); + } + + private BaseViewDefinition( + String sql, String dialect, Schema schema, String defaultCatalog, List defaultNamespace, + List fieldAliases, List fieldComments) { + this.sql = Preconditions.checkNotNull(sql, "sql should not be null"); + this.dialect = Preconditions.checkNotNull(dialect, "dialect should not be null"); + this.schema = schema; + this.defaultCatalog = Preconditions.checkNotNull(defaultCatalog, "default catalog should not null"); + this.defaultNamespace = Preconditions.checkNotNull(defaultNamespace, "default namespace should not be null"); + this.fieldAliases = Preconditions.checkNotNull(fieldAliases, "field aliases should not be null"); + this.fieldComments = Preconditions.checkNotNull(fieldComments, "field comments should not be null"); + } + + @Override + public String sql() { + return sql; + } + + @Override + public String dialect() { + return dialect; + } + + @Override + public Schema schema() { + return schema; + } + + @Override + public String defaultCatalog() { + return defaultCatalog; + } + + @Override + public List defaultNamespace() { + return defaultNamespace; + } + + @Override + public List fieldAliases() { + return fieldAliases; + } + + @Override + public List fieldComments() { + return fieldComments; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BaseViewDefinition that = (BaseViewDefinition) o; + return Objects.equals(sql, that.sql) && + Objects.equals(dialect, that.dialect) && + Objects.equals(schema.asStruct(), that.schema.asStruct()) && + Objects.equals(defaultCatalog, that.defaultCatalog) && + Objects.equals(defaultNamespace, that.defaultNamespace) && + Objects.equals(fieldAliases, that.fieldAliases) && + Objects.equals(fieldComments, that.fieldComments); + } + + @Override + public int hashCode() { + return Objects.hash(sql, dialect, schema.asStruct(), defaultCatalog, defaultNamespace, fieldAliases, fieldComments); + } + + @Override + public String toString() { + return "BaseViewDefinition{" + + "sql='" + sql + '\'' + + ", dialect=" + dialect + + ", schema=" + schema + + ", defaultCatalog='" + defaultCatalog + '\'' + + ", defaultNamespace=" + defaultNamespace + + ", fieldAliases=" + fieldAliases + + ", fieldComments=" + fieldComments + + '}'; + } + + public static final class Builder { + + private String sql; + private String dialect = ""; + private Schema schema = new Schema(); + private String defaultCatalog = ""; + private List defaultNamespace = Collections.emptyList(); + private List fieldAliases = Collections.emptyList(); + private List fieldComments = Collections.emptyList(); + + private Builder() { + } + + public Builder sql(String value) { + sql = value; + return this; + } + + public Builder dialect(String value) { + dialect = value; + return this; + } + + public Builder schema(Schema value) { + schema = value; + return this; + } + + public Builder defaultCatalog(String value) { + defaultCatalog = value; + return this; + } + + public Builder defaultNamespace(List value) { + defaultNamespace = value; + return this; + } + + public Builder fieldAliases(List value) { + fieldAliases = value; + return this; + } + + public Builder fieldComments(List value) { + fieldComments = value; + return this; + } + + public BaseViewDefinition build() { + return new BaseViewDefinition( + sql, + dialect, + schema, + defaultCatalog, + defaultNamespace, + fieldAliases, + fieldComments); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewHistoryEntry.java b/core/src/main/java/org/apache/iceberg/view/BaseViewHistoryEntry.java new file mode 100644 index 000000000000..74e56809db89 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewHistoryEntry.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +public class BaseViewHistoryEntry implements ViewHistoryEntry { + private final long timestampMillis; + private final int versionId; + + static ViewHistoryEntry of(long timestampMillis, int versionId) { + return new BaseViewHistoryEntry(timestampMillis, versionId); + } + + private BaseViewHistoryEntry(long timestampMillis, int versionId) { + this.timestampMillis = timestampMillis; + this.versionId = versionId; + } + + @Override + public long timestampMillis() { + return timestampMillis; + } + + @Override + public int versionId() { + return versionId; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + BaseViewHistoryEntry that = (BaseViewHistoryEntry) other; + return timestampMillis == that.timestampMillis && versionId == that.versionId; + } + + @Override + public int hashCode() { + return Objects.hash(timestampMillis, versionId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("timestampMillis", timestampMillis) + .add("versionId", versionId) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewVersion.java b/core/src/main/java/org/apache/iceberg/view/BaseViewVersion.java new file mode 100644 index 000000000000..775f46354952 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewVersion.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class BaseViewVersion implements ViewVersion { + private final int versionId; + private final Integer parentId; + private final long timestampMillis; + private final Map summary; + private final List representations; + + public static Builder builder() { + return new Builder(); + } + + private BaseViewVersion( + int versionId, + Integer parentId, + long timestampMillis, + Map summary, + List representations) { + this.versionId = versionId; + this.parentId = parentId; + this.timestampMillis = timestampMillis; + this.summary = summary; + Preconditions.checkArgument(representations.size() > 0); + this.representations = representations; + } + + @Override + public int versionId() { + return versionId; + } + + @Override + public Integer parentId() { + return parentId; + } + + @Override + public long timestampMillis() { + return timestampMillis; + } + + @Override + public Map summary() { + return summary; + } + + @Override + public List representations() { + return representations; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BaseViewVersion that = (BaseViewVersion) o; + + if (versionId != that.versionId) { + return false; + } + if (timestampMillis != that.timestampMillis) { + return false; + } + if (!Objects.equals(parentId, that.parentId)) { + return false; + } + if (!Objects.equals(summary, that.summary)) { + return false; + } + return Objects.equals(representations, that.representations); + } + + @Override + public int hashCode() { + int result = versionId; + result = 31 * result + (parentId != null ? parentId.hashCode() : 0); + result = 31 * result + (int) (timestampMillis ^ (timestampMillis >>> 32)); + result = 31 * result + (summary != null ? summary.hashCode() : 0); + result = 31 * result + (representations != null ? representations.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "BaseViewVersion{" + + "versionId=" + versionId + + ", parentId=" + parentId + + ", timestampMillis=" + timestampMillis + + ", summary=" + summary + + ", representations=" + representations + + '}'; + } + + public static final class Builder { + private int versionId; + private Integer parentId; + private long timestampMillis; + private Map summary = Maps.newHashMap(); + private List representations = Lists.newArrayList(); + + private Builder() { + } + + public Builder versionId(int value) { + versionId = value; + return this; + } + + public Builder parentId(Integer value) { + parentId = value; + return this; + } + + public Builder timestampMillis(long value) { + timestampMillis = value; + return this; + } + + public Builder summary(Map value) { + summary = value; + return this; + } + + public Builder representations(List value) { + representations = value; + return this; + } + + public Builder addRepresentation(ViewRepresentation representation) { + representations.add(representation); + return this; + } + + public BaseViewVersion build() { + return new BaseViewVersion( + versionId, + parentId, + timestampMillis, + summary, + representations); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/HadoopViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/HadoopViewCatalog.java new file mode 100644 index 000000000000..67397031e9ec --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/HadoopViewCatalog.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RuntimeIOException; + +/** + * The Views implementation Based on FileIO. + */ +public class HadoopViewCatalog implements ViewCatalog, Configurable { + private Configuration conf; + + public HadoopViewCatalog(Configuration conf) { + this.conf = conf; + } + + @Override + public List listViews(Namespace namespace) { + throw new UnsupportedOperationException("TODO"); + } + + @Override + public View loadView(TableIdentifier identifier) { + String location = identifier.name(); + ViewOperations ops = newViewOps(location); + if (ops.current() == null) { + throw new NoSuchTableException("View does not exist at location: %s", location); + } + return new BaseView(ops, location); + } + + @Override + public View createView(TableIdentifier identifier, List representations, + Map properties) { + String location = identifier.name(); + ViewOperations ops = newViewOps(location); + if (ops.current() != null) { + throw new AlreadyExistsException("View already exists at location: %s", location); + } + + int parentId = -1; + + ViewUtil.doCommit(ViewDDLOperation.CREATE, 1, parentId, representations, properties, location, ops, null); + return new BaseView(ops, location); + } + + @Override + public boolean dropView(TableIdentifier identifier, boolean purge) { + String location = identifier.name(); + Path path = new Path(location); + try { + FileSystem fs = path.getFileSystem(conf); + return fs.delete(path, true); + } catch (IOException e) { + throw new RuntimeIOException("Failed to delete view metadata."); + } + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + throw new UnsupportedOperationException("TODO"); + } + + private ViewOperations newViewOps(String location) { + return new HadoopViewOperations(new Path(location), conf); + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/HadoopViewOperations.java b/core/src/main/java/org/apache/iceberg/view/HadoopViewOperations.java new file mode 100644 index 000000000000..149996481d60 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/HadoopViewOperations.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Map; +import java.util.UUID; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * The Views implementation Based on FileIO. + */ +public class HadoopViewOperations implements ViewOperations { + private final Configuration conf; + private final Path location; + private ViewMetadata currentMetadata = null; + private Integer version = null; + private boolean shouldRefresh = true; + private HadoopFileIO defaultFileIo = null; + + protected HadoopViewOperations(Path location, Configuration conf) { + this.conf = conf; + this.location = location; + } + + @Override + public ViewMetadata current() { + if (shouldRefresh) { + return refresh(); + } + return currentMetadata; + } + + private Path metadataFile(int otherVersion) { + return metadataPath("v" + otherVersion + ".json"); + } + + @Override + public ViewMetadata refresh() { + int ver = version != null ? version : readVersionHint(); + Path metadataFile = metadataFile(ver); + FileSystem fs = getFS(metadataFile, conf); + try { + // don't check if the file exists if version is non-null because it was already checked + if (version == null && !fs.exists(metadataFile)) { + if (ver == 0) { + // no v0 metadata means the table doesn't exist yet + return null; + } + throw new ValidationException("Metadata file is missing: %s", metadataFile); + } + + while (fs.exists(metadataFile(ver + 1))) { + ver += 1; + metadataFile = metadataFile(ver); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get file system for path: %s", metadataFile); + } + this.version = ver; + this.currentMetadata = ViewMetadataParser.read( + io().newInputFile(metadataFile.toString())); + this.shouldRefresh = false; + return currentMetadata; + } + + @Override + public FileIO io() { + if (defaultFileIo == null) { + defaultFileIo = new HadoopFileIO(conf); + } + return defaultFileIo; + } + + @Override + public void commit(ViewMetadata base, ViewMetadata metadata, Map properties) { + if (!base.equals(current())) { + throw new CommitFailedException("Cannot commit changes based on stale table metadata"); + } + + if (!base.equals(metadata)) { + return; + } + + Preconditions.checkArgument( + base == null || base.location().equals(metadata.location()), + "Hadoop path-based tables cannot be relocated"); + Preconditions.checkArgument( + !metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION), + "Hadoop path-based tables cannot relocate metadata"); + + Path tempMetadataFile = metadataPath(UUID.randomUUID() + ".json"); + ViewMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString())); + + int nextVersion = (version != null ? version + 1 : 1); + Path finalMetadataFile = metadataFile(nextVersion); + FileSystem fs = getFS(tempMetadataFile, conf); + + try { + if (fs.exists(finalMetadataFile)) { + throw new CommitFailedException( + "Version %d already exists: %s", nextVersion, finalMetadataFile); + } + } catch (IOException e) { + throw new RuntimeIOException(e, + "Failed to check if next version exists: %s", finalMetadataFile); + } + + try { + // this rename operation is the atomic commit operation + if (!fs.rename(tempMetadataFile, finalMetadataFile)) { + throw new CommitFailedException( + "Failed to commit changes using rename: %s", finalMetadataFile); + } + } catch (IOException e) { + throw new CommitFailedException(e, + "Failed to commit changes using rename: %s", finalMetadataFile); + } + + // update the best-effort version pointer + writeVersionHint(nextVersion); + + this.shouldRefresh = true; + } + + private Path metadataPath(String filename) { + return new Path(new Path(location, "metadata"), filename); + } + + private Path versionHintFile() { + return metadataPath("version-hint.text"); + } + + public static FileSystem getFS(Path path, Configuration conf) { + try { + return path.getFileSystem(conf); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get file system for path: %s", path); + } + } + + private int readVersionHint() { + Path versionHintFile = versionHintFile(); + try { + FileSystem fs = getFS(versionHintFile, conf); + if (!fs.exists(versionHintFile)) { + return 0; + } + + try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(versionHintFile), "UTF-8"))) { + String versionStr = in.readLine(); + if (versionStr != null) { + return Integer.parseInt(versionStr.replace("\n", "")); + } + return 0; + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to get file system for path: %s", versionHintFile); + } + } + + private void writeVersionHint(int otherVersion) { + Path versionHintFile = versionHintFile(); + FileSystem fs = getFS(versionHintFile, conf); + + try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */)) { + out.write(String.valueOf(otherVersion).getBytes("UTF-8")); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write version hint"); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/HasViewOperations.java b/core/src/main/java/org/apache/iceberg/view/HasViewOperations.java new file mode 100644 index 000000000000..d50a598eedd6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/HasViewOperations.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +/** + * Used to expose a view's ViewOperations. + */ +public interface HasViewOperations { + ViewOperations operations(); +} diff --git a/core/src/main/java/org/apache/iceberg/view/MetastoreViewCatalog.java b/core/src/main/java/org/apache/iceberg/view/MetastoreViewCatalog.java new file mode 100644 index 000000000000..e07b016948fb --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/MetastoreViewCatalog.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public abstract class MetastoreViewCatalog implements ViewCatalog { + + private final Configuration conf; + + public MetastoreViewCatalog(Configuration conf) { + this.conf = conf; + } + + protected abstract MetastoreViewOperations newViewOps(TableIdentifier viewName); + + protected String defaultWarehouseLocation(TableIdentifier viewIdentifier) { + throw new UnsupportedOperationException("Implementation for 'defaultWarehouseLocation' not provided."); + } + + @Override + public View createView( + TableIdentifier identifier, List representations, Map properties) { + ViewOperations ops = newViewOps(identifier); + if (ops.current() != null) { + throw new AlreadyExistsException("View already exists: %s", identifier); + } + + String location = defaultWarehouseLocation(identifier); + int parentId = -1; + + ViewUtil.doCommit(ViewDDLOperation.CREATE, 1, parentId, representations, properties, location, ops, null); + return new BaseView(ops, identifier.toString()); + } + + @Override + public View loadView(TableIdentifier identifier) { + ViewOperations ops = newViewOps(identifier); + if (ops.current() == null) { + throw new NotFoundException("View does not exist: %s", identifier); + } + return new BaseView(ops, identifier.toString()); + } + + protected TableIdentifier toCatalogTableIdentifier(String tableIdentifier) { + List namespace = Lists.newArrayList(); + Iterable parts = Splitter.on(".").split(tableIdentifier); + + String lastPart = ""; + for (String part : parts) { + if (!lastPart.isEmpty()) { + namespace.add(lastPart); + } + lastPart = part; + } + + Preconditions.checkState(namespace.size() >= 2, "namespace should have catalog and schema"); + + return TableIdentifier.of(Namespace.of(namespace.toArray(new String[0])), lastPart); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/MetastoreViewOperations.java b/core/src/main/java/org/apache/iceberg/view/MetastoreViewOperations.java new file mode 100644 index 000000000000..d1367bfede96 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/MetastoreViewOperations.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class MetastoreViewOperations implements ViewOperations { + private static final Logger LOG = LoggerFactory.getLogger(MetastoreViewOperations.class); + + public static final String METADATA_LOCATION_PROP = "metadata_location"; + public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; + + public static final String METADATA_FOLDER_NAME = "metadata"; + + private ViewMetadata currentMetadata = null; + private String currentMetadataLocation = null; + private boolean shouldRefresh = true; + private int version = -1; + + @Override + public ViewMetadata current() { + if (shouldRefresh) { + return refresh(); + } + return currentMetadata; + } + + public String currentMetadataLocation() { + return currentMetadataLocation; + } + + public int currentVersion() { + return version; + } + + protected void requestRefresh() { + this.shouldRefresh = true; + } + + protected String writeNewMetadata(ViewMetadata metadata, int newVersion) { + String newViewMetadataFilePath = newViewMetadataFilePath(metadata, newVersion); + OutputFile newMetadataLocation = io().newOutputFile(newViewMetadataFilePath); + + // write the new metadata + // use overwrite to avoid negative caching in S3. this is safe because the metadata location is + // always unique because it includes a UUID. + ViewMetadataParser.overwrite(metadata, newMetadataLocation); + return newViewMetadataFilePath; + } + + protected void refreshFromMetadataLocation( + String newLocation, Predicate shouldRetry, int numRetries) { + refreshFromMetadataLocation(newLocation, shouldRetry, numRetries, + metadataLocation -> ViewMetadataParser.read(io().newInputFile(metadataLocation))); + } + + protected void refreshFromMetadataLocation( + String newLocation, Predicate shouldRetry, + int numRetries, Function metadataLoader) { + // use null-safe equality check because new tables have a null metadata location + if (!Objects.equals(currentMetadataLocation, newLocation)) { + LOG.info("Refreshing table metadata from new version: {}", newLocation); + + AtomicReference newMetadata = new AtomicReference<>(); + Tasks.foreach(newLocation) + .retry(numRetries).exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .throwFailureWhenFinished() + .shouldRetryTest(shouldRetry) + .run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation))); + this.currentMetadata = newMetadata.get(); + this.currentMetadataLocation = newLocation; + this.version = parseVersion(newLocation); + } + this.shouldRefresh = false; + } + + private String metadataFileLocation(ViewMetadata metadata, String filename) { + return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename); + } + + private String newViewMetadataFilePath(ViewMetadata meta, int newVersion) { + return metadataFileLocation( + meta, + String.format("%05d-%s%s", newVersion, UUID.randomUUID(), ".metadata.json")); + } + + private static int parseVersion(String metadataLocation) { + int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 + int versionEnd = metadataLocation.indexOf('-', versionStart); + try { + return Integer.parseInt(metadataLocation.substring(versionStart, versionEnd)); + } catch (NumberFormatException e) { + LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e); + return -1; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewConstants.java b/core/src/main/java/org/apache/iceberg/view/ViewConstants.java new file mode 100644 index 000000000000..57cf80ca36fd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewConstants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +public class ViewConstants { + public static final String ENGINE_VERSION = "engine_version"; + public static final String OPERATION = "operation"; + public static final String OWNER = "owner"; + + /** + * All the properties except 'common_view' are stored in the View's Version Summary. + * 'operation' is supplied by the library and hence does not need to appear in the enum below. If you add a new + * constant that is specific to a + * version of the view, make sure to add it to the enum below. + */ + protected enum SummaryConstants { + engine_version + } + + private ViewConstants() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewDDLOperation.java b/core/src/main/java/org/apache/iceberg/view/ViewDDLOperation.java new file mode 100644 index 000000000000..9dcb00b93db1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewDDLOperation.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Locale; + +/** + * View operations that lead to a new version of view getting created. + *

+ * A version can return the operation that resulted in creating that version of the view. + * Users can inspect the operation to get more information in case a rollback is desired. + */ +public enum ViewDDLOperation { + + CREATE, + REPLACE; + + public String operationName() { + return name().toLowerCase(Locale.ENGLISH); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewDefinitionParser.java b/core/src/main/java/org/apache/iceberg/view/ViewDefinitionParser.java new file mode 100644 index 000000000000..e899036c136b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewDefinitionParser.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.util.JsonUtil; + +class ViewDefinitionParser { + private enum Field { + SQL("sql"), + DIALECT("dialect"), + SCHEMA("schema"), + DEFAULT_CATALOG("default-catalog"), + DEFAULT_NAMESPACE("default-namespace"), + FIELD_ALIASES("field-aliases"), + FIELD_COMMENTS("field-comments"); + + private final String name; + + Field(String name) { + this.name = name; + } + } + + static void toJson(ViewDefinition view, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeStringField(ViewRepresentationParser.Field.TYPE.fieldName(), view.type().typeName()); + generator.writeStringField(ViewDefinitionParser.Field.SQL.name, view.sql()); + generator.writeStringField(ViewDefinitionParser.Field.DIALECT.name, view.dialect()); + generator.writeFieldName(ViewDefinitionParser.Field.SCHEMA.name); + SchemaParser.toJson(view.schema(), generator); + generator.writeStringField(ViewDefinitionParser.Field.DEFAULT_CATALOG.name, view.defaultCatalog()); + JsonUtil.writeStringList(ViewDefinitionParser.Field.DEFAULT_NAMESPACE.name, view.defaultNamespace(), generator); + JsonUtil.writeStringList(ViewDefinitionParser.Field.FIELD_ALIASES.name, view.fieldAliases(), generator); + JsonUtil.writeStringList(ViewDefinitionParser.Field.FIELD_COMMENTS.name, view.fieldComments(), generator); + generator.writeEndObject(); + } + + static ViewDefinition fromJson(JsonNode node) { + return BaseViewDefinition.builder() + .sql(JsonUtil.getString(ViewDefinitionParser.Field.SQL.name, node)) + .dialect(JsonUtil.getString(ViewDefinitionParser.Field.DIALECT.name, node)) + .schema(JsonUtil.getObject(ViewDefinitionParser.Field.SCHEMA.name, node, SchemaParser::fromJson)) + .defaultCatalog(JsonUtil.getString(ViewDefinitionParser.Field.DEFAULT_CATALOG.name, node)) + .defaultNamespace(JsonUtil.getStringList(ViewDefinitionParser.Field.DEFAULT_NAMESPACE.name, node)) + .fieldAliases(JsonUtil.getStringList(ViewDefinitionParser.Field.FIELD_ALIASES.name, node)) + .fieldComments(JsonUtil.getStringList(ViewDefinitionParser.Field.FIELD_COMMENTS.name, node)) + .build(); + } + + private ViewDefinitionParser() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewHistoryEntryParser.java b/core/src/main/java/org/apache/iceberg/view/ViewHistoryEntryParser.java new file mode 100644 index 000000000000..6f7ad16039e8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewHistoryEntryParser.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import org.apache.iceberg.util.JsonUtil; + +class ViewHistoryEntryParser { + + // visible for testing + static final String VERSION_ID = "version-id"; + static final String TIMESTAMP_MS = "timestamp-ms"; + + static void toJson(ViewHistoryEntry entry, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(TIMESTAMP_MS, entry.timestampMillis()); + generator.writeNumberField(VERSION_ID, entry.versionId()); + generator.writeEndObject(); + } + + static ViewHistoryEntry fromJson(JsonNode node) { + return BaseViewHistoryEntry.of(JsonUtil.getLong(TIMESTAMP_MS, node), JsonUtil.getInt(VERSION_ID, node)); + } + + private ViewHistoryEntryParser() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java new file mode 100644 index 000000000000..81d1bdd18528 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadata.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Ordering; + +/** + * Metadata for versioning a view. + */ +public class ViewMetadata implements Serializable { + static final int DEFAULT_VIEW_FORMAT_VERSION = 1; + static final int SUPPORTED_VIEW_FORMAT_VERSION = 1; + + // stored metadata + private final int formatVersion; + private final String location; + private final Map properties; + private final int currentVersionId; + private final List versions; + private final Map versionsById; + private final List versionLog; + + public static Builder builder() { + return new Builder(); + } + + public static Builder buildFrom(ViewMetadata metadata) { + return builder() + .location(metadata.location()) + .properties(metadata.properties()) + .currentVersionId(metadata.currentVersionId()) + .versions(metadata.versions()) + .history(metadata.history()); + } + + // Creates a new view version metadata by simply assigning variables + private ViewMetadata( + String location, + Map properties, + int currentVersionId, + List versions, + List versionLog) { + this.formatVersion = DEFAULT_VIEW_FORMAT_VERSION; + this.location = location; + this.properties = properties; + this.currentVersionId = currentVersionId; + Preconditions.checkState(versions.size() > 0); + this.versions = versions; + checkVersionLog(versionLog); + this.versionLog = versionLog; + this.versionsById = indexVersions(versions); + } + + public int formatVersion() { + return formatVersion; + } + + public String location() { + return location; + } + + public Map properties() { + return properties; + } + + public ViewVersion version(int versionId) { + return versionsById.get(versionId); + } + + public ViewVersion currentVersion() { + return versionsById.get(currentVersionId); + } + + public int currentVersionId() { + return currentVersionId; + } + + public List versions() { + return versions; + } + + public List history() { + return versionLog; + } + + @Override + public String toString() { + return "ViewMetadata{" + + "formatVersion=" + formatVersion + + ", location='" + location + '\'' + + ", properties=" + properties + + ", currentVersionId=" + currentVersionId + + ", versions=" + versions + + ", versionsById=" + versionsById + + ", versionLog=" + versionLog + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ViewMetadata that = (ViewMetadata) o; + + if (formatVersion != that.formatVersion) { + return false; + } + if (currentVersionId != that.currentVersionId) { + return false; + } + if (!location.equals(that.location)) { + return false; + } + if (!properties.equals(that.properties)) { + return false; + } + if (!versions.equals(that.versions)) { + return false; + } + return versionLog.equals(that.versionLog); + } + + @Override + public int hashCode() { + int result = formatVersion; + result = 31 * result + location.hashCode(); + result = 31 * result + properties.hashCode(); + result = 31 * result + currentVersionId; + result = 31 * result + versions.hashCode(); + result = 31 * result + versionLog.hashCode(); + return result; + } + + private static void checkVersionLog(List versionLog) { + Preconditions.checkState(versionLog.size() > 0); + Preconditions.checkState( + Ordering.from(Comparator.comparing(ViewHistoryEntry::timestampMillis)).isOrdered(versionLog), + "[BUG] Expected sorted version log entries."); + } + + private static Map indexVersions(List versions) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (ViewVersion version : versions) { + builder.put(version.versionId(), version); + } + return builder.build(); + } + + public ViewMetadata replaceProperties(Map newProperties) { + ValidationException.check(newProperties != null, "Cannot set properties to null"); + + return buildFrom(this).properties(newProperties).build(); + } + + public static final class Builder { + private String location; + private Map properties = Maps.newHashMap(); + private int currentVersionId; + private List versions = Lists.newArrayList(); + private List versionLog = Lists.newArrayList(); + + private Builder() { + } + + public Builder location(String value) { + location = value; + return this; + } + + public Builder properties(Map value) { + properties = value; + return this; + } + + public Builder currentVersionId(int value) { + currentVersionId = value; + return this; + } + + public Builder versions(List value) { + versions = value; + return this; + } + + public Builder history(List value) { + versionLog = value; + return this; + } + + public Builder addVersion(ViewVersion version) { + versions.add(version); + versionLog.add(BaseViewHistoryEntry.of(version.timestampMillis(), version.versionId())); + return this; + } + + public Builder keepVersions(int numVersionsToKeep) { + if (versions.size() > numVersionsToKeep) { + versions = versions.subList(versions.size() - numVersionsToKeep, numVersionsToKeep); + } + return this; + } + + public Builder keepHistory(int numVersionsToKeep) { + if (versionLog.size() > numVersionsToKeep) { + versionLog = versionLog.subList(versionLog.size() - numVersionsToKeep, numVersionsToKeep); + } + return this; + } + + public ViewMetadata build() { + return new ViewMetadata(location, properties, currentVersionId, versions, versionLog); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java new file mode 100644 index 000000000000..d470195cb820 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewMetadataParser.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, ViewVersion 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.PropertyUtil; + +public class ViewMetadataParser { + + // visible for testing + static final String FORMAT_VERSION = "format-version"; + static final String LOCATION = "location"; + static final String CURRENT_VERSION_ID = "current-version-id"; + static final String VERSIONS = "versions"; + static final String VERSION_LOG = "version-log"; + static final String PROPERTIES = "properties"; + + public static void overwrite(ViewMetadata metadata, OutputFile outputFile) { + internalWrite(metadata, outputFile, true); + } + + public static void write(ViewMetadata metadata, OutputFile outputFile) { + internalWrite(metadata, outputFile, false); + } + + public static void internalWrite( + ViewMetadata metadata, OutputFile outputFile, boolean overwrite) { + OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create(); + try (OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8)) { + JsonGenerator generator = JsonUtil.factory().createGenerator(writer); + generator.useDefaultPrettyPrinter(); + toJson(metadata, generator); + generator.flush(); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile); + } + } + + public static void toJson(ViewMetadata metadata, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + + generator.writeNumberField(FORMAT_VERSION, metadata.formatVersion()); + generator.writeStringField(LOCATION, metadata.location()); + + JsonUtil.writeStringMap(PROPERTIES, metadata.properties(), generator); + + generator.writeNumberField(CURRENT_VERSION_ID, metadata.currentVersionId()); + JsonUtil.writeObjectList(VERSIONS, metadata.versions(), ViewVersionParser::toJson, generator); + + JsonUtil.writeObjectList(VERSION_LOG, metadata.history(), ViewHistoryEntryParser::toJson, generator); + + generator.writeEndObject(); + } + + public static ViewMetadata read(InputFile file) { + try (InputStream is = file.newStream()) { + return fromJson(JsonUtil.mapper().readValue(is, JsonNode.class)); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read file: %s", file); + } + } + + public static ViewMetadata fromJson(JsonNode node) { + Preconditions.checkArgument(node.isObject(), + "Cannot parse metadata from a non-object: %s", node); + + int formatVersion = JsonUtil.getInt(FORMAT_VERSION, node); + Preconditions.checkArgument(formatVersion <= ViewMetadata.SUPPORTED_VIEW_FORMAT_VERSION, + "Cannot read unsupported version %d", formatVersion); + + String location = JsonUtil.getString(LOCATION, node); + + int currentVersionId = JsonUtil.getInt(CURRENT_VERSION_ID, node); + + Map properties = JsonUtil.getStringMap(PROPERTIES, node); + + List versions = JsonUtil.getObjectList(VERSIONS, node, ViewVersionParser::fromJson); + + List history = JsonUtil.getObjectList(VERSION_LOG, node, ViewHistoryEntryParser::fromJson); + + int numVersionsToKeep = PropertyUtil.propertyAsInt(properties, + ViewProperties.VERSION_HISTORY_SIZE, ViewProperties.VERSION_HISTORY_SIZE_DEFAULT); + + return ViewMetadata.builder() + .location(location) + .currentVersionId(currentVersionId) + .properties(properties) + .versions(versions) + .keepVersions(numVersionsToKeep) + .history(history) + .keepHistory(numVersionsToKeep) + .build(); + } + + private ViewMetadataParser() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewOperations.java b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java new file mode 100644 index 000000000000..207e7e95661c --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewOperations.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Map; +import org.apache.iceberg.io.FileIO; + +/** + * SPI interface to abstract view metadata access and updates. + */ +public interface ViewOperations { + + /** + * Returns the currently loaded view metadata, without checking for updates. + * + * @return view version metadata + */ + ViewMetadata current(); + + /** + * Returns the current view metadata after checking for updates. + * + * @return view version metadata + */ + ViewMetadata refresh(); + + /** + * Replace the base metadata with a new version. + *

+ * This method should implement and document atomicity guarantees. + *

+ * Implementations must check that the base metadata is current to avoid overwriting updates. Once the atomic commit + * operation succeeds, implementations must not perform any operations that may fail because failure in this method + * cannot be distinguished from commit failure. + * + * @param base view metadata on which changes were based + * @param metadata new view metadata with updates + * @param properties view properties. + */ + void commit(ViewMetadata base, ViewMetadata metadata, Map properties); + + /** + * Returns a {@link FileIO} to read and write table data and metadata files + */ + FileIO io(); +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewProperties.java b/core/src/main/java/org/apache/iceberg/view/ViewProperties.java new file mode 100644 index 000000000000..8734edc0a2ba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewProperties.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +/** + * View properties that can be set during CREATE/REPLACE view or using updateProperties API. + */ +public class ViewProperties { + public static final String COMMIT_NUM_RETRIES = "commit.retry.num-retries"; + public static final int COMMIT_NUM_RETRIES_DEFAULT = 4; + + public static final String COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"; + public static final int COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100; + + public static final String COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"; + public static final int COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60000; // 1 minute + + public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"; + public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes + + public static final String VERSION_HISTORY_SIZE = "version.history.num-entries"; + public static final int VERSION_HISTORY_SIZE_DEFAULT = 10; + + public static final String TABLE_COMMENT = "comment"; + + private ViewProperties() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewPropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/ViewPropertiesUpdate.java new file mode 100644 index 000000000000..23710e65f054 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewPropertiesUpdate.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.Tasks; + +class ViewPropertiesUpdate implements ViewUpdateProperties { + private final ViewOperations ops; + private final Map updates = Maps.newHashMap(); + private final Set removals = Sets.newHashSet(); + private ViewMetadata base; + + ViewPropertiesUpdate(ViewOperations ops) { + this.ops = ops; + this.base = ops.current(); + } + + @Override + public ViewUpdateProperties set(String key, String value) { + Preconditions.checkNotNull(key, "Key cannot be null"); + Preconditions.checkNotNull(key, "Value cannot be null"); + Preconditions.checkArgument(!removals.contains(key), + "Cannot remove and update the same key: %s", key); + + updates.put(key, value); + + return this; + } + + @Override + public ViewUpdateProperties remove(String key) { + Preconditions.checkNotNull(key, "Key cannot be null"); + Preconditions.checkArgument(!updates.containsKey(key), + "Cannot remove and update the same key: %s", key); + + removals.add(key); + + return this; + } + + @Override + public Map apply() { + this.base = ops.refresh(); + + Map newProperties = Maps.newHashMap(); + for (Map.Entry entry : base.properties().entrySet()) { + if (!removals.contains(entry.getKey())) { + newProperties.put(entry.getKey(), entry.getValue()); + } + } + + newProperties.putAll(updates); + + return newProperties; + } + + @Override + public void commit() { + Tasks.foreach(ops) + .retry( + PropertyUtil.propertyAsInt(base.properties(), + ViewProperties.COMMIT_NUM_RETRIES, + ViewProperties.COMMIT_NUM_RETRIES_DEFAULT)) + .exponentialBackoff( + PropertyUtil.propertyAsInt(base.properties(), + ViewProperties.COMMIT_MIN_RETRY_WAIT_MS, + ViewProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt(base.properties(), + ViewProperties.COMMIT_MAX_RETRY_WAIT_MS, + ViewProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt(base.properties(), + ViewProperties.COMMIT_TOTAL_RETRY_TIME_MS, + ViewProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), + 2.0 /* exponential */) + .onlyRetryOn(CommitFailedException.class) + .run(taskOps -> { + Map newProperties = apply(); + ViewMetadata updated = base.replaceProperties(newProperties); + taskOps.commit(base, updated, Maps.newHashMap()); + }); + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewRepresentationParser.java b/core/src/main/java/org/apache/iceberg/view/ViewRepresentationParser.java new file mode 100644 index 000000000000..374fc58d7965 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewRepresentationParser.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Locale; + +class ViewRepresentationParser { + + enum Field { + TYPE; + + public String fieldName() { + return name().toLowerCase(Locale.ENGLISH); + } + } + + public static void toJson(ViewRepresentation representation, JsonGenerator generator) throws IOException { + switch (representation.type()) { + case SQL: + ViewDefinitionParser.toJson((ViewDefinition) representation, generator); + break; + + default: + throw new IllegalArgumentException(String.format("Unknown view representation type '%s' to serialize", + representation.type())); + } + } + + public static ViewRepresentation fromJson(JsonNode node) { + String typeName = node.get(Field.TYPE.fieldName()).asText(); + ViewRepresentation.Type type = ViewRepresentation.Type.parse(typeName); + + switch (type) { + case SQL: + return ViewDefinitionParser.fromJson(node); + + default: + throw new IllegalStateException(String.format("Unknown view representation type '%s' to deserialize", type)); + } + } + + private ViewRepresentationParser() { + } +} + diff --git a/core/src/main/java/org/apache/iceberg/view/ViewUtil.java b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java new file mode 100644 index 000000000000..0a13b1a93f0f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewUtil.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Utility methods for operating on common views + */ +public class ViewUtil { + /** + * Method picks and returns the 'summary' properties from the map of table properties. Summary properties are recorded + * in the 'summary' portion of 'Version' in metadata json file. + * + * @param operation The view operation that results in alteration of the view + * @param properties Map of all table properties + * @param prevProperties Properties previously set + * @return A map of summary properties to be recorded in the metadata json file. These are all previously set + * properties overlaid with the new properties. + */ + public static Map buildSummaryProperties( + ViewDDLOperation operation, + Map properties, + Map prevProperties) { + Map props = Maps.newHashMap(); + for (ViewConstants.SummaryConstants key : + ViewConstants.SummaryConstants.values()) { + String val = properties.get(key.name()); + if (val != null) { + props.put(String.valueOf(key), val); + } else if (prevProperties != null) { + val = prevProperties.get(key.name()); + if (val != null) { + props.put(String.valueOf(key), val); + } + } + } + props.put(ViewConstants.OPERATION, operation.operationName()); + return props; + } + + /** + * Method picks and returns common view specific properties from the map of table properties. These properties are + * recorded in the 'properties' section of the view version metadata file. Any properties that were previously set and + * are not being overridden are persisted. + * + * @param properties Map of all table properties + * @param prevProperties Properties that were previously set + * @param summaryProperties 'summary' portion of 'Version' in metadata json file. + * @return A map of properties to be recorded in the metadata json file. + */ + public static Map getViewVersionMetadataProperties( + Map properties, + Map prevProperties, Map summaryProperties) { + Map props = Maps.newHashMap(prevProperties); + props.putAll(properties); + props.keySet().removeAll(summaryProperties.keySet()); + return props; + } + + /** + * The method prepares the arguments to perform the commit and then proceeds to commit. + * + * @param operation View operation causing the commit + * @param versionId Current version id. + * @param parentId Version id of the parent version. + * @param representations View definition + * @param properties View properties + * @param location Location of view metadata + * @param ops View operations object needed to perform the commit + * @param prevViewMetadata Previous view version metadata + */ + public static void doCommit( + ViewDDLOperation operation, + int versionId, + int parentId, + List representations, + Map properties, + String location, + ViewOperations ops, + ViewMetadata prevViewMetadata) { + Map prevSummaryProps; + Map prevViewVersionMetadataProps; + + if (prevViewMetadata != null) { + prevSummaryProps = prevViewMetadata.currentVersion().summary(); + prevViewVersionMetadataProps = prevViewMetadata.properties(); + } else { + prevSummaryProps = Maps.newHashMap(); + prevViewVersionMetadataProps = Maps.newHashMap(); + } + + // The input set of view properties need to be classified in three sets of properties: + // 1) Summary properties: these are recorded with a particular version of the view + // (Defined in ViewConstants.java) + // 2) View version metadata properties: these are not versioned. These are all the other table properties + // that do not belong in 1) or 2) above. + Map summary = ViewUtil.buildSummaryProperties(operation, properties, prevSummaryProps); + + Map viewVersionMetadataProperties = + ViewUtil.getViewVersionMetadataProperties(properties, prevViewVersionMetadataProps, summary); + // add the owner field to the view version metadata props as it doesn't belong to the other 2 categories. + if (properties.get(ViewConstants.OWNER) != null) { + viewVersionMetadataProperties.put(ViewConstants.OWNER, properties.get(ViewConstants.OWNER)); + } + + BaseViewVersion version = BaseViewVersion.builder() + .versionId(versionId) + .parentId(parentId) + .timestampMillis(System.currentTimeMillis()) + .summary(summary) + .representations(representations) + .build(); + + ViewMetadata.Builder viewVersionMetadataBuilder = ViewMetadata.builder() + .location(location) + .properties(viewVersionMetadataProperties); + if (prevViewMetadata != null) { + viewVersionMetadataBuilder.versions(prevViewMetadata.versions()) + .history(prevViewMetadata.history()); + } + viewVersionMetadataBuilder.addVersion(version); + ViewMetadata viewMetadata = viewVersionMetadataBuilder.build(); + + ops.commit(prevViewMetadata, viewMetadata, viewVersionMetadataProperties); + } + + private ViewUtil() { + } +} diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionParser.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionParser.java new file mode 100644 index 000000000000..f39dbed86e74 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionParser.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.JsonUtil; + +class ViewVersionParser { + + private static final String VERSION_ID = "version-id"; + private static final String PARENT_VERSION_ID = "parent-version-id"; + private static final String TIMESTAMP_MS = "timestamp-ms"; + private static final String SUMMARY = "summary"; + private static final String OPERATION = "operation"; + private static final String REPRESENTATIONS = "representations"; + + static void toJson(ViewVersion version, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + + generator.writeNumberField(VERSION_ID, version.versionId()); + if (version.parentId() != null) { + generator.writeNumberField(PARENT_VERSION_ID, version.parentId()); + } + generator.writeNumberField(TIMESTAMP_MS, version.timestampMillis()); + JsonUtil.writeStringMap(SUMMARY, version.summary(), generator); + JsonUtil.writeObjectList(REPRESENTATIONS, version.representations(), ViewRepresentationParser::toJson, generator); + + generator.writeEndObject(); + } + + static ViewVersion fromJson(JsonNode node) { + Preconditions.checkArgument(node.isObject(), + "Cannot parse table version from a non-object: %s", node); + + int versionId = JsonUtil.getInt(VERSION_ID, node); + Integer parentId = JsonUtil.getIntOrNull(PARENT_VERSION_ID, node); + long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node); + Map summary = JsonUtil.getStringMap(SUMMARY, node); + List representations = + JsonUtil.getObjectList(REPRESENTATIONS, node, ViewRepresentationParser::fromJson); + + return BaseViewVersion.builder() + .versionId(versionId) + .parentId(parentId) + .timestampMillis(timestamp) + .summary(summary) + .representations(representations) + .build(); + } + + private ViewVersionParser() { + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java new file mode 100644 index 000000000000..a2879fd659ce --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.io.Writer; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class TestJsonUtil { + + @FunctionalInterface + public interface JsonStringWriter { + String write(T object); + } + + public static JsonStringWriter jsonStringWriter(JsonUtil.JsonWriter writer) { + return entry -> { + Writer jsonWriter = new StringWriter(); + try { + JsonGenerator generator = JsonUtil.factory().createGenerator(jsonWriter); + writer.write(entry, generator); + generator.flush(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return jsonWriter.toString(); + }; + } + + public static String toJsonString(T entry, JsonUtil.JsonWriter writer) { + return jsonStringWriter(writer).write(entry); + } + + public static JsonNode fromJsonString(String json) { + try { + return JsonUtil.mapper().readValue(json, JsonNode.class); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } + + @FunctionalInterface + public interface JsonStringReader { + T read(String json); + } + + public static JsonStringReader jsonStringReader(JsonUtil.JsonReader reader) { + return json -> reader.read(fromJsonString(json)); + } + + public static T fromJsonString(String json, JsonUtil.JsonReader reader) { + return jsonStringReader(reader).read(json); + } + + public static Collector joiningJsonArray() { + return Collectors.joining(",", "[", "]"); + } + + public static Collector joiningJsonObject() { + return Collectors.joining(",", "{", "}"); + } + + public static String arrayString(String... strings) { + return Stream.of(strings).collect(joiningJsonArray()); + } + + public static String objectString(String... strings) { + return Stream.of(strings).collect(joiningJsonObject()); + } + + private TestJsonUtil() { + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/ParserTestBase.java b/core/src/test/java/org/apache/iceberg/view/ParserTestBase.java new file mode 100644 index 000000000000..2b06f436f5bd --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/ParserTestBase.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.io.IOException; +import org.apache.iceberg.util.JsonUtil; +import org.apache.iceberg.util.TestJsonUtil; +import org.junit.Assert; +import org.junit.Test; + +public abstract class ParserTestBase { + + private final T entry; + private final String json; + private final JsonUtil.JsonWriter writer; + private final JsonUtil.JsonReader reader; + + public ParserTestBase(T entry, String json, JsonUtil.JsonWriter writer, JsonUtil.JsonReader reader) { + this.entry = entry; + this.json = json; + this.writer = writer; + this.reader = reader; + } + + @Test + public void toJson() throws IOException { + + String expected = TestJsonUtil.fromJsonString(json).toString(); + Assert.assertEquals(expected, TestJsonUtil.toJsonString(entry, writer)); + } + + @Test + public void fromJson() throws IOException { + Assert.assertEquals(entry, TestJsonUtil.fromJsonString(json, reader)); + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewDefinitionParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewDefinitionParser.java new file mode 100644 index 000000000000..6a19cc3d66a8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/TestViewDefinitionParser.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Arrays; +import java.util.Collection; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.TestJsonUtil; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +@RunWith(Parameterized.class) +public class TestViewDefinitionParser extends ParserTestBase { + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { + BaseViewDefinition.builder() + .sql("SELECT 'foo' foo") + .schema(new Schema(optional(1, "foo", Types.StringType.get()))) + .build(), + TestJsonUtil.objectString( + "\"type\":\"sql\"", + "\"sql\":\"SELECT 'foo' foo\"", + "\"dialect\":\"\"", + "\"schema\":" + TestJsonUtil.objectString( + "\"type\":\"struct\"", + "\"schema-id\":0", + "\"fields\":" + TestJsonUtil.arrayString( + TestJsonUtil.objectString( + "\"id\":1", + "\"name\":\"foo\"", + "\"required\":false", + "\"type\":\"string\""))), + "\"default-catalog\":\"\"", + "\"default-namespace\":[]", + "\"field-aliases\":[]", + "\"field-comments\":[]") + }, + { + BaseViewDefinition.builder() + .sql("SELECT 'foo' foo, 'foo2' foo2") + .schema(new Schema( + optional(1, "col1", Types.StringType.get(), "Comment col1"), + optional(2, "col2", Types.StringType.get(), "Comment col2"))) + .defaultCatalog("cat") + .defaultNamespace(Arrays.asList("part1", "part2")) + .fieldAliases(Arrays.asList("col1", "col2")) + .fieldComments(Arrays.asList("Comment col1", "Comment col2")) + .build(), + TestJsonUtil.objectString( + "\"type\":\"sql\"", + "\"sql\":\"SELECT 'foo' foo, 'foo2' foo2\"", + "\"dialect\":\"\"", + "\"schema\":" + TestJsonUtil.objectString( + "\"type\":\"struct\"", + "\"schema-id\":0", + "\"fields\":" + TestJsonUtil.arrayString( + TestJsonUtil.objectString( + "\"id\":1", + "\"name\":\"col1\"", + "\"required\":false", + "\"type\":\"string\"", + "\"doc\":\"Comment col1\""), + TestJsonUtil.objectString( + "\"id\":2", + "\"name\":\"col2\"", + "\"required\":false", + "\"type\":\"string\"", + "\"doc\":\"Comment col2\""))), + "\"default-catalog\":\"cat\"", + "\"default-namespace\":[\"part1\",\"part2\"]", + "\"field-aliases\":[\"col1\",\"col2\"]", + "\"field-comments\":[\"Comment col1\",\"Comment col2\"]") + }, + { + BaseViewDefinition.builder() + .sql("SELECT 'foo' foo") + .build(), + TestJsonUtil.objectString( + "\"type\":\"sql\"", + "\"sql\":\"SELECT 'foo' foo\"", + "\"dialect\":\"\"", + "\"schema\":" + TestJsonUtil.objectString( + "\"type\":\"struct\"", + "\"schema-id\":0", + "\"fields\":[]"), + "\"default-catalog\":\"\"", + "\"default-namespace\":[]", + "\"field-aliases\":[]", + "\"field-comments\":[]") + } + }); + } + + public TestViewDefinitionParser(ViewDefinition entry, String json) { + super(entry, json, ViewDefinitionParser::toJson, ViewDefinitionParser::fromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewHistoryEntryParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewHistoryEntryParser.java new file mode 100644 index 000000000000..3e7471d43d2d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/TestViewHistoryEntryParser.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Arrays; +import java.util.Collection; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestViewHistoryEntryParser extends ParserTestBase { + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { + BaseViewHistoryEntry.of(4353L, 1), + "{\"timestamp-ms\":4353,\"version-id\":1}" + } + }); + } + + public TestViewHistoryEntryParser(ViewHistoryEntry entry, String json) { + super(entry, json, ViewHistoryEntryParser::toJson, ViewHistoryEntryParser::fromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java new file mode 100644 index 000000000000..991beb777875 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/TestViewMetadataParser.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.util.TestJsonUtil; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestViewMetadataParser extends ParserTestBase { + + private static ViewVersion version1 = BaseViewVersion.builder() + .versionId(1) + .timestampMillis(4353L) + .addRepresentation(BaseViewDefinition.builder() + .sql("select 'foo' foo") + .build()) + .build(); + private static ViewHistoryEntry historyEntry1 = BaseViewHistoryEntry.of(4353L, 1); + + private static ViewVersion version2 = BaseViewVersion.builder() + .versionId(2) + .timestampMillis(5555L) + .addRepresentation(BaseViewDefinition.builder() + .sql("select 1 id, 'abc' data") + .build()) + .build(); + private static ViewHistoryEntry historyEntry2 = BaseViewHistoryEntry.of(5555L, 2); + + private static TestJsonUtil.JsonStringWriter versionJsonStringWriter = + TestJsonUtil.jsonStringWriter(ViewVersionParser::toJson); + + private static TestJsonUtil.JsonStringWriter historyEntryJsonStringWriter = + TestJsonUtil.jsonStringWriter(ViewHistoryEntryParser::toJson); + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { + ViewMetadata.builder() + .location("/location/dummy") + .currentVersionId(2) + .versions(Stream.of(version1, version2).collect(Collectors.toList())) + .history(Stream.of(historyEntry1, historyEntry2).collect(Collectors.toList())) + .build(), + TestJsonUtil.objectString( + "\"format-version\":1", + "\"location\":\"/location/dummy\"", + "\"properties\":{}", + "\"current-version-id\":2", + "\"versions\":" + + Stream.of(version1, version2) + .map(versionJsonStringWriter::write) + .collect(TestJsonUtil.joiningJsonArray()), + "\"version-log\":" + + Stream.of(historyEntry1, historyEntry2) + .map(historyEntryJsonStringWriter::write) + .collect(TestJsonUtil.joiningJsonArray())) + } + }); + } + + public TestViewMetadataParser(ViewMetadata entry, String json) { + super(entry, json, ViewMetadataParser::toJson, ViewMetadataParser::fromJson); + } +} diff --git a/core/src/test/java/org/apache/iceberg/view/TestViewRepresentationParser.java b/core/src/test/java/org/apache/iceberg/view/TestViewRepresentationParser.java new file mode 100644 index 000000000000..7decd7d3d926 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/view/TestViewRepresentationParser.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.view; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.TestJsonUtil; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +@RunWith(Parameterized.class) +public class TestViewRepresentationParser { + + private static ViewDefinition sql1 = BaseViewDefinition.builder() + .sql("SELECT 'foo' foo") + .schema(new Schema(optional(1, "foo", Types.StringType.get()))) + .build(); + + private ViewRepresentation viewRepresentation; + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {sql1} + }); + } + + public TestViewRepresentationParser(ViewRepresentation viewRepresentation) { + this.viewRepresentation = viewRepresentation; + } + + @Test + public void typeField() throws IOException { + String json = TestJsonUtil.toJsonString(viewRepresentation, ViewRepresentationParser::toJson); + JsonNode jsonNode = TestJsonUtil.fromJsonString(json); + String actual = jsonNode.get(ViewRepresentationParser.Field.TYPE.fieldName()).asText(); + Assert.assertEquals(viewRepresentation.type().typeName(), actual); + } + + @Test + public void roundTrip() throws IOException { + String json = TestJsonUtil.toJsonString(viewRepresentation, ViewRepresentationParser::toJson); + JsonNode jsonNode = TestJsonUtil.fromJsonString(json); + ViewRepresentation actual = ViewRepresentationParser.fromJson(jsonNode); + Assert.assertEquals(viewRepresentation, actual); + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 43868298944c..408abfd956db 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -33,7 +33,6 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hadoop.HadoopFileIO; @@ -43,10 +42,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.projectnessie.client.NessieClientBuilder; import org.projectnessie.client.NessieConfigConstants; import org.projectnessie.client.api.NessieApiV1; -import org.projectnessie.client.http.HttpClientBuilder; import org.projectnessie.model.ContentKey; import org.projectnessie.model.TableReference; import org.slf4j.Logger; @@ -85,9 +82,11 @@ public void initialize(String name, Map options) { final Function removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, ""); final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF)); String requestedHash = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF_HASH)); - NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL)) - .fromConfig(x -> options.get(removePrefix.apply(x))) - .build(NessieApiV1.class); + NessieApiV1 api = + NessieUtil.createNessieClientBuilder( + options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL)) + .fromConfig(x -> options.get(removePrefix.apply(x))) + .build(NessieApiV1.class); initialize(name, new NessieIcebergClient(api, requestedRef, requestedHash, catalogOptions), @@ -109,54 +108,13 @@ public void initialize(String name, NessieIcebergClient client, FileIO fileIO, M this.client = Preconditions.checkNotNull(client, "client must be non-null"); this.fileIO = Preconditions.checkNotNull(fileIO, "fileIO must be non-null"); this.catalogOptions = Preconditions.checkNotNull(catalogOptions, "catalogOptions must be non-null"); - this.warehouseLocation = validateWarehouseLocation(name, catalogOptions); + this.warehouseLocation = NessieUtil.validateWarehouseLocation(name, catalogOptions); this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(client); closeableGroup.addCloseable(fileIO); closeableGroup.setSuppressCloseFailure(true); } - @SuppressWarnings("checkstyle:HiddenField") - private String validateWarehouseLocation(String name, Map catalogOptions) { - String warehouseLocation = catalogOptions.get(CatalogProperties.WAREHOUSE_LOCATION); - if (warehouseLocation == null) { - // Explicitly log a warning, otherwise the thrown exception can get list in the "silent-ish catch" - // in o.a.i.spark.Spark3Util.catalogAndIdentifier(o.a.s.sql.SparkSession, List, - // o.a.s.sql.connector.catalog.CatalogPlugin) - // in the code block - // Pair catalogIdentifier = SparkUtil.catalogAndIdentifier(nameParts, - // catalogName -> { - // try { - // return catalogManager.catalog(catalogName); - // } catch (Exception e) { - // return null; - // } - // }, - // Identifier::of, - // defaultCatalog, - // currentNamespace - // ); - LOG.warn("Catalog creation for inputName={} and options {} failed, because parameter " + - "'warehouse' is not set, Nessie can't store data.", name, catalogOptions); - throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data."); - } - return warehouseLocation; - } - - private static NessieClientBuilder createNessieClientBuilder(String customBuilder) { - NessieClientBuilder clientBuilder; - if (customBuilder != null) { - try { - clientBuilder = DynMethods.builder("builder").impl(customBuilder).build().asStatic().invoke(); - } catch (Exception e) { - throw new RuntimeException(String.format("Failed to use custom NessieClientBuilder '%s'.", customBuilder), e); - } - } else { - clientBuilder = HttpClientBuilder.builder(); - } - return clientBuilder; - } - @Override public void close() throws IOException { if (null != closeableGroup) { diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java index 39adeba671e1..f4a34ce5836f 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -53,6 +53,7 @@ import org.projectnessie.model.EntriesResponse; import org.projectnessie.model.GetNamespacesResponse; import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.IcebergView; import org.projectnessie.model.Operation; import org.projectnessie.model.Reference; import org.projectnessie.model.Tag; @@ -118,6 +119,14 @@ private UpdateableReference loadReference(String requestedRef, String hash) { } public List listTables(Namespace namespace) { + return listContentTypeFor(namespace, Content.Type.ICEBERG_TABLE); + } + + public List listViews(Namespace namespace) { + return listContentTypeFor(namespace, Content.Type.ICEBERG_VIEW); + } + + public List listContentTypeFor(Namespace namespace, Content.Type type) { try { return api.getEntries() .reference(getRef().getReference()) @@ -125,11 +134,11 @@ public List listTables(Namespace namespace) { .getEntries() .stream() .filter(namespacePredicate(namespace)) - .filter(e -> Content.Type.ICEBERG_TABLE == e.getType()) + .filter(e -> type == e.getType()) .map(this::toIdentifier) .collect(Collectors.toList()); } catch (NessieNotFoundException ex) { - throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref '%s'", getRef().getName()); + throw new NoSuchNamespaceException(ex, "Unable to list %s due to missing ref '%s'", type, getRef().getName()); } } @@ -165,6 +174,16 @@ public IcebergTable table(TableIdentifier tableIdentifier) { } } + public IcebergView view(TableIdentifier tableIdentifier) { + try { + ContentKey key = NessieUtil.toKey(tableIdentifier); + Content view = api.getContent().key(key).reference(getRef().getReference()).get().get(key); + return view != null ? view.unwrap(IcebergView.class).orElse(null) : null; + } catch (NessieNotFoundException e) { + return null; + } + } + public void createNamespace(Namespace namespace, Map metadata) { try { getRef().checkMutable(); @@ -290,6 +309,33 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { .operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable)) .operation(Operation.Delete.of(NessieUtil.toKey(from))); + renameTableOrView(from, to, operations, false); + } + + public void renameView(TableIdentifier from, TableIdentifier to) { + getRef().checkMutable(); + + IcebergView existingFromView = view(from); + if (existingFromView == null) { + throw new NoSuchTableException("View does not exist: %s", from.name()); + } + IcebergView existingToView = view(to); + if (existingToView != null) { + throw new AlreadyExistsException("View already exists: %s", to.name()); + } + + CommitMultipleOperationsBuilder operations = getApi().commitMultipleOperations() + .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename view from '%s' to '%s'", + from, to), catalogOptions)) + .operation(Operation.Put.of(NessieUtil.toKey(to), existingFromView, existingFromView)) + .operation(Operation.Delete.of(NessieUtil.toKey(from))); + + renameTableOrView(from, to, operations, false); + } + + private void renameTableOrView(TableIdentifier from, TableIdentifier to, + CommitMultipleOperationsBuilder operations, boolean isView) { + String item = isView ? "view" : "table"; try { Tasks.foreach(operations) .retry(5) @@ -307,11 +353,11 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { // another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to // a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user // and removed by another. - throw new RuntimeException(String.format("Cannot rename table '%s' to '%s': " + - "ref '%s' no longer exists.", from.name(), to.name(), getRef().getName()), e); + throw new RuntimeException(String.format("Cannot rename %s '%s' to '%s': " + + "ref '%s' no longer exists.", item, from.name(), to.name(), getRef().getName()), e); } catch (BaseNessieClientServerException e) { - throw new CommitFailedException(e, "Cannot rename table '%s' to '%s': " + - "the current reference is not up to date.", from.name(), to.name()); + throw new CommitFailedException(e, "Cannot rename %s '%s' to '%s': " + + "the current reference is not up to date.", item, from.name(), to.name()); } catch (HttpClientException ex) { // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant // to catch all kinds of network errors (e.g. connection reset). Network code implementation @@ -326,19 +372,35 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { } public boolean dropTable(TableIdentifier identifier, boolean purge) { + return dropTableOrView(identifier, purge, false); + } + + public boolean dropView(TableIdentifier identifier, boolean purge) { + return dropTableOrView(identifier, purge, true); + } + + private boolean dropTableOrView(TableIdentifier identifier, boolean purge, boolean isView) { getRef().checkMutable(); - IcebergTable existingTable = table(identifier); - if (existingTable == null) { - return false; + String item = "table"; + + if (isView) { + item = "view"; + if (view(identifier) == null) { + return false; + } + } else { + if (table(identifier) == null) { + return false; + } } if (purge) { - LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString()); + LOG.info("Purging data for {} {} was set to true but is ignored", item, identifier.toString()); } CommitMultipleOperationsBuilder commitBuilderBase = getApi().commitMultipleOperations() - .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier), + .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete %s %s", item, identifier), catalogOptions)) .operation(Operation.Delete.of(NessieUtil.toKey(identifier))); @@ -358,11 +420,11 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { }, BaseNessieClientServerException.class); threw = false; } catch (NessieConflictException e) { - LOG.error("Cannot drop table: failed after retry (update ref '{}' and retry)", getRef().getName(), e); + LOG.error("Cannot drop {}: failed after retry (update ref '{}' and retry)", item, getRef().getName(), e); } catch (NessieNotFoundException e) { - LOG.error("Cannot drop table: ref '{}' is no longer valid.", getRef().getName(), e); + LOG.error("Cannot drop {}: ref '{}' is no longer valid.", item, getRef().getName(), e); } catch (BaseNessieClientServerException e) { - LOG.error("Cannot drop table: unknown error", e); + LOG.error("Cannot drop {}: unknown error", item, e); } return !threw; } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index d53d34c12272..ed48de052ed8 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -34,17 +34,24 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.util.JsonUtil; +import org.projectnessie.client.NessieClientBuilder; +import org.projectnessie.client.http.HttpClientBuilder; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.ContentKey; import org.projectnessie.model.IcebergTable; import org.projectnessie.model.ImmutableCommitMeta; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class NessieUtil { + private static final Logger LOG = LoggerFactory.getLogger(NessieUtil.class); + public static final String NESSIE_CONFIG_PREFIX = "nessie."; static final String APPLICATION_TYPE = "application-type"; @@ -131,4 +138,44 @@ static JsonNode tableMetadataAsJsonNode(TableMetadata metadata) { } return newMetadata; } + + static NessieClientBuilder createNessieClientBuilder(String customBuilder) { + NessieClientBuilder clientBuilder; + if (customBuilder != null) { + try { + clientBuilder = DynMethods.builder("builder").impl(customBuilder).build().asStatic().invoke(); + } catch (Exception e) { + throw new RuntimeException(String.format("Failed to use custom NessieClientBuilder '%s'.", customBuilder), e); + } + } else { + clientBuilder = HttpClientBuilder.builder(); + } + return clientBuilder; + } + + static String validateWarehouseLocation(String name, Map catalogOptions) { + String warehouseLocation = catalogOptions.get(CatalogProperties.WAREHOUSE_LOCATION); + if (warehouseLocation == null) { + // Explicitly log a warning, otherwise the thrown exception can get list in the "silent-ish catch" + // in o.a.i.spark.Spark3Util.catalogAndIdentifier(o.a.s.sql.SparkSession, List, + // o.a.s.sql.connector.catalog.CatalogPlugin) + // in the code block + // Pair catalogIdentifier = SparkUtil.catalogAndIdentifier(nameParts, + // catalogName -> { + // try { + // return catalogManager.catalog(catalogName); + // } catch (Exception e) { + // return null; + // } + // }, + // Identifier::of, + // defaultCatalog, + // currentNamespace + // ); + LOG.warn("Catalog creation for inputName={} and options {} failed, because parameter " + + "'warehouse' is not set, Nessie can't store data.", name, catalogOptions); + throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data."); + } + return warehouseLocation; + } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewCatalog.java new file mode 100644 index 000000000000..8befa3aa8ce1 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewCatalog.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.view.MetastoreViewCatalog; +import org.apache.iceberg.view.MetastoreViewOperations; +import org.projectnessie.client.NessieConfigConstants; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.TableReference; + +public class NessieViewCatalog extends MetastoreViewCatalog implements AutoCloseable { + + private static final Joiner SLASH = Joiner.on("/"); + private NessieIcebergClient client; + private String warehouseLocation; + private final Configuration config; + private String name; + private FileIO fileIO; + private Map catalogOptions; + private CloseableGroup closeableGroup; + + public NessieViewCatalog(Configuration conf) { + super(conf); + this.config = conf; + } + + @SuppressWarnings("checkstyle:HiddenField") + @Override + public void initialize(String name, Map options) { + Map catalogOptions = ImmutableMap.copyOf(options); + String fileIOImpl = options.get(CatalogProperties.FILE_IO_IMPL); + // remove nessie prefix + final Function removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, ""); + final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF)); + String requestedHash = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF_HASH)); + NessieApiV1 api = + NessieUtil.createNessieClientBuilder( + options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL)) + .fromConfig(x -> options.get(removePrefix.apply(x))) + .build(NessieApiV1.class); + + initialize(name, + new NessieIcebergClient(api, requestedRef, requestedHash, catalogOptions), + fileIOImpl == null ? new HadoopFileIO(config) : CatalogUtil.loadFileIO(fileIOImpl, options, config), + catalogOptions); + } + + /** + * An alternative way to initialize the catalog using a pre-configured {@link NessieIcebergClient} and {@link FileIO} + * instance. + * @param name The name of the catalog, defaults to "nessie" if null + * @param client The pre-configured {@link NessieIcebergClient} instance to use + * @param fileIO The {@link FileIO} instance to use + * @param catalogOptions The catalog options to use + */ + @SuppressWarnings("checkstyle:HiddenField") + public void initialize(String name, NessieIcebergClient client, FileIO fileIO, Map catalogOptions) { + this.name = name == null ? "nessie" : name; + this.client = Preconditions.checkNotNull(client, "client must be non-null"); + this.fileIO = Preconditions.checkNotNull(fileIO, "fileIO must be non-null"); + this.catalogOptions = Preconditions.checkNotNull(catalogOptions, "catalogOptions must be non-null"); + this.warehouseLocation = NessieUtil.validateWarehouseLocation(name, catalogOptions); + this.closeableGroup = new CloseableGroup(); + closeableGroup.addCloseable(client); + closeableGroup.addCloseable(fileIO); + closeableGroup.setSuppressCloseFailure(true); + } + + @Override + public String name() { + return name; + } + + @Override + protected String defaultWarehouseLocation(TableIdentifier view) { + if (view.hasNamespace()) { + return SLASH.join(warehouseLocation, view.namespace().toString(), view.name()); + } + return SLASH.join(warehouseLocation, view.name()); + } + + @Override + public List listViews(Namespace namespace) { + return client.listViews(namespace); + } + + @Override + public boolean dropView(TableIdentifier identifier) { + return client.dropView(identifier, false); + } + + @Override + public boolean dropView(TableIdentifier identifier, boolean purge) { + return client.dropView(identifier, purge); + } + + @Override + public void renameView(TableIdentifier from, TableIdentifier to) { + client.renameView(from, NessieUtil.removeCatalogName(to, name())); + } + + @Override + protected MetastoreViewOperations newViewOps(TableIdentifier viewName) { + TableReference tr = TableReference.parse(viewName.name()); + Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid view name: # is only allowed for hashes (reference by " + + "timestamp is not supported)"); + return new NessieViewOperations( + ContentKey.of(org.projectnessie.model.Namespace.of(viewName.namespace().levels()), tr.getName()), + client.withReference(tr.getReference(), tr.getHash()), + fileIO, + catalogOptions); + } + + @Override + public void close() throws Exception { + if (null != closeableGroup) { + closeableGroup.close(); + } + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java new file mode 100644 index 000000000000..b6c0d37992f0 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieViewOperations.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.view.BaseViewVersion; +import org.apache.iceberg.view.MetastoreViewOperations; +import org.apache.iceberg.view.ViewHistoryEntry; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.iceberg.view.ViewMetadataParser; +import org.apache.iceberg.view.ViewVersion; +import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergView; +import org.projectnessie.model.ImmutableCommitMeta; +import org.projectnessie.model.ImmutableIcebergView; +import org.projectnessie.model.Operation; +import org.projectnessie.model.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NessieViewOperations extends MetastoreViewOperations { + + private static final Logger LOG = LoggerFactory.getLogger(NessieViewOperations.class); + + /** + * Name of the `{@link TableMetadata} property that holds the Nessie commit-ID from which the + * metadata has been loaded. + */ + public static final String NESSIE_COMMIT_ID_PROPERTY = "nessie.commit.id"; + + private final NessieIcebergClient client; + private final ContentKey key; + private final FileIO fileIO; + private final Map catalogOptions; + private IcebergView icebergView; + + NessieViewOperations( + ContentKey key, + NessieIcebergClient client, + FileIO fileIO, + Map catalogOptions) { + this.key = key; + this.client = client; + this.fileIO = fileIO; + this.catalogOptions = catalogOptions; + } + + + @Override + public ViewMetadata refresh() { + try { + client.refresh(); + } catch (NessieNotFoundException e) { + throw new RuntimeException(String.format("Failed to refresh as ref '%s' " + + "is no longer valid.", client.getRef().getName()), e); + } + String metadataLocation = null; + Reference reference = client.getRef().getReference(); + try { + Content content = + client + .getApi() + .getContent() + .key(key) + .reference(reference) + .get() + .get(key); + LOG.debug("Content '{}' at '{}': {}", key, reference, content); + if (content == null) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("No such view %s in %s", key, reference); + } + } else { + this.icebergView = + content + .unwrap(IcebergView.class) + .orElseThrow( + () -> + new IllegalStateException(String.format( + "Cannot refresh iceberg view: Nessie points to a non-Iceberg object for path: %s.", key))); + metadataLocation = icebergView.getMetadataLocation(); + } + } catch (NessieNotFoundException ex) { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException(ex, "No such view %s", key); + } + } + refreshFromMetadataLocation(metadataLocation, null, 2, l -> loadViewMetadata(l, reference)); + return current(); + } + + private ViewMetadata loadViewMetadata(String metadataLocation, Reference reference) { + ViewMetadata metadata = + ViewMetadataParser.read(io().newInputFile(metadataLocation)); + Optional viewVersion = + metadata.versions().stream() + .filter(version -> version.versionId() == icebergView.getVersionId()) + .findFirst(); + if (viewVersion.isPresent()) { + ViewVersion version = viewVersion.get(); + BaseViewVersion baseViewVersion = + BaseViewVersion.builder() + .versionId(version.versionId()) + .parentId(version.parentId()) + .timestampMillis(version.timestampMillis()) + .summary(version.summary()) + .representations(version.representations()) + .build(); + + List versions = getVersionsUntil(metadata, icebergView.getVersionId()); + List history = getHistoryEntriesUntil(metadata, icebergView.getVersionId()); + metadata = + ViewMetadata.builder() + .location(metadata.location()) + .addVersion(baseViewVersion) + .properties(metadata.properties()) + .versions(versions) + .history(history) + .currentVersionId(baseViewVersion.versionId()) + .build(); + } + + return metadata; + } + + private List getHistoryEntriesUntil(ViewMetadata metadata, int versionId) { + List history = Lists.newArrayList(); + for (ViewHistoryEntry entry : metadata.history()) { + if (entry.versionId() == versionId) { + break; + } + history.add(entry); + } + return history; + } + + private List getVersionsUntil(ViewMetadata metadata, int versionId) { + List versions = Lists.newArrayList(); + for (ViewVersion version : metadata.versions()) { + if (version.versionId() == versionId) { + break; + } + versions.add(version); + } + return versions; + } + + @Override + public void commit(ViewMetadata base, ViewMetadata metadata, Map properties) { + UpdateableReference updateableReference = client.getRef(); + + updateableReference.checkMutable(); + + Branch current = updateableReference.getAsBranch(); + Branch expectedHead = current; + // TODO +// if (base != null) { +// String metadataCommitId = base.property(NESSIE_COMMIT_ID_PROPERTY, expectedHead.getHash()); +// if (metadataCommitId != null) { +// expectedHead = Branch.of(expectedHead.getName(), metadataCommitId); +// } +// } + + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + + boolean delete = true; + try { + ImmutableIcebergView.Builder viewBuilder = ImmutableIcebergView.builder(); + if (icebergView != null) { + viewBuilder.id(icebergView.getId()); + } + + // TODO: read schema/dialect/sql from view defintion + IcebergView newView = + viewBuilder + .metadataLocation(newMetadataLocation) + .versionId(metadata.currentVersionId()) + .schemaId(23) + .dialect("TODO") + //.sqlText(metadata.currentVersion().representations().get(0.definition().sql()) + .build(); + + + LOG.debug("Committing '{}' against '{}', current is '{}': {}", key, expectedHead, + current.getHash(), newView); + ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder(); + builder.message(buildCommitMsg(base, metadata)); + Branch branch = client.getApi().commitMultipleOperations() + .operation(Operation.Put.of(key, newView, icebergView)) + .commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build()) + .branch(expectedHead) + .commit(); + LOG.info("Committed '{}' against '{}', expected commit-id was '{}'", key, branch, + expectedHead.getHash()); + updateableReference.updateReference(branch); + + delete = false; + } catch (NessieConflictException ex) { + throw new CommitFailedException(ex, "Cannot commit: Reference hash is out of date. " + + "Update the reference '%s' and try again", updateableReference.getName()); + } catch (HttpClientException ex) { + // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant + // to catch all kinds of network errors (e.g. connection reset). Network code implementation + // details and all kinds of network devices can induce unexpected behavior. So better be + // safe than sorry. + delete = false; + throw new CommitStateUnknownException(ex); + } catch (NessieNotFoundException ex) { + throw new RuntimeException( + String.format("Cannot commit: Reference '%s' no longer exists", + updateableReference.getName()), ex); + } finally { + if (delete) { + io().deleteFile(newMetadataLocation); + } + } + } + + private String buildCommitMsg(ViewMetadata base, ViewMetadata metadata) { + if (base != null && metadata.currentVersionId() != base.currentVersionId()) { + return "Iceberg schema change against view "; + } + return "Iceberg commit against view %s"; + } + + @Override + public FileIO io() { + return fileIO; + } +} diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java index 9c6c98058536..680de01730f9 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/BaseTestIceberg.java @@ -84,6 +84,7 @@ public abstract class BaseTestIceberg { public Path temp; protected NessieCatalog catalog; + protected NessieViewCatalog viewCatalog; protected NessieApiV1 api; protected Configuration hadoopConfig; protected final String branch; @@ -119,6 +120,7 @@ public void beforeEach(@NessieUri URI nessieUri) throws IOException { hadoopConfig = new Configuration(); catalog = initCatalog(branch); + viewCatalog = initViewCatalog(branch); } NessieCatalog initCatalog(String ref) { @@ -140,6 +142,25 @@ NessieCatalog initCatalog(String ref, String hash) { return newCatalog; } + NessieViewCatalog initViewCatalog(String ref) { + return initViewCatalog(ref, null); + } + + NessieViewCatalog initViewCatalog(String ref, String hash) { + NessieViewCatalog newCatalog = new NessieViewCatalog(hadoopConfig); + ImmutableMap.Builder options = ImmutableMap.builder() + .put("ref", ref) + .put(CatalogProperties.URI, uri) + .put("auth-type", "NONE") + .put(CatalogProperties.WAREHOUSE_LOCATION, temp.toUri().toString()); + if (null != hash) { + options.put("ref.hash", hash); + } + newCatalog.initialize("nessie", options.build()); + return newCatalog; + } + + protected Table createTable(TableIdentifier tableIdentifier, int count) { try { return catalog.createTable(tableIdentifier, schema(count));