Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.dbpedia.extraction.spark.io;

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.io.IOException;

import com.google.common.base.Charsets;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Taken from Apache Mahout.
* Reads records that are delimited by a specific begin/end tag.
*
* TODO: This does not work with compressed input. Add support for non-splittable and splittable compressed input.
*/
public class XmlInputFormat extends TextInputFormat {

private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);

public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
try {
return new XmlRecordReader((FileSplit) split, context.getConfiguration());
} catch (IOException ioe) {
log.warn("Error while creating XmlRecordReader", ioe);
return null;
}
}

/**
* XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
* by the start tag and end tag
*
*/
public static class XmlRecordReader extends RecordReader<LongWritable, Text> {

private final byte[] startTag;
private final byte[] endTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;

public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);

// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}

private boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}

@Override
public void close() throws IOException {
Closeables.close(fsin, true);
}

@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}

private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) {
return false;
}
// save to buffer:
if (withinBlock) {
buffer.write(b);
}

// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
currentKey = new LongWritable();
currentValue = new Text();
return next(currentKey, currentValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.dbpedia.extraction.destinations

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could modify MarkerDestination to use FileLike instead of File. Then you wouldn't need DistMarkerDestination at all. You would have to modify a few pieces of code that already use MarkerDestination, but adding an import of RichFile.wrapFile would probably be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I can do that. Looks like MarkerDestination is being used only in ConfigLoader.scala which already imports wrapFile. I guess we'd only need to change very little then, only in MarkerDestination? @jimkont shall I send a PR to let MarkerDestination use FileLike instead of File??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nilesh-c as youy probably figured out @jcsahnwaldt is one of the masterminds behind the DBPedia API ;) You can adjust MarkerDestination with a PR but we 'll have to test if it affects dump-based extraction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes I figured so. :-)

I just made the PR: dbpedia/extraction-framework#235


import org.apache.hadoop.fs.{FileSystem, Path}
import org.dbpedia.extraction.util.RichHadoopPath.wrapPath
import org.apache.hadoop.conf.Configuration
import java.io.IOException

/**
* A version of MarkerDestination that works with org.apache.hadoop.fs.Path.
*
* Handles a marker file that signals that the extraction is either running ('start mode')
* or finished ('end mode').
*
* In 'start mode', the file is created before the extraction starts (it must not already exist)
* and deleted after the extraction ends.
*
* In 'end mode', the file is deleted before the extraction starts (if it already exists)
* and re-created after the extraction ends.
*
* @param path Hadoop Path to marker file
* @param start 'start mode' if true, 'end mode' if false.
*
* @see MarkerDestination
*/
class DistMarkerDestination(destination: Destination, path: Path, start: Boolean, implicit val hadoopConf: Configuration)
extends WrapperDestination(destination)
{
private val fs: FileSystem = path.getFileSystem(hadoopConf)

override def open(): Unit = {
if (start) create() else delete()
super.open()
}

override def close(): Unit = {
super.close()
if (!start) create() else delete()
}

private def create(): Unit = {
if (fs.exists(path)) throw new IOException("file '"+path+"' already exists")
path.outputStream().close()
}

private def delete(): Unit = {
if (!fs.exists(path)) return
if (!path.deleteConfirm()) throw new IOException("failed to delete file '"+path+"'")
}

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,68 @@
package org.dbpedia.extraction.dump.extract

import java.net.InetAddress
import java.util.Properties
import scala.collection.JavaConversions.asScalaSet
import org.dbpedia.extraction.util.ConfigUtils.getValue
import org.apache.hadoop.fs.Path
import org.dbpedia.extraction.util.RichHadoopPath.wrapPath
import org.apache.hadoop.conf.Configuration

/**
* Class for distributed configuration
*/
class DistConfig(config: Properties)
{
// It is recommended that spark-home and spark-master are explicitly provided.
/** It is recommended that spark-home and spark-master are explicitly provided. */
val sparkHome = config.getProperty("spark-home", sys.env.get("SPARK_HOME").getOrElse(""))

// By default assume master is runnning locally; use 4 cores
/** By default assume master is runnning locally; use 4 cores */
val sparkMaster = config.getProperty("spark-master", "local[4]")

// Number of splits the initial RDD will be broken to - configure according to your cluster. Maybe total number of cores?
/** Number of splits the initial RDD will be broken to - configure according to your cluster. Maybe total number of cores? */
val sparkNumSlices = config.getProperty("spark-num-slices", "4").toInt

// Shows up on Spark Web UI
/** Shows up on Spark Web UI */
val sparkAppName = config.getProperty("spark-appname", "dbpedia-distributed-extraction-framework")

// Map of optional spark configuration properties.
// See http://spark.apache.org/docs/latest/configuration.html
/** Map of optional spark configuration properties. See http://spark.apache.org/docs/latest/configuration.html */
val sparkProperties = config.stringPropertyNames().filter(_.startsWith("spark")).map(x => (x, config.getProperty(x))).toMap

/** Path to hadoop core-site.xml */
private val hadoopCoreConf = config.getProperty("hadoop-coresite-xml-path")

/** Path to hadoop hdfs-site.xml */
private val hadoopHdfsConf = config.getProperty("hadoop-hdfssite-xml-path")

/** Path to hadoop mapred-site.xml */
private val hadoopMapredConf = config.getProperty("hadoop-mapredsite-xml-path")

/** Hadoop Configuration. This is implicit because RichHadoopPath operations need it. */
implicit val hadoopConf =
{
val hadoopConf = new Configuration()

if (hadoopCoreConf != null)
hadoopConf.addResource(new Path(hadoopCoreConf))
if (hadoopHdfsConf != null)
hadoopConf.addResource(new Path(hadoopHdfsConf))
if (hadoopMapredConf != null)
hadoopConf.addResource(new Path(hadoopMapredConf))

hadoopConf.set("xmlinput.start", "<page>")
hadoopConf.set("xmlinput.end", "</page>")

// Set max input split size to ~10mb if not set
if (null == hadoopConf.get("mapred.max.split.size"))
hadoopConf.set("mapred.max.split.size", "10000000")

hadoopConf
}

/** Dump directory. Note that Config.dumpDir is ignored in the distributed framework. */
val dumpDir = getValue(config, "base-dir", required = true)(new Path(_))
if (!dumpDir.exists)
{
val hadoopHint = if (hadoopCoreConf == null || hadoopHdfsConf == null || hadoopMapredConf == null) " Make sure you configured Hadoop correctly and the directory exists on the configured file system." else ""
throw sys.error("Dir " + dumpDir + " does not exist." + hadoopHint)
}
}
Loading