cascading_ext is a collection of tools built on top of the Cascading platform which make it easy to build, debug, and run simple and high-performance data workflows.
Some of the most interesting public classes in the project (so far).
BloomJoin is designed to be a drop-in replacement for CoGroup which achieves significant performance improvements on certain datasets by filtering the LHS pipe against a bloom filter built from the keys on the RHS. Using a BloomJoin can improve the performance of a job when:
- joining a large LHS store against a relatively small RHS store
- most reduce input data does not make it into the output store
- the job is a good candidate for HashJoin, but the RHS tuples don't fit in memory
Internally, we have cut the reduce time of jobs by up to 90% when a BloomJoin lets the job only reduce over the small subset of the data that makes it past the bloom filter.
The constructor signature mirrors CoGroup:
Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");
Pipe joined = new BloomJoin(source1, new Fields("field1"), source2, new Fields("field3"));
CascadingUtil.get().getFlowConnector().connect("Example flow", sources, sink, joined).complete();
see example usages: BloomJoinExample, BloomJoinExampleWithoutCascadingUtil
For more details on how BloomJoin works, check out our blog post.
BloomFilter is similar to BloomJoin, but can be used when no fields from the RHS are needed in the output. This allows for simpler field algebra (duplicate field names are not a problem):
Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");
Pipe joined = new BloomFilter(source1, new Fields("field1"), source2, new Fields("field1"), true);
CascadingUtil.get().getFlowConnector().connect("Example flow", sources, sink, joined).complete();
Another feature of BloomFilter is the ability to perform an inexact filter, and entirely avoid reducing over the LHS. When performing an inexact join, the LHS is filtered by the bloom filter from the RHS, but the final exact CoGroup is skipped, leaving both true and false positives in the output. See TestBloomFilter for more examples.
MultiGroupBy allows the user to easily GroupBy two or more pipes on a common field without performing a full Inner/OuterJoin first (which can lead to an explosion in the number of tuples, if keys are not distinct.) The MultiBuffer interface gives a user-defined function access to all tuples sharing a common key, across all input pipes:
Pipe s1 = new Pipe("s1");
Pipe s2 = new Pipe("s2");
Pipe results = new MultiGroupBy(s1, new Fields("key"), s2, new Fields("key"),
new Fields("key-rename"), new CustomBuffer());
CascadingUtil.get().getFlowConnector().connect(sources, sink, results).complete();
see TestMultiGroupBy for example usage.
CascadingUtil is a utility class which makes it easy to add default properties and strategies to all jobs which are run in a codebase, and which adds some useful logging and debugging information. For a simple example of how to use this class, see SimpleFlowExample:
CascadingUtil.get().getFlowConnector()
.connect("Example flow", sources, sink, pipe).complete();
By default CascadingUtil will
- print and format counters for each step
- retrieve and log map or reduce task errors if the job fails
- extend Cascading's job naming scheme with improved naming for some taps which use UUID identifiers.
See FlowWithCustomCascadingUtil to see examples of how CascadingUtil can be extended to include custom default properties. Subclasses can easily:
- set default properties in the job Configuration
- add serialization classes
- add serialization tokens
- add flow step strategies
FunctionStats, FilterStats, BufferStats, and AggregatorStats are wrapper classes which make it easier to debug a complex flow with many Filters / Functions / etc. These wrapper classes add counters logging the number of tuples a given operation inputs or outputs. See TestAggregatorStats, TestFilterStats, TestFunctionStats, and TestBufferStats to see usage examples.
MultiCombiner is a tool for running arbitrarily many aggregation operations, each with their own distinct grouping fields, over a single stream of tuples using only a single reduce step. MultiCombiner uses combiners internally to ensure that these aggregation operations are done efficiently and without unnecessary I/O operations.
Combiners are a useful tool for optimizing Hadoop jobs by reducing the number of records which need to be shuffled, decreasing sorting time, disk I/O, and network traffic on your cluster. Combiners work by partially reducing on records during the map phase which have the same key, and emitting the partial result rather than the original records, in essence doing some of the work of the reducer ahead of time. Combiners decrease the number of records generated by each map task, reducing the amount of data that needs to be shuffled and often providing a significant decrease in time spent on I/O. Because each combiner only has access to a subset of the records for each key in a random order, combiners are only useful for operations which are both commutative and associative, such as summing or counting. For a quick combiner tutorial check out this combiner tutorial.
Combiners are an integral part of Hadoop, and are supported by Cascading through AggregateBy and the Functor and Aggregator interfaces, which implement the combiner and reducer logic respectively. What Cascading's and Hadoop's implementations both lack is a way to run many combiner-backed aggregation operations over a single stream using different grouping fields in an efficient way. In both Hadoop and Cascading, such a computation would require a full read and shuffle of the data set for each of the different grouping fields. This can make it prohibitively expensive to add new stats or aggregates to a workflow. The MultiCombiner tool allows developers to define many entirely independent aggregation operations over the same data set and run them simultaneously, even if they use different grouping keys.
An Example Usage
Suppose that it is your job to compute stats for a large retailer. The retailer stores records of purchase events in an HDFS file with the following format: user_id , item_id, item_amount, timestamp You are required to compute
-
The total number of items purchased by each user
-
The total number of items of each kind sold
Using the features built into Cascading, we might build the flow this way:
Pipe purchaseEvents = new Pipe("purchase_events");
Pipe countByUser = new SumBy( purchaseEvents , new Fields("user_id"),
new Fields("item_amount"), new Fields("total_items_by_user"));
Pipe countByItem = new SumBy( purchaseEvents , new Fields("item_id"),
new Fields("item_amount"), new Fields("total_items_by_item"));
Because each SumBy contains a GroupBy, this assembly will spawn 2 entirely separate Hadoop jobs, each running over the same data set. While the combiners used will mitigate the damage somewhat, your task is still reading, shuffling, and writing the same data twice. Using MultiCombiner, we can combine these two aggregation operations into a single Hadoop job, significantly reducing the total amount of I/O we have to do.
Pipe purchaseEvents = new Pipe("purchase_events");
CombinerDefinition countByUserDef = new CombinerDefinitionBuilder()
.setGroupFields(new Fields("user_id"))
.setInputFields(new Fields("item_amount"))
.setOutputFields(new Fields("total_items_by_user"))
.setExactAggregator(new SumExactAggregator(1))
.get();
CombinerDefinition countByItemDef = new CombinerDefinitionBuilder()
.setGroupFields(new Fields("item_id"))
.setInputFields(new Fields("item_amount"))
.setOutputFields(new Fields("total_items_by_item"))
.setExactAggregator(new SumExactAggregator(1))
.get();
MultiCombiner combiner = MultiCombiner.assembly(purchaseEvents,
countByItemDef, countByUserDef);
Pipe countsByUser = combiner.getTailsByName().get(countByUserDef.getName())
The tails of the MultiCombiner assembly will give access to the results for each definition separately. The tail pipes have the same name as the definitions, so you can use the definition names as keys to the map provided by the getTailsByName method. Alternatively, there’s MultiCombiner.singleTailedAssembly which will emit the results in a single stream.
Things to Watch Out For
The ability to run arbitrary aggregators over a single stream is pretty useful, but there’s nothing magical going on in the background. Each aggregator used in a MultiCombiner emits roughly the same number of tuples as if it were being used on its own. Because the sorting involved in the shuffle is an n*log(n) operation, shuffling the output of many aggregators all at once is less efficient than shuffling their outputs separately. This is usually not an issue because of the time saved reading the data set only once, but may matter if the number of tuples being shuffled is much larger than the data being read from disk. Additionally, each aggregator must keep its own map in memory for its combiner. Because of the additional memory pressure, combining for many aggregators can potentially be less efficient. All of that being said, we've seen significant performance increases for every set of aggregation operations we've merged using this tool.
You can either build cascading_ext from source as described below, or pull the latest snapshot from Sonatype. The current snapshot version (1.6) is built against CDH4.1.2.
<dependency>
<groupId>com.liveramp</groupId>
<artifactId>cascading_ext</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>com.liveramp</groupId>
<artifactId>cascading_ext</artifactId>
<version>1.6-SNAPSHOT</version>
</dependency>
To use the snapshot version, make sure to import Sonatype Snapshots:
<repositories>
<repository>
<id>maven-snapshots</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
<layout>default</layout>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
To build cascading_ext.jar from source,
> mvn package
will generate build/cascading_ext.jar. To run the test suite locally,
> mvn test
See usage instructions here for running Cascading with Apache Hadoop. Everything should work fine if cascading_ext.jar and all third-party jars in lib/ are in your jobjar.
To try out any of the code in the com.liveramp.cascading_ext.example package in production, a jobjar task for cascading_ext itself is available:
> mvn assembly:single
Bug reports or feature requests are welcome: https://github.com/liveramp/cascading_ext/issues
Changes you'd like us to merge in? We love pull requests.
Most of the code here has been moved from our internal repositories so much of the original authorship has been lost in the git history. Contributors include:
- Andre Rodriguez
- Ben Pastel
- Ben Podgursky
- Bryan Duxbury
- Chris Mullins
- Diane Yap
- Eddie Siegel
- Emily Leathers
- Nathan Marz
- Piotr Kozikowski
- Porter Westling
- Sean Carr
- Takashi Yonebayashi
- Thomas Kielbus
Copyright 2013 LiveRamp
Licensed under the Apache License, Version 2.0