diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index a882f7360c7b..f35aa29df030 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -25,6 +25,8 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.metrics.LoggingMetricsReporter; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -36,6 +38,8 @@ public abstract class BaseMetastoreCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class); + private MetricsReporter metricsReporter; + @Override public Table loadTable(TableIdentifier identifier) { Table result; @@ -51,7 +55,7 @@ public Table loadTable(TableIdentifier identifier) { } } else { - result = new BaseTable(ops, fullTableName(name(), identifier)); + result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); } } else if (isValidMetadataIdentifier(identifier)) { @@ -301,4 +305,16 @@ protected static String fullTableName(String catalogName, TableIdentifier identi return sb.toString(); } + + private MetricsReporter metricsReporter() { + if (metricsReporter == null) { + metricsReporter = + properties().containsKey(CatalogProperties.METRICS_REPORTER_IMPL) + ? CatalogUtil.loadMetricsReporter( + properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) + : LoggingMetricsReporter.instance(); + } + + return metricsReporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 56b49f9cbaec..8ecbfa5373fd 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -89,7 +89,7 @@ public void initialize(String name, Map properties) { "Cannot initialize JDBCCatalog because warehousePath must not be null or empty"); this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation); - this.catalogProperties = properties; + this.catalogProperties = ImmutableMap.copyOf(properties); if (name != null) { this.catalogName = name; diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index 268391bf436a..b0fdea458d29 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,8 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -57,6 +60,9 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -801,4 +807,43 @@ public void testRegisterExistingTable() { .hasMessage("Table already exists: a.t1"); Assertions.assertThat(catalog.dropTable(identifier)).isTrue(); } + + @Test + public void testCatalogWithCustomMetricsReporter() throws IOException { + JdbcCatalog catalogWithCustomReporter = + initCatalog( + "test_jdbc_catalog_with_custom_reporter", + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName())); + try { + catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create(); + Table table = catalogWithCustomReporter.loadTable(TABLE); + table + .newFastAppend() + .appendFile( + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())) + .withFileSizeInBytes(10) + .withRecordCount(2) + .build()) + .commit(); + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assertions.assertThat(tasks.iterator()).hasNext(); + } + } finally { + catalogWithCustomReporter.dropTable(TABLE); + } + // counter of custom metrics reporter should have been increased + // 1x for commit metrics / 1x for scan metrics + Assertions.assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2); + } + + public static class CustomMetricsReporter implements MetricsReporter { + static final AtomicInteger COUNTER = new AtomicInteger(0); + + @Override + public void report(MetricsReport report) { + COUNTER.incrementAndGet(); + } + } }