From 5362b9e23b657474ed5fa5a71c9d935fa3454b7e Mon Sep 17 00:00:00 2001 From: Jack Moseley Date: Tue, 13 Jun 2023 11:10:39 -0700 Subject: [PATCH 1/3] Add timers to GobblinMCEWriter --- .../iceberg/writer/GobblinMCEWriter.java | 79 +++++++++++++------ 1 file changed, 54 insertions(+), 25 deletions(-) diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java index d2d72a969ea..84082e6914e 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java @@ -38,6 +38,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.commons.collections.CollectionUtils; + +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -69,6 +71,7 @@ import org.apache.gobblin.metadata.DataFile; import org.apache.gobblin.metadata.GobblinMetadataChangeEvent; import org.apache.gobblin.metadata.OperationType; +import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.EventSubmitter; @@ -131,6 +134,9 @@ public class GobblinMCEWriter implements DataWriter { protected EventSubmitter eventSubmitter; private final Set transientExceptionMessages; private final Set nonTransientExceptionMessages; + private final Map metadataWriterWriteTimers = new HashMap<>(); + private final Map metadataWriterFlushTimers = new HashMap<>(); + private final ContextAwareTimer hiveSpecComputationTimer; @AllArgsConstructor static class TableStatus { @@ -150,19 +156,22 @@ public GobblinMCEWriter(DataWriterBuilder builder, State acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName()); state = properties; maxErrorDataset = state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET); + List> tags = Lists.newArrayList(); + String clusterIdentifier = ClustersNames.getInstance().getClusterName(); + tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier)); + MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); + eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build(); for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) { metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state))); + metadataWriterWriteTimers.put(className, metricContext.contextAwareTimer(className + ".write", 1, TimeUnit.HOURS)); + metadataWriterFlushTimers.put(className, metricContext.contextAwareTimer(className + ".flush", 1, TimeUnit.HOURS)); } + hiveSpecComputationTimer = metricContext.contextAwareTimer("hiveSpec.computation", 1, TimeUnit.HOURS); tableOperationTypeMap = new HashMap<>(); parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20), FileSystem.get(HadoopUtils.getConfFromState(properties)))); parallelRunnerTimeoutMills = state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS); - List> tags = Lists.newArrayList(); - String clusterIdentifier = ClustersNames.getInstance().getClusterName(); - tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier)); - MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); - eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build(); transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); nonTransientExceptionMessages = new HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); } @@ -187,26 +196,28 @@ public void write(GenericRecord record) throws IOException { */ private void computeSpecMap(List files, ConcurrentHashMap> specsMap, Cache> cache, State registerState, boolean isPrefix) throws IOException { - HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(registerState); - for (String file : files) { - parallelRunner.submitCallable(new Callable() { - @Override - public Void call() throws Exception { - try { - Path regPath = isPrefix ? new Path(file) : new Path(file).getParent(); - //Use raw path to comply with HDFS federation setting. - Path rawPath = new Path(regPath.toUri().getRawPath()); - specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath))); - } catch (Throwable e) { - //todo: Emit failed GMCE in the future to easily track the error gmce and investigate the reason for that. - log.warn("Cannot get Hive Spec for {} using policy {} due to:", file, policy.toString()); - log.warn(e.getMessage()); + try (Timer.Context context = hiveSpecComputationTimer.time()) { + HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(registerState); + for (String file : files) { + parallelRunner.submitCallable(new Callable() { + @Override + public Void call() throws Exception { + try { + Path regPath = isPrefix ? new Path(file) : new Path(file).getParent(); + //Use raw path to comply with HDFS federation setting. + Path rawPath = new Path(regPath.toUri().getRawPath()); + specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath))); + } catch (Throwable e) { + //todo: Emit failed GMCE in the future to easily track the error gmce and investigate the reason for that. + log.warn("Cannot get Hive Spec for {} using policy {} due to:", file, policy.toString()); + log.warn(e.getMessage()); + } + return null; } - return null; - } - }, file); + }, file); + } + parallelRunner.waitForTasks(parallelRunnerTimeoutMills); } - parallelRunner.waitForTasks(parallelRunnerTimeoutMills); } @Override @@ -341,7 +352,10 @@ void writeWithMetadataWriters( writer.reset(dbName, tableName); } else { try { - writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); + Timer timer = metadataWriterWriteTimers.get(writer.getClass().getName()); + try (Timer.Context context = timer.time()) { + writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); + } } catch (Exception e) { if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); @@ -419,7 +433,10 @@ private void flush(String dbName, String tableName) throws IOException { writer.reset(dbName, tableName); } else { try { - writer.flush(dbName, tableName); + Timer timer = metadataWriterFlushTimers.get(writer.getClass().getName()); + try (Timer.Context context = timer.time()) { + writer.flush(dbName, tableName); + } } catch (IOException e) { if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); @@ -480,6 +497,7 @@ public void flush() throws IOException { } entry.getValue().clear(); } + logTimers(); } @Override @@ -565,4 +583,15 @@ private List getFailedWriterList(MetadataWriter failedWriter) { List failedWriters = metadataWriters.subList(metadataWriters.indexOf(failedWriter), metadataWriters.size()); return failedWriters.stream().map(writer -> writer.getClass().getName()).collect(Collectors.toList()); } + + private void logTimers() { + metadataWriterWriteTimers.values().forEach(this::logTimer); + metadataWriterFlushTimers.values().forEach(this::logTimer); + logTimer(hiveSpecComputationTimer); + } + + private void logTimer(ContextAwareTimer timer) { + log.info("Timer {} 1 hour mean duration: {} ms", timer.getName(), TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().getMean())); + log.info("Timer {} 1 hour 99th percentile duration: {} ms", timer.getName(), TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().get99thPercentile())); + } } From ae7c9b1245f4f5005a24bc058cc243a0ec77fdaf Mon Sep 17 00:00:00 2001 From: Jack Moseley Date: Thu, 22 Jun 2023 11:39:00 -0700 Subject: [PATCH 2/3] Add dataset level timers and more logs in flush --- .../hive/writer/HiveMetadataWriter.java | 10 +++++++-- .../iceberg/writer/GobblinMCEWriter.java | 22 ++++++++++++++----- .../iceberg/writer/IcebergMetadataWriter.java | 18 ++++++++++----- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index 8400dd94b05..02c0c5895c8 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -17,6 +17,7 @@ package org.apache.gobblin.hive.writer; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -160,8 +161,9 @@ public void flush(String dbName, String tableName) throws IOException { HashMap, ListenableFuture> executionMap = this.currentExecutionMap.get(tableKey); //iterator all execution to get the result to make sure they all succeeded for (HashMap.Entry, ListenableFuture> execution : executionMap.entrySet()) { - try { + try (Timer.Context context = new Timer().time()) { execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS); + log.info("Time taken to add partition to table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop())); } catch (TimeoutException e) { // Since TimeoutException should always be a transient issue, throw RuntimeException which will fail/retry container throw new RuntimeException("Timeout waiting for result of registration for table " + tableKey, e); @@ -177,7 +179,11 @@ public void flush(String dbName, String tableName) throws IOException { if (cache != null) { HiveSpec hiveSpec = cache.getIfPresent(execution.getKey()); if (hiveSpec != null) { - eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY)); + try (Timer.Context context = new Timer().time()) { + eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec, + HivePartitionOperation.ADD_OR_MODIFY)); + log.info("Time taken to submit event for table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop())); + } } } } diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java index 84082e6914e..83a1217f89f 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java @@ -131,12 +131,14 @@ public class GobblinMCEWriter implements DataWriter { private final Set currentErrorDatasets = new HashSet<>(); @Setter private int maxErrorDataset; + private final MetricContext metricContext; protected EventSubmitter eventSubmitter; private final Set transientExceptionMessages; private final Set nonTransientExceptionMessages; private final Map metadataWriterWriteTimers = new HashMap<>(); private final Map metadataWriterFlushTimers = new HashMap<>(); private final ContextAwareTimer hiveSpecComputationTimer; + private final Map datasetTimers = new HashMap<>(); @AllArgsConstructor static class TableStatus { @@ -159,7 +161,7 @@ public GobblinMCEWriter(DataWriterBuilder builder, State List> tags = Lists.newArrayList(); String clusterIdentifier = ClustersNames.getInstance().getClusterName(); tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier)); - MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); + metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build(); for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) { metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state))); @@ -306,6 +308,9 @@ public void writeEnvelope(RecordEnvelope recordEnvelope) throws I String tableName = spec.getTable().getTableName(); String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName); partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys()); + if (!datasetTimers.containsKey(tableName)) { + datasetTimers.put(tableName, metricContext.contextAwareTimer(tableName, 1, TimeUnit.HOURS)); + } if (!tableOperationTypeMap.containsKey(tableString)) { tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(), gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(), @@ -352,8 +357,10 @@ void writeWithMetadataWriters( writer.reset(dbName, tableName); } else { try { - Timer timer = metadataWriterWriteTimers.get(writer.getClass().getName()); - try (Timer.Context context = timer.time()) { + Timer writeTimer = metadataWriterWriteTimers.get(writer.getClass().getName()); + Timer datasetTimer = datasetTimers.get(tableName); + try (Timer.Context writeContext = writeTimer.time(); + Timer.Context datasetContext = datasetTimer.time()) { writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); } } catch (Exception e) { @@ -433,8 +440,10 @@ private void flush(String dbName, String tableName) throws IOException { writer.reset(dbName, tableName); } else { try { - Timer timer = metadataWriterFlushTimers.get(writer.getClass().getName()); - try (Timer.Context context = timer.time()) { + Timer flushTimer = metadataWriterFlushTimers.get(writer.getClass().getName()); + Timer datasetTimer = datasetTimers.get(tableName); + try (Timer.Context flushContext = flushTimer.time(); + Timer.Context datasetContext = datasetTimer.time()) { writer.flush(dbName, tableName); } } catch (IOException e) { @@ -585,9 +594,10 @@ private List getFailedWriterList(MetadataWriter failedWriter) { } private void logTimers() { + logTimer(hiveSpecComputationTimer); metadataWriterWriteTimers.values().forEach(this::logTimer); metadataWriterFlushTimers.values().forEach(this::logTimer); - logTimer(hiveSpecComputationTimer); + datasetTimers.values().forEach(this::logTimer); } private void logTimer(ContextAwareTimer timer) { diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index 9c3d30e9e50..e128a703a74 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -826,7 +826,10 @@ public void flush(String dbName, String tableName) throws IOException { String topicName = getTopicName(tid, tableMetadata); if (tableMetadata.appendFiles.isPresent()) { tableMetadata.appendFiles.get().commit(); - sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps); + try (Timer.Context context = new Timer().time()) { + sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps); + log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop())); + } if (tableMetadata.completenessEnabled) { checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props); } @@ -869,20 +872,25 @@ public void flush(String dbName, String tableName) throws IOException { UpdateProperties updateProperties = transaction.updateProperties(); props.forEach(updateProperties::set); updateProperties.commit(); - try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) { + try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName); + Timer.Context context = new Timer().time()) { transaction.commitTransaction(); + log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop())); } // Emit GTE for snapshot commits Snapshot snapshot = tableMetadata.table.get().currentSnapshot(); Map currentProps = tableMetadata.table.get().properties(); - submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark); + try (Timer.Context context = new Timer().time()) { + submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark); + log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop())); + } //Reset the table metadata for next accumulation period tableMetadata.reset(currentProps, highWatermark); - log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString())); + log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid)); } else { - log.info("There's no transaction initiated for the table {}", tid.toString()); + log.info("There's no transaction initiated for the table {}", tid); } } catch (RuntimeException e) { throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e); From 9032fe24c6b49fa4c30a920863f0e32dd8f709e3 Mon Sep 17 00:00:00 2001 From: Jack Moseley Date: Tue, 27 Jun 2023 14:45:54 -0700 Subject: [PATCH 3/3] Fix unit tests --- .../gobblin/iceberg/writer/GobblinMCEWriter.java | 16 ++++++++-------- .../writer/IcebergMetadataWriterTest.java | 4 ++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java index 83a1217f89f..55987405670 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java @@ -131,12 +131,15 @@ public class GobblinMCEWriter implements DataWriter { private final Set currentErrorDatasets = new HashSet<>(); @Setter private int maxErrorDataset; - private final MetricContext metricContext; + @VisibleForTesting + public final MetricContext metricContext; protected EventSubmitter eventSubmitter; private final Set transientExceptionMessages; private final Set nonTransientExceptionMessages; - private final Map metadataWriterWriteTimers = new HashMap<>(); - private final Map metadataWriterFlushTimers = new HashMap<>(); + @VisibleForTesting + public final Map metadataWriterWriteTimers = new HashMap<>(); + @VisibleForTesting + public final Map metadataWriterFlushTimers = new HashMap<>(); private final ContextAwareTimer hiveSpecComputationTimer; private final Map datasetTimers = new HashMap<>(); @@ -308,9 +311,6 @@ public void writeEnvelope(RecordEnvelope recordEnvelope) throws I String tableName = spec.getTable().getTableName(); String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName); partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys()); - if (!datasetTimers.containsKey(tableName)) { - datasetTimers.put(tableName, metricContext.contextAwareTimer(tableName, 1, TimeUnit.HOURS)); - } if (!tableOperationTypeMap.containsKey(tableString)) { tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(), gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(), @@ -358,7 +358,7 @@ void writeWithMetadataWriters( } else { try { Timer writeTimer = metadataWriterWriteTimers.get(writer.getClass().getName()); - Timer datasetTimer = datasetTimers.get(tableName); + Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k -> metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS)); try (Timer.Context writeContext = writeTimer.time(); Timer.Context datasetContext = datasetTimer.time()) { writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); @@ -441,7 +441,7 @@ private void flush(String dbName, String tableName) throws IOException { } else { try { Timer flushTimer = metadataWriterFlushTimers.get(writer.getClass().getName()); - Timer datasetTimer = datasetTimers.get(tableName); + Timer datasetTimer = datasetTimers.computeIfAbsent(tableName, k -> metricContext.contextAwareTimer(k, 1, TimeUnit.HOURS)); try (Timer.Context flushContext = flushTimer.time(); Timer.Context datasetContext = datasetTimer.time()) { writer.flush(dbName, tableName); diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java index bc1c8ca5706..fe13e82a2ab 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java @@ -350,6 +350,10 @@ public void testFaultTolerant() throws Exception { MetadataWriter mockWriter = Mockito.mock(MetadataWriter.class); Mockito.doThrow(new IOException("Test failure")).when(mockWriter).writeEnvelope(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); gobblinMCEWriter.metadataWriters.add(0, mockWriter); + gobblinMCEWriter.metadataWriterWriteTimers.put(mockWriter.getClass().getName(), gobblinMCEWriter.metricContext + .contextAwareTimer(mockWriter.getClass().getName() + ".write", 1, TimeUnit.HOURS)); + gobblinMCEWriter.metadataWriterFlushTimers.put(mockWriter.getClass().getName(), gobblinMCEWriter.metricContext + .contextAwareTimer(mockWriter.getClass().getName() + ".flush", 1, TimeUnit.HOURS)); GobblinMetadataChangeEvent gmceWithMockWriter = SpecificData.get().deepCopy(gmce.getSchema(), gmce); gmceWithMockWriter.setAllowedMetadataWriters(Arrays.asList(IcebergMetadataWriter.class.getName(), mockWriter.getClass().getName()));