Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,25 @@ public final class Proto3Codec<M extends MessageLite> implements Codec<M> {

private final Class<M> clazz;
private final Parser<M> parser;
private final boolean allowInvalidProtocolBufferException;

/**
* @return the {@link Codec} for the given class.
*/
public static <T extends MessageLite> Codec<T> get(T t) {
return get(t, false);
}

public static <T extends MessageLite> Codec<T> get(T t, boolean allowInvalidProtocolBufferException) {
final Codec<?> codec = CODECS.computeIfAbsent(t.getClass(),
key -> new Proto3Codec<>(t));
key -> new Proto3Codec<>(t, allowInvalidProtocolBufferException));
return (Codec<T>) codec;
}

private Proto3Codec(M m) {
private Proto3Codec(M m, boolean allowInvalidProtocolBufferException) {
this.clazz = (Class<M>) m.getClass();
this.parser = (Parser<M>) m.getParserForType();
this.allowInvalidProtocolBufferException = allowInvalidProtocolBufferException;
}

@Override
Expand Down Expand Up @@ -85,6 +91,9 @@ public M fromCodecBuffer(@Nonnull CodecBuffer buffer) throws CodecException {
try {
return parser.parseFrom(buffer.asReadOnlyByteBuffer());
} catch (InvalidProtocolBufferException e) {
if (allowInvalidProtocolBufferException) {
return null;
}
throw new CodecException("Failed to parse " + buffer + " for " + getTypeClass(), e);
}
}
Expand All @@ -97,7 +106,14 @@ public byte[] toPersistedFormat(M message) {
@Override
public M fromPersistedFormatImpl(byte[] bytes)
throws InvalidProtocolBufferException {
return parser.parseFrom(bytes);
try {
return parser.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
if (allowInvalidProtocolBufferException) {
return null;
}
throw e;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
*/
public class BlockData {
private static final Codec<BlockData> CODEC = new DelegatedCodec<>(
Proto3Codec.get(ContainerProtos.BlockData.getDefaultInstance()),
// allow InvalidProtocolBufferException for backward compatibility with Schema One
Proto3Codec.get(ContainerProtos.BlockData.getDefaultInstance(), true),
BlockData::getFromProtoBuf,
BlockData::getProtoBufMessage,
BlockData.class);
Expand Down Expand Up @@ -96,6 +97,9 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) {
* @return - BlockData
*/
public static BlockData getFromProtoBuf(ContainerProtos.BlockData data) throws CodecException {
if (data == null) {
return null;
}
BlockData blockData = new BlockData(
BlockID.getFromProtobuf(data.getBlockID()));
for (int x = 0; x < data.getMetadataCount(); x++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.hadoop.ozone.container.metadata;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfoList;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Codec for parsing {@link org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfoList}
Expand All @@ -44,6 +47,8 @@
* always be present.
*/
public final class SchemaOneChunkInfoListCodec implements Codec<ChunkInfoList> {
public static final Logger LOG = LoggerFactory.getLogger(SchemaOneChunkInfoListCodec.class);
private static final AtomicBoolean LOGGED = new AtomicBoolean(false);

private static final Codec<ChunkInfoList> INSTANCE =
new SchemaOneChunkInfoListCodec();
Expand Down Expand Up @@ -72,9 +77,12 @@ public ChunkInfoList fromPersistedFormat(byte[] rawData) throws CodecException {
return ChunkInfoList.getFromProtoBuf(
ContainerProtos.ChunkInfoList.parseFrom(rawData));
} catch (InvalidProtocolBufferException ex) {
throw new CodecException("Invalid chunk information. " +
if (LOGGED.compareAndSet(false, true)) {
LOG.warn("Invalid chunk information. " +
"This data may have been written using datanode " +
"schema version one, which did not save chunk information.", ex);
}
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ public void testReadDeletedBlockChunkInfo(String schemaVersion)

for (Table.KeyValue<String, ChunkInfoList> chunkListKV: deletedBlocks) {
preUpgradeBlocks.add(chunkListKV.getKey());
assertThrows(IOException.class, () -> chunkListKV.getValue(),
"No exception thrown when trying to retrieve old deleted blocks values as chunk lists.");
assertNull(chunkListKV.getValue());
}

assertEquals(TestDB.NUM_DELETED_BLOCKS, preUpgradeBlocks.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public KeyValue<KEY, VALUE> next() {
try {
return convert(rawIterator.next());
} catch (CodecException e) {
throw new IllegalStateException("Failed next()", e);
throw new IllegalStateException("Failed next() in " + TypedTable.this, e);
}
}

Expand Down
Loading