• 沒有找到結果。

Using DRPC to complete the required processing

在文檔中 Storm Real-time (頁 117-128)

A classic design consideration within data systems is choosing an appropriate balance between precomputation and on-the-fly computation. Precomputation is often preferable;

however, it isn't always possible. Either because the amount of potential data is far too large in practical terms, or because the final result is dependent on a point-in-time perspective of the data that is not possible to precompute.

In the previous chapter, we emitted a constant stream of TF-IDF values based on the documents received from Twitter and the Internet. The TF-IDF value is perfectly correct at the time when it is emitted; however, as time passes the value that was emitted is potentially invalidated because it is coupled to a global state that is affected by new tuples that arrive after the value was computed. In some applications this is the desired result; however, in other applications we need to know what the current value is at this point in time, not at some previous point in time. In this case, we need to compute as much state as is possible as a part of normal stream processing, and defer the remaining computation until the time of the query. This is a use case for which DRPC is ideally suited.

Trident provides a rich set of abstractions for querying sources of state and processing the resulting tuples using the same power that is inherent in any stream processing. Our ability to defer portions of the processing to a later time enables us to deal with use cases where state is only valid in the context of "now".

How to do it…

1. Create a new branch of your source using the following command:

git branch chap4 git checkout chap4

2. Create a new class named SplitAndProjectToFields, which extends from BaseFunction:

public void execute(TridentTuple tuple, TridentCollector collector) {

Values vals = new Values();

for(String word: tuple.getString(0).split(" ")) { if(word.length() > 0) {

vals.add(word);

} }

collector.emit(vals);

}

3. Once this is complete, edit the TermTopology class, and add the following method:

private static void addTFIDFQueryStream(TridentState tfState, TridentState dfState,

TridentState dState, TridentTopology topology, LocalDRPC drpc) {

topology.newDRPCStream("tfidfQuery",drpc) .each(new Fields("args"),

new SplitAndProjectToFields(), new Fields("documentId", "term"))

.each(new Fields(), new StaticSourceFunction(), new Fields("source")).stateQuery(tfState, new Fields("documentId", "term"),

new MapGet(), new Fields("tf"))

.stateQuery(dfState,new Fields("term"), new MapGet(), new Fields("df"))

.stateQuery(dState,new Fields("source"), new MapGet(), new Fields("d"))

.each(new Fields("term","documentId","tf","d","df"), new TfidfExpression(), new Fields("tfidf"))

.each(new Fields("tfidf"), new FilterNull()) .project(new Fields("documentId","term","tfidf"));

}

4. Then update your buildTopology method by removing the final stream definition and adding the DRPC creation:

public static TridentTopology buildTopology(ITridentSpout spout, LocalDRPC drpc) {

TridentTopology topology = new TridentTopology();

Stream documentStream = getUrlStream(topology, spout) .each(new Fields("url"),

new DocumentFetchFunction(mimeTypes),

new Fields("document", "documentId", "source"));

Stream termStream = documentStream

.parallelismHint(20)each(new Fields("document"), new DocumentTokenizer(), new Fields("dirtyTerm")) .each(new Fields("dirtyTerm"), new TermFilter(), new Fields("term"))

.project(new Fields("term","documentId","source"));

TridentState dfState = termStream.groupBy(

new Fields("term")).persistentAggregate (getStateFactory("df"), new Count(), new Fields("df"));

TridentState dState = documentStream.groupBy(

new Fields("source")).persistentAggregate(

getStateFactory("d"), new Count(), new Fields("d"));

TridentState tfState = termStream.groupBy(

new Fields("documentId", "term"))

.persistentAggregate(getStateFactory("tf"), new Count(), new Fields("tf"));

addTFIDFQueryStream(tfState, dfState, dState, topology, drpc);

return topology;

}

How it works…

At a high level, all we are doing as part of the stream processing is persisting computed values for d, df(term), and tf(document,term), but we don't calculate the final TF-IDF value.

We defer this calculation until the time the value is requested.

The states are computed by the following Trident calls:

TridentState dfState = termStream.groupBy(new Fields("term")) .persistentAggregate(getStateFactory("df"), new Count(), new Fields("df"));

TridentState dState = documentStream.groupBy(

new Fields("source")) .persistentAggregate(

getStateFactory("d"), new Count(), new Fields("d"));

TridentState tfState = termStream.groupBy(

new Fields("documentId", "term")).persistentAggregate(

getStateFactory("tf"), new Count(), new Fields("tf"));

It is important to note the GroupBy definitions for each case. The d value is grouped by a static value for the source of the stream, which gives us a global count across batch boundaries.

The df value is grouped by the term, which will effectively give us a count of the number of documents that contain the term, again across batch boundaries.

Finally, the tf value is stored by the document and term that gives us a count of the term on a per document basis.

With these elements calculated, we can defer the calculation to a later point in time. This is enabled through DRPC:

private static void addTFIDFQueryStream(TridentState tfState, TridentState dfState,

TridentState dState,

TridentTopology topology, LocalDRPC drpc) { topology.newDRPCStream("tfidfQuery",drpc)

.each(new Fields("args"), new SplitAndProjectToFields(), new Fields("documentId", "term"))

.each(new Fields(), new StaticSourceFunction(), new Fields("source"))

.stateQuery(tfState, new Fields("documentId", "term"), new MapGet(), new Fields("tf"))

.stateQuery(dfState,new Fields("term"), new MapGet(), new Fields("df")) .stateQuery(dState,new Fields("source"), new MapGet(), new Fields("d"))

Let's just unpack that a bit. The first function splits out the arguments that will be passed to the DRPC call from a client.

.each(new Fields("args"), new SplitAndProjectToFields(), new Fields("documentId", "term"))

The arguments will be passed to the DRPC call in the form drpc.execute("tfidfQuery",

"doc01 area"). The arguments, doc01 and area, should then be placed into a single tuple in the fields documentId and term. In order to achieve this, we can't simply apply a split function to the arguments, as this would generate many tuples. Instead we use the SplitAndProjectToFields function that we defined earlier:

public void execute(TridentTuple tuple, TridentCollector collector) { Values vals = new Values();

for(String word: tuple.getString(0).split(" ")) { if(word.length() > 0) {

vals.add(word);

} }

collector.emit(vals);

}

This function splits the input text based on " " (space) and then projects the values out to consecutive fields within the same tuple. Next we use a state query to add the value for tf to the tuple, based on values that were passed as arguments:

.stateQuery(tfState, new Fields("documentId", "term"), new MapGet(), new Fields("tf"))

After this query is complete, the tuple would contain three fields: documentId, term, and tf, where tf is the value that we computed earlier.

Next we look up the value for df:

.stateQuery(dfState,new Fields("term"), new MapGet(), new Fields("df"))

After this query is complete, the tuple will also contain a field for df. Finally we look up the value for d and add it to the tuple:

.stateQuery(dState,new Fields("source"), new MapGet(), new Fields("d"))

Then we pass all the fields to the expression function that we created in Chapter 3, Calculating Term Importance with Trident, and project just the final fields:

Fields("term","documentId","tf","d","df"), new TfidfExpression(), new Fields("tfidf"))

.each(new Fields("tfidf"), new FilterNull()) .project(new Fields("documentId","term","tfidf"));

Because we defined the stream using newDRPCStream, the output of the stream will be returned to the calling DRPC client.

There's more...

If you would like to test this quickly, update your main method to periodically call the DRPC query you have just created. Take note of how the values evolve over time:

LocalDRPC drpc = new LocalDRPC();

LocalCluster cluster = new LocalCluster();

conf.setDebug(true);

TridentTopology topology = buildTopology(null, drpc);

cluster.submitTopology("tfidf", conf, topology.build());

for(int i=0; i<100; i++) {

System.out.println("DRPC RESULT: " +

drpc.execute("tfidfQuery", "doc01 area"));

Thread.sleep(1000);

}

Integration testing of a Trident topology

In previous chapters, we have implemented integration tests by hooking into the defined topology and providing testing bolts that allow us to exercise the topology as a black box. This was achieved using the Java API. While this is possible with Trident, it becomes increasingly less elegant, especially in light of the fact that there are rich testing APIs based on Clojure.

In this recipe, we will convert our pure Java project into a Polyglot project in which the Java and Clojure code coexist comfortably. We will then implement a full integration test of the TF-IDF topology using the Clojure testing API.

It is assumed that the reader is familiar with Clojure and functional programming techniques. If this is not the case, please refer to any

How to do it…

1. Start by deleting your project from Eclipse using the Eclipse GUI and then clear the Eclipse project files:

mvn eclipse:clean

