public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count"));
} }
Implementing the report bolt
The purpose of the ReportBolt class is to produce a report of the counts for each word. Like the WordCountBolt class, it uses a HashMap<String, Long> object to record the counts, but in this case, it just stores the count received from the counter bolt.
One difference between the report bolt and the other bolts we've written so far is that it is a terminal bolt—it only receives tuples. Because it does not emit any streams, the declareOutputFields() method is left empty.
The report bolt also introduces the cleanup() method defined in the IBolt
interface. Storm calls this method when a bolt is about to be shutdown. We exploit the cleanup() method here as a convenient way to output our final counts when the topology shuts down, but typically, the cleanup() method is used to release resources used by a bolt, such as open files or database connections.
Distributed Word Count
One important thing to keep in mind about the IBolt.cleanup() method when writing bolts is that there is no guarantee that Storm will call it when a topology is running on a cluster. We'll discuss the reasons behind this when we talk about Storm's fault tolerance mechanisms in the next chapter. But for this example, we'll be running Storm in a development mode where the cleanup() method is guaranteed to be called.
The full source for the ReportBolt class is listed in Example 1.4.
Example 1.4 – ReportBolt.java
public class ReportBolt extends BaseRichBolt { private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) { // this bolt does not emit anything
}
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("---");
} }
Chapter 1
[ 19 ]
Implementing the word count topology
Now that we've defined the spout and bolts that will make up our computation, we're ready to wire them together into a runnable topology (refer to Example 1.5).
Example 1.5 – WordCountTopology.java public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
// SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt) .shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.
createTopology());
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
} }
Distributed Word Count
Storm topologies are typically defined and run (or submitted if the topology is being deployed to a cluster) in a Java main() method. In this example, we begin by defining string constants that will serve as unique identifiers for our Storm components. We begin the main() method by instantiating our spout and bolts and creating an instance of TopologyBuilder. The TopologyBuilder class provides a fluent-style API for defining the data flow between components in a topology.
We start by registering the sentence spout and assigning it a unique ID:
builder.setSpout(SENTENCE_SPOUT_ID, spout);
The next step is to register SplitSentenceBolt and establish a subscription to the stream emitted by the SentenceSpout class:
builder.setBolt(SPLIT_BOLT_ID, splitBolt)
.shuffleGrouping(SENTENCE_SPOUT_ID);
The setBolt() method registers a bolt with the TopologyBuilder class and returns an instance of BoltDeclarer that exposes methods for defining the input source(s) for a bolt. Here we pass in the unique ID we defined for the SentenceSpout object to the shuffleGrouping() method establishing the
relationship. The shuffleGrouping() method tells Storm to shuffle tuples emitted by the SentenceSpout class and distribute them evenly among instances of the SplitSentenceBolt object. We will explain stream groupings in detail shortly in our discussion of parallelism in Storm.
The next line establishes the connection between the SplitSentenceBolt class and the WordCountBolt class:
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
As you'll learn, there are times when it's imperative that tuples containing certain data get routed to a specific instance of a bolt. Here, we use the fieldsGrouping() method of the BoltDeclarer class to ensure that all tuples containing the same
"word" value get routed to the same WordCountBolt instance.
The last step in defining our data flow is to route the stream of tuples emitted by the WordCountBolt instance to the ReportBolt class. In this case, we want all tuples emitted by WordCountBolt routed to a single ReportBolt task. This behavior is provided by the globalGrouping() method, as follows:
builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID);
Chapter 1
[ 21 ]
With our data flow defined, the final step in running our word count computation is to build the topology and submit it to a cluster:
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.
createTopology());
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
Here, we're running Storm in local mode using Storm's LocalCluster class to simulate a full-blown Storm cluster within our local development environment.
Local mode is a convenient way to develop and test Storm applications without the overhead of deploying to a distributed cluster. Local mode also allows you to run Storm topologies within an IDE, setting breakpoints, halting execution, inspecting variables and profiling the application in ways that are much more time consuming or near impossible when deploying to a Storm cluster.
In this example, we create a LocalCluster instance and call the submitTopology() method with the topology name, an instance of backtype.storm.Config, and the Topology object returned by the TopologyBuilder class' createTopology() method. As you'll see in the next chapter, the submitTopology() method used to deploy a topology in local mode has the same signature as the method to deploy a topology in remote (distributed) mode.
Storm's Config class is simply an extension of HashMap<String, Object>, which defines a number of Storm-specific constants and convenience methods for configuring a topology's runtime behavior. When a topology is submitted, Storm will merge its predefined default configuration values with the contents of the Config instance passed to the submitTopology() method, and the result will be passed to the open() and prepare() methods of the topology spouts and bolts respectively.
In this sense, the Config object represents a set of configuration parameters that are global to all components in a topology.
We're now ready to run the WordCountTopology class. The main() method will submit the topology, wait for ten seconds while it runs, kill (undeploy) the topology, and finally shut down the local cluster. When the program run is complete, you should see console output similar to the following:
Distributed Word Count
FINAL COUNTS ---a : 1426
ate : 1426 beverages : 1426 cold : 1426 cow : 1426 dog : 2852 don't : 2851 fleas : 2851 has : 1426 have : 1426 homework : 1426 i : 4276
like : 2851 man : 1426 my : 2852 the : 1426 think : 1425 ---
Introducing parallelism in Storm
Recall from the introduction that Storm allows a computation to scale horizontally across multiple machines by dividing the computation into multiple, independent tasks that execute in parallel across a cluster. In Storm, a task is simply an instance of a spout or bolt running somewhere on the cluster.
To understand how parallelism works, we must first explain the four main components involved in executing a topology in a Storm cluster:
• Nodes (machines): These are simply machines configured to participate in a Storm cluster and execute portions of a topology. A Storm cluster contains one or more nodes that perform work.
• Workers (JVMs): These are independent JVM processes running on a node.
Each node is configured to run one or more workers. A topology may request one or more workers be assigned to it.
• Executors (threads): These are Java threads running within a worker JVM process. Multiple tasks can be assigned to a single executor. Unless explicitly overridden, Storm will assign one task for each executor.
• Tasks (bolt/spout instances): Tasks are instances of spouts and bolts whose nextTuple() and execute() methods are called by executor threads.
Chapter 1
[ 23 ]