Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* the License.
*
*/

package org.apache.hadoop.hdds.scm.container;

import com.google.common.base.Preconditions;
Expand All @@ -24,14 +23,25 @@
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.LongCodec;

/**
* Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
* <p>
* We are creating a specific type for this to avoid mixing this with
* normal integers in code.
* <p>
* This class is immutable.
*/
public final class ContainerID implements Comparable<ContainerID> {
private static final Codec<ContainerID> CODEC = new DelegatedCodec<>(
LongCodec.get(), ContainerID::valueOf, c -> c.id, true);

public static Codec<ContainerID> getCodec() {
return CODEC;
}

private final long id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;

/**
* A buffer used by {@link Codec}
Expand Down Expand Up @@ -159,4 +160,19 @@ public CodecBuffer put(ByteBuffer buffer) {
buf.writeBytes(buffer);
return this;
}

/**
* Put bytes from the given source to this buffer.
*
* @param source put bytes to a {@link ByteBuffer} and return the size.
* @return this object.
*/
public CodecBuffer put(ToIntFunction<ByteBuffer> source) {
assertRefCnt(1);
final int w = buf.writerIndex();
final ByteBuffer buffer = buf.nioBuffer(w, buf.writableBytes());
final int size = source.applyAsInt(buffer);
buf.setIndex(buf.readerIndex(), w + size);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import org.apache.ratis.util.function.CheckedFunction;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.function.IntFunction;

/**
* A {@link Codec} to serialize/deserialize objects by delegation.
*
* @param <T> The object type of this {@link Codec}.
* @param <DELEGATE> The object type of the {@link #delegate}.
*/
public class DelegatedCodec<T, DELEGATE> implements Codec<T> {
private final Codec<DELEGATE> delegate;
private final CheckedFunction<DELEGATE, T, IOException> forward;
private final CheckedFunction<T, DELEGATE, IOException> backward;
private final boolean immutable;

/**
* Construct a {@link Codec} using the given delegate.
*
* @param delegate the delegate {@link Codec}
* @param forward a function to convert {@link DELEGATE} to {@link T}.
* @param backward a function to convert {@link T} back to {@link DELEGATE}.
* @param immutable are the objects in {@link T} immutable?
*/
public DelegatedCodec(Codec<DELEGATE> delegate,
CheckedFunction<DELEGATE, T, IOException> forward,
CheckedFunction<T, DELEGATE, IOException> backward,
boolean immutable) {
this.delegate = delegate;
this.forward = forward;
this.backward = backward;
this.immutable = immutable;
}

/** The same as new DelegatedCodec(delegate, forward, backward, false). */
public DelegatedCodec(Codec<DELEGATE> delegate,
CheckedFunction<DELEGATE, T, IOException> forward,
CheckedFunction<T, DELEGATE, IOException> backward) {
this(delegate, forward, backward, false);
}

@Override
public final boolean supportCodecBuffer() {
return delegate.supportCodecBuffer();
}

@Override
public final CodecBuffer toCodecBuffer(@Nonnull T message,
IntFunction<CodecBuffer> allocator) throws IOException {
return delegate.toCodecBuffer(backward.apply(message), allocator);
}

@Override
public final T fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws IOException {
return forward.apply(delegate.fromCodecBuffer(buffer));
}

@Override
public final byte[] toPersistedFormat(T message) throws IOException {
return delegate.toPersistedFormat(backward.apply(message));
}

@Override
public final T fromPersistedFormat(byte[] bytes) throws IOException {
return forward.apply(delegate.fromPersistedFormat(bytes));
}

@Override
public T copyObject(T message) {
if (immutable) {
return message;
}
try {
return forward.apply(delegate.copyObject(backward.apply(message)));
} catch (IOException e) {
throw new IllegalStateException("Failed to copyObject", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
* Codec to convert Long to/from byte array.
*/
public final class LongCodec implements Codec<Long> {
private static final LongCodec CODEC = new LongCodec();

public static LongCodec get() {
return CODEC;
}

@Override
public boolean supportCodecBuffer() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
import org.apache.ratis.thirdparty.com.google.protobuf.Parser;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.IntFunction;

/**
* Codecs to serialize/deserialize Protobuf v3 messages.
*/
public final class Proto3Codec<M extends MessageLite>
implements Codec<M> {
private static final ConcurrentMap<Class<? extends MessageLite>,
Codec<? extends MessageLite>> CODECS
= new ConcurrentHashMap<>();

/**
* @return the {@link Codec} for the given class.
*/
public static <T extends MessageLite> Codec<T> get(Class<T> clazz) {
final Codec<?> codec = CODECS.computeIfAbsent(clazz, Proto3Codec::new);
return (Codec<T>) codec;
}
Comment on lines +36 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of saving codec instances to the map? Are we likely to have multiple uses of the same proto message in different domain objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map is to enforce singleton. We may consider Proto3Codec::new expensive since it uses reflection to create an object. Suppose we return a new object as below

  public static <T extends MessageLite> Codec<T> getNew(Class<T> clazz) {
    return new Proto3Codec(clazz);
  }

Then, it can possibly be misused as

getNew().toCodecBuffer(message, allocator);

i.e. it creates a new codec for each serialization.


private static <M extends MessageLite> Parser<M> getParser(Class<M> clazz) {
final String name = "PARSER";
try {
return (Parser<M>) clazz.getField(name).get(null);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to get " + name + " field from " + clazz, e);
}
}

private final Parser<M> parser;

private Proto3Codec(Class<M> clazz) {
this.parser = getParser(clazz);
}

@Override
public boolean supportCodecBuffer() {
return true;
}

@Override
public CodecBuffer toCodecBuffer(@Nonnull M message,
IntFunction<CodecBuffer> allocator) {
final int size = message.getSerializedSize();
return allocator.apply(size).put(buffer -> {
try {
message.writeTo(CodedOutputStream.newInstance(buffer));
} catch (IOException e) {
throw new IllegalStateException(
"Failed to writeTo: message=" + message, e);
}
return size;
});
}

@Override
public M fromCodecBuffer(@Nonnull CodecBuffer buffer)
throws InvalidProtocolBufferException {
return parser.parseFrom(buffer.asReadOnlyByteBuffer());
}

@Override
public byte[] toPersistedFormat(M message) {
return message.toByteArray();
}

@Override
public M fromPersistedFormat(byte[] bytes)
throws InvalidProtocolBufferException {
return parser.parseFrom(bytes);
}

@Override
public M copyObject(M message) {
// proto messages are immutable
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -32,6 +35,15 @@
* Helper class to convert Protobuf to Java classes.
*/
public class BlockData {
private static final Codec<BlockData> CODEC = new DelegatedCodec<>(
Proto3Codec.get(ContainerProtos.BlockData.class),
BlockData::getFromProtoBuf,
BlockData::getProtoBufMessage);

public static Codec<BlockData> getCodec() {
return CODEC;
}

private final BlockID blockID;
private final Map<String, String> metadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,38 @@
package org.apache.hadoop.ozone.container.common.helpers;

import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto3Codec;

import java.util.Collections;
import java.util.List;

/**
* Helper class to convert between protobuf lists and Java lists of
* {@link ContainerProtos.ChunkInfo} objects.
* <p>
* This class is immutable.
*/
public class ChunkInfoList {
private List<ContainerProtos.ChunkInfo> chunks;
private static final Codec<ChunkInfoList> CODEC = new DelegatedCodec<>(
Proto3Codec.get(ContainerProtos.ChunkInfoList.class),
ChunkInfoList::getFromProtoBuf,
ChunkInfoList::getProtoBufMessage,
true);

public static Codec<ChunkInfoList> getCodec() {
return CODEC;
}

private final List<ContainerProtos.ChunkInfo> chunks;

public ChunkInfoList(List<ContainerProtos.ChunkInfo> chunks) {
this.chunks = chunks;
this.chunks = Collections.unmodifiableList(chunks);
}

public List<ContainerProtos.ChunkInfo> asList() {
return Collections.unmodifiableList(chunks);
return chunks;
}

/**
Expand Down

This file was deleted.

Loading