Skip to content

Conversation

@fengjiajie
Copy link
Contributor

In my use case with Iceberg 1.3, I have a Flink function-1 that outputs a DataStream<DataFile>, which is then processed by the next function. The simplified code for function-1 is as follows:

// Inside function-1:

        Map<Integer, Long> columnSizes  = new HashMap<>();
        columnSizes.put(1, 234L);
        DataFile dataFile = DataFiles.builder(icebergTable.spec())
            .withMetrics(new Metrics(123L, columnSizes, ...))
            ...
            .build();

        // Move file to new path, then rebuild DataFile
        DataFile newDataFile = DataFiles.builder(icebergTable.spec())
            .copy(dataFile)
            .withPath("file:///new_path")
            .build();

If I return dataFile, Flink's Kryo framework can deserialize it correctly in the next function. However, if I return newDataFile (reconstructed with copy), Kryo fails with the following exception:

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
  ...
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	... 4 more
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
columnSizes (org.apache.iceberg.GenericDataFile)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	...
Caused by: java.lang.UnsupportedOperationException
	at java.util.Collections$UnmodifiableMap.put(Collections.java:1459)
	...

This issue arises in Iceberg 1.15 but not in 1.13. The root cause lies in the toReadableMap method of org.apache.iceberg.BaseFile:

// Iceberg 1.13:
  private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
    return map instanceof SerializableMap ? ((SerializableMap)map).immutableMap() : map;
  }

// Iceberg 1.15:
  private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
    if (map == null) {
      return null;
    } else if (map instanceof SerializableMap) {
      return ((SerializableMap<K, V>) map).immutableMap();
    } else {
      return Collections.unmodifiableMap(map);
    }
  }

In Iceberg 1.15, toReadableMap wraps the map with Collections.unmodifiableMap, resulting in an UnsupportedOperationException during deserialization. While using unmodifiableMap seems correct, the copy operation might need to reconstruct these maps as regular mutable maps to avoid this issue.

@github-actions github-actions bot added the core label Jul 14, 2024
@fengjiajie fengjiajie force-pushed the copy_map branch 2 times, most recently from 4206b48 to fd9f1ed Compare July 14, 2024 03:53
@fengjiajie
Copy link
Contributor Author

Hi @pvary, I noticed you've worked on changes related to toReadableMap before. Could you take a look at this pull request and provide some feedback?

This pull request aims to ensure that copied DataFile objects behave the same way as normally constructed ones. Currently, the columnSizes field in a copied DataFile is an UnmodifiableMap, while it's a HashMap in a regularly constructed one. This change makes the copied DataFile consistent and easier for Flink's Kryo serialization to handle.

@pvary
Copy link
Contributor

pvary commented Jul 15, 2024

@fengjiajie: This change was introduced, because in older Iceberg versions the different statistics behaved differently. See: #7643

The goal is that the DataFile should be immutable.

I have 2 main issues with the proposed change:

  • It make the statistics mutable
  • It would introduce an extra copy of the statistics maps, which is problematic from performance reasons.

I think the main issue is that in your code the generated statistics maps are not stored in SerializableMap and SerializableByteBufferMap objects. This is the reason why the copy method falls back wrapping it to Collections.unmodifiableMap(map) instead of calling map.immutableMap(). The result of the former is not serializable with Kryo, while the result of the later should work.

@fengjiajie
Copy link
Contributor Author

@fengjiajie: This change was introduced, because in older Iceberg versions the different statistics behaved differently. See: #7643

The goal is that the DataFile should be immutable.

I have 2 main issues with the proposed change:

  • It make the statistics mutable
  • It would introduce an extra copy of the statistics maps, which is problematic from performance reasons.

I think the main issue is that in your code the generated statistics maps are not stored in SerializableMap and SerializableByteBufferMap objects. This is the reason why the copy method falls back wrapping it to Collections.unmodifiableMap(map) instead of calling map.immutableMap(). The result of the former is not serializable with Kryo, while the result of the later should work.

@pvary Thank you for providing the context behind this issue, I agree with keeping DataFile immutable. However, I have a few points to add:

  1. In the example above, I used new HashMap for simplicity. In reality, the Metrics object is obtained through org.apache.iceberg.parquet.ParquetUtil's fileMetrics method, which also uses HashMap internally.
  2. Even if I modify ParquetUtil.fileMetrics to return a SerializableMap, when calling DataFiles.builder's copy() method, this SerializableMap is still processed by toReadableMap (returning ((SerializableMap<K, V>) map).immutableMap()), and the result is still an UnmodifiableMap, which fails during Kryo deserialization.

After considering this, it seems the core issue lies in Flink's Kryo being unable to deserialize UnmodifiableMap. I can resolve this by registering a custom serializer with Flink:

Class<?> unmodifiableMapClass = Class.forName("java.util.Collections$UnmodifiableMap");
env.getConfig().addDefaultKryoSerializer(unmodifiableMapClass, UnmodifiableCollectionsSerializer.class);

I've tested this and it works. Does ParquetUtil.fileMetrics still need to be modified to return SerializableMap? If not, I will close this pull request.

@fengjiajie fengjiajie closed this Jul 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants