Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
*/
public class LoggingMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
private static final LoggingMetricsReporter INSTANCE = new LoggingMetricsReporter();

public static LoggingMetricsReporter instance() {
return INSTANCE;
}

@Override
public void report(MetricsReport report) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private CatalogProperties() {}
public static final String WAREHOUSE_LOCATION = "warehouse";
public static final String TABLE_DEFAULT_PREFIX = "table-default.";
public static final String TABLE_OVERRIDE_PREFIX = "table-override.";
public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl";

/**
* Controls whether the catalog will cache table entries upon load.
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -400,4 +401,42 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {

setConf.invoke(conf);
}

/**
* Load a custom {@link MetricsReporter} implementation.
*
* <p>The implementation must have a no-arg constructor.
*
* @param impl full class name of a custom {@link MetricsReporter} implementation
* @return An initialized {@link MetricsReporter}.
* @throws IllegalArgumentException if class path not found or right constructor not found or the
* loaded class cannot be cast to the given interface type
*/
public static MetricsReporter loadMetricsReporter(String impl) {
LOG.info("Loading custom MetricsReporter implementation: {}", impl);
DynConstructors.Ctor<MetricsReporter> ctor;
try {
ctor =
DynConstructors.builder(MetricsReporter.class)
.loader(CatalogUtil.class.getClassLoader())
.impl(impl)
.buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl),
e);
}

MetricsReporter reporter;
try {
reporter = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format(
"Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl),
e);
}

return reporter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ public void initialize(String name, Map<String, String> unresolved) {
this.io =
CatalogUtil.loadFileIO(
ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf);
this.reporter = new LoggingMetricsReporter();
String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL);
this.reporter =
null != metricsReporterImpl
? CatalogUtil.loadMetricsReporter(metricsReporterImpl)
: LoggingMetricsReporter.instance();

super.initialize(name, mergedProps);
}
Expand Down Expand Up @@ -316,8 +320,8 @@ private void reportMetrics(
TableIdentifier tableIdentifier,
MetricsReport report,
Supplier<Map<String, String>> headers) {
reporter.report(report);
try {
reporter.report(report);
client.post(
paths.metrics(tableIdentifier),
ReportMetricsRequest.of(report),
Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
Expand Down Expand Up @@ -187,6 +189,32 @@ public void buildCustomCatalog_withTypeSet() {
() -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf));
}

@Test
public void loadCustomMetricsReporter_noArg() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key", "val");

MetricsReporter metricsReporter =
CatalogUtil.loadMetricsReporter(TestMetricsReporterDefault.class.getName());
Assertions.assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class);
}

@Test
public void loadCustomMetricsReporter_badArg() {
Assertions.assertThatThrownBy(
() -> CatalogUtil.loadMetricsReporter(TestMetricsReporterBadArg.class.getName()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("missing no-arg constructor");
}

@Test
public void loadCustomMetricsReporter_badClass() {
Assertions.assertThatThrownBy(
() -> CatalogUtil.loadMetricsReporter(TestFileIONotImpl.class.getName()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("does not implement MetricsReporter");
}

public static class TestCatalog extends BaseMetastoreCatalog {

private String catalogName;
Expand Down Expand Up @@ -399,4 +427,21 @@ public String getArg() {
public static class TestFileIONotImpl {
public TestFileIONotImpl() {}
}

public static class TestMetricsReporterBadArg implements MetricsReporter {
private final String arg;

public TestMetricsReporterBadArg(String arg) {
this.arg = arg;
}

@Override
public void report(MetricsReport report) {}
}

public static class TestMetricsReporterDefault implements MetricsReporter {

@Override
public void report(MetricsReport report) {}
}
}
57 changes: 57 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.responses.ConfigResponse;
Expand Down Expand Up @@ -1075,4 +1082,54 @@ public void testCatalogRefreshedTokenIsUsed() throws Exception {
eq(refreshedCatalogHeader),
any());
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
this.restCatalog =
new RESTCatalog(
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:12345"),
ImmutableMap.of()),
(config) -> HTTPClient.builder().uri(config.get(CatalogProperties.URI)).build());
restCatalog.setConf(new Configuration());
restCatalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"http://localhost:8181/",
"credential",
"catalog:12345",
CatalogProperties.METRICS_REPORTER_IMPL,
CustomMetricsReporter.class.getName()));

restCatalog.buildTable(TABLE, SCHEMA).create();
Table table = restCatalog.loadTable(TABLE);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}

// counter of custom metrics reporter should have been increased
assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(1);
}

public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger COUNTER = new AtomicInteger(0);

@Override
public void report(MetricsReport report) {
COUNTER.incrementAndGet();
}
}
}