diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 1aab4fc28dd5..146c02e6b7b5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -539,6 +539,11 @@ public final class OzoneConsts { public static final String COMPACTION_LOG_TABLE = "compactionLogTable"; + /** + * DB completed request info table name. Referenced in RDBStore. + */ + public static final String COMPLETED_REQUEST_INFO_TABLE = "completedRequestInfoTable"; + /** * S3G multipart upload request's ETag header key. */ diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java new file mode 100644 index 000000000000..e7be27796e7e --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/NotificationCheckpointStrategy.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; + +/** + * Interface for implementations which load/save the current checkpoint + * position representing the last known notification sent by an event + * notification plugin. + */ +public interface NotificationCheckpointStrategy { + + String load() throws IOException; + + void save(String val) throws IOException; +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java new file mode 100644 index 000000000000..a9407ebbb86c --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +public interface OMEventListener { + + void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext); + + void start(); + + void shutdown(); +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java new file mode 100644 index 000000000000..352745f2da6e --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations + */ +public interface OMEventListenerPluginContext { + + boolean isLeaderReady(); + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + + // XXX: this probably doesn't belong here + String getThreadNamePrefix(); + + NotificationCheckpointStrategy getOzoneNotificationCheckpointStrategy(); + +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java new file mode 100644 index 000000000000..5612374f5180 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmCompletedRequestInfo.java @@ -0,0 +1,524 @@ +package org.apache.hadoop.ozone.om.helpers; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.CopyObject; +import org.apache.hadoop.hdds.utils.db.DelegatedCodec; +import org.apache.hadoop.hdds.utils.db.Proto2Codec; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.audit.Auditable; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import java.time.format.DateTimeFormatter; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.ZoneId; + +import java.util.HashMap; +import java.util.Objects; +import java.util.UUID; +import java.util.Map; +import java.util.LinkedHashMap; + +import static org.apache.hadoop.hdds.HddsUtils.fromProtobuf; +import static org.apache.hadoop.hdds.HddsUtils.toProtobuf; + +/** + * This class is used for storing info related to completed operations. + * + * Each successfully completion operation has an associated + * OmCompletedRequestInfo entry the trxLogIndex, op, volumeName, bucketName, + * keyName and creationTime + */ +public final class OmCompletedRequestInfo implements Auditable, CopyObject { + public static final Logger LOG = + LoggerFactory.getLogger(OmCompletedRequestInfo.class); + + private static final Codec CODEC = new DelegatedCodec<>( + Proto2Codec.get(OzoneManagerProtocolProtos.CompletedRequestInfo.getDefaultInstance()), + OmCompletedRequestInfo::getFromProtobuf, + OmCompletedRequestInfo::getProtobuf, + OmCompletedRequestInfo.class); + + /** + * OperationType enum + */ + public enum OperationType { + CREATE_KEY, + RENAME_KEY, + DELETE_KEY, + COMMIT_KEY, + CREATE_DIRECTORY, + CREATE_FILE; + } + + private static final long INVALID_TIMESTAMP = -1; + + private long trxLogIndex; + private final String volumeName; + private final String bucketName; + private final String keyName; + private final long creationTime; + private final OperationArgs opArgs; + + /** + * Private constructor, constructed via builder. + * @param snapshotId - Snapshot UUID. + * @param name - snapshot name. + * @param volumeName - volume name. + * @param bucketName - bucket name. + * @param snapshotStatus - status: SNAPSHOT_ACTIVE, SNAPSHOT_DELETED + * @param creationTime - Snapshot creation time. + * @param deletionTime - Snapshot deletion time. + * @param pathPreviousSnapshotId - Snapshot path previous snapshot id. + * @param globalPreviousSnapshotId - Snapshot global previous snapshot id. + * @param snapshotPath - Snapshot path, bucket .snapshot path. + * @param checkpointDir - Snapshot checkpoint directory. + * @param dbTxSequenceNumber - RDB latest transaction sequence number. + * @param deepCleaned - To be deep cleaned status for snapshot. + * @param referencedSize - Snapshot referenced size. + * @param referencedReplicatedSize - Snapshot referenced size w/ replication. + * @param exclusiveSize - Snapshot exclusive size. + * @param exclusiveReplicatedSize - Snapshot exclusive size w/ replication. + */ + @SuppressWarnings("checkstyle:ParameterNumber") + private OmCompletedRequestInfo(long trxLogIndex, + String volumeName, + String bucketName, + String keyName, + long creationTime, + OperationArgs opArgs) { + this.trxLogIndex = trxLogIndex; + this.volumeName = volumeName; + this.bucketName = bucketName; + this.keyName = keyName; + this.creationTime = creationTime; + this.opArgs = opArgs; + } + + public static Codec getCodec() { + return CODEC; + } + + public void setTrxLogIndex(long trxLogIndex) { + this.trxLogIndex = trxLogIndex; + } + + // the db version of the key is left padded with 0s so that it can be + // "seeked" in in lexigroaphical order + // TODO: is this an appropriate key? + public String getDbKey() { + return StringUtils.leftPad(String.valueOf(trxLogIndex), 20, '0'); + } + + public long getTrxLogIndex() { + return trxLogIndex; + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } + + public String getKeyName() { + return keyName; + } + + public long getCreationTime() { + return creationTime; + } + + public OperationArgs getOpArgs() { + return opArgs; + } + + public static org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.Builder + newBuilder() { + return new org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.Builder(); + } + + public OmCompletedRequestInfo.Builder toBuilder() { + return new Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime) + .setOpArgs(opArgs); + } + + /** + * Builder of OmCompletedRequestInfo. + */ + public static class Builder { + private long trxLogIndex; + private String volumeName; + private String bucketName; + private String keyName; + private long creationTime; + private OperationArgs opArgs; + + public Builder() { + // default values + } + + public Builder setTrxLogIndex(long trxLogIndex) { + this.trxLogIndex = trxLogIndex; + return this; + } + + public Builder setVolumeName(String volumeName) { + this.volumeName = volumeName; + return this; + } + + public Builder setBucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder setKeyName(String keyName) { + this.keyName = keyName; + return this; + } + + public Builder setCreationTime(long crTime) { + this.creationTime = crTime; + return this; + } + + public Builder setOpArgs(OperationArgs opArgs) { + this.opArgs = opArgs; + return this; + } + + public OmCompletedRequestInfo build() { + //Preconditions.checkNotNull(name); + return new OmCompletedRequestInfo( + trxLogIndex, + volumeName, + bucketName, + keyName, + creationTime, + opArgs + ); + } + } + + /** + * Creates OmCompletedRequestInfo protobuf from OmCompletedRequestInfo. + */ + public OzoneManagerProtocolProtos.CompletedRequestInfo getProtobuf() { + OzoneManagerProtocolProtos.CompletedRequestInfo.Builder sib = + OzoneManagerProtocolProtos.CompletedRequestInfo.newBuilder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime); + + switch (opArgs.getOperationType()) { + case CREATE_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateKey); + sib.setCreateKeyArgs(OzoneManagerProtocolProtos.CreateKeyOperationArgs.newBuilder() + .build()); + break; + case RENAME_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.RenameKey); + sib.setRenameKeyArgs(OzoneManagerProtocolProtos.RenameKeyOperationArgs.newBuilder() + .setToKeyName(((OperationArgs.RenameKeyArgs) opArgs).getToKeyName()) + .build()); + break; + case DELETE_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.DeleteKey); + sib.setDeleteKeyArgs(OzoneManagerProtocolProtos.DeleteKeyOperationArgs.newBuilder() + .build()); + break; + case COMMIT_KEY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CommitKey); + sib.setCommitKeyArgs(OzoneManagerProtocolProtos.CommitKeyOperationArgs.newBuilder() + .build()); + break; + case CREATE_DIRECTORY: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateDirectory); + sib.setCreateDirectoryArgs(OzoneManagerProtocolProtos.CreateDirectoryOperationArgs.newBuilder() + .build()); + break; + case CREATE_FILE: + sib.setCmdType(OzoneManagerProtocolProtos.Type.CreateFile); + sib.setCreateFileArgs(OzoneManagerProtocolProtos.CreateFileOperationArgs.newBuilder() + .setIsRecursive(((OperationArgs.CreateFileArgs) opArgs).isRecursive()) + .setIsOverwrite(((OperationArgs.CreateFileArgs) opArgs).isOverwrite()) + .build()); + break; + default: + LOG.error("Unexpected operationType={}", opArgs.getOperationType()); + break; + } + + return sib.build(); + } + + /** + * Parses OmCompletedRequestInfo protobuf and creates OmCompletedRequestInfo. + * @param completedRequestInfoProto protobuf + * @return instance of OmCompletedRequestInfo + */ + public static OmCompletedRequestInfo getFromProtobuf( + OzoneManagerProtocolProtos.CompletedRequestInfo completedRequestInfoProto) { + + OmCompletedRequestInfo.Builder osib = OmCompletedRequestInfo.newBuilder() + .setTrxLogIndex(completedRequestInfoProto.getTrxLogIndex()) + .setVolumeName(completedRequestInfoProto.getVolumeName()) + .setBucketName(completedRequestInfoProto.getBucketName()) + .setKeyName(completedRequestInfoProto.getKeyName()) + .setCreationTime(completedRequestInfoProto.getCreationTime()); + + switch (completedRequestInfoProto.getCmdType()) { + case CreateKey: + osib.setOpArgs(new OperationArgs.CreateKeyArgs()); + break; + case RenameKey: + OzoneManagerProtocolProtos.RenameKeyOperationArgs renameArgs + = (OzoneManagerProtocolProtos.RenameKeyOperationArgs) completedRequestInfoProto.getRenameKeyArgs(); + + osib.setOpArgs(new OperationArgs.RenameKeyArgs(renameArgs.getToKeyName())); + break; + case DeleteKey: + osib.setOpArgs(new OperationArgs.DeleteKeyArgs()); + break; + case CommitKey: + osib.setOpArgs(new OperationArgs.CommitKeyArgs()); + break; + case CreateDirectory: + osib.setOpArgs(new OperationArgs.CreateDirectoryArgs()); + break; + case CreateFile: + OzoneManagerProtocolProtos.CreateFileOperationArgs createFileArgs + = (OzoneManagerProtocolProtos.CreateFileOperationArgs) completedRequestInfoProto.getCreateFileArgs(); + + osib.setOpArgs(new OperationArgs.CreateFileArgs(createFileArgs.getIsOverwrite(), createFileArgs.getIsRecursive())); + break; + default: + LOG.error("Unexpected cmdType={}", completedRequestInfoProto.getCmdType()); + break; + } + + return osib.build(); + } + + @Override + public Map toAuditMap() { + Map auditMap = new LinkedHashMap<>(); + //auditMap.put(OzoneConsts.VOLUME, getVolumeName()); + //auditMap.put(OzoneConsts.BUCKET, getBucketName()); + //auditMap.put(OzoneConsts.OM_SNAPSHOT_NAME, this.name); + return auditMap; + } + + /** + * Factory for making standard instance. + */ + /* + public static OmCompletedRequestInfo newInstance(long trxLogIndex, + Operation op, + long creationTime) { + OmCompletedRequestInfo.Builder builder = new OmCompletedRequestInfo.Builder(); + builder.setTrxLogIndex(trxLogIndex) + .setOp(op) + .setCreationTime(creationTime); + + return builder.build(); + } + */ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OmCompletedRequestInfo that = (OmCompletedRequestInfo) o; + return trxLogIndex == that.trxLogIndex && + creationTime == that.creationTime && + volumeName.equals(that.volumeName) && + bucketName.equals(that.bucketName) && + keyName.equals(that.keyName) && + volumeName == that.bucketName && + opArgs.equals(that.opArgs); + } + + @Override + public int hashCode() { + return Objects.hash(trxLogIndex, volumeName, bucketName, + keyName, creationTime, opArgs); + } + + /** + * Return a new copy of the object. + */ + @Override + public OmCompletedRequestInfo copyObject() { + return new Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setCreationTime(creationTime) + .setOpArgs(opArgs) + .build(); + } + + @Override + public String toString() { + return "OmCompletedRequestInfo{" + + "trxLogIndex: '" + trxLogIndex + '\'' + + ", volumeName: '" + volumeName + '\'' + + ", bucketName: '" + bucketName + '\'' + + ", keyName: '" + keyName + '\'' + + ", creationTime: '" + creationTime + '\'' + + ", opArgs : '" + opArgs + '\'' + + '}'; + } + + public static abstract class OperationArgs { + + public abstract OperationType getOperationType(); + + public static class CreateKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_KEY; + } + + @Override + public String toString() { + return "CreateKeyArgs{}"; + } + } + + public static class RenameKeyArgs extends OperationArgs { + private final String toKeyName; + + public RenameKeyArgs(String toKeyName) { + this.toKeyName = toKeyName; + } + + @Override + public OperationType getOperationType() { + return OperationType.RENAME_KEY; + } + + public String getToKeyName() { + return toKeyName; + } + + @Override + public String toString() { + return "RenameKeyArgs{" + + "toKeyName: '" + toKeyName + '\'' + + '}'; + } + } + + public static class DeleteKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.DELETE_KEY; + } + + @Override + public String toString() { + return "DeleteKeyArgs{}"; + } + } + + public static class CommitKeyArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.COMMIT_KEY; + } + + @Override + public String toString() { + return "CommitKeyArgs{}"; + } + } + + public static class CreateDirectoryArgs extends OperationArgs { + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_DIRECTORY; + } + + @Override + public String toString() { + return "CreateDirectoryArgs{}"; + } + } + + public static class CreateFileArgs extends OperationArgs { + // hsync? + private final boolean recursive; + private final boolean overwrite; + + public CreateFileArgs(boolean recursive, boolean overwrite) { + this.recursive = recursive; + this.overwrite = overwrite; + } + + @Override + public OperationType getOperationType() { + return OperationType.CREATE_FILE; + } + + public boolean isRecursive() { + return recursive; + } + + public boolean isOverwrite() { + return overwrite; + } + + @Override + public String toString() { + return "CreateFileArgs{" + + "recursive: '" + recursive + '\'' + + ", overwrite: '" + overwrite + '\'' + + '}'; + } + } + } +} diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env new file mode 100644 index 000000000000..6507664fad7f --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/.env @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HDDS_VERSION=${hdds.version} +HADOOP_IMAGE=apache/hadoop +OZONE_RUNNER_VERSION=${docker.ozone-runner.version} +OZONE_RUNNER_IMAGE=apache/ozone-runner +OZONE_OPTS= diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml new file mode 100644 index 000000000000..a18699349ce0 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-compose.yaml @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# reusable fragments (see https://docs.docker.com/compose/compose-file/#extension-fields) +x-common-config: + &common-config + image: ${OZONE_RUNNER_IMAGE}:${OZONE_RUNNER_VERSION} + volumes: + - ../..:/opt/hadoop + env_file: + - docker-config + +x-replication: + &replication + OZONE-SITE.XML_ozone.server.default.replication: ${OZONE_REPLICATION_FACTOR:-1} + +services: + datanode: + <<: *common-config + ports: + - 19864 + - 9882 + environment: + <<: *replication + command: ["ozone","datanode"] + om1: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874:9874 + - 9862 + hostname: om1 + command: ["ozone","om"] + om2: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874 + - 9862 + hostname: om2 + command: ["ozone","om"] + om3: + <<: *common-config + environment: + WAITFOR: scm3:9894 + ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION + <<: *replication + ports: + - 9874 + - 9862 + hostname: om3 + command: ["ozone","om"] + scm1: + <<: *common-config + ports: + - 9876:9876 + environment: + ENSURE_SCM_INITIALIZED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + scm2: + <<: *common-config + ports: + - 9876 + environment: + WAITFOR: scm1:9894 + ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + scm3: + <<: *common-config + ports: + - 9876 + environment: + WAITFOR: scm2:9894 + ENSURE_SCM_BOOTSTRAPPED: /data/metadata/scm/current/VERSION + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + command: ["ozone","scm"] + httpfs: + <<: *common-config + environment: + OZONE-SITE.XML_hdds.scm.safemode.min.datanode: ${OZONE_SAFEMODE_MIN_DATANODES:-1} + <<: *replication + ports: + - 14000:14000 + command: [ "ozone","httpfs" ] + s3g: + <<: *common-config + environment: + <<: *replication + ports: + - 9878:9878 + command: ["ozone","s3g"] + recon: + <<: *common-config + ports: + - 9888:9888 + environment: + <<: *replication + command: ["ozone","recon"] + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + hostname: zookeeper + container_name: zookeeper + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka-1: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-1 + container_name: kafka-1 + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + + kafka-2: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-2 + container_name: kafka-2 + ports: + - "9093:9093" + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9093 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + + kafka-3: + image: confluentinc/cp-kafka:7.6.0 + hostname: kafka-3 + container_name: kafka-3 + ports: + - "9094:9094" + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,PLAINTEXT_HOST://localhost:9094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9094 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config new file mode 100644 index 000000000000..87c569fcb04e --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/docker-config @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# For HttpFS service it is required to enable proxying users. +CORE-SITE.XML_hadoop.proxyuser.hadoop.hosts=* +CORE-SITE.XML_hadoop.proxyuser.hadoop.groups=* + +CORE-SITE.XML_fs.defaultFS=ofs://omservice/ +CORE-SITE.XML_fs.trash.interval=1 + +OZONE-SITE.XML_ozone.om.service.ids=omservice +OZONE-SITE.XML_ozone.om.nodes.omservice=om1,om2,om3 +OZONE-SITE.XML_ozone.om.address.omservice.om1=om1 +OZONE-SITE.XML_ozone.om.address.omservice.om2=om2 +OZONE-SITE.XML_ozone.om.address.omservice.om3=om3 + +OZONE-SITE.XML_ozone.scm.service.ids=scmservice +OZONE-SITE.XML_ozone.scm.nodes.scmservice=scm1,scm2,scm3 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm1=scm1 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm2=scm2 +OZONE-SITE.XML_ozone.scm.address.scmservice.scm3=scm3 +OZONE-SITE.XML_ozone.scm.datanode.id.dir=/data/metadata +OZONE-SITE.XML_ozone.scm.container.size=1GB +OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB +OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata +OZONE-SITE.XML_hdds.datanode.dir=/data/hdds +OZONE-SITE.XML_hdds.datanode.volume.min.free.space=100MB +OZONE-SITE.XML_ozone.datanode.pipeline.limit=1 +OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s +OZONE-SITE.XML_ozone.scm.primordial.node.id=scm1 +OZONE-SITE.XML_hdds.container.report.interval=60s +OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true +OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon +OZONE-SITE.XML_ozone.recon.address=recon:9891 +OZONE-SITE.XML_ozone.recon.http-address=0.0.0.0:9888 +OZONE-SITE.XML_ozone.recon.https-address=0.0.0.0:9889 +OZONE-SITE.XML_hdds.container.ratis.datastream.enabled=true +OZONE-SITE.XML_ozone.http.basedir=/tmp/ozone_http + +OZONE-SITE.XML_ozone.om.plugin.destination.kafka=true +OZONE-SITE.XML_ozone.om.plugin.destination.kafka.classname=org.apache.hadoop.ozone.om.eventlistener.OMEventListenerKafkaPublisher +OZONE-SITE.XML_ozone.notify.kafka.topic=test123 +OZONE-SITE.XML_ozone.notify.kafka.bootstrap.servers=kafka-3:29092,kafka-1:29092,kafka-2:29092 +OZONE-SITE.XML_ozone.notify.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer +OZONE-SITE.XML_ozone.notify.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +OZONE_CONF_DIR=/etc/hadoop +OZONE_LOG_DIR=/var/log/hadoop + +no_proxy=om1,om2,om3,scm,s3g,recon,kdc,localhost,127.0.0.1 diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh new file mode 100755 index 000000000000..a0d19fd673c0 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test-hadoop.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#suite:MR + +set -u -o pipefail + +COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +export COMPOSE_DIR + +export SECURITY_ENABLED=false +export SCM=scm1 +export OM_SERVICE_ID=omservice + +source "$COMPOSE_DIR/../common/hadoop-test.sh" diff --git a/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh new file mode 100755 index 000000000000..6c09e7b76158 --- /dev/null +++ b/hadoop-ozone/dist/src/main/compose/ozone-ha-with-kafka/test.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#suite:HA-unsecure + +set -u -o pipefail + +COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +export COMPOSE_DIR + +export SECURITY_ENABLED=false +export OZONE_REPLICATION_FACTOR=3 +export SCM=scm1 +export OM_SERVICE_ID=omservice + +# shellcheck source=/dev/null +source "$COMPOSE_DIR/../testlib.sh" + +start_docker_env 5 + +execute_robot_test ${SCM} basic/ozone-shell-single.robot +execute_robot_test ${SCM} basic/links.robot + +execute_robot_test ${SCM} -v SCHEME:ofs -v BUCKET_TYPE:link -N ozonefs-ofs-link ozonefs/ozonefs.robot + +## Exclude virtual-host tests. This is tested separately as it requires additional config. +exclude="--exclude virtual-host" +for bucket in generated; do + for layout in OBJECT_STORE LEGACY FILE_SYSTEM_OPTIMIZED; do + execute_robot_test ${SCM} -v BUCKET:${bucket} -v BUCKET_LAYOUT:${layout} -N s3-${layout}-${bucket} ${exclude} s3 + # some tests are independent of the bucket type, only need to be run once + exclude="--exclude virtual-host --exclude no-bucket-type" + done +done + +execute_robot_test ${SCM} freon +execute_robot_test ${SCM} -v USERNAME:httpfs httpfs + +execute_robot_test ${SCM} omha/om-roles.robot diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index d6fc15a45a44..b07d398fa31d 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -915,6 +915,48 @@ message SnapshotDiffJobProto { optional double keysProcessedPct = 13; } +message CreateKeyOperationArgs { +} + +message RenameKeyOperationArgs { + required string toKeyName = 1; +} + +message DeleteKeyOperationArgs { +} + +message CommitKeyOperationArgs { +} + +message CreateDirectoryOperationArgs { +} + +message CreateFileOperationArgs { + required bool isRecursive = 2; + required bool isOverwrite = 3; +} + + +/** + * CompletedRequestInfo table entry + */ +message CompletedRequestInfo { + + optional int64 trxLogIndex = 1; + required Type cmdType = 2; // Type of the command + optional string volumeName = 3; + optional string bucketName = 4; + optional string keyName = 5; + optional uint64 creationTime = 6; + + optional CreateKeyOperationArgs createKeyArgs = 7; + optional RenameKeyOperationArgs renameKeyArgs = 8; + optional DeleteKeyOperationArgs deleteKeyArgs = 9; + optional CommitKeyOperationArgs commitKeyArgs = 10; + optional CreateDirectoryOperationArgs createDirectoryArgs = 11; + optional CreateFileOperationArgs createFileArgs = 12; +} + message OzoneObj { enum ObjectType { VOLUME = 1; diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index ec9e34cec720..8f83b77e55b4 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysResult; import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo; @@ -319,6 +320,17 @@ ListSnapshotResponse listSnapshot( List listVolumes(String userName, String prefix, String startKey, int maxKeys) throws IOException; + /** + * Returns a list of operation info objects. + * + * @param startKey the start key determines where to start listing + * from, this key is excluded from the result. + * @param maxKeys the maximum number of results to return. + * @return a list of {@link OmCompletedRequestInfo} + * @throws IOException + */ + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + /** * Get total open key count (estimated, due to the nature of RocksDB impl) * of both OpenKeyTable and OpenFileTable. @@ -467,6 +479,8 @@ String getMultipartKeyFSO(String volume, String bucket, String key, String Table getCompactionLogTable(); + Table getCompletedRequestInfoTable(); + /** * Gets the OM Meta table. * @return meta table reference. diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml b/hadoop-ozone/ozone-manager-plugins/pom.xml new file mode 100644 index 000000000000..47393cbc06e2 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + org.apache.ozone + ozone + 2.1.0-SNAPSHOT + + ozone-manager-plugins + 2.1.0-SNAPSHOT + jar + Apache Ozone Manager Plugins + + false + UTF-8 + + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.google.guava + guava + + + joda-time + joda-time + + + org.apache.commons + commons-lang3 + + + org.apache.hadoop + hadoop-common + + + org.apache.kafka + kafka-clients + + + org.apache.ozone + hdds-common + + + org.apache.ozone + ozone-common + + + org.slf4j + slf4j-api + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + none + + + + + diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..486781d6fb0a --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotificationStrategy; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerNotificationStrategy notificationStrategy; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={},"+ + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.notificationStrategy = new S3EventNotificationStrategy(); + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition( + pluginContext.getOzoneNotificationCheckpointStrategy()); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + List eventsToSend = notificationStrategy.determineEventsForOperation(completedRequestInfo); + + for (String event : eventsToSend) { + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + } + + // no errors so we can update the seek position + seekPosition.set(completedRequestInfo.getDbKey()); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer producer; + + public KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); + } + + public void send(String message) throws IOException { + if (producer != null) { + LOG.info("Producing event {}", message); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, message); + producer.send(producerRecord); + } else { + LOG.warn("Producing event {} [KAFKA DOWN]", message); + } + } + + private void ensureTopicExists() { + try (AdminClient adminClient = AdminClient.create(kafkaProps)) { + LOG.info("Creating kafka topic: {}", this.topic); + NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + adminClient.close(); + } catch (Exception ex) { + LOG.error("Failed to create topic: {}", this.topic, ex); + } + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java new file mode 100644 index 000000000000..1264b9b471e4 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * This is a helper class which can be used by implementations of + * OMEventListener which uses a background service to read the latest + * completed operations and hand them to a callback method + */ +public class OMEventListenerLedgerPoller extends BackgroundService { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPoller.class); + + private static final int MAX_RESULTS = 10_000; + + private final AtomicBoolean suspended; + private final AtomicLong runCount; + private final AtomicLong successRunCount; + private final OMEventListenerPluginContext pluginContext; + private final OMEventListenerLedgerPollerSeekPosition seekPosition; + private final Consumer callback; + + public OMEventListenerLedgerPoller(long interval, TimeUnit unit, + int poolSize, long serviceTimeout, + OMEventListenerPluginContext pluginContext, + OzoneConfiguration configuration, + OMEventListenerLedgerPollerSeekPosition seekPosition, + Consumer callback) { + + super("OMEventListenerLedgerPoller", + interval, + TimeUnit.MILLISECONDS, + poolSize, + serviceTimeout, pluginContext.getThreadNamePrefix()); + + this.suspended = new AtomicBoolean(false); + this.runCount = new AtomicLong(0); + this.successRunCount = new AtomicLong(0); + this.pluginContext = pluginContext; + this.seekPosition = seekPosition; + this.callback = callback; + } + + private boolean shouldRun() { + return pluginContext.isLeaderReady() && !suspended.get(); + } + + /** + * Suspend the service. + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended. + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask()); + return queue; + } + + public AtomicLong getRunCount() { + return runCount; + } + + private class CompletedRequestInfoConsumerTask implements BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() { + if (shouldRun()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running OMEventListenerLedgerPoller"); + } + if (runCount.get() == 0) { + seekPosition.initSeekPosition(); + } + getRunCount().incrementAndGet(); + + try { + for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo(seekPosition.get(), MAX_RESULTS)) { + callback.accept(requestInfo); + } + } catch (IOException e) { + LOG.error("Error while running completed operation consumer " + + "background task. Will retry at next run.", e); + } + } else { + runCount.set(0); + } + + // place holder by returning empty results of this call back. + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java new file mode 100644 index 000000000000..b2965fdc2f1e --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class to get/set the seek position used by the + * OMEventListenerLedgerPoller + */ +public class OMEventListenerLedgerPollerSeekPosition { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class); + + private final AtomicReference seekPosition; + private NotificationCheckpointStrategy seekPositionSaver; + + public OMEventListenerLedgerPollerSeekPosition(NotificationCheckpointStrategy seekPositionSaver) { + this.seekPositionSaver = seekPositionSaver; + this.seekPosition = new AtomicReference(initSeekPosition()); + } + + public String initSeekPosition() { + try { + String savedVal = seekPositionSaver.load(); + LOG.info("Loaded seek position {}", savedVal); + return savedVal; + } catch (IOException ex) { + LOG.error("Error loading seek position", ex); + return null; + } + } + + public String get() { + return seekPosition.get(); + } + + public void set(String val) { + LOG.debug("Setting seek position {}", val); + // TODO: strictly we don't need to persist this for each event - we + // could get away with doing so every X events and have a tolerance + // for replaying a few events on a crash + try { + seekPositionSaver.save(val); + } catch (IOException ex) { + LOG.error("Error saving seek position", ex); + } + // NOTE: this in-memory view of the seek position needs to be kept + // up to date because the OMEventListenerLedgerPoller has a + // reference to it + seekPosition.set(val); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java new file mode 100644 index 000000000000..85c05002039a --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerNotificationStrategy.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +public interface OMEventListenerNotificationStrategy { + + List determineEventsForOperation(OmCompletedRequestInfo completedRequestInfo); +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java new file mode 100644 index 000000000000..266255017cc3 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/DateTimeJsonSerializer.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2016. Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.io.IOException; +import java.util.Date; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.joda.time.tz.FixedDateTimeZone; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +/** + * A Jackson serializer for Joda {@code DateTime}s. + */ +public final class DateTimeJsonSerializer extends JsonSerializer { + + @Override + public void serialize( + DateTime value, + JsonGenerator jgen, + SerializerProvider provider) throws IOException { + + jgen.writeString(DateUtils.formatISO8601Date(value)); + } + + private static class DateUtils { + private static final DateTimeZone GMT = new FixedDateTimeZone("GMT", "GMT", 0, 0); + + /** ISO 8601 format */ + protected static final DateTimeFormatter iso8601DateFormat = + ISODateTimeFormat.dateTime().withZone(GMT); + + /** + * Formats the specified date as an ISO 8601 string. + * + * @param date the date to format + * @return the ISO-8601 string representing the specified date + */ + public static String formatISO8601Date(DateTime date) { + return iso8601DateFormat.print(date); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java new file mode 100644 index 000000000000..2bc978ec09d7 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotification.java @@ -0,0 +1,630 @@ +/* + * Copyright 2014-2025 Amazon Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://aws.amazon.com/apache2.0 + * + * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + * OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +/* copy of + * software.amazon.awssdk.eventnotifications.s3.model.S3EventNotification + * class taken from AWS SDK (1.x) with minor changes for build issues + * and removed usage of unnecessary AWS specific extension entiies: + * + * - GlacierEventDataEntity + * - LifecycleEventDataEntity + * - IntelligentTieringEventDataEntity + * - ReplicationEventDataEntity + * + * NOTE: We may not need to fork this class if we can use the SDK one directly + * but conversely we may want to make our own customizations. + */ + +import java.util.List; +import org.joda.time.DateTime; + +//import com.amazonaws.internal.DateTimeJsonSerializer; +//import com.amazonaws.util.json.Jackson; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Map; + +/** +* A helper class that represents a strongly typed S3 EventNotification item sent +* to SQS, SNS, or Lambda. + * + *

+ * Migrating to the AWS SDK for Java v2 + *

+ * The v2 equivalent of this class is + * S3EventNotification + * + *

+ * See Migration Guide + * for more information. +*/ +public class S3EventNotification { + + private final List records; + + @JsonCreator + public S3EventNotification( + @JsonProperty(value = "Records") List records) + { + this.records = records; + } + + /** + *

+ * Parse the JSON string into a S3EventNotification object. + *

+ *

+ * The function will try its best to parse input JSON string as best as it can. + * It will not fail even if the JSON string contains unknown properties. + * The function will throw SdkClientException if the input JSON string is + * not valid JSON. + *

+ * @param json + * JSON string to parse. Typically this is the body of your SQS + * notification message body. + * + * @return The resulting S3EventNotification object. + */ + //public static S3EventNotification parseJson(String json) { + // return Jackson.fromJsonString(json, S3EventNotification.class); + //} + + /** + * @return the records in this notification + */ + @JsonProperty(value = "Records") + public List getRecords() { + return records; + } + + /** + * @return a JSON representation of this object + */ + //public String toJson() { + // return Jackson.toJsonString(this); + //} + + public static class UserIdentityEntity { + + private final String principalId; + + @JsonCreator + public UserIdentityEntity( + @JsonProperty(value = "principalId") String principalId) { + this.principalId = principalId; + } + + public String getPrincipalId() { + return principalId; + } + } + + public static class S3BucketEntity { + + private final String name; + private final UserIdentityEntity ownerIdentity; + private final String arn; + + @JsonCreator + public S3BucketEntity( + @JsonProperty(value = "name") String name, + @JsonProperty(value = "ownerIdentity") UserIdentityEntity ownerIdentity, + @JsonProperty(value = "arn") String arn) + { + this.name = name; + this.ownerIdentity = ownerIdentity; + this.arn = arn; + } + + public String getName() { + return name; + } + + public UserIdentityEntity getOwnerIdentity() { + return ownerIdentity; + } + + public String getArn() { + return arn; + } + } + + public static class S3ObjectEntity { + + private final String key; + private final Long size; + private final String eTag; + private final String versionId; + private final String sequencer; + + @Deprecated + public S3ObjectEntity( + String key, + Integer size, + String eTag, + String versionId) + { + this.key = key; + this.size = size == null ? null : size.longValue(); + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = null; + } + + @Deprecated + public S3ObjectEntity( + String key, + Long size, + String eTag, + String versionId) + { + this(key, size, eTag, versionId, null); + } + + @JsonCreator + public S3ObjectEntity( + @JsonProperty(value = "key") String key, + @JsonProperty(value = "size") Long size, + @JsonProperty(value = "eTag") String eTag, + @JsonProperty(value = "versionId") String versionId, + @JsonProperty(value = "sequencer") String sequencer) + { + this.key = key; + this.size = size; + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = sequencer; + } + + public String getKey() { + return key; + } + + /** + * S3 URL encodes the key of the object involved in the event. This is + * a convenience method to automatically URL decode the key. + * @return The URL decoded object key. + */ + //public String getUrlDecodedKey() { + // return SdkHttpUtils.urlDecode(getKey()); + //} + + /** + * @deprecated use {@link #getSizeAsLong()} instead. + */ + @Deprecated + @JsonIgnore + public Integer getSize() { + return size == null ? null : size.intValue(); + } + + @JsonProperty(value = "size") + public Long getSizeAsLong() { + return size; + } + + public String geteTag() { + return eTag; + } + + public String getVersionId() { + return versionId; + } + + public String getSequencer() { + return sequencer; + } + } + + public static class S3Entity { + + private final String configurationId; + private final S3BucketEntity bucket; + private final S3ObjectEntity object; + private final String s3SchemaVersion; + + @JsonCreator + public S3Entity( + @JsonProperty(value = "configurationId") String configurationId, + @JsonProperty(value = "bucket") S3BucketEntity bucket, + @JsonProperty(value = "object") S3ObjectEntity object, + @JsonProperty(value = "s3SchemaVersion") String s3SchemaVersion) + { + this.configurationId = configurationId; + this.bucket = bucket; + this.object = object; + this.s3SchemaVersion = s3SchemaVersion; + } + + public String getConfigurationId() { + return configurationId; + } + + public S3BucketEntity getBucket() { + return bucket; + } + + public S3ObjectEntity getObject() { + return object; + } + + public String getS3SchemaVersion() { + return s3SchemaVersion; + } + } + + public static class RequestParametersEntity { + + private final String sourceIPAddress; + + @JsonCreator + public RequestParametersEntity( + @JsonProperty(value = "sourceIPAddress") String sourceIPAddress) + { + this.sourceIPAddress = sourceIPAddress; + } + + public String getSourceIPAddress() { + return sourceIPAddress; + } + } + + public static class ResponseElementsEntity { + + private final String xAmzId2; + private final String xAmzRequestId; + + @JsonCreator + public ResponseElementsEntity( + @JsonProperty(value = "x-amz-id-2") String xAmzId2, + @JsonProperty(value = "x-amz-request-id") String xAmzRequestId) + { + this.xAmzId2 = xAmzId2; + this.xAmzRequestId = xAmzRequestId; + } + + @JsonProperty("x-amz-id-2") + public String getxAmzId2() { + return xAmzId2; + } + + @JsonProperty("x-amz-request-id") + public String getxAmzRequestId() { + return xAmzRequestId; + } + } + + public static class GlacierEventDataEntity { + private final RestoreEventDataEntity restoreEventData; + + @JsonCreator + public GlacierEventDataEntity( + @JsonProperty(value = "restoreEventData") RestoreEventDataEntity restoreEventData) + { + this.restoreEventData = restoreEventData; + } + + public RestoreEventDataEntity getRestoreEventData() { + return restoreEventData; + } + } + + public static class LifecycleEventDataEntity { + + private final TransitionEventDataEntity transitionEventData; + + @JsonCreator + public LifecycleEventDataEntity( + @JsonProperty(value = "transitionEventData") TransitionEventDataEntity transitionEventData) + { + + this.transitionEventData = transitionEventData; + } + + public TransitionEventDataEntity getTransitionEventData() { + return transitionEventData; + } + } + + public static class IntelligentTieringEventDataEntity { + + private final String destinationAccessTier; + + @JsonCreator + public IntelligentTieringEventDataEntity( + @JsonProperty(value = "destinationAccessTier") String destinationAccessTier) + { + this.destinationAccessTier = destinationAccessTier; + } + + @JsonProperty("destinationAccessTier") + public String getDestinationAccessTier() { + return destinationAccessTier; + } + } + + public static class ReplicationEventDataEntity { + + private final String replicationRuleId; + private final String destinationBucket; + private final String s3Operation; + private final String requestTime; + private final String failureReason; + private final String threshold; + private final String replicationTime; + + @JsonCreator + public ReplicationEventDataEntity( + @JsonProperty(value = "replicationRuleId") String replicationRuleId, + @JsonProperty(value = "destinationBucket") String destinationBucket, + @JsonProperty(value = "s3Operation") String s3Operation, + @JsonProperty(value = "requestTime") String requestTime, + @JsonProperty(value = "failureReason") String failureReason, + @JsonProperty(value = "threshold") String threshold, + @JsonProperty(value = "replicationTime") String replicationTime) + { + this.replicationRuleId = replicationRuleId; + this.destinationBucket = destinationBucket; + this.s3Operation = s3Operation; + this.requestTime = requestTime; + this.failureReason = failureReason; + this.threshold = threshold; + this.replicationTime = replicationTime; + } + + @JsonProperty("replicationRuleId") + public String getReplicationRuleId() { + return replicationRuleId; + } + @JsonProperty("destinationBucket") + public String getDestinationBucket() { + return destinationBucket; + } + @JsonProperty("s3Operation") + public String getS3Operation() { + return s3Operation; + } + @JsonProperty("requestTime") + public String getRequestTime() { + return requestTime; + } + @JsonProperty("failureReason") + public String getFailureReason() { + return failureReason; + } + @JsonProperty("threshold") + public String getThreshold() { + return threshold; + } + @JsonProperty("replicationTime") + public String getReplicationTime() { + return replicationTime; + } + } + + public static class TransitionEventDataEntity { + private final String destinationStorageClass; + + @JsonCreator + public TransitionEventDataEntity( + @JsonProperty("destinationStorageClass") String destinationStorageClass) + { + this.destinationStorageClass = destinationStorageClass; + } + + public String getDestinationStorageClass() { + return destinationStorageClass; + } + } + + public static class RestoreEventDataEntity { + private DateTime lifecycleRestorationExpiryTime; + private final String lifecycleRestoreStorageClass; + + @JsonCreator + public RestoreEventDataEntity( + @JsonProperty("lifecycleRestorationExpiryTime") String lifecycleRestorationExpiryTime, + @JsonProperty("lifecycleRestoreStorageClass") String lifecycleRestoreStorageClass) + { + if (lifecycleRestorationExpiryTime != null) { + this.lifecycleRestorationExpiryTime = DateTime.parse(lifecycleRestorationExpiryTime); + } + this.lifecycleRestoreStorageClass = lifecycleRestoreStorageClass; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getLifecycleRestorationExpiryTime() { + return lifecycleRestorationExpiryTime; + } + + public String getLifecycleRestoreStorageClass() { + return lifecycleRestoreStorageClass; + } + } + + public static class S3EventNotificationRecord { + + private final String awsRegion; + private final String eventName; + private final String eventSource; + private DateTime eventTime; + private final String eventVersion; + private final RequestParametersEntity requestParameters; + private final ResponseElementsEntity responseElements; + private final S3Entity s3; + private final UserIdentityEntity userIdentity; + private final Map eventData; + //private final GlacierEventDataEntity glacierEventData; + //private final LifecycleEventDataEntity lifecycleEventData; + //private final IntelligentTieringEventDataEntity intelligentTieringEventData; + //private final ReplicationEventDataEntity replicationEventDataEntity; + + /* + @Deprecated + public S3EventNotificationRecord( + String awsRegion, + String eventName, + String eventSource, + String eventTime, + String eventVersion, + RequestParametersEntity requestParameters, + ResponseElementsEntity responseElements, + S3Entity s3, + UserIdentityEntity userIdentity) + { + this(awsRegion, + eventName, + eventSource, + eventTime, + eventVersion, + requestParameters, + responseElements, + s3, + userIdentity, + null, + null, + null, + null); + } + + @Deprecated + public S3EventNotificationRecord( + String awsRegion, + String eventName, + String eventSource, + String eventTime, + String eventVersion, + RequestParametersEntity requestParameters, + ResponseElementsEntity responseElements, + S3Entity s3, + UserIdentityEntity userIdentity, + GlacierEventDataEntity glacierEventData) + { + this(awsRegion, + eventName, + eventSource, + eventTime, + eventVersion, + requestParameters, + responseElements, + s3, + userIdentity, + glacierEventData, + null, + null, + null); + } + */ + + @JsonCreator + public S3EventNotificationRecord( + @JsonProperty(value = "awsRegion") String awsRegion, + @JsonProperty(value = "eventName") String eventName, + @JsonProperty(value = "eventSource") String eventSource, + @JsonProperty(value = "eventTime") String eventTime, + @JsonProperty(value = "eventVersion") String eventVersion, + @JsonProperty(value = "requestParameters") RequestParametersEntity requestParameters, + @JsonProperty(value = "responseElements") ResponseElementsEntity responseElements, + @JsonProperty(value = "s3") S3Entity s3, + @JsonProperty(value = "userIdentity") UserIdentityEntity userIdentity, + @JsonProperty(value = "eventData") Map eventData) + //@JsonProperty(value = "glacierEventData") GlacierEventDataEntity glacierEventData, + //@JsonProperty(value = "lifecycleEventData") LifecycleEventDataEntity lifecycleEventData, + //@JsonProperty(value = "intelligentTieringEventData") IntelligentTieringEventDataEntity intelligentTieringEventData, + //@JsonProperty(value = "replicationEventData") ReplicationEventDataEntity replicationEventData) + { + this.awsRegion = awsRegion; + this.eventName = eventName; + this.eventSource = eventSource; + + if (eventTime != null) + { + this.eventTime = DateTime.parse(eventTime); + } + + this.eventVersion = eventVersion; + this.requestParameters = requestParameters; + this.responseElements = responseElements; + this.s3 = s3; + this.userIdentity = userIdentity; + this.eventData = eventData; + //this.glacierEventData = glacierEventData; + //this.lifecycleEventData = lifecycleEventData; + //this.intelligentTieringEventData = intelligentTieringEventData; + //this.replicationEventDataEntity = replicationEventData; + } + + public String getAwsRegion() { + return awsRegion; + } + + public String getEventName() { + return eventName; + } + + //@JsonIgnore + //public S3Event getEventNameAsEnum() { + // return S3Event.fromValue(eventName); + //} + + public String getEventSource() { + return eventSource; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getEventTime() { + return eventTime; + } + + public String getEventVersion() { + return eventVersion; + } + + public RequestParametersEntity getRequestParameters() { + return requestParameters; + } + + public ResponseElementsEntity getResponseElements() { + return responseElements; + } + + public S3Entity getS3() { + return s3; + } + + public UserIdentityEntity getUserIdentity() { + return userIdentity; + } + + // Ozone extension + public Map getEventData() { + return eventData; + } + + //public GlacierEventDataEntity getGlacierEventData() { + // return glacierEventData; + //} + + //public LifecycleEventDataEntity getLifecycleEventData() { return lifecycleEventData; } + + //public IntelligentTieringEventDataEntity getIntelligentTieringEventData() { return intelligentTieringEventData; } + + //public ReplicationEventDataEntity getReplicationEventDataEntity() { return replicationEventDataEntity; } + + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java new file mode 100644 index 000000000000..4d8059cb1629 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationBuilder.java @@ -0,0 +1,109 @@ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3BucketEntity; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3EventNotificationRecord; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.S3ObjectEntity; +import org.apache.hadoop.ozone.om.eventlistener.s3.S3EventNotification.UserIdentityEntity; + +/** + * This is a builder for the AWS event notification class + * com.amazonaws.services.s3.event.S3EventNotification which is part of + * AWS SDK 1.x + * + * NOTE: the above class is designed for deserialization which is why it + * requires this builder wrapper. + * + * XXX: we may need to fork these classes so that we can customize it to + * our needs. + */ +public class S3EventNotificationBuilder { + + private static final String REGION = "us-east-1"; + private static final String BUCKET_ARN_PREFIX = "arn:aws:s3:" + REGION; + private static final String EVENT_SOURCE = "ozone:s3"; + private static final String EVENT_VERSION = "2.1"; + private static final String USER_IDENTITY = "some-principalId"; + + private static final String SCHEMA_VERSION = "1.0"; + private static final String CONFIGURATION_ID = "mynotif1"; + + private final String objectKey; + private final String bucketName; + private final String bucketArn; + private final String eventName; + private final Instant eventTime; + private final String etag; + private final Map eventData; + + // mutable fields defaulting to null + private Long objectSize; + private String objectVersionId; + private String objectSequencer; + + public S3EventNotificationBuilder(String objectKey, String bucketName, String bucketArn, String eventName, Instant eventTime, String etag) { + this.objectKey = objectKey; + this.bucketName = bucketName; + this.bucketArn = bucketArn; + this.eventName = eventName; + this.eventTime = eventTime; + this.etag = etag; + this.eventData = new HashMap<>(); + } + + public S3EventNotificationBuilder setObjectSize(long objectSize) { + this.objectSize = objectSize; + return this; + } + + public S3EventNotificationBuilder setObjectVersionId(String objectVersionId) { + this.objectVersionId = objectVersionId; + return this; + } + + public S3EventNotificationBuilder setObjectSequencer(String objectSequencer) { + this.objectSequencer = objectSequencer; + return this; + } + + public S3EventNotificationBuilder addAllEventData(Map eventData) { + this.eventData.putAll(eventData); + return this; + } + + public S3EventNotification build() { + UserIdentityEntity userIdentity = new UserIdentityEntity(USER_IDENTITY); + S3BucketEntity s3BucketEntity = new S3BucketEntity(bucketName, userIdentity, bucketArn); + + S3EventNotification.S3ObjectEntity s3ObjectEntity = new S3ObjectEntity( + objectKey, + objectSize, + etag, + objectVersionId, + objectSequencer); + + S3EventNotification.S3Entity s3Entity = new S3EventNotification.S3Entity( + CONFIGURATION_ID, + s3BucketEntity, + s3ObjectEntity, + SCHEMA_VERSION); + + S3EventNotificationRecord eventRecord = new S3EventNotificationRecord( + REGION, + eventName, + EVENT_SOURCE, + eventTime.toString(), + EVENT_VERSION, + new S3EventNotification.RequestParametersEntity(""), + new S3EventNotification.ResponseElementsEntity("", ""), + s3Entity, + new S3EventNotification.UserIdentityEntity("tester"), + eventData); + + return new S3EventNotification(Collections.singletonList(eventRecord)); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java new file mode 100644 index 000000000000..6cc15ff0f9b6 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/s3/S3EventNotificationStrategy.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.ozone.om.eventlistener.s3; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerNotificationStrategy; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; + +/** + * This is a notification strategy to generate events according to S3 + * notification semantics + */ +public class S3EventNotificationStrategy implements OMEventListenerNotificationStrategy { + public static final Logger LOG = LoggerFactory.getLogger(S3EventNotificationStrategy.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public List determineEventsForOperation(OmCompletedRequestInfo requestInfo) { + + switch (requestInfo.getOpArgs().getOperationType()) { + case CREATE_KEY: + case COMMIT_KEY: { + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + Collections.emptyMap()) + ); + } + case CREATE_FILE: { + OmCompletedRequestInfo.OperationArgs.CreateFileArgs createFileArgs + = (OmCompletedRequestInfo.OperationArgs.CreateFileArgs) requestInfo.getOpArgs(); + + // XXX: eventDaata is an Ozone extension. Its is unclear if this + // schema makes sense but the general S3 schema is somewhat + // freeform. These arguments are more informational than + // required so it is unclear as to their necessity. + Map eventData = new HashMap<>(); + eventData.put("isDirectory", "false"); + eventData.put("isRecursive", String.valueOf(createFileArgs.isRecursive())); + eventData.put("isOverwrite", String.valueOf(createFileArgs.isOverwrite())); + + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + eventData) + ); + } + case CREATE_DIRECTORY: { + OmCompletedRequestInfo.OperationArgs.CreateDirectoryArgs createDirArgs + = (OmCompletedRequestInfo.OperationArgs.CreateDirectoryArgs) requestInfo.getOpArgs(); + + // XXX: eventDaata is an Ozone extension. Its is unclear if this + // schema makes sense but the general S3 schema is somewhat + // freeform. These arguments are more informational than + // required so it is unclear as to their necessity. + Map eventData = new HashMap<>(); + eventData.put("isDirectory", "true"); + + return Collections.singletonList(createS3Event("ObjectCreated:Put", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + eventData) + ); + } + case DELETE_KEY: { + return Collections.singletonList(createS3Event("ObjectRemoved:Delete", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName(), + Collections.emptyMap()) + ); + } + case RENAME_KEY: { + OmCompletedRequestInfo.OperationArgs.RenameKeyArgs renameKeyArgs + = (OmCompletedRequestInfo.OperationArgs.RenameKeyArgs) requestInfo.getOpArgs(); + + String renameFromKey = OzoneKeyUtil.getOzoneKey(requestInfo.getVolumeName(), + requestInfo.getBucketName(), + requestInfo.getKeyName()); + + // XXX: it would be good to be able to convey that this was a + // file vs directory rename + Map eventData = new HashMap<>(); + eventData.put("renameFromKey", renameFromKey); + + // NOTE: ObjectRenamed:Rename is an Ozone extension as is the + // eventData map in the S3 event schema. + return Collections.singletonList(createS3Event("ObjectRenamed:Rename", + requestInfo.getVolumeName(), + requestInfo.getBucketName(), + renameKeyArgs.getToKeyName(), + eventData) + ); + } + default: + LOG.info("No events for operation {} on {}", + requestInfo.getOpArgs().getOperationType(), + requestInfo.getKeyName()); + return Collections.emptyList(); + } + } + + static String createS3Event(String eventName, + String volumeName, + String bucketName, + String keyName, + Map eventData) { + try { + String objectKey = OzoneKeyUtil.getOzoneKey(volumeName, bucketName, keyName); + String bucketArn = "arn:aws:s3:::" + volumeName + "." + bucketName; + Instant eventTime = Instant.now(); + String etag = UUID.randomUUID().toString(); + + S3EventNotification event = new S3EventNotificationBuilder(objectKey, bucketName, bucketArn, eventName, eventTime, etag) + .addAllEventData(eventData) + .build(); + + return MAPPER.writer().writeValueAsString(event); + } catch (Exception ex) { + LOG.info("------------> {}", "failed"); + return null; + } + } + + // stub: taken from metadataManager. Should we just pass in + // metadataManager? + private static class OzoneKeyUtil { + public static String getOzoneKey(String volume, String bucket, String key) { + StringBuilder builder = new StringBuilder() + .append(volume); + // TODO : Throw if the Bucket is null? + builder.append(OM_KEY_PREFIX).append(bucket); + if (StringUtils.isNotBlank(key)) { + builder.append(OM_KEY_PREFIX); + if (!key.equals(OM_KEY_PREFIX)) { + builder.append(key); + } + } + return builder.toString(); + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..4723b4c3a474 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockedConstruction; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + @Mock + private NotificationCheckpointStrategy checkpointStrategy; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo(long trxLogIndex, String keyName, OperationArgs opArgs) { + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(opArgs) + .build(); + } + + private List captureEventsProducedByOperation(OmCompletedRequestInfo op, int expectEvents) throws IOException { + when(pluginContext.getOzoneNotificationCheckpointStrategy()).thenReturn(checkpointStrategy); + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.notify.kafka.topic", "abc"); + + List events = new ArrayList<>(); + + OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher(); + try (MockedConstruction mockeKafkaClientWrapper = + mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { + + plugin.initialize(conf, pluginContext); + plugin.handleCompletedRequest(op); + + OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + verify(mock, times(expectEvents)).send(argument.capture()); + + events.addAll(argument.getAllValues()); + } + + return events; + } + + @Test + public void testCreateKeyRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(123L, "some/key", + new OperationArgs.CreateKeyArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key")); + } + + @Test + public void testCreateFileRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + boolean recursive = false; + boolean overwrite = true; + + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(123L, "some/key", + new OperationArgs.CreateFileArgs(recursive, overwrite)); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key")) + .contains(toJsonKeyVal("isDirectory", "false")) + .contains(toJsonKeyVal("isRecursive", "false")) + .contains(toJsonKeyVal("isOverwrite", "true")); + } + + @Test + public void testCreateDirectoryRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(123L, "some/key", + new OperationArgs.CreateDirectoryArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectCreated:Put")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key")) + .contains(toJsonKeyVal("isDirectory", "true")); + } + + @Test + public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { + OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(123L, "some/key", + new OperationArgs.RenameKeyArgs("some/key_RENAMED")); + + List events = captureEventsProducedByOperation(renameRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("eventName", "ObjectRenamed:Rename")) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key_RENAMED")) + .contains(toJsonKeyVal("renameFromKey", "vol1/bucket1/some/key")); + } +} diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml index 6347ee2722bf..933f67a5a21c 100644 --- a/hadoop-ozone/ozone-manager/pom.xml +++ b/hadoop-ozone/ozone-manager/pom.xml @@ -312,6 +312,11 @@ netty-tcnative-boringssl-static runtime + + org.apache.ozone + ozone-manager-plugins + runtime + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 97af4c77af76..10440e88ca92 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -138,6 +138,7 @@ import org.apache.hadoop.ozone.OmUtils; import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.om.eventlistener.OMEventListenerPluginManager; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; @@ -211,6 +212,9 @@ public class KeyManagerImpl implements KeyManager { private DNSToSwitchMapping dnsToSwitchMapping; private CompactionService compactionService; + // XXX: probably doesn't belong here + private OMEventListenerPluginManager eventListenerPluginManager; + public KeyManagerImpl(OzoneManager om, ScmClient scmClient, OzoneConfiguration conf, OMPerformanceMetrics metrics) { this(om, scmClient, om.getMetadataManager(), conf, @@ -344,6 +348,15 @@ public void start(OzoneConfiguration configuration) { multipartUploadCleanupService.start(); } + // TODO: + // * this is probably the wrong place for this but adding it here + // for now just to get the plumbing fleshed out + // + if (eventListenerPluginManager == null) { + eventListenerPluginManager = new OMEventListenerPluginManager(ozoneManager, configuration); + eventListenerPluginManager.startAll(); + } + Class dnsToSwitchMappingClass = configuration.getClass( ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, @@ -446,6 +459,10 @@ public void stop() throws IOException { compactionService.shutdown(); compactionService = null; } + if (eventListenerPluginManager != null) { + eventListenerPluginManager.shutdownAll(); + eventListenerPluginManager= null; + } } private OmBucketInfo getBucketInfo(String volumeName, String bucketName) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 456e37367be7..b047e1690a1e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -86,6 +86,7 @@ import org.apache.hadoop.ozone.om.helpers.ListKeysResult; import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo; @@ -161,6 +162,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, private TypedTable snapshotInfoTable; private TypedTable snapshotRenamedTable; private TypedTable compactionLogTable; + private TypedTable completedRequestInfoTable; private OzoneManager ozoneManager; @@ -463,6 +465,8 @@ protected void initializeOmTables(CacheType cacheType, // TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable. compactionLogTable = initializer.get(OMDBDefinition.COMPACTION_LOG_TABLE_DEF); + + completedRequestInfoTable = initializer.get(OMDBDefinition.COMPLETED_REQUEST_INFO_TABLE_DEF); } /** @@ -1273,6 +1277,58 @@ private List listAllVolumes(String prefix, String startKey, return result; } + /** + * {@inheritDoc} + */ + @Override + public List listCompletedRequestInfo(final String startKey, + final int maxResults) + throws IOException { + List results = new ArrayList<>(); + + Table.KeyValue completedRequestInfoRow; + try (TableIterator> + tableIterator = getCompletedRequestInfoTable().iterator()) { + + boolean skipFirst = false; + if (StringUtils.isNotBlank(startKey)) { + // TODO: what happens if the seek position is no longer + // available? Do we go to the end of the list + // or the first key > startKey + tableIterator.seek(startKey); + skipFirst = true; + } + + while (tableIterator.hasNext() && results.size() < maxResults) { + completedRequestInfoRow = tableIterator.next(); + // this is the first loop iteration after the seek so we + // need to skip the record we seeked to (if it is still + // present) + if (skipFirst) { + skipFirst = false; + // NOTE: I'm assuming that we need to conditionally do this + // only if it is equal to what we wanted to seek to (hence + if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) { + // when we have a startKey we expect the first result + // to be that startKey. If it is not then we can infer that + // the startKey was already cleaned up and therefore we have + // missed some records somehow and this needs flagged to the + // caller. + // TODO: we should throw a custom exception here (instead of + // IOException) that needs to be handled appropriately by + // callers + throw new IOException( + "Missing rows - start key not found (startKey=" + startKey + + ", foundKey=" + completedRequestInfoRow.getKey() + ")"); + } + } else { + results.add(completedRequestInfoRow.getValue()); + } + } + } + return results; + } + private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) throws OMException { try { @@ -1632,6 +1688,11 @@ public Table getCompactionLogTable() { return compactionLogTable; } + @Override + public Table getCompletedRequestInfoTable() { + return completedRequestInfoTable; + } + /** * Get Snapshot Chain Manager. * diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index c8bc0954d3e8..a2cb0fed65a7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -233,6 +233,8 @@ import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.audit.OMSystemAction; import org.apache.hadoop.ozone.common.Storage.StorageState; +import org.apache.hadoop.ozone.om.eventlistener.NotificationCheckpointStrategy; +import org.apache.hadoop.ozone.om.eventlistener.OzoneFileCheckpointStrategy; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; @@ -479,6 +481,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl private final OzoneLockProvider ozoneLockProvider; private final OMPerformanceMetrics perfMetrics; private final BucketUtilizationMetrics bucketUtilizationMetrics; + private final OzoneFileCheckpointStrategy ozoneFileCheckpointStrategy; private boolean fsSnapshotEnabled; @@ -728,6 +731,8 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption) bucketUtilizationMetrics = BucketUtilizationMetrics.create(metadataManager); omHostName = HddsUtils.getHostName(conf); + ozoneFileCheckpointStrategy = new OzoneFileCheckpointStrategy(this, omMetadataReader); + } public void initializeEdekCache(OzoneConfiguration conf) { @@ -3500,6 +3505,10 @@ public void transferLeadership(String newLeaderId) } } + public NotificationCheckpointStrategy getNotificationCheckpointStrategy() { + return ozoneFileCheckpointStrategy; + } + @Override public boolean triggerRangerBGSync(boolean noWait) throws IOException { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java index 9894e8f5d6bf..3a4dbfbc2ed5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; import org.apache.hadoop.ozone.om.helpers.OmDBTenantState; @@ -315,6 +316,15 @@ public final class OMDBDefinition extends DBDefinition.WithMap { StringCodec.get(), CompactionLogEntry.getCodec()); + public static final String COMPLETED_REQUEST_INFO_TABLE = "completedRequestInfoTable"; + /** completedOperationnfoTable: txId :- OmCompletedRequestInfo. */ + public static final DBColumnFamilyDefinition COMPLETED_REQUEST_INFO_TABLE_DEF + = new DBColumnFamilyDefinition<>(COMPLETED_REQUEST_INFO_TABLE, + StringCodec.get(), // txid (left zeropadded) + OmCompletedRequestInfo.getCodec()); + + + //--------------------------------------------------------------------------- private static final Map> COLUMN_FAMILIES = DBColumnFamilyDefinition.newUnmodifiableMap( @@ -339,7 +349,8 @@ public final class OMDBDefinition extends DBDefinition.WithMap { TENANT_STATE_TABLE_DEF, TRANSACTION_INFO_TABLE_DEF, USER_TABLE_DEF, - VOLUME_TABLE_DEF); + VOLUME_TABLE_DEF, + COMPLETED_REQUEST_INFO_TABLE_DEF); private static final OMDBDefinition INSTANCE = new OMDBDefinition(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/LocalFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/LocalFileCheckpointStrategy.java new file mode 100644 index 000000000000..ccf29aa80b88 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/LocalFileCheckpointStrategy.java @@ -0,0 +1,52 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.List; + +/** + * An implementation of NotificationCheckpointStrategy which loads/saves + * the the last known notification sent by a event notification plugin + * to a local file. + * + * NOTE: this is a flawed approach and mainly exists for testing. In + * the event of a leadership change another OM will have no context on + * where the former leader had left off. + * + * It is better to use: {@link OzoneFileCheckpointStrategy} which + * loads/saves the checkpoint position in a distributed manner. + */ +public class LocalFileCheckpointStrategy implements NotificationCheckpointStrategy { + + private final Path path; + + public LocalFileCheckpointStrategy(Path path) { + this.path = path; + } + + public String load() throws IOException { + try { + List lines = Files.readAllLines(path, StandardCharsets.UTF_8); + if (!lines.isEmpty()) { + return lines.get(0); + } + } catch (NoSuchFileException ex) { + // assume no existing file + } + return null; + } + + public void save(String val) throws IOException { + Path tmpFile = Paths.get(path.toString() + "-" + System.currentTimeMillis()); + // Write to a temp file and atomic rename to avoid corrupting the + // file if we are interrupted by a restart while in the middle of + // writing + Files.write(tmpFile, val.getBytes()); + Files.move(tmpFile, path, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java new file mode 100644 index 000000000000..ad5dd0509556 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations + */ +public final class OMEventListenerPluginContextImpl implements OMEventListenerPluginContext { + private final OzoneManager ozoneManager; + + public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) { + this.ozoneManager = ozoneManager; + } + + @Override + public boolean isLeaderReady() { + return ozoneManager.isLeaderReady(); + } + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + @Override + public List listCompletedRequestInfo(String startKey, int maxResults) throws IOException { + return ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey, maxResults); + } + + // TODO: it feels like this doesn't belong here + @Override + public String getThreadNamePrefix() { + return ozoneManager.getThreadNamePrefix(); + } + + @Override + public NotificationCheckpointStrategy getOzoneNotificationCheckpointStrategy() { + return ozoneManager.getNotificationCheckpointStrategy(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java new file mode 100644 index 000000000000..e7f3c4b639ab --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.service.CompletedRequestInfoCleanupService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.concurrent.TimeUnit; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final CompletedRequestInfoCleanupService completedRequestDeletionService; + private final List plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + // TODO: make timeouts configurable + this.completedRequestDeletionService = new CompletedRequestInfoCleanupService( + 30L, TimeUnit.SECONDS, 30L, ozoneManager, conf); + this.plugins = loadAll(ozoneManager, conf); + } + + public List getLoaded() { + return plugins; + } + + public void startAll() { + completedRequestDeletionService.start(); + + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + if (completedRequestDeletionService != null) { + completedRequestDeletionService.shutdown(); + } + + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + static List loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List plugins = new ArrayList<>(); + + Map props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List destNameList = new ArrayList<>(); + for (Map.Entry entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; + LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); + Class cls = conf.getClass(classnameProp, null, OMEventListener.class); + if (null == cls) { + throw new RuntimeException(String.format( + "Unable to load plugin %s, classname property %s is missing or does not implement OMEventListener", + destName, classnameProp)); + } + return cls; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java new file mode 100644 index 000000000000..b0ca1ec9b510 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OzoneFileCheckpointStrategy.java @@ -0,0 +1,166 @@ +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserInfo; + +import org.apache.hadoop.ozone.om.OmMetadataReader; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.ratis.protocol.ClientId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * An implementation of NotificationCheckpointStrategy which loads/saves + * the the last known notification sent by a event notification plugin + * to the Ozone filesystem itself. + * + * This allows another OM to pick up from the appropriate place in the + * event of a leadership change. + */ +public class OzoneFileCheckpointStrategy implements NotificationCheckpointStrategy { + + public static final Logger LOG = LoggerFactory.getLogger(OzoneFileCheckpointStrategy.class); + + private static final String VOLUME = "notifications"; + private static final String BUCKET = "checkpoint"; + private static final String KEY = "test"; + + // TODO: this seems like the wrong thing to do but in a docker + // environment the createKey request chokes unless this is set to some + // value + private static final UserInfo USER_INFO = UserInfo.newBuilder() + .setUserName("user") + .setHostName("localhost") + .setRemoteAddress("127.0.0.1") + .build(); + + private final AtomicLong callId = new AtomicLong(0); + private final ClientId clientId = ClientId.randomId(); + private final OzoneManager ozoneManager; + private final OmMetadataReader omMetadataReader; + private final AtomicLong saveCount = new AtomicLong(0); + + + public OzoneFileCheckpointStrategy(OzoneManager ozoneManager, final OmMetadataReader omMetadataReader) { + this.ozoneManager = ozoneManager; + this.omMetadataReader = omMetadataReader; + } + + public String load() throws IOException { + try { + + OmKeyArgs omKeyArgs = new OmKeyArgs.Builder().setVolumeName(VOLUME).setBucketName(BUCKET).setKeyName(KEY).build(); + String result = + omMetadataReader.getFileStatus(omKeyArgs).getKeyInfo().getMetadata().get("notification-checkpoint"); + return result; + } catch (IOException ex) { + LOG.info("Error loading notification checkpoint {}", ex); + return null; + } + } + + public void save(String val) throws IOException { + long previousSaveCount = saveCount.getAndIncrement(); + if (previousSaveCount == 0 || previousSaveCount % 100 == 0) { + Map metadata = new HashMap<>(); + metadata.put("notification-checkpoint", val); + writeCheckpointFile(VOLUME, BUCKET, KEY, metadata); + LOG.info("Persisted notification-checkpoint {} to /{}/{}/{}", val, VOLUME, BUCKET, KEY); + } + } + + private void writeCheckpointFile(String volumeName, + String bucketName, + String keyName, + Map metadata) throws IOException { + + CreateKeyResponse createResponse = submitRequest( + buildCreateKeyRequest(volumeName, bucketName, keyName, metadata)).getCreateKeyResponse(); + + // NOTE: we used the sessionId from the createKey response in the + // commitKey request (to indicate which session we are committing) + long sessionId = createResponse.getID(); + submitRequest(buildCommitKeyRequest(volumeName, bucketName, keyName, metadata, sessionId)); + } + + private OMRequest buildCreateKeyRequest(String volumeName, + String bucketName, + String keyName, + Map metadata) { + + CreateKeyRequest createKeyRequest = CreateKeyRequest.newBuilder() + .setKeyArgs(createKeyArgs(volumeName, bucketName, keyName, metadata)) + .build(); + + return OMRequest.newBuilder() + .setCmdType(Type.CreateKey) + .setClientId(clientId.toString()) + .setCreateKeyRequest(createKeyRequest) + .setUserInfo(USER_INFO) + .build(); + } + + private OMRequest buildCommitKeyRequest(String volumeName, + String bucketName, + String keyName, + Map metadata, + long sessionId) { + + CommitKeyRequest commitKeyRequest = CommitKeyRequest.newBuilder() + .setKeyArgs(createKeyArgs(volumeName, bucketName, keyName, metadata)) + .setClientID(sessionId) + .build(); + + return OMRequest.newBuilder() + .setCmdType(Type.CommitKey) + .setClientId(clientId.toString()) + .setCommitKeyRequest(commitKeyRequest) + .build(); + } + + private static KeyArgs.Builder createKeyArgs(String volumeName, + String bucketName, + String keyName, + Map metadata) { + + KeyArgs.Builder keyArgs = KeyArgs.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName); + + // Include metadata, if provided + if (metadata != null && !metadata.isEmpty()) { + metadata.forEach((key, value) -> keyArgs.addMetadata(HddsProtos.KeyValue.newBuilder() + .setKey(key) + .setValue(value) + .build())); + } + + return keyArgs; + } + + private OMResponse submitRequest(OMRequest omRequest) { + try { + return OzoneManagerRatisUtils.submitRequest(ozoneManager, omRequest, clientId, callId.incrementAndGet()); + } catch (ServiceException e) { + LOG.error("Open key " + omRequest.getCmdType() + + " request failed. Will retry at next run.", e); + } + return null; + } + +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index a304e39dec35..0d8070e8e0f2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -95,6 +95,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private final boolean isTracingEnabled; private final AtomicInteger statePausedCount = new AtomicInteger(0); private final String threadPrefix; + private final OzoneManagerSuccessfulRequestHandler successfulRequestHandler; /** The last {@link TermIndex} received from {@link #notifyTermIndexUpdated(long, long)}. */ private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); @@ -124,6 +125,7 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer, this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory); this.nettyMetrics = NettyMetrics.create(); + this.successfulRequestHandler = new OzoneManagerSuccessfulRequestHandler(ozoneManager); } /** @@ -392,13 +394,13 @@ public CompletableFuture applyTransaction(TransactionContext trx) { ozoneManagerDoubleBuffer.acquireUnFlushedTransactions(1); return CompletableFuture.supplyAsync(() -> runCommand(request, termIndex), executorService) - .thenApply(this::processResponse); + .thenApply(resp -> processResponse(request, resp, termIndex)); } catch (Exception e) { return completeExceptionally(e); } } - private Message processResponse(OMResponse omResponse) { + private Message processResponse(OMRequest request, OMResponse omResponse, TermIndex termIndex) { if (!omResponse.getSuccess()) { // INTERNAL_ERROR or METADATA_ERROR are considered as critical errors. // In such cases, OM must be terminated instead of completing the future exceptionally, @@ -408,6 +410,10 @@ private Message processResponse(OMResponse omResponse) { } else if (omResponse.getStatus() == METADATA_ERROR) { terminate(omResponse, OMException.ResultCodes.METADATA_ERROR); } + } else { + // The operation completed successfully - hand off the request + // so we can perform some post-actions + successfulRequestHandler.handle(termIndex.getIndex(), request); } // For successful response and non-critical errors, convert the response. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java new file mode 100644 index 000000000000..b74263f29c29 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerSuccessfulRequestHandler.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationType; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is a simple hook on a successful write operation. It's + * only purpose at the moment is to write an OmCompletedRequestInfo record to the DB + */ +public final class OzoneManagerSuccessfulRequestHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneManagerSuccessfulRequestHandler.class); + + private final OzoneManager ozoneManager; + + public OzoneManagerSuccessfulRequestHandler(OzoneManager ozoneManager) { + this.ozoneManager = ozoneManager; + } + + public void handle(long trxLogIndex, OzoneManagerProtocolProtos.OMRequest omRequest) { + + switch (omRequest.getCmdType()) { + case CreateKey: + logRequest("CreateKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateKeyRequest().getKeyArgs(), + new OperationArgs.CreateKeyArgs())); + break; + case RenameKey: + logRequest("RenameKey", omRequest); + OzoneManagerProtocolProtos.RenameKeyRequest renameReq + = (OzoneManagerProtocolProtos.RenameKeyRequest) omRequest.getRenameKeyRequest(); + + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getRenameKeyRequest().getKeyArgs(), + new OperationArgs.RenameKeyArgs(renameReq.getToKeyName()))); + + break; + case DeleteKey: + logRequest("DeleteKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getDeleteKeyRequest().getKeyArgs(), + new OperationArgs.DeleteKeyArgs())); + break; + case CommitKey: + logRequest("CommitKey", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCommitKeyRequest().getKeyArgs(), + new OperationArgs.CommitKeyArgs())); + break; + case CreateDirectory: + logRequest("CreateDirectory", omRequest); + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateDirectoryRequest().getKeyArgs(), + new OperationArgs.CreateDirectoryArgs())); + break; + case CreateFile: + logRequest("CreateFile", omRequest); + + OzoneManagerProtocolProtos.CreateFileRequest createFileReq + = (OzoneManagerProtocolProtos.CreateFileRequest) omRequest.getCreateFileRequest(); + + storeCompletedRequestInfo(buildOmCompletedRequestInfo(trxLogIndex, + omRequest.getCreateFileRequest().getKeyArgs(), + new OperationArgs.CreateFileArgs(createFileReq.getIsRecursive(), + createFileReq.getIsOverwrite()))); + break; + default: + LOG.error("Unhandled cmdType={}", omRequest.getCmdType()); + break; + } + } + + private static void logRequest(String label, OzoneManagerProtocolProtos.OMRequest omRequest) { + if (LOG.isDebugEnabled()) { + LOG.debug("---> {} {}", label, omRequest); + } + } + + private OmCompletedRequestInfo buildOmCompletedRequestInfo(long trxLogIndex, + OzoneManagerProtocolProtos.KeyArgs keyArgs, + OperationArgs opArgs) { + return OmCompletedRequestInfo.newBuilder() + .setTrxLogIndex(trxLogIndex) + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setCreationTime(System.currentTimeMillis()) + .setOpArgs(opArgs) + .build(); + } + + private void storeCompletedRequestInfo(OmCompletedRequestInfo requestInfo) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing request info {}", requestInfo); + } + + String key = requestInfo.getDbKey(); + + // XXX: should this be part of an atomic db txn that happens at the end + // of each replayed event (so that the ledger is consistent with the + // processed ratis events) + + try { + ozoneManager.getMetadataManager().getCompletedRequestInfoTable().put(key, requestInfo); + //} catch (IOException ex) { + // LOG.error("Unable to write operation {}", requestInfo, ex); + } catch (Exception ex) { + LOG.error("Unable to write operation {}", requestInfo, ex); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompletedRequestInfoCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompletedRequestInfoCleanupService.java new file mode 100644 index 000000000000..e0633c5a44ce --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompletedRequestInfoCleanupService.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.service; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the background service to delete operation info records + * according to the retention strategy. + * + * NOTE: this is a crude strawman draft approach and needs revised + */ +public class CompletedRequestInfoCleanupService extends BackgroundService { + private static final Logger LOG = + LoggerFactory.getLogger(CompletedRequestInfoCleanupService.class); + + // Use only a single thread for OpenKeyCleanup. Multiple threads would read + // from the same table and can send deletion requests for same key multiple + // times. + private static final int OPERATION_INFO_DELETING_CORE_POOL_SIZE = 1; + + private final OzoneManager ozoneManager; + private final OMMetadataManager metadataManager; + private final AtomicBoolean suspended; + + public CompletedRequestInfoCleanupService(long interval, TimeUnit unit, long timeout, + OzoneManager ozoneManager, + OzoneConfiguration conf) { + super("CompletedRequestInfoCleanupService", interval, unit, + OPERATION_INFO_DELETING_CORE_POOL_SIZE, timeout, + ozoneManager.getThreadNamePrefix()); + + this.ozoneManager = ozoneManager; + this.metadataManager = this.ozoneManager.getMetadataManager(); + this.suspended = new AtomicBoolean(false); + } + + /** + * Suspend the service (for testing). + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended (for testing). + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new CompletedRequestInfoCleanupTask()); + return queue; + } + + // this runs on all OMs + private boolean shouldRun() { + return !suspended.get(); + } + + private class CompletedRequestInfoCleanupTask implements BackgroundTask { + + // TODO: number of rows is a crude/flawed criteria for deletion + // do something better + private long MAX_KEYS = 20; + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + if (!shouldRun()) { + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + LOG.debug("Running CompletedRequestInfoCleanupTask"); + long startTime = Time.monotonicNow(); + long estimatedKeyCount; + + // XXX: this is intended to short circuit the need for the + // lock/table iteration below but unless I'm missing something the + // row estimate is not very accurate and can produce false + // positives. Is there a better way to do this? + try { + estimatedKeyCount = metadataManager.countRowsInTable( + metadataManager.getCompletedRequestInfoTable()); + + LOG.debug("cleanupCompletedRequestInfoIfNecessary - estimatedKeyCount={}, maxKeys={}", estimatedKeyCount, MAX_KEYS); + if (estimatedKeyCount <= MAX_KEYS) { + LOG.debug("cleanupCompletedRequestInfoIfNecessary - nothing to do"); + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } catch (IOException e) { + // XXX + LOG.error("Error while running completed operation consumer " + + "background task. Will retry at next run.", e); + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + + // XXX: do we need a write lock here? + + Table.KeyValue requestInfoRow; + try (TableIterator> + tableIterator = metadataManager.getCompletedRequestInfoTable().iterator()) { + + tableIterator.seekToFirst(); + + // TODO - it seems like we can't trust the key estimate so we + // need to iterate the whole collection? + // or could we iterate backwards until we have enough? + // it feels like there is a better way to do this. + + long actualKeyCount = 0; + while (tableIterator.hasNext()) { + requestInfoRow = tableIterator.next(); + actualKeyCount++; + } + + if (actualKeyCount <= MAX_KEYS) { + LOG.info("cleanupCompletedRequestInfoIfNecessary - nothing to do"); + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + + long toDelete = actualKeyCount - MAX_KEYS; + String firstKeyToDelete = null; + String lastKeyToDelete = null; + LOG.debug("cleanupCompletedRequestInfoIfNecessary - actualKeyCount={}, maxKeys={}, toDelete={}", actualKeyCount, MAX_KEYS, toDelete); + + tableIterator.seekToFirst(); + + while (tableIterator.hasNext() && toDelete > 0) { + requestInfoRow = tableIterator.next(); + + if (firstKeyToDelete == null) { + firstKeyToDelete = requestInfoRow.getKey(); + } + + lastKeyToDelete = requestInfoRow.getKey(); + toDelete--; + } + + LOG.info("cleanupCompletedRequestInfoIfNecessary - firstKeyToDelete={}, lastKeyToDelete={}", + firstKeyToDelete, lastKeyToDelete); + + // XXX: do we need a lock here? + if (Objects.equals(firstKeyToDelete, lastKeyToDelete)) { + metadataManager.getCompletedRequestInfoTable().delete(firstKeyToDelete); + } else { + metadataManager.getCompletedRequestInfoTable().deleteRange(firstKeyToDelete, lastKeyToDelete); + } + + } catch (IOException e) { + LOG.error("Error while running completed operation consumer " + + "background task. Will retry at next run.", e); + } + + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java new file mode 100644 index 000000000000..2f384a2eb0d3 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmCompletedRequestInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS; + +/** + * Tests OmCompletedRequestInfo om database table for Ozone object storage operations. + */ +public class TestOmCompletedRequestInfo { + + private OMMetadataManager omMetadataManager; + private static final long EXPECTED_OPERATION_ID = 123L; + private static final String EXPECTED_OPERATION_KEY = "123"; + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + private static final String KEY_NAME = "bucket1"; + private static final long CLIENT_ID = 321L; + + @TempDir + private Path folder; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(OZONE_OM_DB_DIRS, + folder.toAbsolutePath().toString()); + omMetadataManager = new OmMetadataManagerImpl(conf, null); + } + + private OmCompletedRequestInfo createRequestInfo() { + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(123L) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(KEY_NAME) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.CreateKeyArgs()) + .build(); + } + + @Test + public void testTableExists() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + Assertions.assertTrue(requestInfo.isEmpty()); + } + + @Test + public void testAddNewOperation() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + requestInfo.put(EXPECTED_OPERATION_KEY, createRequestInfo()); + Assertions.assertEquals(EXPECTED_OPERATION_ID, + requestInfo.get(EXPECTED_OPERATION_KEY).getTrxLogIndex()); + } + + @Test + public void testDeleteOmCompletedRequestInfo() throws Exception { + Table requestInfo = + omMetadataManager.getCompletedRequestInfoTable(); + + Assertions.assertFalse(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + requestInfo.put(EXPECTED_OPERATION_KEY, createRequestInfo()); + Assertions.assertTrue(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + requestInfo.delete(EXPECTED_OPERATION_KEY); + Assertions.assertFalse(requestInfo.isExist(EXPECTED_OPERATION_KEY)); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java new file mode 100644 index 000000000000..ba4e7e5b9a08 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +public class BarPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java new file mode 100644 index 000000000000..ac28f17b87f2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +public class FooPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestLocalFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestLocalFileCheckpointStrategy.java new file mode 100644 index 000000000000..7b4ae6fd206e --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestLocalFileCheckpointStrategy.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests {@link LocalFileCheckpointStrategy}. + */ +public class TestLocalFileCheckpointStrategy { + + @TempDir + private Path tempDir; + + @Test + public void testSeekPositionIsInitiallyNull() throws InterruptedException, IOException { + Path filePath = Paths.get(tempDir.toString(), UUID.randomUUID().toString()); + + OMEventListenerLedgerPollerSeekPosition seekPosition = new OMEventListenerLedgerPollerSeekPosition( + new LocalFileCheckpointStrategy(filePath)); + + assertThat(seekPosition.get()).isNull(); + } + + @Test + public void testSeekPositionCanBeUpdatedAndReadBack() throws InterruptedException, IOException { + Path filePath = Paths.get(tempDir.toString(), UUID.randomUUID().toString()); + + OMEventListenerLedgerPollerSeekPosition seekPosition = new OMEventListenerLedgerPollerSeekPosition( + new LocalFileCheckpointStrategy(filePath)); + + String value1 = "00000000000000000017"; + seekPosition.set(value1); + assertThat(seekPosition.get()).isEqualTo(value1); + + String value2 = "00000000000000000020"; + seekPosition.set(value2); + assertThat(seekPosition.get()).isEqualTo(value2); + } + + @Test + public void testSeekPositionCanBeUpdatedAndReloaded() throws InterruptedException, IOException { + Path filePath = Paths.get(tempDir.toString(), UUID.randomUUID().toString()); + + OMEventListenerLedgerPollerSeekPosition seekPosition = new OMEventListenerLedgerPollerSeekPosition( + new LocalFileCheckpointStrategy(filePath)); + + String value = "00000000000000000025"; + seekPosition.set(value); + assertThat(seekPosition.get()).isEqualTo(value); + + // reload + seekPosition = new OMEventListenerLedgerPollerSeekPosition( + new LocalFileCheckpointStrategy(filePath)); + + assertThat(seekPosition.get()).isEqualTo(value); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java new file mode 100644 index 000000000000..da6c28ebf386 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerPluginManager { + + @Mock + private OzoneManager ozoneManager; + + static List getLoadedPlugins(OMEventListenerPluginManager pluginManager) { + List loadedClasses = new ArrayList<>(); + for (OMEventListener plugin : pluginManager.getLoaded()) { + loadedClasses.add(plugin.getClass().getName()); + } + + // normalize + Collections.sort(loadedClasses); + + return loadedClasses; + } + + private static class BrokenFooPlugin { + + } + + @Test + public void testLoadSinglePlugin() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testLoadMultiplePlugins() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + conf.set("ozone.om.plugin.destination.bar", "enabled"); + conf.set("ozone.om.plugin.destination.bar.classname", "org.apache.hadoop.ozone.om.eventlistener.BarPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.BarPlugin", + "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginMissingClassname() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotExist() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.NotExistingPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotImplementInterface() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java new file mode 100644 index 000000000000..d1a0bee2220a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOzoneFileCheckpointStrategy.java @@ -0,0 +1,105 @@ +package org.apache.hadoop.ozone.om.eventlistener; + + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.google.protobuf.ServiceException; +import org.apache.hadoop.ozone.om.OmMetadataReader; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus; +import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; +import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest; +import org.apache.ratis.protocol.ClientId; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import java.io.FileNotFoundException; +import java.io.IOException; + +/** + * Tests {@link OzoneFileCheckpointStrategy}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOzoneFileCheckpointStrategy { + + @Mock + OzoneManager mockOzoneManager; + @Mock + OmMetadataReader mockOmMetadataReader; + OzoneManagerRatisServer mockOzoneManagerRatisServer; + @Mock + OzoneManagerProtocolProtos.OMResponse mockOmResponse; + @Mock + OzoneManagerProtocolProtos.CreateKeyResponse mockOmCreateResponse; + OzoneFileStatus fileStatus; + OzoneFileCheckpointStrategy ozoneFileCheckpointStrategy; + + @BeforeEach + public void setup() { + ozoneFileCheckpointStrategy = new OzoneFileCheckpointStrategy(mockOzoneManager, mockOmMetadataReader); + } + + @Test + public void testSaveStrategy() throws IOException, ServiceException { + + when(mockOmCreateResponse.getID()).thenReturn(123L); + when(mockOmResponse.getCreateKeyResponse()).thenReturn(mockOmCreateResponse); + try (MockedStatic utils = mockStatic(OzoneManagerRatisUtils.class)) { + + utils.when(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class))).thenReturn(mockOmResponse); + + //Check its saved on first iteration + ozoneFileCheckpointStrategy.save("00000000000000000001"); + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(2)); + //But not on second + ozoneFileCheckpointStrategy.save("0000000000000000002"); + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(2)); + + for (int i = 0; i <=100; i++) { + String val = String.format("%020d", i); + ozoneFileCheckpointStrategy.save(val); + } + + //Check submit has only ran twice(4 times in total) + utils.verify(() -> OzoneManagerRatisUtils.submitRequest(any(OzoneManager.class), any(OzoneManagerProtocolProtos.OMRequest.class), + any(ClientId.class), any(Long.class)), Mockito.times(4)); + } + } + + @Test + public void testLoadStrategyWhenMetadataNotSet() throws IOException { + OmKeyInfo omKeyInfo = new OmKeyInfo.Builder().build(); + fileStatus = new OzoneFileStatus(omKeyInfo, 1L, false); + when(mockOmMetadataReader.getFileStatus(any())).thenReturn(fileStatus); + Assertions.assertEquals(null, ozoneFileCheckpointStrategy.load()); + } + + @Test + public void testLoadStrategyWhenFileDoesNotExist() throws IOException { + when(mockOmMetadataReader.getFileStatus(any())).thenThrow(FileNotFoundException.class); + Assertions.assertEquals(null, ozoneFileCheckpointStrategy.load()); + } + + @Test + public void testLoadStrategyWithValidMetaData() throws IOException { + OmKeyInfo omKeyInfo = new OmKeyInfo.Builder().addMetadata("notification-checkpoint", "00000000000000000017").build(); + fileStatus = new OzoneFileStatus(omKeyInfo, 1L, false); + when(mockOmMetadataReader.getFileStatus(any())).thenReturn(fileStatus); + Assertions.assertEquals("00000000000000000017", ozoneFileCheckpointStrategy.load()); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java new file mode 100644 index 000000000000..11577c127d96 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerSuccessfulRequestHandler.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.UUID; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockedConstruction; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +/** + * Testing OzoneManagerSuccessfulRequestHandler class. + */ +@ExtendWith(MockitoExtension.class) +public class TestOzoneManagerSuccessfulRequestHandler { + + private static final String TEST_VOLUME_NAME = "testVol"; + private static final String TEST_BUCKET_NAME = "testBucket"; + private static final String TEST_KEY = "/foo/bar/baz/key"; + private static final String TEST_KEY_RENAMED = TEST_KEY + "_RENAMED"; + + private static final KeyArgs TEST_KEY_ARGS = KeyArgs.newBuilder() + .setKeyName(TEST_KEY) + .setVolumeName(TEST_VOLUME_NAME) + .setBucketName(TEST_BUCKET_NAME) + .build(); + + @Mock + private OzoneManager ozoneManager; + + @Mock + private OMMetadataManager omMetadataManager; + + @Mock + private OzoneConfiguration configuration; + + @Mock + private Table completedRequestInfoTable; + + protected OMRequest createCreateKeyRequest() { + CreateKeyRequest createKeyRequest = CreateKeyRequest.newBuilder() + .setKeyArgs(TEST_KEY_ARGS).build(); + + return OMRequest.newBuilder() + .setClientId(UUID.randomUUID().toString()) + .setCreateKeyRequest(createKeyRequest) + .setCmdType(Type.CreateKey).build(); + } + + protected OMRequest createRenameKeyRequest() { + RenameKeyRequest renameKeyRequest = RenameKeyRequest.newBuilder() + .setKeyArgs(TEST_KEY_ARGS).setToKeyName(TEST_KEY_RENAMED).build(); + + return OMRequest.newBuilder() + .setClientId(UUID.randomUUID().toString()) + .setRenameKeyRequest(renameKeyRequest) + .setCmdType(Type.RenameKey).build(); + } + + @Test + public void testCreateKeyRequest() throws IOException { + + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + when(omMetadataManager.getCompletedRequestInfoTable()).thenReturn(completedRequestInfoTable); + + OzoneManagerSuccessfulRequestHandler requestHandler + = new OzoneManagerSuccessfulRequestHandler(ozoneManager); + + requestHandler.handle(123L, createCreateKeyRequest()); + + ArgumentCaptor arg1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor arg2 = ArgumentCaptor.forClass(OmCompletedRequestInfo.class); + verify(completedRequestInfoTable, times(1)).put(arg1.capture(), arg2.capture()); + + String key = arg1.getValue(); + assertThat(key).isEqualTo("00000000000000000123"); + + OmCompletedRequestInfo requestInfo = arg2.getValue(); + assertThat(requestInfo.getVolumeName()).isEqualTo(TEST_VOLUME_NAME); + assertThat(requestInfo.getBucketName()).isEqualTo(TEST_BUCKET_NAME); + assertThat(requestInfo.getKeyName()).isEqualTo(TEST_KEY); + assertThat(requestInfo.getOpArgs()).isInstanceOf(OperationArgs.CreateKeyArgs.class); + } + + @Test + public void testRenameKeyRequest() throws IOException { + + when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + when(omMetadataManager.getCompletedRequestInfoTable()).thenReturn(completedRequestInfoTable); + + OzoneManagerSuccessfulRequestHandler requestHandler + = new OzoneManagerSuccessfulRequestHandler(ozoneManager); + + requestHandler.handle(123L, createRenameKeyRequest()); + + ArgumentCaptor arg1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor arg2 = ArgumentCaptor.forClass(OmCompletedRequestInfo.class); + verify(completedRequestInfoTable, times(1)).put(arg1.capture(), arg2.capture()); + + String key = arg1.getValue(); + assertThat(key).isEqualTo("00000000000000000123"); + + OmCompletedRequestInfo requestInfo = arg2.getValue(); + assertThat(requestInfo.getVolumeName()).isEqualTo(TEST_VOLUME_NAME); + assertThat(requestInfo.getBucketName()).isEqualTo(TEST_BUCKET_NAME); + assertThat(requestInfo.getKeyName()).isEqualTo(TEST_KEY); + assertThat(requestInfo.getOpArgs()).isInstanceOf(OperationArgs.RenameKeyArgs.class); + OperationArgs.RenameKeyArgs opArgs = (OperationArgs.RenameKeyArgs) requestInfo.getOpArgs(); + assertThat(opArgs.getToKeyName()).isEqualTo(TEST_KEY_RENAMED); + } +} diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 33ab6235f291..154aadba1003 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -43,6 +43,7 @@ interface-storage mini-cluster ozone-manager + ozone-manager-plugins ozonefs ozonefs-common recon diff --git a/pom.xml b/pom.xml index 9d2ca2aa4286..13bff8dfeddf 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,7 @@ 2.1 1.1.1 5.13.4 + 2.8.2 1.0.1 1.9.25 2.6.0 @@ -973,6 +974,11 @@ httpcore-nio ${httpcore.version} + + org.apache.kafka + kafka-clients + ${kafka.version} + org.apache.kerby kerb-core @@ -1223,6 +1229,11 @@ ${ozone.version} test-jar + + org.apache.ozone + ozone-manager-plugins + ${ozone.version} + org.apache.ozone ozone-mini-cluster @@ -2216,6 +2227,7 @@ --> com.fasterxml.jackson.core:jackson-databind:jar org.apache.commons:commons-compress:jar + org.apache.hadoop:hadoop-common:jar org.apache.ozone:hdds-client:jar org.apache.ozone:ozone-interface-client:jar org.glassfish.jersey.core:jersey-common:jar