diff --git a/distributed/src/main/scala/org/dbpedia/extraction/dump/extract/DistConfigLoader.scala b/distributed/src/main/scala/org/dbpedia/extraction/dump/extract/DistConfigLoader.scala index e51ee4f..4c690c2 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/dump/extract/DistConfigLoader.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/dump/extract/DistConfigLoader.scala @@ -80,17 +80,17 @@ class DistConfigLoader(config: DistConfig, sparkContext: SparkContext) val articlesRDD: RDD[WikiPage] = try { if (!cache.exists) - throw new IOException("Cache file " + cache + " does not exist.") - logger.info("Loading articles from cache file " + cache) + throw new IOException("Cache file " + cache.getSchemeWithFileName + " does not exist.") + logger.info("Loading articles from cache file " + cache.getSchemeWithFileName) val loaded = DistIOUtils.loadRDD(sparkContext, classOf[WikiPage], cache) - logger.info("WikiPages loaded from cache file " + cache) + logger.info("WikiPages loaded from cache file " + cache.getSchemeWithFileName) loaded } catch { case ex: Exception => { - logger.log(Level.INFO, "Will read from wiki dump file for " + lang.wikiCode + " wiki, could not load cache file '" + cache + "': " + ex) + logger.log(Level.INFO, "Will read from wiki dump file for " + lang.wikiCode + " wiki, could not load cache file '" + cache.getSchemeWithFileName + "': " + ex) // Create RDD with WikiPageWritable elements. val rawArticlesRDD: RDD[(LongWritable, WikiPageWritable)] = @@ -109,7 +109,7 @@ class DistConfigLoader(config: DistConfig, sparkContext: SparkContext) if (config.cacheWikiPageRDD) { DistIOUtils.saveRDD(newRdd, cache) - logger.info("Parsed WikiPages written to cache file " + cache) + logger.info("Parsed WikiPages written to cache file " + cache.getSchemeWithFileName) } newRdd diff --git a/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistDisambiguations.scala b/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistDisambiguations.scala index b429cde..800b41c 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistDisambiguations.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistDisambiguations.scala @@ -36,19 +36,19 @@ object DistDisambiguations } catch { - case ex : Exception => logger.log(Level.INFO, "Will extract disambiguations from source for "+lang.wikiCode+" wiki, could not load cache file '"+cache+"': "+ex) + case ex : Exception => logger.log(Level.INFO, "Will extract disambiguations from source for "+lang.wikiCode+" wiki, could not load cache file '"+cache.getSchemeWithFileName+"': "+ex) } val disambiguations = Disambiguations.loadFromFile(reader, lang) val dir = cache.getParent - if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir + "] does not exist and cannot be created") + if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir.getSchemeWithFileName + "] does not exist and cannot be created") val output = new Output(new BufferedOutputStream(cache.outputStream())) try { DistIOUtils.getKryoInstance.writeClassAndObject(output, disambiguations.set) - logger.info(disambiguations.set.size + " disambiguations written to cache file " + cache) + logger.info(disambiguations.set.size + " disambiguations written to cache file " + cache.getSchemeWithFileName) disambiguations } finally @@ -62,12 +62,12 @@ object DistDisambiguations */ private def loadFromCache(cache : Path)(implicit hadoopConf: Configuration) : Disambiguations = { - logger.info("Loading disambiguations from cache file " + cache) + logger.info("Loading disambiguations from cache file " + cache.getSchemeWithFileName) val input = new Input(new BufferedInputStream(cache.inputStream())) try { val disambiguations = new Disambiguations(DistIOUtils.getKryoInstance.readClassAndObject(input).asInstanceOf[Set[Long]]) - logger.info(disambiguations.set.size + " disambiguations loaded from cache file " + cache) + logger.info(disambiguations.set.size + " disambiguations loaded from cache file " + cache.getSchemeWithFileName) disambiguations } finally diff --git a/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistRedirects.scala b/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistRedirects.scala index 5221ed3..c91eacc 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistRedirects.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/mappings/DistRedirects.scala @@ -53,19 +53,19 @@ object DistRedirects } catch { - case ex: Exception => logger.log(Level.INFO, "Will extract redirects from source for " + lang.wikiCode + " wiki, could not load cache file '" + cache + "': " + ex) + case ex: Exception => logger.log(Level.INFO, "Will extract redirects from source for " + lang.wikiCode + " wiki, could not load cache file '" + cache.getSchemeWithFileName + "': " + ex) } //Load redirects from RDD val redirects = loadFromRDD(rdd, lang) val dir = cache.getParent - if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir + "] does not exist and cannot be created") + if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir.getSchemeWithFileName + "] does not exist and cannot be created") val output = new Output(new BufferedOutputStream(cache.outputStream())) try { DistIOUtils.getKryoInstance.writeClassAndObject(output, redirects.map) - logger.info(redirects.map.size + " redirects written to cache file " + cache) + logger.info(redirects.map.size + " redirects written to cache file " + cache.getSchemeWithFileName) redirects } finally @@ -79,12 +79,12 @@ object DistRedirects */ private def loadFromCache(cache: Path)(implicit hadoopConf: Configuration): Redirects = { - logger.info("Loading redirects from cache file " + cache) + logger.info("Loading redirects from cache file " + cache.getSchemeWithFileName) val input = new Input(new BufferedInputStream(cache.inputStream())) try { val redirects = new Redirects(DistIOUtils.getKryoInstance.readClassAndObject(input).asInstanceOf[Map[String, String]]) - logger.info(redirects.map.size + " redirects loaded from cache file " + cache) + logger.info(redirects.map.size + " redirects loaded from cache file " + cache.getSchemeWithFileName) redirects } finally diff --git a/distributed/src/main/scala/org/dbpedia/extraction/util/RichHadoopPath.scala b/distributed/src/main/scala/org/dbpedia/extraction/util/RichHadoopPath.scala index 944165f..5d3291e 100644 --- a/distributed/src/main/scala/org/dbpedia/extraction/util/RichHadoopPath.scala +++ b/distributed/src/main/scala/org/dbpedia/extraction/util/RichHadoopPath.scala @@ -75,4 +75,6 @@ class RichHadoopPath(path: Path, conf: Configuration) extends FileLike[Path] { override def outputStream(append: Boolean = false): OutputStream = if(append) fs.append(path) else fs.create(path) def mkdirs(): Boolean = fs.mkdirs(path) + + def getSchemeWithFileName: String = fs.getScheme + "://" + path.toUri.getPath }