This page contains examples to be executed using Wayang.
The "Hello World!" of data processing systems is the wordcount.
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;
public class WordcountJava {
public static void main(String[] args){
// Settings
String inputUrl = "file:/tmp.txt";
// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println(wordcounts);
}
}
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
object WordcountScala {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/tmp.txt"
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
// Filter empty tokens.
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
// Attach counter to each word.
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
// Execute the plan and collect the results.
.collect()
println(wordcounts)
}
}
Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object kmeans {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/kmeans.txt"
val k = 5
val iterations = 100
val configuration = new Configuration
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPoint(x: Double, y: Double, cluster: Int)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
// Read and parse the input file(s).
val points = planBuilder
.readTextFile(inputUrl).withName("Read file")
.map { line =>
val fields = line.split(",")
Point(fields(0).toDouble, fields(1).toDouble)
}.withName("Create points")
// Create initial centroids.
val random = new Random
val initialCentroids = planBuilder
.loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")
// Declare UDF to select centroid for each data point.
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
/** Keeps the broadcasted centroids. */
var centroids: Iterable[TaggedPointCounter] = _
override def open(executionCtx: ExecutionContext) = {
centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")
}
override def apply(point: Point): TaggedPointCounter = {
var minDistance = Double.PositiveInfinity
var nearestCentroidId = -1
for (centroid <- centroids) {
val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = centroid.cluster
}
}
new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)
}
}
// Do the k-means loop.
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points
.mapJava(new SelectNearestCentroid,
udfLoad = LoadProfileEstimators.createFromSpecification(
"my.udf.costfunction.key", configuration
))
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
.withCardinalityEstimator(k)
.map(_.average).withName("Average points")
}).withName("Loop")
// Collect the results.
.collect()
println(finalCentroids)
}
}
This code demonstrates the implementation of a Collaborative Filtering algorithm used in Recommendation Systems using Wayang.
import org.apache.wayang.api.*;
import org.apache.wayang.basic.data.*;
import org.apache.wayang.core.api.*;
import org.apache.wayang.core.function.*;
import org.apache.wayang.core.util.*;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import org.apache.commons.math3.linear.*;
import java.util.*;
public class CollaborativeFiltering {
public static void main(String[] args) {
// Create a Wayang context
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
PlanBuilder planBuilder = new PlanBuilder(wayangContext);
// Load the data
List<Tuple3<String, String, Integer>> data = Arrays.asList(
new Tuple3<>("user1", "item1", 5),
new Tuple3<>("user1", "item2", 3),
new Tuple3<>("user2", "item1", 4),
new Tuple3<>("user2", "item3", 2),
new Tuple3<>("user3", "item2", 1),
new Tuple3<>("user3", "item3", 5)
);
// Define a function to normalize the ratings
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Double>> normalizationFunction =
tuple -> new Tuple3<>(tuple.field0, tuple.field1, (double)tuple.field2 / 5);
// Define a function to calculate the cosine similarity between users
TransformationDescriptor.SerializableFunction<Tuple2<String, RealVector>, Tuple2<String, RealVector>> similarityFunction =
tuple -> {
// This is a placeholder. You would need to implement a real similarity calculation here.
// For example, you could calculate the cosine similarity like this:
double dotProduct = tuple.field1.dotProduct(otherUserVector);
double normProduct = tuple.field1.getNorm() * otherUserVector.getNorm();
double cosineSimilarity = dotProduct / normProduct;
return new Tuple2<>(tuple.field0, cosineSimilarity);
};
// Define a function to calculate the predicted rating for each user-item pair
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> predictionFunction =
tuple -> {
// This is a placeholder. You would need to implement a real prediction calculation here.
// For example, you could calculate the predicted rating based on the similarity matrix and the user's ratings like this:
double predictedRating = 0.0;
double similaritySum = 0.0;
for (String otherUser : similarityMatrix.keySet()) {
double similarity = similarityMatrix.get(otherUser);
double otherUserRating = userRatings.get(otherUser).get(tuple.field1);
predictedRating += similarity * otherUserRating;
similaritySum += Math.abs(similarity);
}
predictedRating /= similaritySum;
return new Tuple3<>(tuple.field0, tuple.field1, predictedRating);
};
// Define a function to handle cold start problems
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction =
tuple -> {
if (tuple.field2 == null) {
// If the user has no ratings, recommend the most popular item
String mostPopularItem = itemPopularity.entrySet().stream()
.max(Map.Entry.comparingByValue())
.get()
.getKey();
return new Tuple3<>(tuple.field0, mostPopularItem, 5.0);
} else {
return tuple;
}
};
// Define a function to handle cold start problems
TransformationDescriptor.SerializableFunction<Tuple3<String, String, Double>, Tuple3<String, String, Double>> coldStartFunction =
tuple -> {
if (tuple.field2 == null) {
// If the user has no ratings, recommend the most popular item
return new Tuple3<>(tuple.field0, "item1", 1.0);
} else {
return tuple;
}
};
// Execute the plan
Collection<Tuple3<String, String, Double>> output = planBuilder
.loadCollection(data)
.map(normalizationFunction)
.map(similarityFunction)
.map(predictionFunction)
.map(recommendationFunction)
.map(coldStartFunction)
.collect();
// Print the recommendations
for (Tuple3<String, String, Double> recommendation : output) {
System.out.println("User: " + recommendation.field0 + ", Item: " + recommendation.field1 + ", Rating: " + recommendation.field2);
}
}
}