-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-11650. ContainerId list to track all containers created in a datanode #7402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
4d2ea78
87a4809
5603b56
c048a5e
f4c538f
f22f0d1
c94734a
e579d0e
2c376bc
4b77481
5027029
c5392d0
7c4837a
1ae494b
bdc2e50
06ca347
8f98ab9
7d7f078
108bf82
5b3d27a
af0f757
b97d874
09b2dfe
082cfc9
3766bc3
46ee375
564ae17
af144a2
79e5de8
b0ffe5d
7a0e341
730d75e
5446f4a
9cbc45f
261f8fc
4320d50
ac3918c
3d9431a
50b27bf
827bc86
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hdds.utils.db; | ||
|
|
||
| import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum; | ||
| import jakarta.annotation.Nonnull; | ||
|
|
||
| import java.io.IOException; | ||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
|
|
||
| /** | ||
| * Codecs to serialize/deserialize Protobuf v2 enums. | ||
| */ | ||
| public final class Proto2EnumCodec<M extends ProtocolMessageEnum> | ||
| implements Codec<M> { | ||
| private static final ConcurrentMap<Class<? extends ProtocolMessageEnum>, | ||
| Codec<? extends ProtocolMessageEnum>> CODECS | ||
| = new ConcurrentHashMap<>(); | ||
| private static final IntegerCodec INTEGER_CODEC = IntegerCodec.get(); | ||
|
|
||
| /** | ||
| * @return the {@link Codec} for the given class. | ||
| */ | ||
| public static <T extends ProtocolMessageEnum> Codec<T> get(T t) { | ||
| final Codec<?> codec = CODECS.computeIfAbsent(t.getClass(), | ||
| key -> new Proto2EnumCodec<>(t)); | ||
| return (Codec<T>) codec; | ||
| } | ||
|
|
||
| private final Class<M> clazz; | ||
|
|
||
| private Proto2EnumCodec(M m) { | ||
| this.clazz = (Class<M>) m.getClass(); | ||
| } | ||
|
|
||
| @Override | ||
| public Class<M> getTypeClass() { | ||
| return clazz; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean supportCodecBuffer() { | ||
| return INTEGER_CODEC.supportCodecBuffer(); | ||
| } | ||
|
|
||
| @Override | ||
| public CodecBuffer toCodecBuffer(@Nonnull M value, | ||
| CodecBuffer.Allocator allocator) throws IOException { | ||
| return INTEGER_CODEC.toCodecBuffer(value.getNumber(), allocator); | ||
| } | ||
|
|
||
| private M parseFrom(Integer value) throws IOException { | ||
| try { | ||
| return (M) this.clazz.getDeclaredMethod("forNumber", int.class).invoke(null, value); | ||
| } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { | ||
| throw new IOException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public M fromCodecBuffer(@Nonnull CodecBuffer buffer) | ||
| throws IOException { | ||
| return parseFrom(INTEGER_CODEC.fromCodecBuffer(buffer)); | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toPersistedFormat(M value) { | ||
| return INTEGER_CODEC.toPersistedFormat(value.getNumber()); | ||
| } | ||
|
|
||
| @Override | ||
| public M fromPersistedFormat(byte[] bytes) throws IOException { | ||
| return parseFrom(INTEGER_CODEC.fromPersistedFormat(bytes)); | ||
| } | ||
|
|
||
| @Override | ||
| public M copyObject(M message) { | ||
| // proto messages are immutable | ||
| return message; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,12 +23,16 @@ | |||||||
| import com.google.common.collect.ImmutableMap; | ||||||||
| import com.google.protobuf.Message; | ||||||||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; | ||||||||
| import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; | ||||||||
|
|
||||||||
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; | ||||||||
| import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; | ||||||||
| import org.apache.hadoop.hdds.utils.db.Table; | ||||||||
| import org.apache.hadoop.ozone.container.common.interfaces.Container; | ||||||||
| import org.apache.hadoop.ozone.container.common.statemachine.StateContext; | ||||||||
| import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; | ||||||||
| import org.apache.hadoop.ozone.container.common.volume.HddsVolume; | ||||||||
|
|
||||||||
| import org.slf4j.Logger; | ||||||||
| import org.slf4j.LoggerFactory; | ||||||||
|
|
||||||||
|
|
@@ -65,10 +69,19 @@ public class ContainerSet implements Iterable<Container<?>> { | |||||||
| new ConcurrentSkipListMap<>(); | ||||||||
| private Clock clock; | ||||||||
| private long recoveringTimeout; | ||||||||
| private final Table<Long, State> containerIdsTable; | ||||||||
|
|
||||||||
| public ContainerSet(Table<Long, State> continerIdsTable, long recoveringTimeout) { | ||||||||
| this(continerIdsTable, recoveringTimeout, false); | ||||||||
| } | ||||||||
|
|
||||||||
| public ContainerSet(long recoveringTimeout) { | ||||||||
| public ContainerSet(Table<Long, State> continerIdsTable, long recoveringTimeout, boolean readOnly) { | ||||||||
| this.clock = Clock.system(ZoneOffset.UTC); | ||||||||
| this.containerIdsTable = continerIdsTable; | ||||||||
| this.recoveringTimeout = recoveringTimeout; | ||||||||
| if (!readOnly && containerIdsTable == null) { | ||||||||
| throw new IllegalArgumentException("Container table cannot be null when container set is not read only"); | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| public long getCurrentTime() { | ||||||||
|
|
@@ -85,22 +98,46 @@ public void setRecoveringTimeout(long recoveringTimeout) { | |||||||
| this.recoveringTimeout = recoveringTimeout; | ||||||||
| } | ||||||||
|
|
||||||||
| public boolean addContainer(Container<?> container) throws StorageContainerException { | ||||||||
| return addContainer(container, false); | ||||||||
| } | ||||||||
|
|
||||||||
| public void validateContainerIsMissing(long containerId, State state) throws StorageContainerException { | ||||||||
kerneltime marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||
| if (missingContainerSet.contains(containerId)) { | ||||||||
| throw new StorageContainerException(String.format("Container with container Id %d with state : %s is missing in" + | ||||||||
| " the DN.", containerId, state), | ||||||||
| ContainerProtos.Result.CONTAINER_MISSING); | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * Add Container to container map. | ||||||||
| * @param container container to be added | ||||||||
| * @return If container is added to containerMap returns true, otherwise | ||||||||
| * false | ||||||||
| */ | ||||||||
| public boolean addContainer(Container<?> container) throws | ||||||||
| public boolean addContainer(Container<?> container, boolean overwriteMissingContainers) throws | ||||||||
|
||||||||
| public boolean addContainer(Container<?> container) throws StorageContainerException { | |
| return addContainer(container, false); | |
| } |
kerneltime marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -175,7 +175,8 @@ private boolean canIgnoreException(Result result) { | |
| case CONTAINER_UNHEALTHY: | ||
| case CLOSED_CONTAINER_IO: | ||
| case DELETE_ON_OPEN_CONTAINER: | ||
| case UNSUPPORTED_REQUEST: // Blame client for sending unsupported request. | ||
| case UNSUPPORTED_REQUEST: | ||
| case CONTAINER_MISSING:// Blame client for sending unsupported request. | ||
|
||
| return true; | ||
| default: | ||
| return false; | ||
|
|
@@ -276,7 +277,8 @@ private ContainerCommandResponseProto dispatchRequest( | |
| getMissingContainerSet().remove(containerID); | ||
| } | ||
| } | ||
| if (getMissingContainerSet().contains(containerID)) { | ||
| if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg) | ||
kerneltime marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| && getMissingContainerSet().contains(containerID)) { | ||
| StorageContainerException sce = new StorageContainerException( | ||
| "ContainerID " + containerID | ||
| + " has been lost and cannot be recreated on this DataNode", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,24 +17,24 @@ | |
| */ | ||
| package org.apache.hadoop.ozone.container.common.interfaces; | ||
|
|
||
| import org.apache.hadoop.ozone.container.metadata.DatanodeStore; | ||
| import org.apache.hadoop.ozone.container.metadata.AbstractStore; | ||
|
|
||
| import java.io.Closeable; | ||
|
|
||
| /** | ||
| * DB handle abstract class. | ||
| */ | ||
| public abstract class DBHandle implements Closeable { | ||
| public abstract class DBHandle<STORE extends AbstractStore> implements Closeable { | ||
|
||
|
|
||
| private final DatanodeStore store; | ||
| private final STORE store; | ||
| private final String containerDBPath; | ||
|
|
||
| public DBHandle(DatanodeStore store, String containerDBPath) { | ||
| public DBHandle(STORE store, String containerDBPath) { | ||
| this.store = store; | ||
| this.containerDBPath = containerDBPath; | ||
| } | ||
|
|
||
| public DatanodeStore getStore() { | ||
| public STORE getStore() { | ||
| return this.store; | ||
| } | ||
|
|
||
|
|
@@ -45,4 +45,12 @@ public String getContainerDBPath() { | |
| public boolean cleanup() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "DBHandle{" + | ||
| "containerDBPath='" + containerDBPath + '\'' + | ||
| ", store=" + store + | ||
| '}'; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.