Skip to content

Conversation

@nilesh-c
Copy link
Member

I am removing all synchronization from the OutputFormat stuff because of the following reasons:

  • In any case we will need to use Apache Spark in a single-threaded manner with 1 thread per worker (and 1 worker per core so that we can still use the cores) because Hadoop's bz2 InputStream - CBzip2InputStream is not thread-safe. There's a JIRA on that that is fixed and merged to trunk. At least until Hadoop's next version this stays as it is.
  • We are already guaranteeing single-threaded execution, so needless synchronization will simply cause problems.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you create the map with all the different formats when you declare recordWrites and make it immutable will solve the problem? Or the problem rises when another class uses DBpediaCompositeOutputFormat.write()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are currently not having multiple threads access these methods, so we should be safe in any case.

But, let's say that Hadoop releases its new version with a thread-safe Bz2 decompression stream that lets users have the choice of having multiple threads per worker (JVM): in such a case we will need to synchronize access to the whole write() and close() methods, not just the Map.

And you might ask: Why not create the whole map of RecordWriters right when the class initializes? Doing it lazily prevents us from having too many RecordWriter instances. For each input split (can be hundreds of them, maybe a few thousands too?) the map of RecordWriters is initialized. This will cause even RecordWriters for unneeded datasets (which have no Quads coming in) to be created too - extra instances, additional GC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, @jimkont I have a question here, regarding the last paragraph of my last comment. Initializing ALL record writers does give us a GC overhead, but it allows us to write empty splits with only the formatter.header and formatter.footer that look like:

# started 2014-06-12T04:44:23Z
# completed 2014-06-12T04:44:23Z

This is how those files look like in the original framework. Currently in this case the dataset directories remain empty, instead of having files (output splits) looking like above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I should go for it? Or I could decide after benchmarking both methods.

@nilesh-c
Copy link
Member Author

Update: I just set the number of cores a worker should use to 1, directly in the code. This means no more Bz2 decompression thread issues. @sangv this should solve the users' headache regarding thread issues.

…problems"

I mistakenly thought that "spark.cores.max" means max. number of cores per worker. It's actually max. cores to use in the whole cluster.

This reverts commit 87f1743.
@nilesh-c
Copy link
Member Author

I mistakenly thought that "spark.cores.max" means max. number of cores per worker. It's actually max. cores to use in the whole cluster. :-(

sangv added a commit that referenced this pull request Jul 16, 2014
Remove all synchronization from OutputFormats
@sangv sangv merged commit 24b45cd into nildev2 Jul 16, 2014
@nilesh-c nilesh-c deleted the output-nothreads branch July 26, 2014 23:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants