Skip to content

Commit 87143d5

Browse files
author
Ryan Murray
authored
Nessie: Add Nessie catalog module (#1587)
1 parent e09970a commit 87143d5

File tree

13 files changed

+1623
-0
lines changed

13 files changed

+1623
-0
lines changed

build.gradle

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ buildscript {
3838

3939
plugins {
4040
id 'nebula.dependency-recommender' version '9.0.2'
41+
id 'org.projectnessie' version '0.2.1'
4142
}
4243

4344
try {
@@ -1076,6 +1077,53 @@ project(':iceberg-pig') {
10761077
}
10771078
}
10781079

1080+
project(':iceberg-nessie') {
1081+
apply plugin: 'org.projectnessie'
1082+
1083+
java {
1084+
registerFeature('clientSupport') {
1085+
usingSourceSet(sourceSets.main)
1086+
}
1087+
}
1088+
1089+
dependencies {
1090+
compile project(':iceberg-core')
1091+
compile project(path: ':iceberg-bundled-guava', configuration: 'shadow')
1092+
compile "org.projectnessie:nessie-client"
1093+
quarkusAppRunnerConfig "org.projectnessie:nessie-quarkus:0.2.1"
1094+
clientSupportImplementation("io.quarkus:quarkus-rest-client") {
1095+
exclude group: "org.jboss.logging", module: "commons-logging-jboss-logging"
1096+
}
1097+
clientSupportImplementation("io.quarkus:quarkus-resteasy-jackson") {
1098+
exclude group: "org.codehaus.mojo", module: "animal-sniffer-annotations"
1099+
}
1100+
1101+
1102+
compileOnly("org.apache.hadoop:hadoop-common") {
1103+
exclude group: 'org.apache.avro', module: 'avro'
1104+
exclude group: 'org.apache.httpcomponents', module: 'httpclient'
1105+
exclude group: 'org.apache.httpcomponents', module: 'httpcore'
1106+
exclude group: 'commons-httpclient', module: 'commons-httpclient'
1107+
exclude group: "com.sun.jersey", module: 'jersey-core'
1108+
exclude group: "com.sun.jersey", module: 'jersey-json'
1109+
exclude group: "com.sun.jersey", module: 'jersey-server'
1110+
}
1111+
1112+
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
1113+
testCompile(project(":iceberg-nessie")) {
1114+
capabilities {
1115+
requireCapability("org.apache.iceberg:iceberg-nessie-client-support")
1116+
}
1117+
}
1118+
}
1119+
quarkusAppRunnerProperties {
1120+
props = ["quarkus.http.test-port": 0]
1121+
}
1122+
tasks.getByName("quarkus-start").dependsOn("compileTestJava")
1123+
tasks.test.dependsOn("quarkus-start").finalizedBy("quarkus-stop")
1124+
1125+
}
1126+
10791127
@Memoized
10801128
boolean isVersionFileExists() {
10811129
return file('version.txt').exists()
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.nessie;
21+
22+
import com.dremio.nessie.api.TreeApi;
23+
import com.dremio.nessie.client.NessieClient;
24+
import com.dremio.nessie.error.BaseNessieClientServerException;
25+
import com.dremio.nessie.error.NessieConflictException;
26+
import com.dremio.nessie.error.NessieNotFoundException;
27+
import com.dremio.nessie.model.Contents;
28+
import com.dremio.nessie.model.IcebergTable;
29+
import com.dremio.nessie.model.ImmutableDelete;
30+
import com.dremio.nessie.model.ImmutableOperations;
31+
import com.dremio.nessie.model.ImmutablePut;
32+
import com.dremio.nessie.model.Operations;
33+
import com.dremio.nessie.model.Reference;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Set;
37+
import java.util.function.Function;
38+
import java.util.stream.Collectors;
39+
import java.util.stream.Stream;
40+
import org.apache.hadoop.conf.Configurable;
41+
import org.apache.hadoop.conf.Configuration;
42+
import org.apache.iceberg.BaseMetastoreCatalog;
43+
import org.apache.iceberg.CatalogProperties;
44+
import org.apache.iceberg.CatalogUtil;
45+
import org.apache.iceberg.TableOperations;
46+
import org.apache.iceberg.catalog.Namespace;
47+
import org.apache.iceberg.catalog.SupportsNamespaces;
48+
import org.apache.iceberg.catalog.TableIdentifier;
49+
import org.apache.iceberg.exceptions.AlreadyExistsException;
50+
import org.apache.iceberg.exceptions.CommitFailedException;
51+
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
52+
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
53+
import org.apache.iceberg.exceptions.NoSuchTableException;
54+
import org.apache.iceberg.hadoop.HadoopFileIO;
55+
import org.apache.iceberg.io.FileIO;
56+
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
57+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
58+
import org.apache.iceberg.util.Tasks;
59+
import org.slf4j.Logger;
60+
import org.slf4j.LoggerFactory;
61+
62+
/**
63+
* Nessie implementation of Iceberg Catalog.
64+
*
65+
* <p>
66+
* A note on namespaces: Nessie namespaces are implicit and do not need to be explicitly created or deleted.
67+
* The create and delete namespace methods are no-ops for the NessieCatalog. One can still list namespaces that have
68+
* objects stored in them to assist with namespace-centric catalog exploration.
69+
* </p>
70+
*/
71+
public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable, SupportsNamespaces, Configurable {
72+
private static final Logger logger = LoggerFactory.getLogger(NessieCatalog.class);
73+
private static final Joiner SLASH = Joiner.on("/");
74+
private NessieClient client;
75+
private String warehouseLocation;
76+
private Configuration config;
77+
private UpdateableReference reference;
78+
private String name;
79+
private FileIO fileIO;
80+
81+
public NessieCatalog() {
82+
}
83+
84+
@Override
85+
public void initialize(String inputName, Map<String, String> options) {
86+
String fileIOImpl = options.get(CatalogProperties.FILE_IO_IMPL);
87+
this.fileIO = fileIOImpl == null ? new HadoopFileIO(config) : CatalogUtil.loadFileIO(fileIOImpl, options, config);
88+
this.name = inputName == null ? "nessie" : inputName;
89+
// remove nessie prefix
90+
final Function<String, String> removePrefix = x -> x.replace("nessie.", "");
91+
92+
this.client = NessieClient.withConfig(x -> options.get(removePrefix.apply(x)));
93+
94+
this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
95+
if (warehouseLocation == null) {
96+
throw new IllegalStateException("Parameter warehouse not set, nessie can't store data.");
97+
}
98+
final String requestedRef = options.get(removePrefix.apply(NessieClient.CONF_NESSIE_REF));
99+
this.reference = loadReference(requestedRef);
100+
}
101+
102+
@Override
103+
public void close() {
104+
client.close();
105+
}
106+
107+
@Override
108+
public String name() {
109+
return name;
110+
}
111+
112+
@Override
113+
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
114+
TableReference pti = TableReference.parse(tableIdentifier);
115+
UpdateableReference newReference = this.reference;
116+
if (pti.reference() != null) {
117+
newReference = loadReference(pti.reference());
118+
}
119+
return new NessieTableOperations(NessieUtil.toKey(pti.tableIdentifier()), newReference, client, fileIO);
120+
}
121+
122+
@Override
123+
protected String defaultWarehouseLocation(TableIdentifier table) {
124+
if (table.hasNamespace()) {
125+
return SLASH.join(warehouseLocation, table.namespace().toString(), table.name());
126+
}
127+
return SLASH.join(warehouseLocation, table.name());
128+
}
129+
130+
@Override
131+
public List<TableIdentifier> listTables(Namespace namespace) {
132+
return tableStream(namespace).collect(Collectors.toList());
133+
}
134+
135+
@Override
136+
public boolean dropTable(TableIdentifier identifier, boolean purge) {
137+
reference.checkMutable();
138+
139+
IcebergTable existingTable = table(identifier);
140+
if (existingTable == null) {
141+
return false;
142+
}
143+
144+
// We try to drop the table. Simple retry after ref update.
145+
boolean threw = true;
146+
try {
147+
Tasks.foreach(identifier)
148+
.retry(5)
149+
.stopRetryOn(NessieNotFoundException.class)
150+
.throwFailureWhenFinished()
151+
.run(this::dropTableInner, BaseNessieClientServerException.class);
152+
threw = false;
153+
} catch (NessieConflictException e) {
154+
logger.error("Cannot drop table: failed after retry (update ref and retry)", e);
155+
} catch (NessieNotFoundException e) {
156+
logger.error("Cannot drop table: ref is no longer valid.", e);
157+
} catch (BaseNessieClientServerException e) {
158+
logger.error("Cannot drop table: unknown error", e);
159+
}
160+
return !threw;
161+
}
162+
163+
@Override
164+
public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
165+
reference.checkMutable();
166+
167+
TableIdentifier to = NessieUtil.removeCatalogName(toOriginal, name());
168+
169+
IcebergTable existingFromTable = table(from);
170+
if (existingFromTable == null) {
171+
throw new NoSuchTableException("table %s doesn't exists", from.name());
172+
}
173+
IcebergTable existingToTable = table(to);
174+
if (existingToTable != null) {
175+
throw new AlreadyExistsException("table %s already exists", to.name());
176+
}
177+
178+
Operations contents = ImmutableOperations.builder()
179+
.addOperations(ImmutablePut.builder().key(NessieUtil.toKey(to)).contents(existingFromTable).build(),
180+
ImmutableDelete.builder().key(NessieUtil.toKey(from)).build())
181+
.build();
182+
183+
try {
184+
Tasks.foreach(contents)
185+
.retry(5)
186+
.stopRetryOn(NessieNotFoundException.class)
187+
.throwFailureWhenFinished()
188+
.run(c -> {
189+
client.getTreeApi().commitMultipleOperations(reference.getAsBranch().getName(), reference.getHash(),
190+
"iceberg rename table", c);
191+
refresh();
192+
}, BaseNessieClientServerException.class);
193+
194+
} catch (NessieNotFoundException e) {
195+
// important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the
196+
// another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to
197+
// a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user
198+
// and removed by another.
199+
throw new RuntimeException("Failed to drop table as ref is no longer valid.", e);
200+
} catch (BaseNessieClientServerException e) {
201+
throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date.");
202+
}
203+
}
204+
205+
/**
206+
* creating namespaces in nessie is implicit, therefore this is a no-op. Metadata is ignored.
207+
*
208+
* @param namespace a multi-part namespace
209+
* @param metadata a string Map of properties for the given namespace
210+
*/
211+
@Override
212+
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
213+
}
214+
215+
@Override
216+
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
217+
return tableStream(namespace)
218+
.map(TableIdentifier::namespace)
219+
.filter(n -> !n.isEmpty())
220+
.distinct()
221+
.collect(Collectors.toList());
222+
}
223+
224+
/**
225+
* namespace metadata is not supported in Nessie and we return an empty map.
226+
*
227+
* @param namespace a namespace. {@link Namespace}
228+
* @return an empty map
229+
*/
230+
@Override
231+
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
232+
return ImmutableMap.of();
233+
}
234+
235+
/**
236+
* Namespaces in Nessie are implicit and deleting them results in a no-op.
237+
*
238+
* @param namespace a namespace. {@link Namespace}
239+
* @return always false.
240+
*/
241+
@Override
242+
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
243+
return false;
244+
}
245+
246+
@Override
247+
public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
248+
throw new UnsupportedOperationException(
249+
"Cannot set namespace properties " + namespace + " : setProperties is not supported");
250+
}
251+
252+
@Override
253+
public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
254+
throw new UnsupportedOperationException(
255+
"Cannot remove properties " + namespace + " : removeProperties is not supported");
256+
}
257+
258+
@Override
259+
public void setConf(Configuration conf) {
260+
this.config = conf;
261+
}
262+
263+
@Override
264+
public Configuration getConf() {
265+
return config;
266+
}
267+
268+
TreeApi getTreeApi() {
269+
return client.getTreeApi();
270+
}
271+
272+
public void refresh() throws NessieNotFoundException {
273+
reference.refresh();
274+
}
275+
276+
public String currentHash() {
277+
return reference.getHash();
278+
}
279+
280+
String currentRefName() {
281+
return reference.getName();
282+
}
283+
284+
private IcebergTable table(TableIdentifier tableIdentifier) {
285+
try {
286+
Contents table = client.getContentsApi().getContents(NessieUtil.toKey(tableIdentifier), reference.getHash());
287+
return table.unwrap(IcebergTable.class).orElse(null);
288+
} catch (NessieNotFoundException e) {
289+
return null;
290+
}
291+
}
292+
293+
private UpdateableReference loadReference(String requestedRef) {
294+
try {
295+
Reference ref = requestedRef == null ? client.getTreeApi().getDefaultBranch()
296+
: client.getTreeApi().getReferenceByName(requestedRef);
297+
return new UpdateableReference(ref, client.getTreeApi());
298+
} catch (NessieNotFoundException ex) {
299+
if (requestedRef != null) {
300+
throw new IllegalArgumentException(String.format("Nessie ref '%s' does not exist. " +
301+
"This ref must exist before creating a NessieCatalog.", requestedRef), ex);
302+
}
303+
304+
throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch." +
305+
"Either configure an alternative ref via %s or create the default branch on the server.",
306+
NessieClient.CONF_NESSIE_REF), ex);
307+
}
308+
}
309+
310+
311+
public void dropTableInner(TableIdentifier identifier) throws NessieConflictException, NessieNotFoundException {
312+
try {
313+
client.getContentsApi().deleteContents(NessieUtil.toKey(identifier),
314+
reference.getAsBranch().getName(),
315+
reference.getHash(),
316+
String.format("delete table %s", identifier));
317+
318+
} finally {
319+
// TODO: fix this so we don't depend on it in tests. and move refresh into catch clause.
320+
refresh();
321+
}
322+
}
323+
324+
private Stream<TableIdentifier> tableStream(Namespace namespace) {
325+
try {
326+
return client.getTreeApi()
327+
.getEntries(reference.getHash())
328+
.getEntries()
329+
.stream()
330+
.filter(NessieUtil.namespacePredicate(namespace))
331+
.map(NessieUtil::toIdentifier);
332+
} catch (NessieNotFoundException ex) {
333+
throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref. %s", reference.getName());
334+
}
335+
}
336+
}

0 commit comments

Comments
 (0)