-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add remaining View APIs and support for InMemoryCatalog #7880
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5f03a39
852cae5
87cbd19
4c2d734
305e62f
7c17686
b776333
371e615
252154e
b4d1573
64cdef2
2e0f8a0
d1f587a
ef8299a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,220 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.view; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.BaseMetastoreCatalog; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.catalog.ViewCatalog; | ||
| import org.apache.iceberg.exceptions.AlreadyExistsException; | ||
| import org.apache.iceberg.exceptions.CommitFailedException; | ||
| import org.apache.iceberg.exceptions.NoSuchViewException; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
|
||
| public abstract class BaseMetastoreViewCatalog extends BaseMetastoreCatalog implements ViewCatalog { | ||
| protected abstract ViewOperations newViewOps(TableIdentifier identifier); | ||
|
|
||
| @Override | ||
| public void initialize(String name, Map<String, String> properties) { | ||
| super.initialize(name, properties); | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return super.name(); | ||
| } | ||
|
|
||
| @Override | ||
| public View loadView(TableIdentifier identifier) { | ||
| if (isValidIdentifier(identifier)) { | ||
| ViewOperations ops = newViewOps(identifier); | ||
| if (ops.current() == null) { | ||
| throw new NoSuchViewException("View does not exist: %s", identifier); | ||
| } else { | ||
| return new BaseView(newViewOps(identifier), ViewUtil.fullViewName(name(), identifier)); | ||
| } | ||
| } | ||
|
|
||
| throw new NoSuchViewException("Invalid view identifier: %s", identifier); | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder buildView(TableIdentifier identifier) { | ||
| return new BaseViewBuilder(identifier); | ||
| } | ||
|
|
||
| protected class BaseViewBuilder implements ViewBuilder { | ||
| private final TableIdentifier identifier; | ||
| private final Map<String, String> properties = Maps.newHashMap(); | ||
| private final List<ViewRepresentation> representations = Lists.newArrayList(); | ||
| private Namespace defaultNamespace = null; | ||
| private String defaultCatalog = null; | ||
| private Schema schema = null; | ||
|
|
||
| protected BaseViewBuilder(TableIdentifier identifier) { | ||
| Preconditions.checkArgument( | ||
| isValidIdentifier(identifier), "Invalid view identifier: %s", identifier); | ||
| this.identifier = identifier; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withSchema(Schema newSchema) { | ||
| this.schema = newSchema; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withQuery(String dialect, String sql) { | ||
| representations.add( | ||
| ImmutableSQLViewRepresentation.builder().dialect(dialect).sql(sql).build()); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withDefaultCatalog(String catalog) { | ||
| this.defaultCatalog = catalog; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withDefaultNamespace(Namespace namespace) { | ||
| this.defaultNamespace = namespace; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withProperties(Map<String, String> newProperties) { | ||
| this.properties.putAll(newProperties); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public ViewBuilder withProperty(String key, String value) { | ||
| this.properties.put(key, value); | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public View create() { | ||
| return create(newViewOps(identifier)); | ||
| } | ||
|
|
||
| @Override | ||
| public View replace() { | ||
| return replace(newViewOps(identifier)); | ||
| } | ||
|
|
||
| @Override | ||
| public View createOrReplace() { | ||
| ViewOperations ops = newViewOps(identifier); | ||
| if (null == ops.current()) { | ||
| return create(ops); | ||
| } else { | ||
| return replace(ops); | ||
| } | ||
| } | ||
|
|
||
| private View create(ViewOperations ops) { | ||
| if (null != ops.current()) { | ||
| throw new AlreadyExistsException("View already exists: %s", identifier); | ||
| } | ||
|
|
||
| Preconditions.checkState( | ||
| !representations.isEmpty(), "Cannot create view without specifying a query"); | ||
| Preconditions.checkState(null != schema, "Cannot create view without specifying schema"); | ||
| Preconditions.checkState( | ||
| null != defaultNamespace, "Cannot create view without specifying a default namespace"); | ||
|
|
||
| ViewVersion viewVersion = | ||
| ImmutableViewVersion.builder() | ||
| .versionId(1) | ||
| .schemaId(schema.schemaId()) | ||
| .addAllRepresentations(representations) | ||
| .defaultNamespace(defaultNamespace) | ||
| .defaultCatalog(defaultCatalog) | ||
| .timestampMillis(System.currentTimeMillis()) | ||
| .putSummary("operation", "create") | ||
| .build(); | ||
|
|
||
| ViewMetadata viewMetadata = | ||
| ViewMetadata.builder() | ||
| .setProperties(properties) | ||
| .setLocation(defaultWarehouseLocation(identifier)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why isn't it possible to set the location? I think I asked about this elsewhere, but I don't see the reply. Sorry if it's a duplicate.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry in case my answer got hidden by GH, but I did reply in #7880 (comment). I'll move my answer here, so that we can continue the discussion here. Do you mean the fact that there's no method on the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it seems like an oversight that you can't set the location where a view is stored. I'd expect the builder to allow this, although it can be done in a follow up.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll address this in a separate PR
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've handled this in #8648
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is an unresolved issue, I'm reopening the thread so that people can follow the link to the follow up PR. |
||
| .setCurrentVersion(viewVersion, schema) | ||
| .build(); | ||
|
|
||
| try { | ||
| ops.commit(null, viewMetadata); | ||
| } catch (CommitFailedException ignored) { | ||
| throw new AlreadyExistsException("View was created concurrently: %s", identifier); | ||
| } | ||
|
|
||
| return new BaseView(ops, ViewUtil.fullViewName(name(), identifier)); | ||
| } | ||
|
|
||
| private View replace(ViewOperations ops) { | ||
| if (null == ops.current()) { | ||
| throw new NoSuchViewException("View does not exist: %s", identifier); | ||
| } | ||
|
|
||
| Preconditions.checkState( | ||
| !representations.isEmpty(), "Cannot replace view without specifying a query"); | ||
| Preconditions.checkState(null != schema, "Cannot replace view without specifying schema"); | ||
| Preconditions.checkState( | ||
| null != defaultNamespace, "Cannot replace view without specifying a default namespace"); | ||
|
|
||
| ViewMetadata metadata = ops.current(); | ||
| int maxVersionId = | ||
| metadata.versions().stream() | ||
| .map(ViewVersion::versionId) | ||
| .max(Integer::compareTo) | ||
| .orElseGet(metadata::currentVersionId); | ||
|
|
||
| ViewVersion viewVersion = | ||
| ImmutableViewVersion.builder() | ||
| .versionId(maxVersionId + 1) | ||
| .schemaId(schema.schemaId()) | ||
| .addAllRepresentations(representations) | ||
| .defaultNamespace(defaultNamespace) | ||
| .defaultCatalog(defaultCatalog) | ||
| .timestampMillis(System.currentTimeMillis()) | ||
| .putSummary("operation", "replace") | ||
| .build(); | ||
|
|
||
| ViewMetadata replacement = | ||
| ViewMetadata.buildFrom(metadata) | ||
| .setProperties(properties) | ||
| .setCurrentVersion(viewVersion, schema) | ||
| .build(); | ||
|
|
||
| try { | ||
| ops.commit(metadata, replacement); | ||
| } catch (CommitFailedException ignored) { | ||
| throw new AlreadyExistsException("View was updated concurrently: %s", identifier); | ||
| } | ||
|
|
||
| return new BaseView(ops, ViewUtil.fullViewName(name(), identifier)); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.view; | ||
nastra marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| import java.io.Serializable; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.Schema; | ||
|
|
||
| public class BaseView implements View, Serializable { | ||
|
|
||
| private final ViewOperations ops; | ||
| private final String name; | ||
|
|
||
| public BaseView(ViewOperations ops, String name) { | ||
| this.ops = ops; | ||
| this.name = name; | ||
| } | ||
|
|
||
| @Override | ||
| public String name() { | ||
| return name; | ||
| } | ||
|
|
||
| public ViewOperations operations() { | ||
| return ops; | ||
| } | ||
|
|
||
| @Override | ||
| public Schema schema() { | ||
| return operations().current().schema(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<Integer, Schema> schemas() { | ||
| return operations().current().schemasById(); | ||
| } | ||
|
|
||
| @Override | ||
| public ViewVersion currentVersion() { | ||
| return operations().current().currentVersion(); | ||
| } | ||
|
|
||
| @Override | ||
| public Iterable<ViewVersion> versions() { | ||
| return operations().current().versions(); | ||
| } | ||
|
|
||
| @Override | ||
| public ViewVersion version(int versionId) { | ||
| return operations().current().version(versionId); | ||
| } | ||
|
|
||
| @Override | ||
| public List<ViewHistoryEntry> history() { | ||
| return operations().current().history(); | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, String> properties() { | ||
| return operations().current().properties(); | ||
| } | ||
|
|
||
| @Override | ||
| public UpdateViewProperties updateProperties() { | ||
| return new PropertiesUpdate(ops); | ||
| } | ||
|
|
||
| @Override | ||
| public ReplaceViewVersion replaceVersion() { | ||
| return new ViewVersionReplace(ops); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't set
defaultCatalog, which looks like a bug to me. In addition, thewithDefaultCatalogmethod above sets the default catalog on aviewVersionBuilderthat is unused. It looks like this wasn't updated after originally using a builder?Also double check that this is covered by the test for full view metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withDefaultCatalog()sets it on theviewVersionBuilderthat is being used in L151 for creating a view version and then also in L194.We do have tests that check for the default catalog when parsing view metadata. Additionally,
ViewCatalogTests#replaceViewVersion()was setting + testing the default catalog (and now alsoViewCatalogTests#completeCreateView()).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I see that the view version builder is reused, and I guess the test was just in the wrong place.
Why are some settings added directly to the builder and some are set in the final operation? This seems confusing and I'd like for it to be consistent.