Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
ECHO;
ECHO,
RECOVER_EC_BLOCK;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.DNAction;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
Expand Down Expand Up @@ -95,11 +102,12 @@
* - PutBlock
* - Close RECOVERING containers in TargetDNs
*/
public class ECReconstructionCoordinator implements Closeable {
public class ECReconstructionCoordinator implements Closeable, Auditor {

static final Logger LOG =
LoggerFactory.getLogger(ECReconstructionCoordinator.class);

private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.DNLOGGER);
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;
Expand Down Expand Up @@ -274,6 +282,8 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
ECBlockOutputStream[] emptyBlockStreams =
new ECBlockOutputStream[notReconstructIndexes.size()];
ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
Map<String, String> ecAuditLogMap = new HashMap<>();
ecAuditLogMap.put("blockLocationInfo", blockLocationInfo.toString());
try {
// Create streams and buffers for all indexes that need reconstructed
for (int i = 0; i < toReconstructIndexes.size(); i++) {
Expand Down Expand Up @@ -341,6 +351,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(DNAction.RECOVER_EC_BLOCK, ecAuditLogMap));
} catch (Exception e) {
AUDIT.logWriteFailure(buildAuditMessageForFailure(DNAction.RECOVER_EC_BLOCK, ecAuditLogMap, e));
throw e;
} finally {
for (ByteBuffer buf : bufs) {
byteBufferPool.putBuffer(buf);
Expand Down Expand Up @@ -607,4 +621,25 @@ private static ExecutorService createThreadPoolExecutor(
new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
public AuditMessage buildAuditMessageForSuccess(
AuditAction op, Map<String, String> auditMap) {
return new AuditMessage.Builder()
.forOperation(op)
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS)
.build();
}

@Override
public AuditMessage buildAuditMessageForFailure(
AuditAction op, Map<String, String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.forOperation(op)
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE)
.withException(throwable)
.build();
}
}