From c5f7c4978065b3fc16dbe4f9470af216c0751845 Mon Sep 17 00:00:00 2001 From: Nilesh Chakraborty Date: Thu, 10 Jul 2014 20:36:03 +0530 Subject: [PATCH 1/4] Remove all synchronization from OutputFormats and docs for that. --- .../output/DBpediaCompositeOutputFormat.scala | 7 +++++++ .../output/DBpediaDatasetOutputFormat.scala | 21 +++++++++---------- .../io/output/MultipleTextOutputFormat.scala | 5 +++++ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala index b60227b..21c7787 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala @@ -27,6 +27,8 @@ import org.dbpedia.extraction.util.ConfigUtils * 3. Also, the output needs to be grouped by dataset such that each key is a Text representing the dataset * to which the Quads in the value belong to. Example key: article_categories * +* NOTE: When using this with Spark set only one core per worker. +* * Output will look like Hadoop leaf files (eg. part-r-00000) inside directories like enwiki-20140614-article-categories.tql. * The files will be compressed using the specified compression codec. * @@ -52,6 +54,11 @@ class DBpediaCompositeOutputFormat extends TextOutputFormat[Text, QuadSeqWritabl UriPolicy.parseFormats(config, "uri-policy", "format") } + /** + * Note: This method is not synchronized, keeping with the rest of the Hadoop code in this framework. + * When using this with Spark set only one core per worker to ensure that only one thread accesses + * this method per JVM. + */ override def write(key: Text, value: QuadSeqWritable) { for ((suffix, format) <- formatters) diff --git a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala index b29fa4c..ec4b062 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala @@ -94,26 +94,25 @@ class DBpediaDatasetOutputFormat(langWikiCode: String, lineWriter.write(nullKey, text) /** - * Note: using synchronization here is *probably* not strictly necessary, but without it, different sequences of quads - * may be interleaved, which is harder to read and makes certain parsing optimizations impossible. + * Note: This method is not synchronized, keeping with the rest of the Hadoop code in this framework. + * When using this with Spark, set only one core per worker to ensure that only one thread accesses + * this method per JVM. */ override def write(key: Text, value: QuadSeqWritable) = - synchronized + { + for (quad <- value.get) { - for (quad <- value.get) - { - text.set(formatter.render(quad).dropRight(1)) // remove newline from rendered output - lineWriter.write(nullKey, text) - } + text.set(formatter.render(quad).dropRight(1)) // remove newline from rendered output + lineWriter.write(nullKey, text) } + } override def close(context: TaskAttemptContext) = - synchronized - { + { text.set(formatter.footer.dropRight(1)) // remove newline from footer lineWriter.write(nullKey, text) lineWriter.close(context) - } + } } } diff --git a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/MultipleTextOutputFormat.scala b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/MultipleTextOutputFormat.scala index 27c23ef..c47bb6c 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/MultipleTextOutputFormat.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/MultipleTextOutputFormat.scala @@ -26,6 +26,11 @@ class MultipleTextOutputFormat[K, V] extends TextOutputFormat[K, V] { private val recordWriters = mutable.Map[String, RecordWriter[K, V]]() + /** + * Note: This method is not synchronized, keeping with the rest of the Hadoop code in this framework. + * When using this with Spark, set only one core per worker to ensure that only one thread accesses + * this method per JVM. + */ override def write(key: K, value: V) { // Generate the path depending upon key-value pair From 24ed4a2e3516f231242aee0f1cf470f19c70c1e2 Mon Sep 17 00:00:00 2001 From: Nilesh Chakraborty Date: Thu, 10 Jul 2014 20:38:55 +0530 Subject: [PATCH 2/4] Small formatting fixes. --- .../output/DBpediaCompositeOutputFormat.scala | 50 +++++++++---------- .../output/DBpediaDatasetOutputFormat.scala | 32 ++++++------ 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala index 21c7787..0738c65 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaCompositeOutputFormat.scala @@ -9,31 +9,31 @@ import org.dbpedia.extraction.destinations.formatters.UriPolicy import org.dbpedia.extraction.util.ConfigUtils /** -* OutputFormat implementation that uses the configured Formatters to write Quads to respective datasets -* through the DBpediaDatasetOutputFormat class. This class uses as many DBpediaDatasetOutputFormat objects -* as there are configured formats. Formats are read in from the provided extraction config properties file. -* This class handles configuration and Formatters, while DBpediaDatasetOutputFormat handles dividing the Quads -* into datasets. -* -* 1. To use this OutputFormat three Strings need to be set in Hadoop's Configuration: -* dbpedia.wiki.name - Config.wikiName, the wiki suffix (eg. wiki) -* dbpedia.wiki.language.wikicode - Language wiki code of the input wiki dump -* dbpedia.wiki.date - Wiki dump date in YYYYMMDD format -* dbpedia.output.overwrite - Boolean, if set to true, output files will be overwritten if they already exist, -* or else an IOException will be thrown (which is also the default behaviour) - this is actually for MultipleTextOutputFormat -* -* 2. The extraction config properties file needs to be added to the distributed cache. -* -* 3. Also, the output needs to be grouped by dataset such that each key is a Text representing the dataset -* to which the Quads in the value belong to. Example key: article_categories -* -* NOTE: When using this with Spark set only one core per worker. -* -* Output will look like Hadoop leaf files (eg. part-r-00000) inside directories like enwiki-20140614-article-categories.tql. -* The files will be compressed using the specified compression codec. -* -* @see DBpediaDatasetOutputFormat -*/ + * OutputFormat implementation that uses the configured Formatters to write Quads to respective datasets + * through the DBpediaDatasetOutputFormat class. This class uses as many DBpediaDatasetOutputFormat objects + * as there are configured formats. Formats are read in from the provided extraction config properties file. + * This class handles configuration and Formatters, while DBpediaDatasetOutputFormat handles dividing the Quads + * into datasets. + * + * 1. To use this OutputFormat three Strings need to be set in Hadoop's Configuration: + * dbpedia.wiki.name - Config.wikiName, the wiki suffix (eg. wiki) + * dbpedia.wiki.language.wikicode - Language wiki code of the input wiki dump + * dbpedia.wiki.date - Wiki dump date in YYYYMMDD format + * dbpedia.output.overwrite - Boolean, if set to true, output files will be overwritten if they already exist, + * or else an IOException will be thrown (which is also the default behaviour) - this is actually for MultipleTextOutputFormat + * + * 2. The extraction config properties file needs to be added to the distributed cache. + * + * 3. Also, the output needs to be grouped by dataset such that each key is a Text representing the dataset + * to which the Quads in the value belong to. Example key: article_categories + * + * NOTE: When using this with Spark set only one core per worker. + * + * Output will look like Hadoop leaf files (eg. part-r-00000) inside directories like enwiki-20140614-article-categories.tql. + * The files will be compressed using the specified compression codec. + * + * @see DBpediaDatasetOutputFormat + */ class DBpediaCompositeOutputFormat extends TextOutputFormat[Text, QuadSeqWritable] { private val WIKI = "dbpedia.wiki.name" diff --git a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala index ec4b062..f68e505 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/spark/io/output/DBpediaDatasetOutputFormat.scala @@ -9,19 +9,19 @@ import java.io.DataOutputStream import org.apache.hadoop.io.compress.CompressionCodec /** -* OutputFormat implementation that writes Quads to respective datasets depending upon the key, after applying -* a given Formatter. This class extends MultipleTextOutputFormat which allows it to write to multiple locations -* (for multiple datasets) depending upon custom criteria. -* -* The output needs to be grouped by dataset such that each key is a Text representing the dataset to which -* the Quads in the value belong to. Example key: article_categories -* -* @param langWikiCode Language wiki code of the input wiki dump -* @param wikiNameSuffix Config.wikiName (eg. wiki) -* @param date Wiki dump date in YYYYMMDD format -* @param outputSuffix Output suffix corresponding to formatter (eg. tql) -* @param formatter Formatter object used to render the Quad objects according to a specific format -*/ + * OutputFormat implementation that writes Quads to respective datasets depending upon the key, after applying + * a given Formatter. This class extends MultipleTextOutputFormat which allows it to write to multiple locations + * (for multiple datasets) depending upon custom criteria. + * + * The output needs to be grouped by dataset such that each key is a Text representing the dataset to which + * the Quads in the value belong to. Example key: article_categories + * + * @param langWikiCode Language wiki code of the input wiki dump + * @param wikiNameSuffix Config.wikiName (eg. wiki) + * @param date Wiki dump date in YYYYMMDD format + * @param outputSuffix Output suffix corresponding to formatter (eg. tql) + * @param formatter Formatter object used to render the Quad objects according to a specific format + */ class DBpediaDatasetOutputFormat(langWikiCode: String, wikiNameSuffix: String, date: String, @@ -109,9 +109,9 @@ class DBpediaDatasetOutputFormat(langWikiCode: String, override def close(context: TaskAttemptContext) = { - text.set(formatter.footer.dropRight(1)) // remove newline from footer - lineWriter.write(nullKey, text) - lineWriter.close(context) + text.set(formatter.footer.dropRight(1)) // remove newline from footer + lineWriter.write(nullKey, text) + lineWriter.close(context) } } From 87f1743ebe330700d68f07e6196fc8c449178a74 Mon Sep 17 00:00:00 2001 From: Nilesh Chakraborty Date: Fri, 11 Jul 2014 20:08:42 +0530 Subject: [PATCH 3/4] Set number of worker cores to 1 to prevent bz2 decompression problems --- .../src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala b/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala index e06f9f1..73b0aa3 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala @@ -76,6 +76,7 @@ object SparkUtils conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "org.dbpedia.extraction.spark.serialize.KryoExtractionRegistrator") conf.set("spark.kryoserializer.buffer.mb", "100") + conf.set("spark.cores.max", "1") sc = new SparkContext(conf) // No logging is done upon omitting 'with Logging' - some package problem? setLogLevels(Level.INFO, Seq("org.apache.spark.ui.jobs.DBpediaJobProgressListener")) From ea40d081bd35f788d37c0bd01ebd1fac174e9541 Mon Sep 17 00:00:00 2001 From: Nilesh Chakraborty Date: Fri, 11 Jul 2014 21:56:45 +0530 Subject: [PATCH 4/4] Revert "Set number of worker cores to 1 to prevent bz2 decompression problems" I mistakenly thought that "spark.cores.max" means max. number of cores per worker. It's actually max. cores to use in the whole cluster. This reverts commit 87f1743ebe330700d68f07e6196fc8c449178a74. --- .../src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala b/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala index 73b0aa3..e06f9f1 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/util/SparkUtils.scala @@ -76,7 +76,6 @@ object SparkUtils conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "org.dbpedia.extraction.spark.serialize.KryoExtractionRegistrator") conf.set("spark.kryoserializer.buffer.mb", "100") - conf.set("spark.cores.max", "1") sc = new SparkContext(conf) // No logging is done upon omitting 'with Logging' - some package problem? setLogLevels(Level.INFO, Seq("org.apache.spark.ui.jobs.DBpediaJobProgressListener"))