From 8d940942063bc4ca7238f8745e08f5e420754910 Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Wed, 19 Nov 2025 14:16:10 +0000 Subject: [PATCH 1/7] Added protobufs and entity class for storing the ledger of completed requests --- .../org/apache/hadoop/ozone/OzoneConsts.java | 5 + .../om/helpers/OmCompletedRequestInfo.java | 524 ++++++++++++++++++ .../src/main/proto/OmClientProtocol.proto | 42 ++ .../hadoop/ozone/om/OMMetadataManager.java | 3 + .../ozone/om/OmMetadataManagerImpl.java | 9 + .../hadoop/ozone/om/codec/OMDBDefinition.java | 13 +- .../ozone/om/TestOmCompletedRequestInfo.java | 101 ++++ 7 files changed, 696 insertions(+), 1 deletion(-) create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index aecbdfae615d..1ece2f6d4445 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -552,6 +552,11 @@ public final class OzoneConsts { public static final String COMPACTION_LOG_TABLE = "compactionLogTable"; + /** + * DB completed request info table name. Referenced in RDBStore. + */ + public static final String COMPLETED_REQUEST_INFO_TABLE = "completedRequestInfoTable"; + /** * S3G multipart upload request's ETag header key. */ diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java new file mode 100644 index 000000000000..5612374f5180 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java @@ -0,0 +1,524 @@ +package org.apache.hadoop.ozone.om.helpers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.CopyObject; +import org.apache.hadoop.hdds.utils.db.DelegatedCodec; +import org.apache.hadoop.hdds.utils.db.Proto2Codec; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.Auditable; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import java.time.format.DateTimeFormatter; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.ZoneId; + +import java.util.HashMap; +import java.util.Objects; +import java.util.UUID; +import java.util.Map; +import java.util.LinkedHashMap; + +import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf; +import static org.apache.hadoop.hdds.HddsUtils.toProtobuf; + +/** + * This class is used for storing info related to completed operations. + * + * Each successfully completion operation has an associated + * OmCompletedRequestInfo entry the trxLogIndex, op, volumeName, bucketName, + * keyName and creationTime + */ +public final class OmCompletedRequestInfo implements Auditable, CopyObject { + public static final Logger LOG = + LoggerFactory.getLogger(OmCompletedRequestInfo.class); + + private static final Codec CODEC = new DelegatedCodec<>( + Proto2Codec.get(OzoneManagerProtocolProtos.CompletedRequestInfo.getDefaultInstance()), + OmCompletedRequestInfo::getFromProtobuf, + OmCompletedRequestInfo::getProtobuf, + OmCompletedRequestInfo.class); + + /** + * OperationType enum + */ + public enum OperationType { + CREATE_KEY, + RENAME_KEY, + DELETE_KEY, + COMMIT_KEY, + CREATE_DIRECTORY, + CREATE_FILE; + } + + private static final long INVALID_TIMESTAMP = -1; + + private long trxLogIndex; + private final String volumeName; + private final String bucketName; + private final String keyName; + private final long creationTime; + private final OperationArgs opArgs; + + /** + * Private constructor, constructed via builder. + * @param snapshotId - Snapshot UUID. + * @param name - snapshot name. + * @param volumeName - volume name. + * @param bucketName - bucket name. + * @param snapshotStatus - status: SNAPSHOT_ACTIVE, SNAPSHOT_DELETED + * @param creationTime - Snapshot creation time. + * @param deletionTime - Snapshot deletion time. + * @param pathPreviousSnapshotId - Snapshot path previous snapshot id. + * @param globalPreviousSnapshotId - Snapshot global previous snapshot id. + * @param snapshotPath - Snapshot path, bucket .snapshot path. + * @param checkpointDir - Snapshot checkpoint directory. + * @param dbTxSequenceNumber - RDB latest transaction sequence number. + * @param deepCleaned - To be deep cleaned status for snapshot. + * @param referencedSize - Snapshot referenced size. + * @param referencedReplicatedSize - Snapshot referenced size w/ replication. + * @param exclusiveSize - Snapshot exclusive size. + * @param exclusiveReplicatedSize - Snapshot exclusive size w/ replication. + */ + @SuppressWarnings("checkstyle:ParameterNumber") + private OmCompletedRequestInfo(long trxLogIndex, + String volumeName, + String bucketName, + String keyName, + long creationTime, + OperationArgs opArgs) { + this.trxLogIndex = trxLogIndex; + this.volumeName = volumeName; + this.bucketName = bucketName; + this.keyName = keyName; + this.creationTime = creationTime; + this.opArgs = opArgs; + } + + public static Codec getCodec() { + return CODEC; + } + + public void setTrxLogIndex(long trxLogIndex) { + this.trxLogIndex = trxLogIndex; + } + + // the db version of the key is left padded with 0s so that it can be + // "seeked" in in lexigroaphical order + // TODO: is this an appropriate key? + public String getDbKey() { + return StringUtils.leftPad(String.valueOf(trxLogIndex), 20, '0'); + } + + public long getTrxLogIndex() { + return trxLogIndex; + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } + + public String getKeyName() { + return keyName; + } + + public long getCreationTime() { + return creationTime; + } + + public OperationArgs getOpArgs() { + return opArgs; + } + + public static org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.Builder + newBuilder() { + return new org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.Builder(); + } + + public OmCompletedRequestInfo.Builder toBuilder() { + return new Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime) + .setOpArgs(opArgs); + } + + /** + * Builder of OmCompletedRequestInfo. + */ + public static class Builder { + private long trxLogIndex; + private String volumeName; + private String bucketName; + private String keyName; + private long creationTime; + private OperationArgs opArgs; + + public Builder() { + // default values + } + + public Builder setTrxLogIndex(long trxLogIndex) { + this.trxLogIndex = trxLogIndex; + return this; + } + + public Builder setVolumeName(String volumeName) { + this.volumeName = volumeName; + return this; + } + + public Builder setBucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder setKeyName(String keyName) { + this.keyName = keyName; + return this; + } + + public Builder setCreationTime(long crTime) { + this.creationTime = crTime; + return this; + } + + public Builder setOpArgs(OperationArgs opArgs) { + this.opArgs = opArgs; + return this; + } + + public OmCompletedRequestInfo build() { + //Preconditions.checkNotNull(name); + return new OmCompletedRequestInfo( + trxLogIndex, + volumeName, + bucketName, + keyName, + creationTime, + opArgs + ); + } + } + + /** + * Creates OmCompletedRequestInfo protobuf from OmCompletedRequestInfo. + */ + public OzoneManagerProtocolProtos.CompletedRequestInfo getProtobuf() { + OzoneManagerProtocolProtos.CompletedRequestInfo.Builder sib = + OzoneManagerProtocolProtos.CompletedRequestInfo.newBuilder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime); + + switch (opArgs.getOperationType()) { + case CREATE_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateKey); + sib.setCreateKeyArgs(OzoneManagerProtocolProtos.CreateKeyOperationArgs.newBuilder() + .build()); + break; + case RENAME_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.RenameKey); + sib.setRenameKeyArgs(OzoneManagerProtocolProtos.RenameKeyOperationArgs.newBuilder() + .setToKeyName(((OperationArgs.RenameKeyArgs) opArgs).getToKeyName()) + .build()); + break; + case DELETE_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey); + sib.setDeleteKeyArgs(OzoneManagerProtocolProtos.DeleteKeyOperationArgs.newBuilder() + .build()); + break; + case COMMIT_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CommitKey); + sib.setCommitKeyArgs(OzoneManagerProtocolProtos.CommitKeyOperationArgs.newBuilder() + .build()); + break; + case CREATE_DIRECTORY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateDirectory); + sib.setCreateDirectoryArgs(OzoneManagerProtocolProtos.CreateDirectoryOperationArgs.newBuilder() + .build()); + break; + case CREATE_FILE: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateFile); + sib.setCreateFileArgs(OzoneManagerProtocolProtos.CreateFileOperationArgs.newBuilder() + .setIsRecursive(((OperationArgs.CreateFileArgs) opArgs).isRecursive()) + .setIsOverwrite(((OperationArgs.CreateFileArgs) opArgs).isOverwrite()) + .build()); + break; + default: + LOG.error("Unexpected operationType={}", opArgs.getOperationType()); + break; + } + + return sib.build(); + } + + /** + * Parses OmCompletedRequestInfo protobuf and creates OmCompletedRequestInfo. + * @param completedRequestInfoProto protobuf + * @return instance of OmCompletedRequestInfo + */ + public static OmCompletedRequestInfo getFromProtobuf( + OzoneManagerProtocolProtos.CompletedRequestInfo completedRequestInfoProto) { + + OmCompletedRequestInfo.Builder osib = OmCompletedRequestInfo.newBuilder() + .setTrxLogIndex(completedRequestInfoProto.getTrxLogIndex()) + .setVolumeName(completedRequestInfoProto.getVolumeName()) + .setBucketName(completedRequestInfoProto.getBucketName()) + .setKeyName(completedRequestInfoProto.getKeyName()) + .setCreationTime(completedRequestInfoProto.getCreationTime()); + + switch (completedRequestInfoProto.getCmdType()) { + case CreateKey: + osib.setOpArgs(new OperationArgs.CreateKeyArgs()); + break; + case RenameKey: + OzoneManagerProtocolProtos.RenameKeyOperationArgs renameArgs + = (OzoneManagerProtocolProtos.RenameKeyOperationArgs) completedRequestInfoProto.getRenameKeyArgs(); + + osib.setOpArgs(new OperationArgs.RenameKeyArgs(renameArgs.getToKeyName())); + break; + case DeleteKey: + osib.setOpArgs(new OperationArgs.DeleteKeyArgs()); + break; + case CommitKey: + osib.setOpArgs(new OperationArgs.CommitKeyArgs()); + break; + case CreateDirectory: + osib.setOpArgs(new OperationArgs.CreateDirectoryArgs()); + break; + case CreateFile: + OzoneManagerProtocolProtos.CreateFileOperationArgs createFileArgs + = (OzoneManagerProtocolProtos.CreateFileOperationArgs) completedRequestInfoProto.getCreateFileArgs(); + + osib.setOpArgs(new OperationArgs.CreateFileArgs(createFileArgs.getIsOverwrite(), createFileArgs.getIsRecursive())); + break; + default: + LOG.error("Unexpected cmdType={}", completedRequestInfoProto.getCmdType()); + break; + } + + return osib.build(); + } + + @Override + public Map toAuditMap() { + Map auditMap = new LinkedHashMap<>(); + //auditMap.put(OzoneConsts.VOLUME, getVolumeName()); + //auditMap.put(OzoneConsts.BUCKET, getBucketName()); + //auditMap.put(OzoneConsts.OM_SNAPSHOT_NAME, this.name); + return auditMap; + } + + /** + * Factory for making standard instance. + */ + /* + public static OmCompletedRequestInfo newInstance(long trxLogIndex, + Operation op, + long creationTime) { + OmCompletedRequestInfo.Builder builder = new OmCompletedRequestInfo.Builder(); + builder.setTrxLogIndex(trxLogIndex) + .setOp(op) + .setCreationTime(creationTime); + + return builder.build(); + } + */ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OmCompletedRequestInfo that = (OmCompletedRequestInfo) o; + return trxLogIndex == that.trxLogIndex && + creationTime == that.creationTime && + volumeName.equals(that.volumeName) && + bucketName.equals(that.bucketName) && + keyName.equals(that.keyName) && + volumeName == that.bucketName && + opArgs.equals(that.opArgs); + } + + @Override + public int hashCode() { + return Objects.hash(trxLogIndex, volumeName, bucketName, + keyName, creationTime, opArgs); + } + + /** + * Return a new copy of the object. + */ + @Override + public OmCompletedRequestInfo copyObject() { + return new Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime) + .setOpArgs(opArgs) + .build(); + } + + @Override + public String toString() { + return "OmCompletedRequestInfo{" + + "trxLogIndex: '" + trxLogIndex + '\'' + + ", volumeName: '" + volumeName + '\'' + + ", bucketName: '" + bucketName + '\'' + + ", keyName: '" + keyName + '\'' + + ", creationTime: '" + creationTime + '\'' + + ", opArgs : '" + opArgs + '\'' + + '}'; + } + + public static abstract class OperationArgs { + + public abstract OperationType getOperationType(); + + public static class CreateKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_KEY; + } + + @Override + public String toString() { + return "CreateKeyArgs{}"; + } + } + + public static class RenameKeyArgs extends OperationArgs { + private final String toKeyName; + + public RenameKeyArgs(String toKeyName) { + this.toKeyName = toKeyName; + } + + @Override + public OperationType getOperationType() { + return OperationType.RENAME_KEY; + } + + public String getToKeyName() { + return toKeyName; + } + + @Override + public String toString() { + return "RenameKeyArgs{" + + "toKeyName: '" + toKeyName + '\'' + + '}'; + } + } + + public static class DeleteKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.DELETE_KEY; + } + + @Override + public String toString() { + return "DeleteKeyArgs{}"; + } + } + + public static class CommitKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.COMMIT_KEY; + } + + @Override + public String toString() { + return "CommitKeyArgs{}"; + } + } + + public static class CreateDirectoryArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_DIRECTORY; + } + + @Override + public String toString() { + return "CreateDirectoryArgs{}"; + } + } + + public static class CreateFileArgs extends OperationArgs { + // hsync? + private final boolean recursive; + private final boolean overwrite; + + public CreateFileArgs(boolean recursive, boolean overwrite) { + this.recursive = recursive; + this.overwrite = overwrite; + } + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_FILE; + } + + public boolean isRecursive() { + return recursive; + } + + public boolean isOverwrite() { + return overwrite; + } + + @Override + public String toString() { + return "CreateFileArgs{" + + "recursive: '" + recursive + '\'' + + ", overwrite: '" + overwrite + '\'' + + '}'; + } + } + } +} diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1e5675f612e6..ab23bb0a3a00 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -917,6 +917,48 @@ message SnapshotDiffJobProto { optional double keysProcessedPct = 13; } +message CreateKeyOperationArgs { +} + +message RenameKeyOperationArgs { + required string toKeyName = 1; +} + +message DeleteKeyOperationArgs { +} + +message CommitKeyOperationArgs { +} + +message CreateDirectoryOperationArgs { +} + +message CreateFileOperationArgs { + required bool isRecursive = 2; + required bool isOverwrite = 3; +} + + +/** + * CompletedRequestInfo table entry + */ +message CompletedRequestInfo { + + optional int64 trxLogIndex = 1; + required Type cmdType = 2; // Type of the command + optional string volumeName = 3; + optional string bucketName = 4; + optional string keyName = 5; + optional uint64 creationTime = 6; + + optional CreateKeyOperationArgs createKeyArgs = 7; + optional RenameKeyOperationArgs renameKeyArgs = 8; + optional DeleteKeyOperationArgs deleteKeyArgs = 9; + optional CommitKeyOperationArgs commitKeyArgs = 10; + optional CreateDirectoryOperationArgs createDirectoryArgs = 11; + optional CreateFileOperationArgs createFileArgs = 12; +} + message OzoneObj { enum ObjectType { VOLUME = 1; diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index baac362da741..3f16db308061 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysResult; import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo; @@ -478,6 +479,8 @@ String getMultipartKeyFSO(String volume, String bucket, String key, String Table getCompactionLogTable(); + Table getCompletedRequestInfoTable(); + /** * Gets the OM Meta table. * @return meta table reference. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index e7826708b895..31fe6bd0594a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -101,6 +101,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysResult; import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo; @@ -180,6 +181,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, private TypedTable snapshotInfoTable; private TypedTable snapshotRenamedTable; private TypedTable compactionLogTable; + private TypedTable completedRequestInfoTable; private OzoneManager ozoneManager; @@ -486,6 +488,8 @@ protected void initializeOmTables(CacheType cacheType, // TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable. compactionLogTable = initializer.get(OMDBDefinition.COMPACTION_LOG_TABLE_DEF); + + completedRequestInfoTable = initializer.get(OMDBDefinition.COMPLETED_REQUEST_INFO_TABLE_DEF); } /** @@ -1683,6 +1687,11 @@ public Table getCompactionLogTable() { return compactionLogTable; } + @Override + public Table getCompletedRequestInfoTable() { + return completedRequestInfoTable; + } + /** * Get Snapshot Chain Manager. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java index 9894e8f5d6bf..3a4dbfbc2ed5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; @@ -315,6 +316,15 @@ public final class OMDBDefinition extends DBDefinition.WithMap { StringCodec.get(), CompactionLogEntry.getCodec()); + public static final String COMPLETED_REQUEST_INFO_TABLE = "completedRequestInfoTable"; + /** completedOperationnfoTable: txId :- OmCompletedRequestInfo. */ + public static final DBColumnFamilyDefinition COMPLETED_REQUEST_INFO_TABLE_DEF + = new DBColumnFamilyDefinition<>(COMPLETED_REQUEST_INFO_TABLE, + StringCodec.get(), // txid (left zeropadded) + OmCompletedRequestInfo.getCodec()); + + + //--------------------------------------------------------------------------- private static final Map> COLUMN_FAMILIES = DBColumnFamilyDefinition.newUnmodifiableMap( @@ -339,7 +349,8 @@ public final class OMDBDefinition extends DBDefinition.WithMap { TENANT_STATE_TABLE_DEF, TRANSACTION_INFO_TABLE_DEF, USER_TABLE_DEF, - VOLUME_TABLE_DEF); + VOLUME_TABLE_DEF, + COMPLETED_REQUEST_INFO_TABLE_DEF); private static final OMDBDefinition INSTANCE = new OMDBDefinition(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java new file mode 100644 index 000000000000..2f384a2eb0d3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; + +/** + * Tests OmCompletedRequestInfo om database table for Ozone object storage operations. + */ +public class TestOmCompletedRequestInfo { + + private OMMetadataManager omMetadataManager; + private static final long EXPECTED_OPERATION_ID = 123L; + private static final String EXPECTED_OPERATION_KEY = "123"; + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + private static final String KEY_NAME = "bucket1"; + private static final long CLIENT_ID = 321L; + + @TempDir + private Path folder; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OZONE_OM_DB_DIRS, + folder.toAbsolutePath().toString()); + omMetadataManager = new OmMetadataManagerImpl(conf, null); + } + + private OmCompletedRequestInfo createRequestInfo() { + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(123L) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(KEY_NAME) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.CreateKeyArgs()) + .build(); + } + + @Test + public void testTableExists() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + Assertions.assertTrue(requestInfo.isEmpty()); + } + + @Test + public void testAddNewOperation() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + requestInfo.put(EXPECTED_OPERATION_KEY, createRequestInfo()); + Assertions.assertEquals(EXPECTED_OPERATION_ID, + requestInfo.get(EXPECTED_OPERATION_KEY).getTrxLogIndex()); + } + + @Test + public void testDeleteOmCompletedRequestInfo() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + + Assertions.assertFalse(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + requestInfo.put(EXPECTED_OPERATION_KEY, createRequestInfo()); + Assertions.assertTrue(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + requestInfo.delete(EXPECTED_OPERATION_KEY); + Assertions.assertFalse(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + } +} From 11e57d4f88b89e74e0ffcf0668d31038283eea19 Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Wed, 19 Nov 2025 14:24:29 +0000 Subject: [PATCH 2/7] Capture data to the completd operation ledger table --- .../om/ratis/OzoneManagerStateMachine.java | 10 +- .../OzoneManagerSuccessfulRequestHandler.java | 184 ++++++++++++++++++ ...tOzoneManagerSuccessfulRequestHandler.java | 167 ++++++++++++++++ 3 files changed, 359 insertions(+), 2 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index a3ad217ceef7..c68a70cc4c04 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -104,6 +104,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final boolean isTracingEnabled; private final AtomicInteger statePausedCount = new AtomicInteger(0); private final String threadPrefix; + private final OzoneManagerSuccessfulRequestHandler successfulRequestHandler; /** The last {@link TermIndex} received from {@link #notifyTermIndexUpdated(long, long)}. */ private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); @@ -133,6 +134,7 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer, this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory); this.nettyMetrics = NettyMetrics.create(); + this.successfulRequestHandler = new OzoneManagerSuccessfulRequestHandler(ozoneManager); } /** @@ -413,13 +415,13 @@ public CompletableFuture applyTransaction(TransactionContext trx) { ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1); return CompletableFuture.supplyAsync(() -> runCommand(request, termIndex), executorService) - .thenApply(this::processResponse); + .thenApply(resp -> processResponse(request, resp, termIndex)); } catch (Exception e) { return completeExceptionally(e); } } - private Message processResponse(OMResponse omResponse) { + private Message processResponse(OMRequest request, OMResponse omResponse, TermIndex termIndex) { if (!omResponse.getSuccess()) { // INTERNAL_ERROR or METADATA_ERROR are considered as critical errors. // In such cases, OM must be terminated instead of completing the future exceptionally, @@ -429,6 +431,10 @@ private Message processResponse(OMResponse omResponse) { } else if (omResponse.getStatus() == METADATA_ERROR) { terminate(omResponse, OMException.ResultCodes.METADATA_ERROR); } + } else { + // The operation completed successfully - hand off the request + // so we can perform some post-actions + successfulRequestHandler.handle(termIndex.getIndex(), request); } // For successful response and non-critical errors, convert the response. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java new file mode 100644 index 000000000000..6d358035acdd --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationType; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is a simple hook on a successful write operation. It's + * only purpose at the moment is to write an OmCompletedRequestInfo record to the DB + */ +public final class OzoneManagerSuccessfulRequestHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneManagerSuccessfulRequestHandler.class); + + private final OzoneManager ozoneManager; + private final OMMetadataManager omMetadataManager; + + public OzoneManagerSuccessfulRequestHandler(OzoneManager ozoneManager) { + this.ozoneManager = ozoneManager; + this.omMetadataManager = ozoneManager.getMetadataManager(); + } + + public void handle(long trxLogIndex, OzoneManagerProtocolProtos.OMRequest omRequest) { + + switch (omRequest.getCmdType()) { + case CreateKey: + logRequest("CreateKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateKeyRequest().getKeyArgs(), + new OperationArgs.CreateKeyArgs())); + break; + case RenameKey: + logRequest("RenameKey", omRequest); + OzoneManagerProtocolProtos.RenameKeyRequest renameReq + = (OzoneManagerProtocolProtos.RenameKeyRequest) omRequest.getRenameKeyRequest(); + + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getRenameKeyRequest().getKeyArgs(), + new OperationArgs.RenameKeyArgs(renameReq.getToKeyName()))); + + break; + case DeleteKey: + logRequest("DeleteKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getDeleteKeyRequest().getKeyArgs(), + new OperationArgs.DeleteKeyArgs())); + break; + case CommitKey: + logRequest("CommitKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCommitKeyRequest().getKeyArgs(), + new OperationArgs.CommitKeyArgs())); + break; + case CreateDirectory: + logRequest("CreateDirectory", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateDirectoryRequest().getKeyArgs(), + new OperationArgs.CreateDirectoryArgs())); + break; + case CreateFile: + logRequest("CreateFile", omRequest); + + OzoneManagerProtocolProtos.CreateFileRequest createFileReq + = (OzoneManagerProtocolProtos.CreateFileRequest) omRequest.getCreateFileRequest(); + + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateFileRequest().getKeyArgs(), + new OperationArgs.CreateFileArgs(createFileReq.getIsRecursive(), + createFileReq.getIsOverwrite()))); + break; + default: + LOG.error("Unhandled cmdType={}", omRequest.getCmdType()); + break; + } + } + + private static void logRequest(String label, OzoneManagerProtocolProtos.OMRequest omRequest) { + if (LOG.isDebugEnabled()) { + LOG.debug("---> {} {}", label, omRequest); + } + } + + private OmCompletedRequestInfo buildOmCompletedRequestInfo(long trxLogIndex, + OzoneManagerProtocolProtos.KeyArgs keyArgs, + OperationArgs opArgs) { + return OmCompletedRequestInfo.newBuilder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setCreationTime(System.currentTimeMillis()) + .setOpArgs(opArgs) + .build(); + } + + private void storeCompletedRequestInfo(OmCompletedRequestInfo requestInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing request info {}", requestInfo); + } + + // XXX: not sure if this string key is necessary. I added it as an + // identifier which consumers of the ledger could use an efficient + // (lexiographically sortable) key which could serve as a "seek + // position" to continue reading where they left off (and to + // persist to remember where they needed to carry on from). But + // that may be unnecessary. TODO: can we just use a plain integer + // key? + String key = requestInfo.getDbKey(); + + // XXX: should this be part of an atomic db txn that happens at the end + // of each replayed event or batch of events so that the completed + // request info "ledger" table is consistent with the processed + // raits events? e.g. OzoneManagerDoubleBuffer? + + try (BatchOperation batchOperation = omMetadataManager.getStore() + .initBatchOperation()) { + + omMetadataManager.getCompletedRequestInfoTable().putWithBatch(batchOperation, key, requestInfo); + + // TODO: cap the size of the table to some configured limit. + // + // The following code is taken from + // https://github.com/apache/ozone/pull/8779/files#r2510853726 + // + // ... as a suggested approach but I think it will need amended to + // work here because the code seems to be predicated on all txnids + // being held (and therefore we can count the nuber of IDs to + // delete by subtracting new txnid from the first) whereas + // CompletedRequestInfoTable only holds the details of a subset of + // "interesting" write requests and therefore there are gaps in + // the IDs. + // + // I'm not sure how best to approach this. A couple of( strawman) + // ideas: + // + // 1. we could store every operation wherther interesting or not (or we + // add dummy rows for the "non interesting" requests). + // 2. we store an in memory count of the CompletedRequestInfoTable + // which is initialized on startup and updated as rows are + // added/cycled out. Therefore we know how many to cap the table + // size as. + // + // TODO: revisit this + // + //omMetadataManager.getCompletedRequestInfoTable().deleteRangeWithBatch(batchOperation, 0L, + // Math.max(lastTransaction.getIndex() - maxFlushedTransactionGap, 0L)); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + + //} catch (IOException ex) { + // LOG.error("Unable to write operation {}", requestInfo, ex); + } catch (Exception ex) { + LOG.error("Unable to write operation {}", requestInfo, ex); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java new file mode 100644 index 000000000000..f12bb4ed1c7a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockedConstruction; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +/** + * Testing OzoneManagerSuccessfulRequestHandler class. + */ +@ExtendWith(MockitoExtension.class) +public class TestOzoneManagerSuccessfulRequestHandler { + + private static final String TEST_VOLUME_NAME = "testVol"; + private static final String TEST_BUCKET_NAME = "testBucket"; + private static final String TEST_KEY = "/foo/bar/baz/key"; + private static final String TEST_KEY_RENAMED = TEST_KEY + "_RENAMED"; + + private static final KeyArgs TEST_KEY_ARGS = KeyArgs.newBuilder() + .setKeyName(TEST_KEY) + .setVolumeName(TEST_VOLUME_NAME) + .setBucketName(TEST_BUCKET_NAME) + .build(); + + @Mock + private OzoneManager ozoneManager; + + @Mock + private OMMetadataManager omMetadataManager; + + @Mock + private OzoneConfiguration configuration; + + @Mock + private DBStore dbStore; + + @Mock + private BatchOperation batchOperation; + + @Mock + private Table completedRequestInfoTable; + + protected OMRequest createCreateKeyRequest() { + CreateKeyRequest createKeyRequest = CreateKeyRequest.newBuilder() + .setKeyArgs(TEST_KEY_ARGS).build(); + + return OMRequest.newBuilder() + .setClientId(UUID.randomUUID().toString()) + .setCreateKeyRequest(createKeyRequest) + .setCmdType(Type.CreateKey).build(); + } + + protected OMRequest createRenameKeyRequest() { + RenameKeyRequest renameKeyRequest = RenameKeyRequest.newBuilder() + .setKeyArgs(TEST_KEY_ARGS).setToKeyName(TEST_KEY_RENAMED).build(); + + return OMRequest.newBuilder() + .setClientId(UUID.randomUUID().toString()) + .setRenameKeyRequest(renameKeyRequest) + .setCmdType(Type.RenameKey).build(); + } + + @Test + public void testCreateKeyRequest() throws IOException { + + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + when(omMetadataManager.getCompletedRequestInfoTable()).thenReturn(completedRequestInfoTable); + when(omMetadataManager.getStore()).thenReturn(dbStore); + when(dbStore.initBatchOperation()).thenReturn(batchOperation); + + OzoneManagerSuccessfulRequestHandler requestHandler + = new OzoneManagerSuccessfulRequestHandler(ozoneManager); + + requestHandler.handle(123L, createCreateKeyRequest()); + + ArgumentCaptor arg1 = ArgumentCaptor.forClass(BatchOperation.class); + ArgumentCaptor arg2 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor arg3 = ArgumentCaptor.forClass(OmCompletedRequestInfo.class); + + verify(completedRequestInfoTable, times(1)).putWithBatch(arg1.capture(), arg2.capture(), arg3.capture()); + assertThat(arg1.getValue()).isEqualTo(batchOperation); + + String key = arg2.getValue(); + assertThat(key).isEqualTo("00000000000000000123"); + + OmCompletedRequestInfo requestInfo = arg3.getValue(); + assertThat(requestInfo.getVolumeName()).isEqualTo(TEST_VOLUME_NAME); + assertThat(requestInfo.getBucketName()).isEqualTo(TEST_BUCKET_NAME); + assertThat(requestInfo.getKeyName()).isEqualTo(TEST_KEY); + assertThat(requestInfo.getOpArgs()).isInstanceOf(OperationArgs.CreateKeyArgs.class); + } + + @Test + public void testRenameKeyRequest() throws IOException { + + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + when(omMetadataManager.getCompletedRequestInfoTable()).thenReturn(completedRequestInfoTable); + when(omMetadataManager.getStore()).thenReturn(dbStore); + when(dbStore.initBatchOperation()).thenReturn(batchOperation); + + OzoneManagerSuccessfulRequestHandler requestHandler + = new OzoneManagerSuccessfulRequestHandler(ozoneManager); + + requestHandler.handle(124L, createRenameKeyRequest()); + + ArgumentCaptor arg1 = ArgumentCaptor.forClass(BatchOperation.class); + ArgumentCaptor arg2 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor arg3 = ArgumentCaptor.forClass(OmCompletedRequestInfo.class); + + verify(completedRequestInfoTable, times(1)).putWithBatch(arg1.capture(), arg2.capture(), arg3.capture()); + assertThat(arg1.getValue()).isEqualTo(batchOperation); + + String key = arg2.getValue(); + assertThat(key).isEqualTo("00000000000000000124"); + + OmCompletedRequestInfo requestInfo = arg3.getValue(); + assertThat(requestInfo.getVolumeName()).isEqualTo(TEST_VOLUME_NAME); + assertThat(requestInfo.getBucketName()).isEqualTo(TEST_BUCKET_NAME); + assertThat(requestInfo.getKeyName()).isEqualTo(TEST_KEY); + assertThat(requestInfo.getOpArgs()).isInstanceOf(OperationArgs.RenameKeyArgs.class); + OperationArgs.RenameKeyArgs opArgs = (OperationArgs.RenameKeyArgs) requestInfo.getOpArgs(); + assertThat(opArgs.getToKeyName()).isEqualTo(TEST_KEY_RENAMED); + } +} From 599dc1562f7748ef162c751aed811943880b7645 Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Thu, 20 Nov 2025 14:40:51 +0000 Subject: [PATCH 3/7] Added the concept of dynamically configurable EventListener plugins The intent is that implementations of these plugins would read from the ledger of completed events and publish event notifications (e.g. via kafka) --- .../om/eventlistener/OMEventListener.java | 15 +++ .../OMEventListenerPluginContext.java | 9 ++ .../OMEventListenerPluginContextImpl.java | 18 +++ .../OMEventListenerPluginManager.java | 120 +++++++++++++++++ .../ozone/om/eventlistener/BarPlugin.java | 42 ++++++ .../ozone/om/eventlistener/FooPlugin.java | 42 ++++++ .../TestOMEventListenerPluginManager.java | 121 ++++++++++++++++++ 7 files changed, 367 insertions(+) create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java new file mode 100644 index 000000000000..e932d9c0f2b7 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +/** + * Interface for event listener plugin implementations + */ +public interface OMEventListener { + + void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext); + + void start(); + + void shutdown(); +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java new file mode 100644 index 000000000000..cbd7d1ec0912 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations + */ +public interface OMEventListenerPluginContext { + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java new file mode 100644 index 000000000000..178ed5b4b7f6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.ozone.om.OzoneManager; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations + */ +public final class OMEventListenerPluginContextImpl implements OMEventListenerPluginContext { + private final OzoneManager ozoneManager; + + public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) { + this.ozoneManager = ozoneManager; + } + + // TODO: fill this out with capabilities we would like to expose to + // plugin implementations. +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java new file mode 100644 index 000000000000..e8f8527a8af9 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List plugins = new ArrayList<>(); + + Map props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List destNameList = new ArrayList<>(); + for (Map.Entry entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; + LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); + Class cls = conf.getClass(classnameProp, null, OMEventListener.class); + if (null == cls) { + throw new RuntimeException(String.format( + "Unable to load plugin %s, classname property %s is missing or does not implement OMEventListener", + destName, classnameProp)); + } + return cls; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java new file mode 100644 index 000000000000..ba4e7e5b9a08 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +public class BarPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java new file mode 100644 index 000000000000..ac28f17b87f2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +public class FooPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java new file mode 100644 index 000000000000..da6c28ebf386 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerPluginManager { + + @Mock + private OzoneManager ozoneManager; + + static List getLoadedPlugins(OMEventListenerPluginManager pluginManager) { + List loadedClasses = new ArrayList<>(); + for (OMEventListener plugin : pluginManager.getLoaded()) { + loadedClasses.add(plugin.getClass().getName()); + } + + // normalize + Collections.sort(loadedClasses); + + return loadedClasses; + } + + private static class BrokenFooPlugin { + + } + + @Test + public void testLoadSinglePlugin() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testLoadMultiplePlugins() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + conf.set("ozone.om.plugin.destination.bar", "enabled"); + conf.set("ozone.om.plugin.destination.bar.classname", "org.apache.hadoop.ozone.om.eventlistener.BarPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.BarPlugin", + "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginMissingClassname() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotExist() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.NotExistingPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotImplementInterface() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } +} From 5598d0fda8c7c07652b4fe1886216d6502941f1e Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Thu, 20 Nov 2025 16:28:49 +0000 Subject: [PATCH 4/7] Created a OMEventListener plugin implementation which publishes events to kafka --- .../OMEventListenerPluginContext.java | 12 ++ .../hadoop/ozone/om/OMMetadataManager.java | 11 ++ hadoop-ozone/ozone-manager-plugins/pom.xml | 69 +++++++ .../OMEventListenerKafkaPublisher.java | 176 ++++++++++++++++++ .../OMEventListenerLedgerPoller.java | 142 ++++++++++++++ ...EventListenerLedgerPollerSeekPosition.java | 56 ++++++ .../TestOMEventListenerKafkaPublisher.java | 156 ++++++++++++++++ hadoop-ozone/ozone-manager/pom.xml | 5 + .../ozone/om/OmMetadataManagerImpl.java | 52 ++++++ .../OMEventListenerPluginContextImpl.java | 22 ++- hadoop-ozone/pom.xml | 1 + pom.xml | 12 ++ 12 files changed, 712 insertions(+), 2 deletions(-) create mode 100644 hadoop-ozone/ozone-manager-plugins/pom.xml create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java index cbd7d1ec0912..a679a08930ed 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java @@ -1,9 +1,21 @@ package org.apache.hadoop.ozone.om.eventlistener; +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + /** * A narrow set of functionality we are ok with exposing to plugin * implementations */ public interface OMEventListenerPluginContext { + boolean isLeaderReady(); + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + + // XXX: this probably doesn't belong here + String getThreadNamePrefix(); } diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 3f16db308061..a6115eacd1bb 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -337,6 +337,17 @@ ListSnapshotResponse listSnapshot( List listVolumes(String userName, String prefix, String startKey, int maxKeys) throws IOException; + /** + * Returns a list of operation info objects. + * + * @param startKey the start key determines where to start listing + * from, this key is excluded from the result. + * @param maxKeys the maximum number of results to return. + * @return a list of {@link OmCompletedRequestInfo} + * @throws IOException + */ + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + /** * Returns the names of up to {@code count} open keys whose age is * greater than or equal to {@code expireThreshold}. diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml b/hadoop-ozone/ozone-manager-plugins/pom.xml new file mode 100644 index 000000000000..82f7fe5751fc --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + org.apache.ozone + ozone + 2.2.0-SNAPSHOT + + ozone-manager-plugins + 2.2.0-SNAPSHOT + jar + Apache Ozone Manager Plugins + + false + UTF-8 + + + + + com.google.guava + guava + + + org.apache.hadoop + hadoop-common + + + org.apache.kafka + kafka-clients + + + org.apache.ozone + hdds-common + + + org.apache.ozone + ozone-common + + + org.slf4j + slf4j-api + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + none + + + + + diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..ecca094b3a43 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={},"+ + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getOpArgs().getOperationType())); + + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(completedRequestInfo.getDbKey()); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer producer; + + public KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); + } + + public void send(String message) throws IOException { + if (producer != null) { + LOG.info("Producing event {}", message); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, message); + producer.send(producerRecord); + } else { + LOG.warn("Producing event {} [KAFKA DOWN]", message); + } + } + + private void ensureTopicExists() { + try (AdminClient adminClient = AdminClient.create(kafkaProps)) { + LOG.info("Creating kafka topic: {}", this.topic); + NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + adminClient.close(); + } catch (Exception ex) { + LOG.error("Failed to create topic: {}", this.topic, ex); + } + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java new file mode 100644 index 000000000000..1264b9b471e4 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * This is a helper class which can be used by implementations of + * OMEventListener which uses a background service to read the latest + * completed operations and hand them to a callback method + */ +public class OMEventListenerLedgerPoller extends BackgroundService { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPoller.class); + + private static final int MAX_RESULTS = 10_000; + + private final AtomicBoolean suspended; + private final AtomicLong runCount; + private final AtomicLong successRunCount; + private final OMEventListenerPluginContext pluginContext; + private final OMEventListenerLedgerPollerSeekPosition seekPosition; + private final Consumer callback; + + public OMEventListenerLedgerPoller(long interval, TimeUnit unit, + int poolSize, long serviceTimeout, + OMEventListenerPluginContext pluginContext, + OzoneConfiguration configuration, + OMEventListenerLedgerPollerSeekPosition seekPosition, + Consumer callback) { + + super("OMEventListenerLedgerPoller", + interval, + TimeUnit.MILLISECONDS, + poolSize, + serviceTimeout, pluginContext.getThreadNamePrefix()); + + this.suspended = new AtomicBoolean(false); + this.runCount = new AtomicLong(0); + this.successRunCount = new AtomicLong(0); + this.pluginContext = pluginContext; + this.seekPosition = seekPosition; + this.callback = callback; + } + + private boolean shouldRun() { + return pluginContext.isLeaderReady() && !suspended.get(); + } + + /** + * Suspend the service. + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended. + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask()); + return queue; + } + + public AtomicLong getRunCount() { + return runCount; + } + + private class CompletedRequestInfoConsumerTask implements BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() { + if (shouldRun()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running OMEventListenerLedgerPoller"); + } + if (runCount.get() == 0) { + seekPosition.initSeekPosition(); + } + getRunCount().incrementAndGet(); + + try { + for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo(seekPosition.get(), MAX_RESULTS)) { + callback.accept(requestInfo); + } + } catch (IOException e) { + LOG.error("Error while running completed operation consumer " + + "background task. Will retry at next run.", e); + } + } else { + runCount.set(0); + } + + // place holder by returning empty results of this call back. + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java new file mode 100644 index 000000000000..99f917caac83 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class to get/set the seek position used by the + * OMEventListenerLedgerPoller. + * + * XXX: the seek position should be persisted (and ideally distrbuted to + * all OMs) but at the moment it only lives in memory + */ +public class OMEventListenerLedgerPollerSeekPosition { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class); + + private final AtomicReference seekPosition; + + public OMEventListenerLedgerPollerSeekPosition() { + this.seekPosition = new AtomicReference(initSeekPosition()); + } + + // TODO: load this from persistent storage + public String initSeekPosition() { + return null; + } + + public String get() { + return seekPosition.get(); + } + + public void set(String val) { + LOG.debug("Setting seek position {}", val); + // NOTE: this in-memory view of the seek position needs to be kept + // up to date because the OMEventListenerLedgerPoller has a + // reference to it + seekPosition.set(val); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..6020cf77744f --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockedConstruction; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo(long trxLogIndex, String keyName, OperationArgs opArgs) { + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(opArgs) + .build(); + } + + private List captureEventsProducedByOperation(OmCompletedRequestInfo op, int expectEvents) throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.notify.kafka.topic", "abc"); + + List events = new ArrayList<>(); + + OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher(); + try (MockedConstruction mockeKafkaClientWrapper = + mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { + + plugin.initialize(conf, pluginContext); + plugin.handleCompletedRequest(op); + + OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + verify(mock, times(expectEvents)).send(argument.capture()); + + events.addAll(argument.getAllValues()); + } + + return events; + } + + @Test + public void testCreateKeyRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(1L, "some/key1", + new OperationArgs.CreateKeyArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1")) + .contains(toJsonKeyVal("type", "CREATE_KEY")); + } + + @Test + public void testCreateFileRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + boolean recursive = false; + boolean overwrite = true; + + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(2L, "some/key2", + new OperationArgs.CreateFileArgs(recursive, overwrite)); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2")) + .contains(toJsonKeyVal("type", "CREATE_FILE")); + } + + @Test + public void testCreateDirectoryRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(3L, "some/key3", + new OperationArgs.CreateDirectoryArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3")) + .contains(toJsonKeyVal("type", "CREATE_DIRECTORY")); + } + + @Test + public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { + OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(4L, "some/key4", + new OperationArgs.RenameKeyArgs("some/key_RENAMED")); + + List events = captureEventsProducedByOperation(renameRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key4")) + .contains(toJsonKeyVal("type", "RENAME_KEY")); + } +} diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml index 923b1c02cbeb..401d92d07ced 100644 --- a/hadoop-ozone/ozone-manager/pom.xml +++ b/hadoop-ozone/ozone-manager/pom.xml @@ -239,6 +239,11 @@ netty-tcnative-boringssl-static runtime + + org.apache.ozone + ozone-manager-plugins + runtime + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 31fe6bd0594a..443c80103ca9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -1323,6 +1323,58 @@ private List listAllVolumes(String prefix, String startKey, return result; } + /** + * {@inheritDoc} + */ + @Override + public List listCompletedRequestInfo(final String startKey, + final int maxResults) + throws IOException { + List results = new ArrayList<>(); + + Table.KeyValue completedRequestInfoRow; + try (TableIterator> + tableIterator = getCompletedRequestInfoTable().iterator()) { + + boolean skipFirst = false; + if (StringUtils.isNotBlank(startKey)) { + // TODO: what happens if the seek position is no longer + // available? Do we go to the end of the list + // or the first key > startKey + tableIterator.seek(startKey); + skipFirst = true; + } + + while (tableIterator.hasNext() && results.size() < maxResults) { + completedRequestInfoRow = tableIterator.next(); + // this is the first loop iteration after the seek so we + // need to skip the record we seeked to (if it is still + // present) + if (skipFirst) { + skipFirst = false; + // NOTE: I'm assuming that we need to conditionally do this + // only if it is equal to what we wanted to seek to (hence + if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) { + // when we have a startKey we expect the first result + // to be that startKey. If it is not then we can infer that + // the startKey was already cleaned up and therefore we have + // missed some records somehow and this needs flagged to the + // caller. + // TODO: we should throw a custom exception here (instead of + // IOException) that needs to be handled appropriately by + // callers + throw new IOException( + "Missing rows - start key not found (startKey=" + startKey + + ", foundKey=" + completedRequestInfoRow.getKey() + ")"); + } + } else { + results.add(completedRequestInfoRow.getValue()); + } + } + } + return results; + } + private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) throws OMException { try { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java index 178ed5b4b7f6..2fa6242af7ff 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java @@ -1,6 +1,9 @@ package org.apache.hadoop.ozone.om.eventlistener; +import java.io.IOException; +import java.util.List; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; /** * A narrow set of functionality we are ok with exposing to plugin @@ -13,6 +16,21 @@ public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) { this.ozoneManager = ozoneManager; } - // TODO: fill this out with capabilities we would like to expose to - // plugin implementations. + @Override + public boolean isLeaderReady() { + return ozoneManager.isLeaderReady(); + } + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + @Override + public List listCompletedRequestInfo(String startKey, int maxResults) throws IOException { + return ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey, maxResults); + } + + // TODO: it feels like this doesn't belong here + @Override + public String getThreadNamePrefix() { + return ozoneManager.getThreadNamePrefix(); + } } diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 32041971cbeb..8e6bb742b232 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -44,6 +44,7 @@ mini-cluster multitenancy-ranger ozone-manager + ozone-manager-plugins ozonefs ozonefs-common recon diff --git a/pom.xml b/pom.xml index d6e46d73b0c1..c799e1f7454d 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,7 @@ 2.1 1.1.1 5.14.1 + 2.8.2 1.0.1 1.9.25 2.7.0 @@ -984,6 +985,11 @@ httpcore-nio ${httpcore.version} + + org.apache.kafka + kafka-clients + ${kafka.version} + org.apache.kerby kerb-core @@ -1234,6 +1240,11 @@ ${ozone.version} test-jar + + org.apache.ozone + ozone-manager-plugins + ${ozone.version} + org.apache.ozone ozone-mini-cluster @@ -2254,6 +2265,7 @@ --> com.fasterxml.jackson.core:jackson-databind:jar org.apache.commons:commons-compress:jar + org.apache.hadoop:hadoop-common:jar org.apache.ozone:hdds-client:jar org.apache.ozone:ozone-interface-client:jar org.glassfish.jersey.core:jersey-common:jar From b642e95e48cb85c2751f8b62510be145757d6028 Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Thu, 20 Nov 2025 16:42:10 +0000 Subject: [PATCH 5/7] Plumbed in event listener plugins to run end to end --- .../apache/hadoop/ozone/om/KeyManagerImpl.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index f4a900435e7e..5a38122150cf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -144,6 +144,7 @@ import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeletedBlock; import org.apache.hadoop.ozone.om.PendingKeysDeletion.PurgedKey; +import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; @@ -219,6 +220,9 @@ public class KeyManagerImpl implements KeyManager { private DNSToSwitchMapping dnsToSwitchMapping; private CompactionService compactionService; + // XXX: probably doesn't belong here + private OMEventListenerPluginManager eventListenerPluginManager; + public KeyManagerImpl(OzoneManager om, ScmClient scmClient, OzoneConfiguration conf, OMPerformanceMetrics metrics) { this(om, scmClient, om.getMetadataManager(), conf, @@ -357,6 +361,15 @@ public void start(OzoneConfiguration configuration) { multipartUploadCleanupService.start(); } + // TODO: + // * this is probably the wrong place for this but adding it here + // for now just to get the plumbing fleshed out + // + if (eventListenerPluginManager == null) { + eventListenerPluginManager = new OMEventListenerPluginManager(ozoneManager, configuration); + eventListenerPluginManager.startAll(); + } + Class dnsToSwitchMappingClass = configuration.getClass( ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, @@ -499,6 +512,10 @@ public void stop() { compactionService.shutdown(); compactionService = null; } + if (eventListenerPluginManager != null) { + eventListenerPluginManager.shutdownAll(); + eventListenerPluginManager= null; + } } /** From c259fe75a6c60101e4d2c7cca67531f0ba63485b Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Thu, 20 Nov 2025 16:43:10 +0000 Subject: [PATCH 6/7] Cloned ozone-ha docker setup with additional kafka install for end to end testing --- .../src/main/compose/ozone-ha-with-kafka/.env | 21 +++ .../ozone-ha-with-kafka/docker-compose.yaml | 176 ++++++++++++++++++ .../compose/ozone-ha-with-kafka/docker-config | 63 +++++++ .../ozone-ha-with-kafka/test-hadoop.sh | 29 +++ .../main/compose/ozone-ha-with-kafka/test.sh | 53 ++++++ 5 files changed, 342 insertions(+) create mode 100644 hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env create mode 100644 hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml create mode 100644 hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config create mode 100755 hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh create mode 100755 hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env new file mode 100644 index 000000000000..2186e05a2709 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HDDS_VERSION=${hdds.version} +HADOOP_IMAGE=${docker.hadoop.image} +OZONE_RUNNER_VERSION=${docker.ozone-runner.version} +OZONE_RUNNER_IMAGE=apache/ozone-runner +OZONE_OPTS= diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml new file mode 100644 index 000000000000..a18699349ce0 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# reusable fragments (see https://docs.docker.com/compose/compose-file/#extension-fields) +x-common-config: + &common-config + image: ${OZONE_RUNNER_IMAGE}:${OZONE_RUNNER_VERSION} + volumes: + - ../..:/opt/hadoop + env_file: + - docker-config + +x-replication: + &replication + OZONE-SITE.XML_ozone.server.default.replication: ${OZONE_REPLICATION_FACTOR:-1} + +services: + datanode: + <<: *common-config + ports: + - 19864 + - 9882 + environment: + <<: *replication + command: ["ozone","datanode"] + om1: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874:9874 + - 9862 + hostname: om1 + command: ["ozone","om"] + om2: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874 + - 9862 + hostname: om2 + command: ["ozone","om"] + om3: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874 + - 9862 + hostname: om3 + command: ["ozone","om"] + scm1: + <<: *common-config + ports: + - 9876:9876 + environment: + ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + scm2: + <<: *common-config + ports: + - 9876 + environment: + WAITFOR: scm1:9894 + ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + scm3: + <<: *common-config + ports: + - 9876 + environment: + WAITFOR: scm2:9894 + ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + httpfs: + <<: *common-config + environment: + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + ports: + - 14000:14000 + command: [ "ozone","httpfs" ] + s3g: + <<: *common-config + environment: + <<: *replication + ports: + - 9878:9878 + command: ["ozone","s3g"] + recon: + <<: *common-config + ports: + - 9888:9888 + environment: + <<: *replication + command: ["ozone","recon"] + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka-1: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-1 + container_name: kafka-1 + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + + kafka-2: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-2 + container_name: kafka-2 + ports: + - "9093:9093" + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9093 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + + kafka-3: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-3 + container_name: kafka-3 + ports: + - "9094:9094" + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,PLAINTEXT_HOST://localhost:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9094 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config new file mode 100644 index 000000000000..87c569fcb04e --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For HttpFS service it is required to enable proxying users. +CORE-SITE.XML_hadoop.proxyuser.hadoop.hosts=* +CORE-SITE.XML_hadoop.proxyuser.hadoop.groups=* + +CORE-SITE.XML_fs.defaultFS=ofs://omservice/ +CORE-SITE.XML_fs.trash.interval=1 + +OZONE-SITE.XML_ozone.om.service.ids=omservice +OZONE-SITE.XML_ozone.om.nodes.omservice=om1,om2,om3 +OZONE-SITE.XML_ozone.om.address.omservice.om1=om1 +OZONE-SITE.XML_ozone.om.address.omservice.om2=om2 +OZONE-SITE.XML_ozone.om.address.omservice.om3=om3 + +OZONE-SITE.XML_ozone.scm.service.ids=scmservice +OZONE-SITE.XML_ozone.scm.nodes.scmservice=scm1,scm2,scm3 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm1=scm1 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm2=scm2 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm3=scm3 +OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata +OZONE-SITE.XML_ozone.scm.container.size=1GB +OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB +OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata +OZONE-SITE.XML_hdds.datanode.dir=/data/hdds +OZONE-SITE.XML_hdds.datanode.volume.min.free.space=100MB +OZONE-SITE.XML_ozone.datanode.pipeline.limit=1 +OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s +OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1 +OZONE-SITE.XML_hdds.container.report.interval=60s +OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true +OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon +OZONE-SITE.XML_ozone.recon.address=recon:9891 +OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888 +OZONE-SITE.XML_ozone.recon.https-address=0.0.0.0:9889 +OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true +OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http + +OZONE-SITE.XML_ozone.om.plugin.destination.kafka=true +OZONE-SITE.XML_ozone.om.plugin.destination.kafka.classname=org.apache.hadoop.ozone.om.eventlistener.OMEventListenerKafkaPublisher +OZONE-SITE.XML_ozone.notify.kafka.topic=test123 +OZONE-SITE.XML_ozone.notify.kafka.bootstrap.servers=kafka-3:29092,kafka-1:29092,kafka-2:29092 +OZONE-SITE.XML_ozone.notify.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer +OZONE-SITE.XML_ozone.notify.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +OZONE_CONF_DIR=/etc/hadoop +OZONE_LOG_DIR=/var/log/hadoop + +no_proxy=om1,om2,om3,scm,s3g,recon,kdc,localhost,127.0.0.1 diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh new file mode 100755 index 000000000000..a0d19fd673c0 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#suite:MR + +set -u -o pipefail + +COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +export COMPOSE_DIR + +export SECURITY_ENABLED=false +export SCM=scm1 +export OM_SERVICE_ID=omservice + +source "$COMPOSE_DIR/../common/hadoop-test.sh" diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh new file mode 100755 index 000000000000..6c09e7b76158 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#suite:HA-unsecure + +set -u -o pipefail + +COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +export COMPOSE_DIR + +export SECURITY_ENABLED=false +export OZONE_REPLICATION_FACTOR=3 +export SCM=scm1 +export OM_SERVICE_ID=omservice + +# shellcheck source=/dev/null +source "$COMPOSE_DIR/../testlib.sh" + +start_docker_env 5 + +execute_robot_test ${SCM} basic/ozone-shell-single.robot +execute_robot_test ${SCM} basic/links.robot + +execute_robot_test ${SCM} -v SCHEME:ofs -v BUCKET_TYPE:link -N ozonefs-ofs-link ozonefs/ozonefs.robot + +## Exclude virtual-host tests. This is tested separately as it requires additional config. +exclude="--exclude virtual-host" +for bucket in generated; do + for layout in OBJECT_STORE LEGACY FILE_SYSTEM_OPTIMIZED; do + execute_robot_test ${SCM} -v BUCKET:${bucket} -v BUCKET_LAYOUT:${layout} -N s3-${layout}-${bucket} ${exclude} s3 + # some tests are independent of the bucket type, only need to be run once + exclude="--exclude virtual-host --exclude no-bucket-type" + done +done + +execute_robot_test ${SCM} freon +execute_robot_test ${SCM} -v USERNAME:httpfs httpfs + +execute_robot_test ${SCM} omha/om-roles.robot From 146ca31485b7b26a809758ece8d2ae142b095d0e Mon Sep 17 00:00:00 2001 From: Colm Dougan Date: Thu, 20 Nov 2025 16:55:12 +0000 Subject: [PATCH 7/7] Generate events according to S3 schema/strategy --- hadoop-ozone/ozone-manager-plugins/pom.xml | 20 + .../OMEventListenerKafkaPublisher.java | 32 +- .../OMEventListenerNotificationStrategy.java | 9 + .../s3/DateTimeJsonSerializer.java | 62 ++ .../eventlistener/s3/S3EventNotification.java | 630 ++++++++++++++++++ .../s3/S3EventNotificationBuilder.java | 109 +++ .../s3/S3EventNotificationStrategy.java | 156 +++++ .../TestOMEventListenerKafkaPublisher.java | 17 +- 8 files changed, 1010 insertions(+), 25 deletions(-) create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java create mode 100644 hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml b/hadoop-ozone/ozone-manager-plugins/pom.xml index 82f7fe5751fc..45c8fd8766d6 100644 --- a/hadoop-ozone/ozone-manager-plugins/pom.xml +++ b/hadoop-ozone/ozone-manager-plugins/pom.xml @@ -29,10 +29,30 @@ + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + com.google.guava guava + + joda-time + joda-time + + + org.apache.commons + commons-lang3 + org.apache.hadoop hadoop-common diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java index ecca094b3a43..bbd913cb8d39 100644 --- a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotificationStrategy; import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.slf4j.Logger; @@ -50,6 +51,7 @@ public class OMEventListenerKafkaPublisher implements OMEventListener { private OMEventListenerLedgerPoller ledgerPoller; private KafkaClientWrapper kafkaClient; + private OMEventListenerNotificationStrategy notificationStrategy; private OMEventListenerLedgerPollerSeekPosition seekPosition; @Override @@ -69,6 +71,7 @@ public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext plu kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, seekPosition); + this.notificationStrategy = new S3EventNotificationStrategy(); this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); this.ledgerPoller = new OMEventListenerLedgerPoller( @@ -103,27 +106,18 @@ public void shutdown() { // callback called by OMEventListenerLedgerPoller public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { - LOG.info("Processing {}", completedRequestInfo); - - // stub event until we implement a strategy to convert the events to - // a user facing schema (e.g. S3) - String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", - completedRequestInfo.getVolumeName(), - completedRequestInfo.getBucketName(), - completedRequestInfo.getKeyName(), - String.valueOf(completedRequestInfo.getOpArgs().getOperationType())); - - - LOG.info("Sending {}", event); - - try { - kafkaClient.send(event); - } catch (IOException ex) { - LOG.error("Failure to send event {}", event, ex); - return; + List eventsToSend = notificationStrategy.determineEventsForOperation(completedRequestInfo); + + for (String event : eventsToSend) { + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } } - // we can update the seek position + // no errors so we can update the seek position seekPosition.set(completedRequestInfo.getDbKey()); } diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java new file mode 100644 index 000000000000..85c05002039a --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +public interface OMEventListenerNotificationStrategy { + + List determineEventsForOperation(OmCompletedRequestInfo completedRequestInfo); +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java new file mode 100644 index 000000000000..266255017cc3 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016. Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.io.IOException; +import java.util.Date; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.joda.time.tz.FixedDateTimeZone; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +/** + * A Jackson serializer for Joda {@code DateTime}s. + */ +public final class DateTimeJsonSerializer extends JsonSerializer { + + @Override + public void serialize( + DateTime value, + JsonGenerator jgen, + SerializerProvider provider) throws IOException { + + jgen.writeString(DateUtils.formatISO8601Date(value)); + } + + private static class DateUtils { + private static final DateTimeZone GMT = new FixedDateTimeZone("GMT", "GMT", 0, 0); + + /** ISO 8601 format */ + protected static final DateTimeFormatter iso8601DateFormat = + ISODateTimeFormat.dateTime().withZone(GMT); + + /** + * Formats the specified date as an ISO 8601 string. + * + * @param date the date to format + * @return the ISO-8601 string representing the specified date + */ + public static String formatISO8601Date(DateTime date) { + return iso8601DateFormat.print(date); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java new file mode 100644 index 000000000000..2bc978ec09d7 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java @@ -0,0 +1,630 @@ +/* + * Copyright 2014-2025 Amazon Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://aws.amazon.com/apache2.0 + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + * OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +/* copy of + * software.amazon.awssdk.eventnotifications.s3.model.S3EventNotification + * class taken from AWS SDK (1.x) with minor changes for build issues + * and removed usage of unnecessary AWS specific extension entiies: + * + * - GlacierEventDataEntity + * - LifecycleEventDataEntity + * - IntelligentTieringEventDataEntity + * - ReplicationEventDataEntity + * + * NOTE: We may not need to fork this class if we can use the SDK one directly + * but conversely we may want to make our own customizations. + */ + +import java.util.List; +import org.joda.time.DateTime; + +//import com.amazonaws.internal.DateTimeJsonSerializer; +//import com.amazonaws.util.json.Jackson; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Map; + +/** +* A helper class that represents a strongly typed S3 EventNotification item sent +* to SQS, SNS, or Lambda. + * + *

+ * Migrating to the AWS SDK for Java v2 + *

+ * The v2 equivalent of this class is + * S3EventNotification + * + *

+ * See Migration Guide + * for more information. +*/ +public class S3EventNotification { + + private final List records; + + @JsonCreator + public S3EventNotification( + @JsonProperty(value = "Records") List records) + { + this.records = records; + } + + /** + *

+ * Parse the JSON string into a S3EventNotification object. + *

+ *

+ * The function will try its best to parse input JSON string as best as it can. + * It will not fail even if the JSON string contains unknown properties. + * The function will throw SdkClientException if the input JSON string is + * not valid JSON. + *

+ * @param json + * JSON string to parse. Typically this is the body of your SQS + * notification message body. + * + * @return The resulting S3EventNotification object. + */ + //public static S3EventNotification parseJson(String json) { + // return Jackson.fromJsonString(json, S3EventNotification.class); + //} + + /** + * @return the records in this notification + */ + @JsonProperty(value = "Records") + public List getRecords() { + return records; + } + + /** + * @return a JSON representation of this object + */ + //public String toJson() { + // return Jackson.toJsonString(this); + //} + + public static class UserIdentityEntity { + + private final String principalId; + + @JsonCreator + public UserIdentityEntity( + @JsonProperty(value = "principalId") String principalId) { + this.principalId = principalId; + } + + public String getPrincipalId() { + return principalId; + } + } + + public static class S3BucketEntity { + + private final String name; + private final UserIdentityEntity ownerIdentity; + private final String arn; + + @JsonCreator + public S3BucketEntity( + @JsonProperty(value = "name") String name, + @JsonProperty(value = "ownerIdentity") UserIdentityEntity ownerIdentity, + @JsonProperty(value = "arn") String arn) + { + this.name = name; + this.ownerIdentity = ownerIdentity; + this.arn = arn; + } + + public String getName() { + return name; + } + + public UserIdentityEntity getOwnerIdentity() { + return ownerIdentity; + } + + public String getArn() { + return arn; + } + } + + public static class S3ObjectEntity { + + private final String key; + private final Long size; + private final String eTag; + private final String versionId; + private final String sequencer; + + @Deprecated + public S3ObjectEntity( + String key, + Integer size, + String eTag, + String versionId) + { + this.key = key; + this.size = size == null ? null : size.longValue(); + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = null; + } + + @Deprecated + public S3ObjectEntity( + String key, + Long size, + String eTag, + String versionId) + { + this(key, size, eTag, versionId, null); + } + + @JsonCreator + public S3ObjectEntity( + @JsonProperty(value = "key") String key, + @JsonProperty(value = "size") Long size, + @JsonProperty(value = "eTag") String eTag, + @JsonProperty(value = "versionId") String versionId, + @JsonProperty(value = "sequencer") String sequencer) + { + this.key = key; + this.size = size; + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = sequencer; + } + + public String getKey() { + return key; + } + + /** + * S3 URL encodes the key of the object involved in the event. This is + * a convenience method to automatically URL decode the key. + * @return The URL decoded object key. + */ + //public String getUrlDecodedKey() { + // return SdkHttpUtils.urlDecode(getKey()); + //} + + /** + * @deprecated use {@link #getSizeAsLong()} instead. + */ + @Deprecated + @JsonIgnore + public Integer getSize() { + return size == null ? null : size.intValue(); + } + + @JsonProperty(value = "size") + public Long getSizeAsLong() { + return size; + } + + public String geteTag() { + return eTag; + } + + public String getVersionId() { + return versionId; + } + + public String getSequencer() { + return sequencer; + } + } + + public static class S3Entity { + + private final String configurationId; + private final S3BucketEntity bucket; + private final S3ObjectEntity object; + private final String s3SchemaVersion; + + @JsonCreator + public S3Entity( + @JsonProperty(value = "configurationId") String configurationId, + @JsonProperty(value = "bucket") S3BucketEntity bucket, + @JsonProperty(value = "object") S3ObjectEntity object, + @JsonProperty(value = "s3SchemaVersion") String s3SchemaVersion) + { + this.configurationId = configurationId; + this.bucket = bucket; + this.object = object; + this.s3SchemaVersion = s3SchemaVersion; + } + + public String getConfigurationId() { + return configurationId; + } + + public S3BucketEntity getBucket() { + return bucket; + } + + public S3ObjectEntity getObject() { + return object; + } + + public String getS3SchemaVersion() { + return s3SchemaVersion; + } + } + + public static class RequestParametersEntity { + + private final String sourceIPAddress; + + @JsonCreator + public RequestParametersEntity( + @JsonProperty(value = "sourceIPAddress") String sourceIPAddress) + { + this.sourceIPAddress = sourceIPAddress; + } + + public String getSourceIPAddress() { + return sourceIPAddress; + } + } + + public static class ResponseElementsEntity { + + private final String xAmzId2; + private final String xAmzRequestId; + + @JsonCreator + public ResponseElementsEntity( + @JsonProperty(value = "x-amz-id-2") String xAmzId2, + @JsonProperty(value = "x-amz-request-id") String xAmzRequestId) + { + this.xAmzId2 = xAmzId2; + this.xAmzRequestId = xAmzRequestId; + } + + @JsonProperty("x-amz-id-2") + public String getxAmzId2() { + return xAmzId2; + } + + @JsonProperty("x-amz-request-id") + public String getxAmzRequestId() { + return xAmzRequestId; + } + } + + public static class GlacierEventDataEntity { + private final RestoreEventDataEntity restoreEventData; + + @JsonCreator + public GlacierEventDataEntity( + @JsonProperty(value = "restoreEventData") RestoreEventDataEntity restoreEventData) + { + this.restoreEventData = restoreEventData; + } + + public RestoreEventDataEntity getRestoreEventData() { + return restoreEventData; + } + } + + public static class LifecycleEventDataEntity { + + private final TransitionEventDataEntity transitionEventData; + + @JsonCreator + public LifecycleEventDataEntity( + @JsonProperty(value = "transitionEventData") TransitionEventDataEntity transitionEventData) + { + + this.transitionEventData = transitionEventData; + } + + public TransitionEventDataEntity getTransitionEventData() { + return transitionEventData; + } + } + + public static class IntelligentTieringEventDataEntity { + + private final String destinationAccessTier; + + @JsonCreator + public IntelligentTieringEventDataEntity( + @JsonProperty(value = "destinationAccessTier") String destinationAccessTier) + { + this.destinationAccessTier = destinationAccessTier; + } + + @JsonProperty("destinationAccessTier") + public String getDestinationAccessTier() { + return destinationAccessTier; + } + } + + public static class ReplicationEventDataEntity { + + private final String replicationRuleId; + private final String destinationBucket; + private final String s3Operation; + private final String requestTime; + private final String failureReason; + private final String threshold; + private final String replicationTime; + + @JsonCreator + public ReplicationEventDataEntity( + @JsonProperty(value = "replicationRuleId") String replicationRuleId, + @JsonProperty(value = "destinationBucket") String destinationBucket, + @JsonProperty(value = "s3Operation") String s3Operation, + @JsonProperty(value = "requestTime") String requestTime, + @JsonProperty(value = "failureReason") String failureReason, + @JsonProperty(value = "threshold") String threshold, + @JsonProperty(value = "replicationTime") String replicationTime) + { + this.replicationRuleId = replicationRuleId; + this.destinationBucket = destinationBucket; + this.s3Operation = s3Operation; + this.requestTime = requestTime; + this.failureReason = failureReason; + this.threshold = threshold; + this.replicationTime = replicationTime; + } + + @JsonProperty("replicationRuleId") + public String getReplicationRuleId() { + return replicationRuleId; + } + @JsonProperty("destinationBucket") + public String getDestinationBucket() { + return destinationBucket; + } + @JsonProperty("s3Operation") + public String getS3Operation() { + return s3Operation; + } + @JsonProperty("requestTime") + public String getRequestTime() { + return requestTime; + } + @JsonProperty("failureReason") + public String getFailureReason() { + return failureReason; + } + @JsonProperty("threshold") + public String getThreshold() { + return threshold; + } + @JsonProperty("replicationTime") + public String getReplicationTime() { + return replicationTime; + } + } + + public static class TransitionEventDataEntity { + private final String destinationStorageClass; + + @JsonCreator + public TransitionEventDataEntity( + @JsonProperty("destinationStorageClass") String destinationStorageClass) + { + this.destinationStorageClass = destinationStorageClass; + } + + public String getDestinationStorageClass() { + return destinationStorageClass; + } + } + + public static class RestoreEventDataEntity { + private DateTime lifecycleRestorationExpiryTime; + private final String lifecycleRestoreStorageClass; + + @JsonCreator + public RestoreEventDataEntity( + @JsonProperty("lifecycleRestorationExpiryTime") String lifecycleRestorationExpiryTime, + @JsonProperty("lifecycleRestoreStorageClass") String lifecycleRestoreStorageClass) + { + if (lifecycleRestorationExpiryTime != null) { + this.lifecycleRestorationExpiryTime = DateTime.parse(lifecycleRestorationExpiryTime); + } + this.lifecycleRestoreStorageClass = lifecycleRestoreStorageClass; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getLifecycleRestorationExpiryTime() { + return lifecycleRestorationExpiryTime; + } + + public String getLifecycleRestoreStorageClass() { + return lifecycleRestoreStorageClass; + } + } + + public static class S3EventNotificationRecord { + + private final String awsRegion; + private final String eventName; + private final String eventSource; + private DateTime eventTime; + private final String eventVersion; + private final RequestParametersEntity requestParameters; + private final ResponseElementsEntity responseElements; + private final S3Entity s3; + private final UserIdentityEntity userIdentity; + private final Map eventData; + //private final GlacierEventDataEntity glacierEventData; + //private final LifecycleEventDataEntity lifecycleEventData; + //private final IntelligentTieringEventDataEntity intelligentTieringEventData; + //private final ReplicationEventDataEntity replicationEventDataEntity; + + /* + @Deprecated + public S3EventNotificationRecord( + String awsRegion, + String eventName, + String eventSource, + String eventTime, + String eventVersion, + RequestParametersEntity requestParameters, + ResponseElementsEntity responseElements, + S3Entity s3, + UserIdentityEntity userIdentity) + { + this(awsRegion, + eventName, + eventSource, + eventTime, + eventVersion, + requestParameters, + responseElements, + s3, + userIdentity, + null, + null, + null, + null); + } + + @Deprecated + public S3EventNotificationRecord( + String awsRegion, + String eventName, + String eventSource, + String eventTime, + String eventVersion, + RequestParametersEntity requestParameters, + ResponseElementsEntity responseElements, + S3Entity s3, + UserIdentityEntity userIdentity, + GlacierEventDataEntity glacierEventData) + { + this(awsRegion, + eventName, + eventSource, + eventTime, + eventVersion, + requestParameters, + responseElements, + s3, + userIdentity, + glacierEventData, + null, + null, + null); + } + */ + + @JsonCreator + public S3EventNotificationRecord( + @JsonProperty(value = "awsRegion") String awsRegion, + @JsonProperty(value = "eventName") String eventName, + @JsonProperty(value = "eventSource") String eventSource, + @JsonProperty(value = "eventTime") String eventTime, + @JsonProperty(value = "eventVersion") String eventVersion, + @JsonProperty(value = "requestParameters") RequestParametersEntity requestParameters, + @JsonProperty(value = "responseElements") ResponseElementsEntity responseElements, + @JsonProperty(value = "s3") S3Entity s3, + @JsonProperty(value = "userIdentity") UserIdentityEntity userIdentity, + @JsonProperty(value = "eventData") Map eventData) + //@JsonProperty(value = "glacierEventData") GlacierEventDataEntity glacierEventData, + //@JsonProperty(value = "lifecycleEventData") LifecycleEventDataEntity lifecycleEventData, + //@JsonProperty(value = "intelligentTieringEventData") IntelligentTieringEventDataEntity intelligentTieringEventData, + //@JsonProperty(value = "replicationEventData") ReplicationEventDataEntity replicationEventData) + { + this.awsRegion = awsRegion; + this.eventName = eventName; + this.eventSource = eventSource; + + if (eventTime != null) + { + this.eventTime = DateTime.parse(eventTime); + } + + this.eventVersion = eventVersion; + this.requestParameters = requestParameters; + this.responseElements = responseElements; + this.s3 = s3; + this.userIdentity = userIdentity; + this.eventData = eventData; + //this.glacierEventData = glacierEventData; + //this.lifecycleEventData = lifecycleEventData; + //this.intelligentTieringEventData = intelligentTieringEventData; + //this.replicationEventDataEntity = replicationEventData; + } + + public String getAwsRegion() { + return awsRegion; + } + + public String getEventName() { + return eventName; + } + + //@JsonIgnore + //public S3Event getEventNameAsEnum() { + // return S3Event.fromValue(eventName); + //} + + public String getEventSource() { + return eventSource; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getEventTime() { + return eventTime; + } + + public String getEventVersion() { + return eventVersion; + } + + public RequestParametersEntity getRequestParameters() { + return requestParameters; + } + + public ResponseElementsEntity getResponseElements() { + return responseElements; + } + + public S3Entity getS3() { + return s3; + } + + public UserIdentityEntity getUserIdentity() { + return userIdentity; + } + + // Ozone extension + public Map getEventData() { + return eventData; + } + + //public GlacierEventDataEntity getGlacierEventData() { + // return glacierEventData; + //} + + //public LifecycleEventDataEntity getLifecycleEventData() { return lifecycleEventData; } + + //public IntelligentTieringEventDataEntity getIntelligentTieringEventData() { return intelligentTieringEventData; } + + //public ReplicationEventDataEntity getReplicationEventDataEntity() { return replicationEventDataEntity; } + + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java new file mode 100644 index 000000000000..4d8059cb1629 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java @@ -0,0 +1,109 @@ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3BucketEntity; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3EventNotificationRecord; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3ObjectEntity; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.UserIdentityEntity; + +/** + * This is a builder for the AWS event notification class + * com.amazonaws.services.s3.event.S3EventNotification which is part of + * AWS SDK 1.x + * + * NOTE: the above class is designed for deserialization which is why it + * requires this builder wrapper. + * + * XXX: we may need to fork these classes so that we can customize it to + * our needs. + */ +public class S3EventNotificationBuilder { + + private static final String REGION = "us-east-1"; + private static final String BUCKET_ARN_PREFIX = "arn:aws:s3:" + REGION; + private static final String EVENT_SOURCE = "ozone:s3"; + private static final String EVENT_VERSION = "2.1"; + private static final String USER_IDENTITY = "some-principalId"; + + private static final String SCHEMA_VERSION = "1.0"; + private static final String CONFIGURATION_ID = "mynotif1"; + + private final String objectKey; + private final String bucketName; + private final String bucketArn; + private final String eventName; + private final Instant eventTime; + private final String etag; + private final Map eventData; + + // mutable fields defaulting to null + private Long objectSize; + private String objectVersionId; + private String objectSequencer; + + public S3EventNotificationBuilder(String objectKey, String bucketName, String bucketArn, String eventName, Instant eventTime, String etag) { + this.objectKey = objectKey; + this.bucketName = bucketName; + this.bucketArn = bucketArn; + this.eventName = eventName; + this.eventTime = eventTime; + this.etag = etag; + this.eventData = new HashMap<>(); + } + + public S3EventNotificationBuilder setObjectSize(long objectSize) { + this.objectSize = objectSize; + return this; + } + + public S3EventNotificationBuilder setObjectVersionId(String objectVersionId) { + this.objectVersionId = objectVersionId; + return this; + } + + public S3EventNotificationBuilder setObjectSequencer(String objectSequencer) { + this.objectSequencer = objectSequencer; + return this; + } + + public S3EventNotificationBuilder addAllEventData(Map eventData) { + this.eventData.putAll(eventData); + return this; + } + + public S3EventNotification build() { + UserIdentityEntity userIdentity = new UserIdentityEntity(USER_IDENTITY); + S3BucketEntity s3BucketEntity = new S3BucketEntity(bucketName, userIdentity, bucketArn); + + S3EventNotification.S3ObjectEntity s3ObjectEntity = new S3ObjectEntity( + objectKey, + objectSize, + etag, + objectVersionId, + objectSequencer); + + S3EventNotification.S3Entity s3Entity = new S3EventNotification.S3Entity( + CONFIGURATION_ID, + s3BucketEntity, + s3ObjectEntity, + SCHEMA_VERSION); + + S3EventNotificationRecord eventRecord = new S3EventNotificationRecord( + REGION, + eventName, + EVENT_SOURCE, + eventTime.toString(), + EVENT_VERSION, + new S3EventNotification.RequestParametersEntity(""), + new S3EventNotification.ResponseElementsEntity("", ""), + s3Entity, + new S3EventNotification.UserIdentityEntity("tester"), + eventData); + + return new S3EventNotification(Collections.singletonList(eventRecord)); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java new file mode 100644 index 000000000000..6cc15ff0f9b6 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerNotificationStrategy; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + +/** + * This is a notification strategy to generate events according to S3 + * notification semantics + */ +public class S3EventNotificationStrategy implements OMEventListenerNotificationStrategy { + public static final Logger LOG = LoggerFactory.getLogger(S3EventNotificationStrategy.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public List determineEventsForOperation(OmCompletedRequestInfo requestInfo) { + + switch (requestInfo.getOpArgs().getOperationType()) { + case CREATE_KEY: + case COMMIT_KEY: { + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + Collections.emptyMap()) + ); + } + case CREATE_FILE: { + OmCompletedRequestInfo.OperationArgs.CreateFileArgs createFileArgs + = (OmCompletedRequestInfo.OperationArgs.CreateFileArgs) requestInfo.getOpArgs(); + + // XXX: eventDaata is an Ozone extension. Its is unclear if this + // schema makes sense but the general S3 schema is somewhat + // freeform. These arguments are more informational than + // required so it is unclear as to their necessity. + Map eventData = new HashMap<>(); + eventData.put("isDirectory", "false"); + eventData.put("isRecursive", String.valueOf(createFileArgs.isRecursive())); + eventData.put("isOverwrite", String.valueOf(createFileArgs.isOverwrite())); + + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + eventData) + ); + } + case CREATE_DIRECTORY: { + OmCompletedRequestInfo.OperationArgs.CreateDirectoryArgs createDirArgs + = (OmCompletedRequestInfo.OperationArgs.CreateDirectoryArgs) requestInfo.getOpArgs(); + + // XXX: eventDaata is an Ozone extension. Its is unclear if this + // schema makes sense but the general S3 schema is somewhat + // freeform. These arguments are more informational than + // required so it is unclear as to their necessity. + Map eventData = new HashMap<>(); + eventData.put("isDirectory", "true"); + + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + eventData) + ); + } + case DELETE_KEY: { + return Collections.singletonList(createS3Event("ObjectRemoved:Delete", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + Collections.emptyMap()) + ); + } + case RENAME_KEY: { + OmCompletedRequestInfo.OperationArgs.RenameKeyArgs renameKeyArgs + = (OmCompletedRequestInfo.OperationArgs.RenameKeyArgs) requestInfo.getOpArgs(); + + String renameFromKey = OzoneKeyUtil.getOzoneKey(requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName()); + + // XXX: it would be good to be able to convey that this was a + // file vs directory rename + Map eventData = new HashMap<>(); + eventData.put("renameFromKey", renameFromKey); + + // NOTE: ObjectRenamed:Rename is an Ozone extension as is the + // eventData map in the S3 event schema. + return Collections.singletonList(createS3Event("ObjectRenamed:Rename", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + renameKeyArgs.getToKeyName(), + eventData) + ); + } + default: + LOG.info("No events for operation {} on {}", + requestInfo.getOpArgs().getOperationType(), + requestInfo.getKeyName()); + return Collections.emptyList(); + } + } + + static String createS3Event(String eventName, + String volumeName, + String bucketName, + String keyName, + Map eventData) { + try { + String objectKey = OzoneKeyUtil.getOzoneKey(volumeName, bucketName, keyName); + String bucketArn = "arn:aws:s3:::" + volumeName + "." + bucketName; + Instant eventTime = Instant.now(); + String etag = UUID.randomUUID().toString(); + + S3EventNotification event = new S3EventNotificationBuilder(objectKey, bucketName, bucketArn, eventName, eventTime, etag) + .addAllEventData(eventData) + .build(); + + return MAPPER.writer().writeValueAsString(event); + } catch (Exception ex) { + LOG.info("------------> {}", "failed"); + return null; + } + } + + // stub: taken from metadataManager. Should we just pass in + // metadataManager? + private static class OzoneKeyUtil { + public static String getOzoneKey(String volume, String bucket, String key) { + StringBuilder builder = new StringBuilder() + .append(volume); + // TODO : Throw if the Bucket is null? + builder.append(OM_KEY_PREFIX).append(bucket); + if (StringUtils.isNotBlank(key)) { + builder.append(OM_KEY_PREFIX); + if (!key.equals(OM_KEY_PREFIX)) { + builder.append(key); + } + } + return builder.toString(); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java index 6020cf77744f..2a14d1073bca 100644 --- a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java +++ b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java @@ -108,8 +108,8 @@ public void testCreateKeyRequestProducesS3CreatedEvent() throws InterruptedExcep assertThat(events).hasSize(1); assertThat(events.get(0)) - .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1")) - .contains(toJsonKeyVal("type", "CREATE_KEY")); + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1")); } @Test @@ -124,8 +124,11 @@ public void testCreateFileRequestProducesS3CreatedEvent() throws InterruptedExce assertThat(events).hasSize(1); assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2")) - .contains(toJsonKeyVal("type", "CREATE_FILE")); + .contains(toJsonKeyVal("isDirectory", "false")) + .contains(toJsonKeyVal("isRecursive", "false")) + .contains(toJsonKeyVal("isOverwrite", "true")); } @Test @@ -137,8 +140,9 @@ public void testCreateDirectoryRequestProducesS3CreatedEvent() throws Interrupte assertThat(events).hasSize(1); assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3")) - .contains(toJsonKeyVal("type", "CREATE_DIRECTORY")); + .contains(toJsonKeyVal("isDirectory", "true")); } @Test @@ -150,7 +154,8 @@ public void testRenameRequestProducesS3CreateAndDeleteEvents() throws Interrupte assertThat(events).hasSize(1); assertThat(events.get(0)) - .contains(toJsonKeyVal("key", "vol1/bucket1/some/key4")) - .contains(toJsonKeyVal("type", "RENAME_KEY")); + .contains(toJsonKeyVal("eventName", "ObjectRenamed:Rename")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key_RENAMED")) + .contains(toJsonKeyVal("renameFromKey", "vol1/bucket1/some/key4")); } }