Skip to content

Commit

Permalink
[flink] support CoderTypeSerializer snapshot migration (apache#29939)
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Feb 19, 2024
1 parent 74b0684 commit 1a728a0
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.types;

import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.construction.SerializablePipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for Beam {@link
* org.apache.beam.sdk.coders.Coder Coders}.
*/
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class CoderTypeSerializer<T> extends TypeSerializer<T> {

private final Coder<T> coder;

/**
* {@link SerializablePipelineOptions} deserialization will cause {@link
* org.apache.beam.sdk.io.FileSystems} registration needed for {@link
* org.apache.beam.sdk.transforms.Reshuffle} translation.
*/
private final SerializablePipelineOptions pipelineOptions;

private final boolean fasterCopy;

public CoderTypeSerializer(Coder<T> coder, SerializablePipelineOptions pipelineOptions) {
Preconditions.checkNotNull(coder);
Preconditions.checkNotNull(pipelineOptions);
this.coder = coder;
this.pipelineOptions = pipelineOptions;

FlinkPipelineOptions options = pipelineOptions.get().as(FlinkPipelineOptions.class);
this.fasterCopy = options.getFasterCopy();
}

@Override
public boolean isImmutableType() {
return false;
}

@Override
public CoderTypeSerializer<T> duplicate() {
return new CoderTypeSerializer<>(coder, pipelineOptions);
}

@Override
public T createInstance() {
return null;
}

@Override
public T copy(T t) {
if (fasterCopy) {
return t;
}
try {
return CoderUtils.clone(coder, t);
} catch (CoderException e) {
throw new RuntimeException("Could not clone.", e);
}
}

@Override
public T copy(T t, T reuse) {
return copy(t);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
coder.encode(t, outputWrapper);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
try {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
return coder.decode(inputWrapper);
} catch (CoderException e) {
Throwable cause = e.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw e;
}
}
}

@Override
public T deserialize(T t, DataInputView dataInputView) throws IOException {
return deserialize(dataInputView);
}

@Override
public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
serialize(deserialize(dataInputView), dataOutputView);
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CoderTypeSerializer that = (CoderTypeSerializer) o;
return coder.equals(that.coder);
}

@Override
public int hashCode() {
return coder.hashCode();
}

@Override
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new UnversionedTypeSerializerSnapshot<>(this);
}

/**
* A legacy snapshot which does not care about schema compatibility. This is used only for state
* restore of state created by Beam 2.54.0 and below for Flink 1.16 and below.
*/
public static class LegacySnapshot<T> extends TypeSerializerConfigSnapshot<T> {

/** Needs to be public to work with {@link VersionedIOReadableWritable}. */
public LegacySnapshot() {}

public LegacySnapshot(CoderTypeSerializer<T> serializer) {
setPriorSerializer(serializer);
}

@Override
public int getVersion() {
// We always return the same version
return 1;
}

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
// We assume compatibility because we don't have a way of checking schema compatibility
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}

@Override
public String toString() {
return "CoderTypeSerializer{" + "coder=" + coder + '}';
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.translation.types.compat.UnversionedTypeSerializerSnapshot;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.EOFException;
import java.io.IOException;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.types.compat.UnversionedTypeSerializerSnapshot;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.types.compat;
package org.apache.beam.runners.flink.translation.types;

import java.io.IOException;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.TemporaryClassLoaderContext;

/** A legacy snapshot which does not care about schema compatibility. */
@SuppressWarnings("allcheckers")
public class UnversionedTypeSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {

private CoderTypeSerializer<T> serializer = null;
private @Nullable CoderTypeSerializer<T> serializer;

/** Needs to be public to work with {@link VersionedIOReadableWritable}. */
public UnversionedTypeSerializerSnapshot() {}
public UnversionedTypeSerializerSnapshot() {
this(null);
}

@SuppressWarnings("initialization")
public UnversionedTypeSerializerSnapshot(CoderTypeSerializer<T> serializer) {
Expand All @@ -42,29 +46,30 @@ public UnversionedTypeSerializerSnapshot(CoderTypeSerializer<T> serializer) {

@Override
public int getCurrentVersion() {
// We always return the same version
return 1;
}

@Override
public void writeSnapshot(DataOutputView dataOutputView) throws IOException {
byte[] bytes = SerializableUtils.serializeToByteArray(serializer);
dataOutputView.writeInt(getCurrentVersion());
dataOutputView.writeInt(bytes.length);
dataOutputView.write(bytes);
}

@SuppressWarnings("unchecked")
@Override
public void readSnapshot(int i, DataInputView dataInputView, ClassLoader classLoader)
public void readSnapshot(int version, DataInputView dataInputView, ClassLoader classLoader)
throws IOException {
// discard version
dataInputView.readInt();
int length = dataInputView.readInt();
byte[] bytes = new byte[length];
dataInputView.readFully(bytes);
this.serializer =
(CoderTypeSerializer<T>)
SerializableUtils.deserializeFromByteArray(bytes, "CoderTypeSerializer");

try (TemporaryClassLoaderContext context = TemporaryClassLoaderContext.of(classLoader)) {
int length = dataInputView.readInt();
byte[] bytes = new byte[length];
dataInputView.readFully(bytes);
this.serializer =
(CoderTypeSerializer<T>)
SerializableUtils.deserializeFromByteArray(
bytes, CoderTypeSerializer.class.getName());
}
}

@Override
Expand All @@ -75,7 +80,7 @@ public TypeSerializer<T> restoreSerializer() {
@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {
// We assume compatibility because we don't have a way of checking schema compatibility

return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.types;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.construction.SerializablePipelineOptions;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.junit.Test;

public class UnversionedTypeSerializerSnapshotTest {

@Test
public void testSerialization() throws IOException {
PipelineOptions opts = PipelineOptionsFactory.create();
CoderTypeSerializer<Integer> serializer =
new CoderTypeSerializer<>(VarIntCoder.of(), new SerializablePipelineOptions(opts));
TypeSerializerSnapshot<Integer> snapshot = serializer.snapshotConfiguration();
assertTrue(snapshot instanceof UnversionedTypeSerializerSnapshot);
assertEquals(1, snapshot.getCurrentVersion());
DataOutputSerializer output = new DataOutputSerializer(1);
snapshot.writeSnapshot(output);
DataInputDeserializer input = new DataInputDeserializer();
input.setBuffer(output.wrapAsByteBuffer());
TypeSerializerSnapshot<Integer> emptySnapshot = new UnversionedTypeSerializerSnapshot<>();
emptySnapshot.readSnapshot(
snapshot.getCurrentVersion(), input, Thread.currentThread().getContextClassLoader());
assertEquals(emptySnapshot.restoreSerializer(), serializer);
}
}
Loading

0 comments on commit 1a728a0

Please sign in to comment.