To close off, we need to define the topology using Clojure. Remember that the point of this topology is to drive home the polyglot nature of Storm. You can deliver multi-technology real-time topologies and you must select the appropriate method. The bolts described earlier in this chapter used the minimal multilang protocol. There are various other ways, including Thrift, the Clojure's Java interop, and, in the case of Qt, you could have easily used the Qt Jambi project. The selection of the appropriate method depends on many factors within your environment. Use the right tool for the right job.
How to do it…
1. Create the Lein project file within the polyglot-count-topology project folder, and name the file project.clj.
(defproject polyglot-count-topology "0.0.1-SNAPSHOT"
:source-paths ["src/clj"]
:java-source-paths ["src/jvm" "test/jvm"]
:test-paths ["test/clj"]
:javac-options ["-target" "1.6" "-source" "1.6"]
:resource-paths ["multilang"]
:main storm.cookbook.count-topology :aot :all
:min-lein-version "2.0.0"
:dependencies [[org.slf4j/slf4j-log4j12 "1.6.1"]
[org.clojure/clojure "1.4.0"]
[commons-collections/commons-collections "3.2.1"]
[storm-starter "0.0.1-SNAPSHOT"]]
:profiles {:dev {:dependencies [[storm "0.8.2"]
[junit/junit "4.11"]
[org.testng/testng "6.1.1"]]}}
)
2. Within the src/clj/storm/cookbook folder, create the Clojure topology named count_topology.clj.
(ns storm.cookbook.count-topology
(:import (backtype.storm StormSubmitter LocalCluster) (storm.cookbook QtSplitSentence RubyCount)) (:use [backtype.storm clojure config])
)
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth sentences)]) )
(ack [id]
))))
(defn mk-topology []
(topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec {"1" :shuffle}
(QtSplitSentence.) :p 1)
"4" (bolt-spec {"3" ["word"]}
(RubyCount.) :p 1)}))
(Thread/sleep 10000) (.shutdown cluster) ))
(defn submit-topology! [name]
(StormSubmitter/submitTopology name
{TOPOLOGY-DEBUG true TOPOLOGY-WORKERS 3}
(mk-topology))) (defn -main
([]
(run-local!)) ([name]
(submit-topology! name)))
How it works…
We define a spout for testing purposes that emits the sentences:
(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth sentences)]) )
(ack [id]
))))
We then define the topology:
(topology
{"1" (spout-spec sentence-spout)}
{"3" (bolt-spec {"1" :shuffle}
(QtSplitSentence.) :p 1)
"4" (bolt-spec {"3" ["word"]}
(RubyCount.) :p 1)}))
The body of the function adds elements to the topology, starting with the spout, and then adds the two bolts, which we have defined earlier in the chapter. Note that we simply create instances of the bolts that are the Java parents of the underlying bolt implementation, using the Clojure's Java interop notation for creating a new instance of the class RubyCount.
There's more…
You can now execute the topology. At the command line, execute the following commands:
lein deps lein javac lein compile lein repl
Once the REPL has launched, execute the run-local! function and the topology will launch and execute. You can use this command to package the deployable JAR file for you.
lein uberjar
Integrating Storm 6
and Hadoop
In this chapter, we will cover:
f Implementing TF-IDF in Hadoop
f Persisting documents from Storm
f Integrating the batch and real-time views
Introduction
In Chapter 4, Distributed Remote Procedure Calls, we implemented the Speed layer for a Lambda architecture instance using Storm. In this chapter, we will implement the Batch and Service layers to complete the architecture.
There are some key concepts underlying this big data architecture:
f Immutable state
f Abstraction and composition
f Constrain complexity
Immutable state is the key, in that it provides true fault-tolerance for the architecture. If a failure is experienced at any level, we can always rebuild the data from the original immutable data. This is in contrast to many existing data systems, where the paradigm is to act on
Abstractions allow us to remove complexity in some cases, and in others they can introduce complexity. It is important to achieve an appropriate set of abstractions that increase our productivity and remove complexity, but at an appropriate cost. It must be noted that all abstractions leak, meaning that when failures occur at a lower abstraction, they will affect the higher-level abstractions. It is therefore often important to be able to make changes within the various layers and understand more than one layer of abstraction. The designs we choose to implement our abstractions must therefore not prevent us from reasoning about or working at the lower levels of abstraction when required. Open source projects are often good at this, because of the obvious access to the code of the lower level abstractions, but even with source code available, it is easy to convolute the abstraction to the extent that it becomes a risk. In a big data solution, we have to work at higher levels of abstraction in order to be productive and deal with the massive complexity, so we need to choose our abstractions carefully. In the case of Storm, Trident represents an appropriate abstraction for dealing with the data-processing complexity, but the lower level Storm API on which Trident is based isn't hidden from us. We are therefore able to easily reason about Trident based on an understanding of lower-level abstractions within Storm.
Another key issue to consider when dealing with complexity and productivity is composition.
Composition within a given layer of abstraction allows us to quickly build out a solution that is well tested and easy to reason about. Composition is fundamentally decoupled, while abstraction contains some inherent coupling to the lower-level abstractions—something that we need to be aware of.
Finally, a big data solution needs to constrain complexity. Complexity always equates to risk and cost in the long run, both from a development perspective and from an operational perspective. Real-time solutions will always be more complex than batch-based systems; they also lack some of the qualities we require in terms of performance. Nathan Marz's Lambda architecture attempts to address this by combining the qualities of each type of system to constrain complexity and deliver a truly fault-tolerant architecture.
In Chapter 3, Calculating Term Importance with Trident, and Chapter 4, Distributed Remote Procedure Calls, we implemented a real-time TF-IDF data flow using Trident. We divided this flow into preprocessing and "at time" phases, using streams and DRPC streams respectively.
We also introduced time windows that allowed us to segment the preprocessed data. In this chapter, we complete the entire architecture by implementing the Batch and Service layers.
The Service layer is simply a store of a view of the data. In this case, we will store this view in Cassandra, as it is a convenient place to access the state alongside Trident's state. The preprocessed view is identical to the preprocessed view created by Trident, counted elements of the TF-IDF formula (D, DF, and TF), but in the batch case, the dataset is much larger, as it includes the entire history.
The Batch layer is implemented in Hadoop using MapReduce to calculate the preprocessed view of the data. MapReduce is extremely powerful, but like the lower-level Storm API, is potentially too low-level for the problem at hand for the following reasons:
f We need to describe the problem as a data pipeline; MapReduce isn't congruent with such a way of thinking
f Productivity
We would like to think of a data pipeline in terms of streams of data, tuples within the stream and predicates acting on those tuples. This allows us to easily describe a solution to a data processing problem, but it also promotes composability, in that predicates are fundamentally composable, but pipelines themselves can also be composed to form larger, more complex pipelines. Cascading provides such an abstraction for MapReduce in the same way as Trident does for Storm.
With these tools, approaches, and considerations in place, we can now complete our real-time big data architecture. There are a number of elements from Chapter 3, Calculating Term Importance with Trident, and Chapter 4, Distributed Remote Procedure Calls, that we will update, and a number of elements that we will add. The following figure illustrates the final architecture, where the elements in light grey will be updated from the existing recipe, and the elements in dark grey will be added in this chapter:
Stream Trident
Storm
Cassandra
Cascalog
Hadoop Map Reduce