From 4195a3adb929c2e63a8590d190ae93e82c667ca4 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 20 Jan 2021 14:24:19 +0000 Subject: [PATCH 1/5] Improve HadoopCatalog performance/scalability #2124 Move to RemoteIterator for scanning directories. It's not as elegant as using the java8 streaming, but it works with the prefetching that the s3a and (soon) abfs connectors do, as well as bailing out more efficiently. Also added check for access errors to also look for AccessDeniedException --- .../apache/iceberg/hadoop/HadoopCatalog.java | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 68797927e269..d3b03bf6c8dd 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -23,18 +23,19 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.AccessDeniedException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.BaseMetastoreCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -170,7 +171,8 @@ public String name() { private boolean shouldSuppressPermissionError(IOException ioException) { if (suppressPermissionError) { - return ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch"); + return ioException instanceof AccessDeniedException + || ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch"); } return false; } @@ -180,8 +182,17 @@ private boolean isTableDir(Path path) { // Only the path which contains metadata is the path for table, otherwise it could be // still a namespace. try { - return fs.listStatus(metadataPath, TABLE_FILTER).length >= 1; - } catch (FileNotFoundException e) { + // using the iterator listing allows for paged downloads + // from HDFS and prefetching from object storage. + RemoteIterator it = fs.listStatusIterator(path); + while (it.hasNext()) { + if (TABLE_FILTER.accept(it.next().getPath())) { + return true; + } + } + // no match + return false; + } catch (FileNotFoundException e) { return false; } catch (IOException e) { if (shouldSuppressPermissionError(e)) { @@ -220,8 +231,9 @@ public List listTables(Namespace namespace) { if (!isDirectory(nsPath)) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } - - for (FileStatus s : fs.listStatus(nsPath)) { + RemoteIterator it = fs.listStatusIterator(nsPath); + while (it.hasNext()) { + FileStatus s = it.next(); if (!s.isDirectory()) { // Ignore the path which is not a directory. continue; @@ -329,11 +341,17 @@ public List listNamespaces(Namespace namespace) { } try { - return Stream.of(fs.listStatus(nsPath)) - .map(FileStatus::getPath) - .filter(this::isNamespace) - .map(path -> append(namespace, path.getName())) - .collect(Collectors.toList()); + // using the iterator listing allows for paged downloads + // from HDFS and prefetching from object storage. + List namespaces = new ArrayList<>(); + RemoteIterator it = fs.listStatusIterator(nsPath); + while (it.hasNext()) { + Path path = it.next().getPath(); + if (isNamespace(path)) { + namespaces.add(append(namespace, path.getName())); + } + } + return namespaces; } catch (IOException ioe) { throw new RuntimeIOException(ioe, "Failed to list namespace under: %s", namespace); } From 576e89bcdde0176de731e9fb9e1f5694bf6d1e87 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 21 Jan 2021 15:44:21 +0000 Subject: [PATCH 2/5] Fix build/test * corrected the assertEquals() for the correct error messages * only using listIterator on the two operation which is doing nested file I/O underneath; this is where speedups could be observed. --- .../org/apache/iceberg/hadoop/HadoopCatalog.java | 15 ++++----------- .../apache/iceberg/hadoop/TestHadoopCatalog.java | 14 +++++++------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index d3b03bf6c8dd..84cc854fddb0 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -172,7 +174,7 @@ public String name() { private boolean shouldSuppressPermissionError(IOException ioException) { if (suppressPermissionError) { return ioException instanceof AccessDeniedException - || ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch"); + || (ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch")); } return false; } @@ -182,16 +184,7 @@ private boolean isTableDir(Path path) { // Only the path which contains metadata is the path for table, otherwise it could be // still a namespace. try { - // using the iterator listing allows for paged downloads - // from HDFS and prefetching from object storage. - RemoteIterator it = fs.listStatusIterator(path); - while (it.hasNext()) { - if (TABLE_FILTER.accept(it.next().getPath())) { - return true; - } - } - // no match - return false; + return fs.listStatus(metadataPath, TABLE_FILTER).length >= 1; } catch (FileNotFoundException e) { return false; } catch (IOException e) { diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java index 64ae92e77caa..08fa022a0ab4 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java @@ -262,13 +262,13 @@ public void testListTables() throws Exception { List tbls1 = catalog.listTables(Namespace.of("db")); Set tblSet = Sets.newHashSet(tbls1.stream().map(t -> t.name()).iterator()); - Assert.assertEquals(tblSet.size(), 2); + Assert.assertEquals(2, tblSet.size()); Assert.assertTrue(tblSet.contains("tbl1")); Assert.assertTrue(tblSet.contains("tbl2")); List tbls2 = catalog.listTables(Namespace.of("db", "ns1")); - Assert.assertEquals(tbls2.size(), 1); - Assert.assertTrue(tbls2.get(0).name().equals("tbl3")); + Assert.assertEquals("table identifiers", 1, tbls2.size()); + Assert.assertEquals("table name", "tbl3", tbls2.get(0).name()); AssertHelpers.assertThrows("should throw exception", NoSuchNamespaceException.class, "Namespace does not exist: ", () -> { @@ -337,24 +337,24 @@ public void testListNamespace() throws Exception { List nsp1 = catalog.listNamespaces(Namespace.of("db")); Set tblSet = Sets.newHashSet(nsp1.stream().map(t -> t.toString()).iterator()); - Assert.assertEquals(tblSet.size(), 3); + Assert.assertEquals(3, tblSet.size()); Assert.assertTrue(tblSet.contains("db.ns1")); Assert.assertTrue(tblSet.contains("db.ns2")); Assert.assertTrue(tblSet.contains("db.ns3")); List nsp2 = catalog.listNamespaces(Namespace.of("db", "ns1")); - Assert.assertEquals(nsp2.size(), 1); + Assert.assertEquals(1, nsp2.size()); Assert.assertTrue(nsp2.get(0).toString().equals("db.ns1.ns2")); List nsp3 = catalog.listNamespaces(); Set tblSet2 = Sets.newHashSet(nsp3.stream().map(t -> t.toString()).iterator()); - Assert.assertEquals(tblSet2.size(), 2); + Assert.assertEquals(2, tblSet2.size()); Assert.assertTrue(tblSet2.contains("db")); Assert.assertTrue(tblSet2.contains("db2")); List nsp4 = catalog.listNamespaces(); Set tblSet3 = Sets.newHashSet(nsp4.stream().map(t -> t.toString()).iterator()); - Assert.assertEquals(tblSet3.size(), 2); + Assert.assertEquals(2, tblSet3.size()); Assert.assertTrue(tblSet3.contains("db")); Assert.assertTrue(tblSet3.contains("db2")); From 46a56fa1d9c519f748c2c681ca70028268268714 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 25 Jan 2021 14:34:43 +0000 Subject: [PATCH 3/5] checkstyle --- .../java/org/apache/iceberg/hadoop/HadoopCatalog.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 84cc854fddb0..4bb451fb8b94 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -173,8 +173,9 @@ public String name() { private boolean shouldSuppressPermissionError(IOException ioException) { if (suppressPermissionError) { - return ioException instanceof AccessDeniedException - || (ioException.getMessage() != null && ioException.getMessage().contains("AuthorizationPermissionMismatch")); + return ioException instanceof AccessDeniedException || + (ioException.getMessage() != null && + ioException.getMessage().contains("AuthorizationPermissionMismatch")); } return false; } @@ -226,13 +227,13 @@ public List listTables(Namespace namespace) { } RemoteIterator it = fs.listStatusIterator(nsPath); while (it.hasNext()) { - FileStatus s = it.next(); - if (!s.isDirectory()) { + FileStatus status = it.next(); + if (!status.isDirectory()) { // Ignore the path which is not a directory. continue; } - Path path = s.getPath(); + Path path = status.getPath(); if (isTableDir(path)) { TableIdentifier tblIdent = TableIdentifier.of(namespace, path.getName()); tblIdents.add(tblIdent); From 05799906c808c4d8bf19c7fc6a533c11d9adc187 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 27 Jan 2021 12:08:23 +0000 Subject: [PATCH 4/5] removed unused imports --- core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 4bb451fb8b94..85b174a674c0 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -29,8 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; From 6698e420a4f1b1bcaab45f194efa1efc74fcaaf7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 28 Jan 2021 16:09:59 +0000 Subject: [PATCH 5/5] review comments: fix whitespace error --- core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 85b174a674c0..97e1ecaacadc 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -184,7 +184,7 @@ private boolean isTableDir(Path path) { // still a namespace. try { return fs.listStatus(metadataPath, TABLE_FILTER).length >= 1; - } catch (FileNotFoundException e) { + } catch (FileNotFoundException e) { return false; } catch (IOException e) { if (shouldSuppressPermissionError(e)) {