diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 3d6c9795b47..2e4329fe43d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction { CLOSE_CONTAINER, GET_COMMITTED_BLOCK_LENGTH, STREAM_INIT, - ECHO; + ECHO, + RECOVER_EC_BLOCK; @Override public String getAction() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 8fadd19b67d..3590f41def6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -43,6 +43,13 @@ import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.AuditLogger; +import org.apache.hadoop.ozone.audit.Auditor; +import org.apache.hadoop.ozone.audit.AuditLoggerType; +import org.apache.hadoop.ozone.audit.AuditMessage; +import org.apache.hadoop.ozone.audit.AuditAction; +import org.apache.hadoop.ozone.audit.AuditEventStatus; +import org.apache.hadoop.ozone.audit.DNAction; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory; import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl; import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy; @@ -95,11 +102,12 @@ * - PutBlock * - Close RECOVERING containers in TargetDNs */ -public class ECReconstructionCoordinator implements Closeable { +public class ECReconstructionCoordinator implements Closeable, Auditor { static final Logger LOG = LoggerFactory.getLogger(ECReconstructionCoordinator.class); - + private static final AuditLogger AUDIT = + new AuditLogger(AuditLoggerType.DNLOGGER); private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3; private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5; @@ -274,6 +282,8 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, ECBlockOutputStream[] emptyBlockStreams = new ECBlockOutputStream[notReconstructIndexes.size()]; ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()]; + Map ecAuditLogMap = new HashMap<>(); + ecAuditLogMap.put("blockLocationInfo", blockLocationInfo.toString()); try { // Create streams and buffers for all indexes that need reconstructed for (int i = 0; i < toReconstructIndexes.size(); i++) { @@ -341,6 +351,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup); checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture()); } + AUDIT.logWriteSuccess(buildAuditMessageForSuccess(DNAction.RECOVER_EC_BLOCK, ecAuditLogMap)); + } catch (Exception e) { + AUDIT.logWriteFailure(buildAuditMessageForFailure(DNAction.RECOVER_EC_BLOCK, ecAuditLogMap, e)); + throw e; } finally { for (ByteBuffer buf : bufs) { byteBufferPool.putBuffer(buf); @@ -607,4 +621,25 @@ private static ExecutorService createThreadPoolExecutor( new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(), new ThreadPoolExecutor.CallerRunsPolicy()); } + + @Override + public AuditMessage buildAuditMessageForSuccess( + AuditAction op, Map auditMap) { + return new AuditMessage.Builder() + .forOperation(op) + .withParams(auditMap) + .withResult(AuditEventStatus.SUCCESS) + .build(); + } + + @Override + public AuditMessage buildAuditMessageForFailure( + AuditAction op, Map auditMap, Throwable throwable) { + return new AuditMessage.Builder() + .forOperation(op) + .withParams(auditMap) + .withResult(AuditEventStatus.FAILURE) + .withException(throwable) + .build(); + } }