diff --git a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java index 26e9a0b527f1..8d41f10e74fb 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java @@ -28,6 +28,11 @@ */ public class LoggingMetricsReporter implements MetricsReporter { private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class); + private static final LoggingMetricsReporter INSTANCE = new LoggingMetricsReporter(); + + public static LoggingMetricsReporter instance() { + return INSTANCE; + } @Override public void report(MetricsReport report) { diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 95fe6a074c0a..c2490ee3ea83 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -29,6 +29,7 @@ private CatalogProperties() {} public static final String WAREHOUSE_LOCATION = "warehouse"; public static final String TABLE_DEFAULT_PREFIX = "table-default."; public static final String TABLE_OVERRIDE_PREFIX = "table-override."; + public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl"; /** * Controls whether the catalog will cache table entries upon load. diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index c0c5078a3f34..ef4d17c24997 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -35,6 +35,7 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -400,4 +401,42 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) { setConf.invoke(conf); } + + /** + * Load a custom {@link MetricsReporter} implementation. + * + *

The implementation must have a no-arg constructor. + * + * @param impl full class name of a custom {@link MetricsReporter} implementation + * @return An initialized {@link MetricsReporter}. + * @throws IllegalArgumentException if class path not found or right constructor not found or the + * loaded class cannot be cast to the given interface type + */ + public static MetricsReporter loadMetricsReporter(String impl) { + LOG.info("Loading custom MetricsReporter implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(MetricsReporter.class) + .loader(CatalogUtil.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl), + e); + } + + MetricsReporter reporter; + try { + reporter = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl), + e); + } + + return reporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 0b4edc9a84f0..d4757c4ed529 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -177,7 +177,11 @@ public void initialize(String name, Map unresolved) { this.io = CatalogUtil.loadFileIO( ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf); - this.reporter = new LoggingMetricsReporter(); + String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL); + this.reporter = + null != metricsReporterImpl + ? CatalogUtil.loadMetricsReporter(metricsReporterImpl) + : LoggingMetricsReporter.instance(); super.initialize(name, mergedProps); } @@ -316,8 +320,8 @@ private void reportMetrics( TableIdentifier tableIdentifier, MetricsReport report, Supplier> headers) { - reporter.report(report); try { + reporter.report(report); client.post( paths.metrics(tableIdentifier), ReportMetricsRequest.of(report), diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java index a31cf51b685a..86e4a2c83729 100644 --- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -29,6 +29,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -187,6 +189,32 @@ public void buildCustomCatalog_withTypeSet() { () -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf)); } + @Test + public void loadCustomMetricsReporter_noArg() { + Map properties = Maps.newHashMap(); + properties.put("key", "val"); + + MetricsReporter metricsReporter = + CatalogUtil.loadMetricsReporter(TestMetricsReporterDefault.class.getName()); + Assertions.assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class); + } + + @Test + public void loadCustomMetricsReporter_badArg() { + Assertions.assertThatThrownBy( + () -> CatalogUtil.loadMetricsReporter(TestMetricsReporterBadArg.class.getName())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("missing no-arg constructor"); + } + + @Test + public void loadCustomMetricsReporter_badClass() { + Assertions.assertThatThrownBy( + () -> CatalogUtil.loadMetricsReporter(TestFileIONotImpl.class.getName())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("does not implement MetricsReporter"); + } + public static class TestCatalog extends BaseMetastoreCatalog { private String catalogName; @@ -399,4 +427,21 @@ public String getArg() { public static class TestFileIONotImpl { public TestFileIONotImpl() {} } + + public static class TestMetricsReporterBadArg implements MetricsReporter { + private final String arg; + + public TestMetricsReporterBadArg(String arg) { + this.arg = arg; + } + + @Override + public void report(MetricsReport report) {} + } + + public static class TestMetricsReporterDefault implements MetricsReporter { + + @Override + public void report(MetricsReport report) {} + } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 9703b4c4776c..2d1133e9c48f 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -31,16 +31,23 @@ import java.nio.file.Path; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod; import org.apache.iceberg.rest.responses.ConfigResponse; @@ -1075,4 +1082,54 @@ public void testCatalogRefreshedTokenIsUsed() throws Exception { eq(refreshedCatalogHeader), any()); } + + @Test + public void testCatalogWithCustomMetricsReporter() throws IOException { + this.restCatalog = + new RESTCatalog( + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()), + (config) -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build()); + restCatalog.setConf(new Configuration()); + restCatalog.initialize( + "prod", + ImmutableMap.of( + CatalogProperties.URI, + "http://localhost:8181/", + "credential", + "catalog:12345", + CatalogProperties.METRICS_REPORTER_IMPL, + CustomMetricsReporter.class.getName())); + + restCatalog.buildTable(TABLE, SCHEMA).create(); + Table table = restCatalog.loadTable(TABLE); + table + .newFastAppend() + .appendFile( + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withRecordCount(2) + .build()) + .commit(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + assertThat(tasks.iterator()).hasNext(); + } + + // counter of custom metrics reporter should have been increased + assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1); + } + + public static class CustomMetricsReporter implements MetricsReporter { + static final AtomicInteger COUNTER = new AtomicInteger(0); + + @Override + public void report(MetricsReport report) { + COUNTER.incrementAndGet(); + } + } }