Skip to content

Commit

Permalink
Added Hadoop support (#1)
Browse files Browse the repository at this point in the history
* Added Hadoop implementation

* Added fat jar build task

* FIxed null pointer ref
Apparent, I have to check null myself. Seems like auto-boxing doesn't
work the way I thought it did.

* Improved blacklist implementation

* Set 1000 reducer tasks
  • Loading branch information
RunDevelopment authored Jul 21, 2021
1 parent f53bdd2 commit a6524ac
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 87 deletions.
16 changes: 14 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
// Apply plugins
apply plugin: 'java'
apply plugin: 'application'

plugins {
// adds './gradlew shadowjar', which creates a fat Jar (includes all dependencies) in build/libs/
id 'com.github.johnrengelman.shadow' version '4.0.2'
// Needed for all Java projects
id 'java'
id 'application'
}

shadowJar {
transform(com.github.jengelman.gradle.plugins.shadow.transformers.Log4j2PluginsCacheFileTransformer)
}

// Basic configuration and settings for all (sub-)projects
allprojects {
Expand All @@ -18,6 +28,8 @@ allprojects {

// Declare global dependencies
dependencies {
implementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1'
implementation group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '3.3.1'
implementation group: 'org.apache.commons', name: 'commons-compress', version: '1.19'
implementation 'info.picocli:picocli:4.5.2'

Expand Down
129 changes: 129 additions & 0 deletions src/main/java/org/netspeak/hadoop/Merge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.netspeak.hadoop;

import java.io.IOException;
import java.util.Collection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.netspeak.lang.Agnostic;
import org.netspeak.lang.En;
import org.netspeak.lang.MapperConfig;
import org.netspeak.lang.SingleMapProcessor;
import org.netspeak.preprocessing.PhraseMapper;

public class Merge {
private static final String CONFIG_LOWERCASE = "preprocessing.lowercase";
private static final String CONFIG_MAX_N_GRAM = "preprocessing.max-n-gram";
private static final String CONFIG_LANG = "preprocessing.lang";

private static final String LANG_NONE = "none";
private static final String LANG_EN = "en";
private static final String LANG_DE = "de";

public static class TokenizerMapper extends Mapper<Object, Text, Text, LongWritable> {

private final Text phrase = new Text();
private final LongWritable frequency = new LongWritable();

private PhraseMapper[] mappers = new PhraseMapper[0];

@Override
public void setup(Context context) throws IOException, InterruptedException {
final Configuration conf = context.getConfiguration();

final MapperConfig config = new MapperConfig();
config.lowercase = conf.getBoolean(CONFIG_LOWERCASE, false);
config.maxNGram = conf.getInt(CONFIG_MAX_N_GRAM, Integer.MAX_VALUE);

SingleMapProcessor processor;
final String lang = conf.get(CONFIG_LANG, LANG_NONE).toLowerCase();
switch (lang) {
case LANG_NONE:
processor = Agnostic.INSTANCE;
break;
case LANG_DE:
throw new IllegalArgumentException("DE is not supported for Hadoop.");
case LANG_EN:
processor = En.INSTANCE;
break;
default:
throw new IllegalArgumentException("Unknown language: " + lang);
}

try {
mappers = processor.getMappers(config).toArray(new PhraseMapper[0]);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private String map(String phrase, long frequency) {
for (final PhraseMapper mapper : mappers) {
phrase = mapper.map(phrase, frequency);
if (phrase == null) {
break;
}
}
return phrase;
}

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// format: <word> *( <spaces> <word> ) <tab> <frequency>
final String line = value.toString().trim();
if (line.isEmpty()) {
// ignore line
return;
}

final int tabIndex = line.indexOf('\t');
if (tabIndex == -1) {
throw new IOException("Invalid format: Unable to find tab character.");
}

final long freq = Long.parseLong(line.substring(tabIndex + 1));
final String p = map(line.substring(0, tabIndex), freq);

if (p == null) {
return;
}

phrase.set(p);
frequency.set(freq);

context.write(phrase, frequency);
}
}

public static void run(Collection<String> input, String outputDir, String lang, MapperConfig config)
throws Exception {
final Configuration conf = new Configuration();
conf.set(CONFIG_LANG, lang);
conf.setBoolean(CONFIG_LOWERCASE, config.lowercase);
conf.setInt(CONFIG_MAX_N_GRAM, config.maxNGram);

final Job job = Job.getInstance(conf, "Netspeak index preprocessing (" + lang + ")");
job.setJarByClass(Merge.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(LongSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1000);

FileInputFormat.setInputPaths(job, input.stream().map(Path::new).toArray(Path[]::new));
FileOutputFormat.setOutputPath(job, new Path(outputDir));

if (!job.waitForCompletion(true)) {
throw new RuntimeException("Job failed.");
}
}

}
1 change: 0 additions & 1 deletion src/main/java/org/netspeak/lang/De.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public void process(Config config) throws Exception {
final StandardMappers stdMappers = new StandardMappers();
stdMappers.setSuperBlacklist(Util.readResourceWordList("/super-blacklist.txt"));
stdMappers.setBlacklist(Util.readResourceWordList("/blacklist.txt"));
stdMappers.setBlacklistCombinations(4);
stdMappers.setMaxNGram(config.maxNGram);
stdMappers.setToLowerCase(config.lowercase);

Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/netspeak/lang/En.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public Collection<PhraseMapper> getMappers(MapperConfig config) throws IOExcepti
final StandardMappers stdMappers = new StandardMappers();
stdMappers.setSuperBlacklist(Util.readResourceWordList("/super-blacklist.txt"));
stdMappers.setBlacklist(Util.readResourceWordList("/blacklist.txt"));
stdMappers.setBlacklistCombinations(4);
stdMappers.setMaxNGram(config.maxNGram);
stdMappers.setToLowerCase(config.lowercase);

Expand Down
76 changes: 40 additions & 36 deletions src/main/java/org/netspeak/preprocessing/mappers/PhraseMappers.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -93,7 +92,11 @@ public static PhraseMapper normalizeApostrophe() {
* @return
*/
public static PhraseMapper blacklist(final Collection<String> words) {
return PhraseMapper.rename(blacklist(words, 1));
final HashSet<String> set = new HashSet<>(words);
set.remove(null);
set.remove("");

return PhraseMapper.rename(filterByWords(w -> !set.contains(w)));
}

/**
Expand Down Expand Up @@ -135,50 +138,51 @@ public static PhraseMapper removeControlCharacters() {
* Returns a new {@link PhraseMapper} that removes phrases which contain at
* least one word that is contained in a given blacklist vocabulary.
* <p>
* Phrases which contains a word which can be constructed by concatenating
* {@code <= repeating} many words from the blacklist will also be removed. I.e.
* if {@code "} and {@code ?} are in the blacklist and {@code repeating} is 3,
* then {@code """}, {@code "?"}, {@code "?}, and {@code ??} will all be
* removed.
* <p>
* Please note that the blacklist will consume <b>{@code O(n ** repeat)}</b>
* many bytes of memory where {@code n} is the number of blacklist entries.
* Phrases which contain a word which can be constructed by concatenating
* blacklist words will also be removed. I.e. if {@code "} and {@code ?} are in
* the blacklist, then {@code """}, {@code "?"}, {@code "?}, and {@code ??} will
* all be removed.
*
* @param words
* @return
*/
public static PhraseMapper blacklist(final Collection<String> words, int repeat) {
HashSet<String> tempBlacklist = new HashSet<>(words);
// just to be safe
tempBlacklist.remove(null);
tempBlacklist.remove("");

if (repeat > 1) {
tempBlacklist = new HashSet<>(getAllCombinations(tempBlacklist, repeat));
public static PhraseMapper blacklistRepeated(final Collection<String> words) {
// create a regex for the words

// split by length
final ArrayList<String> singleChar = new ArrayList<>();
final ArrayList<String> multipleChar = new ArrayList<>();
for (final String word : words) {
if (word == null || word.isEmpty()) {
// skip
} else if (word.length() == 1) {
singleChar.add(word);
} else {
multipleChar.add(word);
}
}

// thanks Java
final Set<String> blacklist = tempBlacklist;
final StringBuilder sb = new StringBuilder();
sb.append("[");
for (final String singleCharWord : singleChar) {
appendLiteral(sb, singleCharWord);
}
sb.append("]");

return PhraseMapper.rename(filterByWords(w -> !blacklist.contains(w)));
}
for (final String word : multipleChar) {
sb.append("|");
appendLiteral(sb, word);
}

private static List<String> getAllCombinations(Collection<String> words, int repeat) {
final ArrayList<String> combinations = new ArrayList<>((int) Math.pow(words.size(), repeat));
combinations.addAll(words);
final Pattern regex = Pattern.compile("(?:" + sb.toString() + ")+");

int start = 0;
for (; repeat > 1; repeat--) {
final int size = combinations.size();
for (int i = start; i < size; i++) {
for (final String word : words) {
combinations.add(combinations.get(i) + word);
}
}
start = size;
}
return PhraseMapper.rename(filterByWords(w -> !regex.matcher(w).matches()));
}

return combinations;
private static void appendLiteral(StringBuilder sb, String value) {
for (final char c : value.toCharArray()) {
sb.append("\\u").append(String.format("%04x", (int) c));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ public class StandardMappers {
*/
boolean toLowerCase = false;
/**
* All phrases with at least one word which can be constructed from at most
* {@link #blacklistCombinations} many blacklisted word will be removed.
* All phrases with at least one word which can be constructed from blacklisted
* word will be removed.
*/
Collection<String> blacklist = null;
int blacklistCombinations = 4;
/**
* @see PhraseMappers#superBlacklist(Iterable)
*/
Expand All @@ -37,10 +36,6 @@ public void setBlacklist(Collection<String> blacklist) {
this.blacklist = blacklist;
}

public void setBlacklistCombinations(int blacklistCombinations) {
this.blacklistCombinations = blacklistCombinations;
}

public void setSuperBlacklist(Path superBlacklist) throws IOException {
this.superBlacklist = Util.readWordList(superBlacklist);
}
Expand Down Expand Up @@ -79,7 +74,7 @@ public Collection<PhraseMapper> getMappers() {
mappers.add(PhraseMappers.joinWordsWithLeadingApostrophe());

if (blacklist != null) {
mappers.add(PhraseMappers.blacklist(blacklist, blacklistCombinations));
mappers.add(PhraseMappers.blacklistRepeated(blacklist));
}
if (maxNGram < Integer.MAX_VALUE) {
mappers.add(PhraseMappers.maxNGram(maxNGram));
Expand Down
Loading

0 comments on commit a6524ac

Please sign in to comment.