Skip to content

Conversation

@alexeykudinkin
Copy link
Contributor

Tips

What is the purpose of the pull request

Avoid including whole MultipleSparkJobExecutionStrategy object into the closure for Spark to serialize

Brief change log

See above

Verify this pull request

This pull request is a trivial rework / code cleanup without any test coverage.
This pull request is already covered by existing tests, such as (please describe tests).

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

Alexey Kudinkin added 2 commits March 4, 2022 14:15
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good catch and a nice fix!

@yihua yihua self-assigned this Mar 4, 2022
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@alexeykudinkin
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

hudi-bot commented Mar 5, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan merged commit f0bcee3 into apache:master Mar 7, 2022
@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Mar 7, 2022
@boneanxs
Copy link
Contributor

boneanxs commented Mar 8, 2022

Hi guys, I also met this exception when enable async clustering in a HoodieSparkStreaming job, not the same as the stacktrace this issue hit, following is the stacktrace I met,

 ERROR AsyncClusteringService: Clustering executor failed java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task not serializable 
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) 
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) 
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) 
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) 
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2467) 
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911) 
at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103) 
at org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99) 
at org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45) 
at org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115) 
at org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68) 
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175) 
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more

Caused by: java.util.ConcurrentModificationException 
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) 
at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) 
at java.util.HashSet.writeObject(HashSet.java:287) 
at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) 
at org.apache.spark.serializer.JavaSerializerInstance

From my perspective, it might be TypedProperties#keys is not thread safe, and another thread is trying to change this HashSet(like put or putall from TypedProperties) while spark is trying to iterate it to serialize it at the same time. TypedProperties could be used by HoodieTable's config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to be serialized.

But when I'm trying to solve it with the same way as this pr used, Unfortunately found there could be a lot changes to avoid serializing HoodieTable (Change construction methods of BulkInsertMapFunction, SparkLazyInsertIterable, HoodieLazyInsertIterable, and many kinds of WriteHandler), I'm afraid this could be a huge change.

Another solution is to make TypedProperties thread-safe, there are two ways to make TypedProperties thread-safe

  1. Only change keys to be Collections.newSetFromMap(new ConcurrentHashMap<>()), this could avoid ConcurrentModificationException, but TypedProperties is not really thread-safe, as modify attribute keys and save key-value pair is divided into two steps, for example,
// Synchronized is not work actually, because get methods are not synchronized
  public synchronized Object put(Object key, Object value) {
    keys.remove(key);
    keys.add(key);
   // This could cause key is added in keys, but its value is not saved by TypedProperties
    return super.put(key, value);
  }
  1. Not let TypedProperties to extend Properties, use an internal ConcurrentHashMap to save key and values, this could make TypedProperties to be real thread-safe.
public class TypedProperties implements Serializable {

  private final ConcurrentHashMap<Object, Object> props = new ConcurrentHashMap<Object, Object>();

  public TypedProperties() {

  }

  public TypedProperties(Properties defaults) {
    if (Objects.nonNull(defaults)) {
      for (String key : defaults.stringPropertyNames()) {
        put(key, defaults.getProperty(key));
      }
    }
  }

  public Enumeration<Object> keys() {
    return Collections.enumeration(props.keySet());
  }
...

Do you guys have any other suggestions? Thanks~

@alexeykudinkin
Copy link
Contributor Author

alexeykudinkin commented Mar 8, 2022

@boneanxs can you create a JIRA for your issue so that we can keep track of it and concentrate all of the conversation in there.

@boneanxs
Copy link
Contributor

boneanxs commented Mar 9, 2022

@alexeykudinkin, @xushiyan, @yihua Sure, created a JIRA ticket: https://issues.apache.org/jira/browse/HUDI-3593, looking forward to get your feedback:-)

vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
… object into the closure for Spark to serialize (apache#4954)

- Avoid including whole MultipleSparkJobExecutionStrategy object into the closure for Spark to serialize
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
… object into the closure for Spark to serialize (apache#4954)

- Avoid including whole MultipleSparkJobExecutionStrategy object into the closure for Spark to serialize
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants