@@ -42,8 +42,6 @@ to specify the state's name, as well as information about the type of the state.
42
42
It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
43
43
simply by directly instantiating the ` StateDescriptor ` with your own ` TypeSerializer ` implementation:
44
44
45
- {{< tabs "ee215ff6-2e21-4a40-a1b4-7f114560546f" >}}
46
- {{< tab "Java" >}}
47
45
``` java
48
46
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String , Integer > > {... };
49
47
@@ -54,20 +52,6 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =
54
52
55
53
checkpointedState = getRuntimeContext(). getListState(descriptor);
56
54
```
57
- {{< /tab >}}
58
- {{< tab "Scala" >}}
59
- ``` scala
60
- class CustomTypeSerializer extends TypeSerializer [(String , Integer )] {...}
61
-
62
- val descriptor = new ListStateDescriptor [(String , Integer )](
63
- " state-name" ,
64
- new CustomTypeSerializer )
65
- )
66
-
67
- checkpointedState = getRuntimeContext.getListState(descriptor)
68
- ```
69
- {{< /tab >}}
70
- {{< /tabs >}}
71
55
72
56
## State serializers and schema evolution
73
57
@@ -151,7 +135,7 @@ To wrap up, this section concludes how Flink, or more specifically the state bac
151
135
abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal
152
136
to the implementation of state serializers and their serializer snapshots.
153
137
154
- #### Off-heap state backends (e.g. ` RocksDBStateBackend ` )
138
+ #### Off-heap state backends (e.g. ` EmbeddedRocksDBStateBackend ` )
155
139
156
140
1 . ** Register new state with a state serializer that has schema _ A_ **
157
141
- the registered ` TypeSerializer ` for the state is used to read / write state on every state access.
@@ -172,7 +156,7 @@ to the implementation of state serializers and their serializer snapshots.
172
156
of the accessed state is migrated all-together before processing continues.
173
157
- If the resolution signals incompatibility, then the state access fails with an exception.
174
158
175
- #### Heap state backends (e.g. ` MemoryStateBackend ` , ` FsStateBackend ` )
159
+ #### Heap state backends (e.g. ` HashMapStateBackend ` )
176
160
177
161
1 . ** Register new state with a state serializer that has schema _ A_ **
178
162
- the registered ` TypeSerializer ` is maintained by the state backend.
@@ -284,6 +268,7 @@ public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot
284
268
}
285
269
```
286
270
271
+
287
272
When implementing a new serializer snapshot as a subclass of ` CompositeTypeSerializerSnapshot ` ,
288
273
the following three methods must be implemented:
289
274
* ` #getCurrentOuterSnapshotVersion() ` : This method defines the version of
@@ -311,6 +296,7 @@ has outer snapshot information, then all three methods must be implemented.
311
296
Below is an example of how the ` CompositeTypeSerializerSnapshot ` is used for composite serializer snapshots
312
297
that do have outer snapshot information, using Flink's ` GenericArraySerializer ` as an example:
313
298
299
+
314
300
``` java
315
301
public final class GenericArraySerializerSnapshot <C> extends CompositeTypeSerializerSnapshot<C [ ], GenericArraySerializer > {
316
302
@@ -365,6 +351,7 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
365
351
}
366
352
```
367
353
354
+
368
355
There are two important things to notice in the above code snippet. First of all, since this
369
356
` CompositeTypeSerializerSnapshot ` implementation has outer snapshot information that is written as part of the snapshot,
370
357
the outer snapshot version, as defined by ` getCurrentOuterSnapshotVersion() ` , must be upticked whenever the
@@ -449,18 +436,18 @@ migrate from the old abstractions. The steps to do this is as follows:
449
436
450
437
This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.19.
451
438
452
- Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
453
- (maybe in Flink library) has to meet the future need.
454
- Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T > newSerializer) of the old serializer has to be modified.
455
- There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
439
+ Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
440
+ (maybe in Flink library) has to meet the future need.
441
+ Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T > newSerializer) of the old serializer has to be modified.
442
+ There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
456
443
not supported in some scenarios.
457
444
458
- So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
459
- ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) ` is now removed and needs to be replaced with
445
+ So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
446
+ ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) ` is now removed and needs to be replaced with
460
447
` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot) ` .
461
448
To make this transition, follow these steps:
462
449
463
- 1 . Implement the ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot) ` whose logic
450
+ 1 . Implement the ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot) ` whose logic
464
451
should be same as the original ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) ` .
465
452
2 . Remove the old method ` TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) ` .
466
453
0 commit comments