• 沒有找到結果。

Persisting documents from Storm

在文檔中 Storm Real-time (頁 159-166)

} catch (IOException e) { e.printStackTrace();

}

dataFileWriter.close();

Persisting documents from Storm

In the previous recipe, we looked at deriving precomputed views of our data taking some immutable data as the source. In that recipe, we used statically created data. In an operational system, we need Storm to store the immutable data into Hadoop so that it can be used in any preprocessing that is required.

How to do it…

As each tuple is processed in Storm, we must generate an Avro record based on the document record definition and append it to the data file within the Hadoop filesystem.

We must create a Trident function that takes each document tuple and stores the associated Avro record.

1. Within the tfidf-topology project created in Chapter 3, Calculating Term Importance with Trident, inside the storm.cookbook.tfidf.function package, create a new class named PersistDocumentFunction that extends BaseFunction. Within the prepare function, initialize the Avro schema and document writer:

public void prepare(Map conf, TridentOperationContext context) { try {

String path = (String) conf.get("DOCUMENT_PATH");

schema = Schema.parse(PersistDocumentFunction.class .getResourceAsStream("/document.avsc"));

File file = new File(path);

DatumWriter<GenericRecord> datumWriter = new GenericDatum Writer<GenericRecord>(schema);

dataFileWriter = new DataFileWriter<GenericRecord>(

datumWriter);

if(file.exists())

dataFileWriter.appendTo(file);

else

dataFileWriter.create(schema, file);

} catch (IOException e) {

throw new RuntimeException(e);

} }

2. As each tuple is received, coerce it into an Avro record and add it to the file:

public void execute(TridentTuple tuple, TridentCollector collector) {

GenericRecord docEntry = new GenericData.Record(schema);

docEntry.put("docid", tuple.getStringByField("documentId"));

docEntry.put("time", Time.currentTimeMillis());

docEntry.put("line", tuple.getStringByField("document"));

LOG.error("Error writing to document record: " + e);

throw new RuntimeException(e);

} }

3. Next, edit the TermTopology.build topology and add the function to the document stream:

documentStream.each(new Fields("documentId","document"), new PersistDocumentFunction(), new Fields());

4. Finally, include the document path into the topology configuration:

conf.put("DOCUMENT_PATH", "document.avro");

How it works…

There are various logical streams within the topology, and certainly the input for the topology is not in the appropriate state for the recipes in this chapter containing only URLs. We therefore need to select the correct stream from which to consume tuples, coerce these into Avro records, and serialize them into a file.

The previous recipe will then periodically consume this file. Within the context of the topology definition, include the following code:

Stream documentStream = getUrlStream(topology, spout) .each(new Fields("url"),

new DocumentFetchFunction(mimeTypes),

new Fields("document", "documentId", "source"));

documentStream.each(new Fields("documentId","document"), new PersistDocumentFunction(), new Fields());

The function should consume tuples from the document stream whose tuples are populated with already fetched documents.

Integrating the batch and real-time views

The final step to complete the big data architecture is largely complete already and is surprisingly simple, as is the case with all good functional style designs.

How to do it…

This recipe involves simply extending the existing TF-IDF DRPC query that we defined in Chapter 4, Distributed Remote Procedure Calls. We need three new state sources that represents the D, DF, and TF values computed in the Batch layer. We will combine the values from these states with the existing state before performing the final TF-IDF calculation.

1. Start from the inside out by creating the combination function called

BatchCombiner within the storm.cookbook.tfidf.function package and implement the logic to combine two versions of the same state. One version should be from the current hour, and the other from all the data prior to the current hour:

public void execute(TridentTuple tuple, TridentCollector collector) {

try {

double d_rt = (double) tuple.getLongByField("d_rt");

double df_rt = (double) tuple.getLongByField("df_rt");

double tf_rt = (double) tuple.getLongByField("tf_rt");

double d_batch = (double) tuple.getLongByField("d_batch");

double df_batch = (double) tuple.getLongByField("df_batch");

double tf_batch = (double) tuple.getLongByField("tf_batch");

collector.emit(new Values(tf_rt + tf_batch, d_rt + d_batch, df_rt + df_batch));

} catch (Exception e) { }

}