2. Once this is complete you can delete your Maven POM file. Next create a new file called project.clj in the root of the project:

(defproject tfidf-topology "0.0.1-SNAPSHOT"

:source-paths ["src/clj"]

:java-source-paths ["src/jvm" "test/jvm"]

:test-paths ["test/clj"]

:javac-options ["-target" "1.6" "-source" "1.6"]

:resources-path "multilang"

[org.twitter4j/twitter4j-core "3.0.3"]

[org.twitter4j/twitter4j-stream "3.0.3"]

[trident-cassandra "0.0.1-wip2"]

[org.slf4j/slf4j-log4j12 "1.6.1"]

[com.googlecode.json-simple/json-simple "1.1"]

[redis.clients/jedis "2.1.0"]

[org.apache.tika/tika-parsers "1.2"]

[org.apache.lucene/lucene-analyzers "3.6.2"]

[org.apache.lucene/lucene-spellchecker "3.6.2"]

[edu.washington.cs.knowitall/morpha-stemmer "1.0.4"]

[trident-cassandra/trident-cassandra "0.0.1-wip1"]

[commons-collections/commons-collections "3.2.1"]]

:profiles {:dev {:dependencies [[storm "0.8.2"]

[org.clojure/clojure "1.4.0"]

[junit/junit "4.11"]

[org.jmock/jmock-legacy "2.5.1"]

[org.mockito/mockito-all "1.8.4"]

[org.easytesting/fest-assert-core "2.0M8"]

[net.sf.opencsv/opencsv "2.3"]

[org.testng/testng "6.1.1"]]}}

)

3. Next, you will need to refactor your folder structure to make room for the Clojure source files:

1. Create two new folders: src/jvm and src/clj. 2. Move the contents from src/main/java to src/jvm.

3. Create three more folders: test/java/, test/clj, and multilang. 4. Move src/main/resource to multilang.

5. Delete src/main.

4. In order to enable your normal development workflow, you need to install lein and the Eclipse plugin. First download the lein script from https://github.com/

technomancy/leiningen and add it to your path (http://askubuntu.com/

questions/60218/how-to-add-a-directory-to-my-path).

5. Next, navigate to Help | Install New Software | Add http://ccw.cgrand.net/

updatesite/ in the Eclipse menu, complete the installation, and allow Eclipse to restart.

6. In order to import the lein project into Eclipse, you need to follow this procedure:

1. In the project explorer go to New | Project….

2. Navigate to General | Project.

3. For the project name enter tfidf-topology. 4. Uncheck the Use default location checkbox.

5. Browse to the location of your project and select Finish.

6. Right-click on the project in the project explorer and then select Configure | Convert to leiningen.

Your project should now be fully available and working in Eclipse.

If you want to change your project dependencies, simply right-click on your project in Eclipse and go to leiningen

| Reset project configuration.

7. Next you need to add some more DRPC queries to the topology to enable our testing.

Add the following methods to TermTopology:

public static void addDQueryStream(TridentState state, TridentTopology topology, LocalDRPC drpc){

topology.newDRPCStream("dQuery",drpc)

.each(new Fields("args"), new Split(), new

}

private static void addDFQueryStream(

TridentState dfState, TridentTopology topology, LocalDRPC drpc) {

topology.newDRPCStream("dfQuery",drpc) .each(new Fields("args"), new Split(), new Fields("term"))

.stateQuery(dfState, new Fields("term"), new MapGet(), new Fields("df")) .each(new Fields("df"), new FilterNull()) .project(new Fields("term","df"));

}

8. And update buildTopology to include these methods at the appropriate time:

TridentState dfState = termStream.groupBy(new Fields("term")) .persistentAggregate(getStateFactory("df"), new Count(), new Fields("df"));

addDFQueryStream(dfState, topology, drpc);

TridentState dState = documentStream.groupBy (new Fields("source"))

.persistentAggregate(getStateFactory("d"), new Count(), new Fields("d"));

addDQueryStream(dState, topology, drpc);

9. Finally you need to implement the integration test in Clojure. Add a file called TermTopology.clj to the test/clj folder:

(defn with-topology-debug* [cluster topo body-fn]

(t/submit-local-topology (:nimbus cluster) "tester"

{TOPOLOGY-DEBUG true} (.build topo)) (body-fn)

(.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))

)

(defmacro with-topology-debug [[cluster topo] & body]

'(with-topology-debug* ~cluster ~topo (fn [] ~@body))) (deftest test-tfidf

(bootstrap-db)

(t/with-local-cluster [cluster]

(with-drpc [drpc]

(letlocals

(bind feeder (feeder-spout ["url"]))

(bind topo (TermTopology/buildTopology feeder drpc)) (with-topology-debug [cluster topo]

(feed feeder [["doc01"] ["doc02"] ["doc03"]

["doc04"] ["doc05"]])

(is (= [["twitter" 5]] (exec-drpc drpc "dQuery"

"twitter")))

(is (= [["area" 3]] (exec-drpc drpc "dfQuery"

"area")))

(is (= [["doc01" "area" 0.44628710262841953]]

(exec-drpc drpc "tfidfQuery" "doc01 area"))))))))

How it works…

Leiningen is for automating Clojure projects without setting your hair on fire. It is essentially a Clojure equivalent of maven. It has many similar concepts, including dependency

management. In fact, lein (short name) deploys to maven repositories and consumes dependencies from maven repositories. The important properties to take note of are:

f source-paths: This defines the folders containing Clojure source

f java-source-paths: This defines the Java source folders, both main and testing folders

f test-paths: This defines the Clojure testing folder

f javac-options: This specifies your Java compiler options

f repositories: This property lists external repositories to source dependencies from, over, and above the standard repos of clojars and Maven central

f dependencies: This property lists your dependencies; note the dependency syntax is [groupID/artefactID "version"]

f profiles: This allows you to specify any property only applicable to the development time

In designing the integration test, the properties that we would like to assert are the values for d, df, and tf-idf. We have provided DRPC queries for all of the values and therefore the integration test will treat the topology as a black box, injecting values into a spout and verifying the results using DRPC queries. Let's explore the integration test in detail to understand its functionality. Everything in Clojure is a list, typically following a form where the function name is the first entry in the list. Because functions are first-class citizens in Clojure, this is also true for defining a function (achieved by calling a function). This function is called before the tests are run in order to clear out the Cassandra database so that we can start the

Next, we need to declare our test, start a cluster, and initialize the DRPC server to test the topology:

(deftest test-tfidf (bootstrap-db)

(t/with-local-cluster [cluster]

(with-drpc [drpc]

with-[functionality] is a convention-based macro within the Storm testing API, which simply initializes some element and performs the functionality based on it. t/with-local-cluster, for example, is defined as follows:

(defmacro with-local-cluster [[cluster-sym & args] & body]

'(let [~cluster-sym (mk-local-storm-cluster ~@args)]

(try

(kill-local-storm-cluster ~cluster-sym))) ))

As you can see, the macro makes it simple to create a cluster and then execute your functionality using it; your functionality is the body passed as an argument. The naming convention simply makes tests syntactically pleasing. Next we need to create the topology and a way to inject tuples into the topology:

(letlocals

(bind feeder (feeder-spout ["url"]))

(bind topo (TermTopology/buildTopology feeder drpc))

feeder is a testing spout that we pass to the buildTopology method, which we can then use to inject tuples into the topology. Clojure's Java interop defines various syntactical idioms for calling Java constructs. TermTopology/buildTopology calls out the buildTopology method and passes the created spout and drpc server. Next we can feed tuples into the topology and then verify the results:

(with-topology-debug [cluster topo]

(feed feeder [["doc01"] ["doc02"] ["doc03"] ["doc04"] ["doc05"]]) (is (= [["twitter" 5]] (exec-drpc drpc "dQuery" "twitter"))) (is (= [["area" 3]] (exec-drpc drpc "dfQuery" "area")))

(is (= [["doc01" "area" 0.44628710262841953]] (exec-drpc drpc "tfidfQuery" "doc01 area")))

with-topology-debug is a macro that submits the topology to the local cluster, with {TOPOLOGY-DEBUG true}.

There's more…

To run the tests, you can either use the command-line REPL that can be launched using lein:

lein repl

Or you can launch the REPL in Eclipse, using the Ctrl + Alt + S shortcut key. Once the REPL has been launched and is in the correct namespace (this happens automatically when using Eclipse shortcut), simply call the function:

(run-tests)

When you change any Java code you will need to restart the REPL to force it to pick them up. The command-line REPL requires that you first compile the Java source using lein javac.

在文檔中 Storm Real-time (頁 117-128)