Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,17 @@ class DistConfigLoader(config: DistConfig, sparkContext: SparkContext)
val articlesRDD: RDD[WikiPage] = try
{
if (!cache.exists)
throw new IOException("Cache file " + cache + " does not exist.")
logger.info("Loading articles from cache file " + cache)
throw new IOException("Cache file " + cache.getSchemeWithFileName + " does not exist.")
logger.info("Loading articles from cache file " + cache.getSchemeWithFileName)
val loaded = DistIOUtils.loadRDD(sparkContext, classOf[WikiPage], cache)
logger.info("WikiPages loaded from cache file " + cache)
logger.info("WikiPages loaded from cache file " + cache.getSchemeWithFileName)
loaded
}
catch
{
case ex: Exception =>
{
logger.log(Level.INFO, "Will read from wiki dump file for " + lang.wikiCode + " wiki, could not load cache file '" + cache + "': " + ex)
logger.log(Level.INFO, "Will read from wiki dump file for " + lang.wikiCode + " wiki, could not load cache file '" + cache.getSchemeWithFileName + "': " + ex)

// Create RDD with WikiPageWritable elements.
val rawArticlesRDD: RDD[(LongWritable, WikiPageWritable)] =
Expand All @@ -109,7 +109,7 @@ class DistConfigLoader(config: DistConfig, sparkContext: SparkContext)
if (config.cacheWikiPageRDD)
{
DistIOUtils.saveRDD(newRdd, cache)
logger.info("Parsed WikiPages written to cache file " + cache)
logger.info("Parsed WikiPages written to cache file " + cache.getSchemeWithFileName)
}

newRdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ object DistDisambiguations
}
catch
{
case ex : Exception => logger.log(Level.INFO, "Will extract disambiguations from source for "+lang.wikiCode+" wiki, could not load cache file '"+cache+"': "+ex)
case ex : Exception => logger.log(Level.INFO, "Will extract disambiguations from source for "+lang.wikiCode+" wiki, could not load cache file '"+cache.getSchemeWithFileName+"': "+ex)
}

val disambiguations = Disambiguations.loadFromFile(reader, lang)

val dir = cache.getParent
if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir + "] does not exist and cannot be created")
if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir.getSchemeWithFileName + "] does not exist and cannot be created")
val output = new Output(new BufferedOutputStream(cache.outputStream()))

try
{
DistIOUtils.getKryoInstance.writeClassAndObject(output, disambiguations.set)
logger.info(disambiguations.set.size + " disambiguations written to cache file " + cache)
logger.info(disambiguations.set.size + " disambiguations written to cache file " + cache.getSchemeWithFileName)
disambiguations
}
finally
Expand All @@ -62,12 +62,12 @@ object DistDisambiguations
*/
private def loadFromCache(cache : Path)(implicit hadoopConf: Configuration) : Disambiguations =
{
logger.info("Loading disambiguations from cache file " + cache)
logger.info("Loading disambiguations from cache file " + cache.getSchemeWithFileName)
val input = new Input(new BufferedInputStream(cache.inputStream()))
try
{
val disambiguations = new Disambiguations(DistIOUtils.getKryoInstance.readClassAndObject(input).asInstanceOf[Set[Long]])
logger.info(disambiguations.set.size + " disambiguations loaded from cache file " + cache)
logger.info(disambiguations.set.size + " disambiguations loaded from cache file " + cache.getSchemeWithFileName)
disambiguations
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ object DistRedirects
}
catch
{
case ex: Exception => logger.log(Level.INFO, "Will extract redirects from source for " + lang.wikiCode + " wiki, could not load cache file '" + cache + "': " + ex)
case ex: Exception => logger.log(Level.INFO, "Will extract redirects from source for " + lang.wikiCode + " wiki, could not load cache file '" + cache.getSchemeWithFileName + "': " + ex)
}

//Load redirects from RDD
val redirects = loadFromRDD(rdd, lang)

val dir = cache.getParent
if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir + "] does not exist and cannot be created")
if (!dir.exists && !dir.mkdirs()) throw new IOException("cache dir [" + dir.getSchemeWithFileName + "] does not exist and cannot be created")
val output = new Output(new BufferedOutputStream(cache.outputStream()))
try
{
DistIOUtils.getKryoInstance.writeClassAndObject(output, redirects.map)
logger.info(redirects.map.size + " redirects written to cache file " + cache)
logger.info(redirects.map.size + " redirects written to cache file " + cache.getSchemeWithFileName)
redirects
}
finally
Expand All @@ -79,12 +79,12 @@ object DistRedirects
*/
private def loadFromCache(cache: Path)(implicit hadoopConf: Configuration): Redirects =
{
logger.info("Loading redirects from cache file " + cache)
logger.info("Loading redirects from cache file " + cache.getSchemeWithFileName)
val input = new Input(new BufferedInputStream(cache.inputStream()))
try
{
val redirects = new Redirects(DistIOUtils.getKryoInstance.readClassAndObject(input).asInstanceOf[Map[String, String]])
logger.info(redirects.map.size + " redirects loaded from cache file " + cache)
logger.info(redirects.map.size + " redirects loaded from cache file " + cache.getSchemeWithFileName)
redirects
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ class RichHadoopPath(path: Path, conf: Configuration) extends FileLike[Path] {
override def outputStream(append: Boolean = false): OutputStream = if(append) fs.append(path) else fs.create(path)

def mkdirs(): Boolean = fs.mkdirs(path)

def getSchemeWithFileName: String = fs.getScheme + "://" + path.toUri.getPath
}