2. Add the state to the topology by adding these calls to the addTFIDFQueryStream function:

TridentState batchDfState = topology.newStaticState(

getBatchStateFactory("df"));

TridentState batchDState = topology.newStaticState(

getBatchStateFactory("d"));

TridentState batchTfState = topology.newStaticState(

getBatchStateFactory("tf"));

3. This is supported by the static utility function:

private static StateFactory getBatchStateFactory(String rowKey) {

options.rowKey = rowKey;

return CassandraState.nonTransactional("localhost", options);

}

Within a cluster deployment of Cassandra, simply replace the word localhost with a list of seed node IP addresses. Seed nodes are simply Cassandra nodes, which, when appropriately configured, will know about their peers in the cluster. For more information on Cassandra, please see the online documentation at http://wiki.

apache.org/cassandra/GettingStarted.

4. Finally, edit the existing DRPC query to reflect the added state and combiner function:

topology.newDRPCStream("tfidfQuery",drpc) .each(new Fields("term","documentId","tf","d", "df"), new TfidfExpression(),

new Fields("tfidf"))

.each(new Fields("tfidf"), new FilterNull()) .project(new Fields("documentId",

"term","tfidf"));

How it works…

We have covered a huge amount of ground to get to this point. We have implemented an entire real-time, big data architecture that is fault-tolerant, scalable, and reliable using purely open source technologies. It is therefore useful at this point to recap the journey we have taken to the point, ending back where we are now:

f We learned how to implement a Trident topology and define a stream data pipeline.

This data pipeline defines predicates that not only act on tuples but also on persistent, mutable states.

f Using this pipeline, we implemented the TF-IDF algorithm.

f We separated out the preprocessing stage of the data pipeline from the "at time"

stage of the pipeline. We achieved this by implementing a portion of the pipeline in a DRPC stream that is only invoked at "at time".

f We then added the concept of time windows to the topology. This allowed us to segment the state into time-window buckets. We chose hours as a convenient segmentation.

f We learned how to test a time-dependent topology using the Clojure testing API.

f Then, in this chapter, we implemented the immutable state and the batch computation.

f Finally, we combined the batch-computed view with the mutable state to provide a complete solution.

The following flow diagram illustrates the entire process:

Twitter Spout

Fetch Documents

Serialize Records

Process Streams

DRPC Query

Calculate TF-IDF

Compute Immutable

Cassandra Batch View Mutable

State

With the high-level picture in place, the final DRPC query stream becomes easier to understand. The stream effectively implements the following steps:

f .each(SplitAndProjectToFields): This splits the input arguments from the query and projects them out into separate fields in the tuple

f .each(StaticSourceFunction): This adds a static value to the stream, which will be required later

f .stateQuery(tfState): This queries the state of the tf value for the current hour based on the document ID and term and outputs tf_rt

f .stateQuery(dState): This queries the state of the d value for the current hour based on the static source value and outputs d_rt

f .stateQuery(dfState): This queries the state of the df value for the current hour based on the term and outputs df_rt

f .stateQuery(tfBatchState): This queries the state of the tf value for all previous hours based on the document ID and term and outputs tf_batch

f .stateQuery(dBatchState): This queries the state of the d value for all previous hours based on the static source value and outputs d_batch

f .stateQuery(dfBatchState): This queries the state of the df value for all previous hours based on the term and outputs df_batch

f .each(BatchCombiner): This combines the separate _rt and _batch fields into a single set of values

f .each(TfidfExpression): This calculates the TF-IDF final value

f .project: This projects just the fields we require in the output

A key to understanding this is that in each stage in this process, the tuple is simply receiving new values and each function is simply adding new named values to the tuple. The state queries are doing the same based on existing fields within the tuple. Finally, we end up with a very "wide" tuple that we trim down before returning the final result.

在文檔中 Storm Real-time (頁 159-166)