Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions data/mllib/sample_fpgrowth.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
r z h k p
z y x w v u t s
s x o n r
x z y m t s q e
z
x z y r q t p
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,49 @@
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;

/**
* Java example for mining frequent itemsets using FP-growth.
* Example usage: ./bin/run-example mllib.JavaFPGrowthExample ./data/mllib/sample_fpgrowth.txt
*/
public class JavaFPGrowthExample {

public static void main(String[] args) {
String inputFile;
double minSupport = 0.3;
int numPartition = -1;
if (args.length < 1) {
System.err.println(
"Usage: JavaFPGrowth <input_file> [minSupport] [numPartition]");
System.exit(1);
}
inputFile = args[0];
if (args.length >= 2) {
minSupport = Double.parseDouble(args[1]);
}
if (args.length >= 3) {
numPartition = Integer.parseInt(args[2]);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaFPGrowthExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<ArrayList<String>> transactions = sc.textFile(inputFile).map(
new Function<String, ArrayList<String>>() {
@Override
public ArrayList<String> call(String s) {
return Lists.newArrayList(s.split(" "));
}
}
);

// TODO: Read a user-specified input file.
@SuppressWarnings("unchecked")
JavaRDD<ArrayList<String>> transactions = sc.parallelize(Lists.newArrayList(
Lists.newArrayList("r z h k p".split(" ")),
Lists.newArrayList("z y x w v u t s".split(" ")),
Lists.newArrayList("s x o n r".split(" ")),
Lists.newArrayList("x z y m t s q e".split(" ")),
Lists.newArrayList("z".split(" ")),
Lists.newArrayList("x z y r q t p".split(" "))), 2);

FPGrowth fpg = new FPGrowth()
.setMinSupport(0.3);
FPGrowthModel<String> model = fpg.run(transactions);
FPGrowthModel<String> model = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartition)
.run(transactions);

for (FPGrowth.FreqItemset<String> s: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + Joiner.on(",").join(s.javaItems()) + "], " + s.freq());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,61 @@

package org.apache.spark.examples.mllib

import scopt.OptionParser

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Example for mining frequent itemsets using FP-growth.
* Example usage: ./bin/run-example mllib.FPGrowthExample \
* --minSupport 0.8 --numPartition 2 ./data/mllib/sample_fpgrowth.txt
*/
object FPGrowthExample {

case class Params(
input: String = null,
minSupport: Double = 0.3,
numPartition: Int = -1) extends AbstractParams[Params]

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FPGrowthExample")
val defaultParams = Params()

val parser = new OptionParser[Params]("FPGrowthExample") {
head("FPGrowth: an example FP-growth app.")
opt[Double]("minSupport")
.text(s"minimal support level, default: ${defaultParams.minSupport}")
.action((x, c) => c.copy(minSupport = x))
opt[Int]("numPartition")
.text(s"number of partition, default: ${defaultParams.numPartition}")
.action((x, c) => c.copy(numPartition = x))
arg[String]("<input>")
.text("input paths to input data set, whose file format is that each line " +
"contains a transaction with each item in String and separated by a space")
.required()
.action((x, c) => c.copy(input = x))
}

parser.parse(args, defaultParams).map { params =>
run(params)
}.getOrElse {
sys.exit(1)
}
}

def run(params: Params) {
val conf = new SparkConf().setAppName(s"FPGrowthExample with $params")
val sc = new SparkContext(conf)
val transactions = sc.textFile(params.input).map(_.split(" ")).cache()

println(s"Number of transactions: ${transactions.count()}")

val model = new FPGrowth()
.setMinSupport(params.minSupport)
.setNumPartitions(params.numPartition)
.run(transactions)

// TODO: Read a user-specified input file.
val transactions = sc.parallelize(Seq(
"r z h k p",
"z y x w v u t s",
"s x o n r",
"x z y m t s q e",
"z",
"x z y r q t p").map(_.split(" ")), numSlices = 2)

val fpg = new FPGrowth()
.setMinSupport(0.3)
val model = fpg.run(transactions)
println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")

model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
Expand Down