Step 3: Apache Storm Framework Installation
8. APACHE STORM – TRIDENT
33 FeederBatchSpout creation and data feeding can be done as shown below:
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident Operations
Trident relies on the “Trident Operation” to process the input stream of trident tuples. Trident API has a number of in-built operations to handle simple-to-complex stream processing. These operations range from simple validation to complex grouping and aggregation of trident tuples.
Let us go through the most important and frequently used operations.
Filter
Filter is an object used to perform the task of input validation. A Trident filter gets a subset of trident tuple fields as input and returns either true or false depending on whether certain conditions are satisfied or not. If true is returned, then the tuple is kept in the output stream;
otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter class and implement the isKeep method. Here is a sample implementation of filter operation:
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0;
} }
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
34 Filter function can be called in the topology using “each” method. “Fields” class can be used to specify the input (subset of trident tuple). The sample code is as follows:
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())
Function
Function is an object used to perform a simple operation on a single trident tuple. It takes a subset of trident tuple fields and emits zero or more new trident tuple fields.
Function basically inherits from the BaseFunction class and implements the execute method.
A sample implementation is given below:
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
collector.emit(new Values(a + b));
} }
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
35 Just like Filter operation, Function operation can be called in a topology using the each method.
The sample code is as follows:
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
Aggregation
Aggregation is an object used to perform aggregation operations on an input batch or partition or stream. Trident has three types of aggregation. They are as follows:
aggregate: Aggregates each batch of trident tuple in isolation. During the aggregate process, the tuples are initially repartitioned using the global grouping to combine all partitions of the same batch into a single partition.
partitionAggregate: Aggregates each partition instead of the entire batch of trident tuple. The output of the partition aggregate completely replaces the input tuple. The output of the partition aggregate contains a single field tuple.
persistentaggregate: Aggregates on all trident tuple across all batch and stores the result in either memory or database
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”))
// partitionAggregate operation topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count"))
// persistentAggregate - saving the count to memory topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Aggregation operation can be created using either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The "count” aggregator used in the above example is one of the build-in aggregators. It is implemented using “CombinerAggregator”. The implementation is as follows:
36 public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) { return 1L;
}
@Override
public Long combine(Long val1, Long val2) { return val1 + val2;
}
@Override
public Long zero() { return 0L;
} }
Grouping
Grouping operation is an inbuilt operation and can be called by the groupBy method. The groupBy method repartitions the stream by doing a partitionBy on the specified fields, and then within each partition, it groups tuples together whose group fields are equal. Normally, we use
“groupBy” along with “persistentAggregate” to get the grouped aggregation. The sample code is as follows:
TridentTopology topology = new TridentTopology();
// persistentAggregate - saving the count to memory topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”)
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Merging and Joining
Merging and joining can be done by using “merge” and “join” method respectively. Merging combines one or more streams. Joining is similar to merging, except the fact that joining uses trident tuple field from both sides to check and join two streams. Moreover, joining will work under batch level only. The sample code is as follows:
37 TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key",
"a", "b", "c"));
State Maintenance
Trident provides a mechanism for state maintenance. State information can be stored in the topology itself, otherwise you can store it in a separate database as well. The reason is to maintain a state that if any tuple fails during processing, then the failed tuple is retried. This creates a problem while updating the state because you are not sure whether the state of this tuple has been updated previously or not. If the tuple has failed before updating the state, then retrying the tuple will make the state stable. However, if the tuple has failed after updating the state, then retrying the same tuple will again increase the count in the database and make the state unstable. One needs to perform the following steps to ensure a message is processed only once:
Process the tuples in small batches.
Assign a unique ID to each batch. If the batch is retried, it is given the same unique ID.
The state updates are ordered among batches. For example, the state update of the second batch will not be possible until the state update for the first batch has completed.
Distributed RPC
Distributed RPC is used to query and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the client and passes it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is redirected by the distributed RPC server to the client.
Trident's distributed RPC query executes like a normal RPC query, except for the fact that these queries are run in parallel.
When to Use Trident?
As in many use-cases, if the requirement is to process a query only once, we can achieve it by writing a topology in Trident. On the other hand, it will be difficult to achieve exactly once processing in the case of Storm. Hence Trident will be useful for those use-cases where you require exactly once processing. Trident is not for all use cases, especially high-performance use-cases because it adds complexity to Storm and manages the state.
Working Example of Trident
We are going to convert our call log analyzer application worked out in the previous section to Trident framework. Trident application will be relatively easy as compared to plain storm, thanks to its high-level API. Storm will be basically required to perform any one of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally we will start the DRPC Server using the LocalDRPC class and search some keyword using the execute method of LocalDRPC class.
38
Formatting the call information
The purpose of the FormatCall class is to format the call information comprising “Caller number”
and “Receiver number”. The complete program code is as follows:
Coding: FormatCall.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0);
String toMobileNumber = tuple.getString(1);
collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
} }
CSVSplit
The purpose of the CSVSplit class is to split the input string based on “comma (,)” and emit every word in the string. This function is used to parse the input argument of distributed querying. The complete code is as follows:
Coding: CSVSplit.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) {
if(word.length() > 0) {
39 collector.emit(new Values(word));
} } } }
Log Analyzer
This is the main application. Initially, the application will initialize the TridentTopology and feed caller information using FeederBatchSpout. Trident topology stream can be created using the newStream method of TridentTopology class. Similarly, Trident topology DRPC stream can be created using the newDRCPStream method of TridentTopology class. A simple DRCP server can be created using LocalDRPC class. LocalDRPC has execute method to search some keyword.
The complete code is given below.
Coding: LogAnalyserTrident.java
import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
40 import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident");
.newStream("fixed-batch-spout", testSpout)
.each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call"))
.groupBy(new Fields("call"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
LocalDRPC drpc = new LocalDRPC();
topology.newDRPCStream("call_count", drpc)
.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
topology.newDRPCStream("multiple_call_count", drpc)
.each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call"))
.stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug())
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident", conf, topology.build());
41 Random randomGenerator = new Random();
int idx = 0;
while(idx < 10) { testSpout.feed(
ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60))));
testSpout.feed(
ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60))));
testSpout.feed(
ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60))));
testSpout.feed(
ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60))));
idx = idx + 1;
}
System.out.println("DRPC : Query starts");
System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403"));
System.out.println("DRPC : Query ends");
cluster.shutdown();
drpc.shutdown();
// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
} }
Building and Running the Application
The complete application has three Java codes. They are as follows:
42 FormatCall.java
CSVSplit.java
LogAnalyerTrident.java
The application can be built by using the following command:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java The application can be run by using the following command:
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
Output
Once the application is started, the application will output the complete details about the cluster startup process, operations processing, DRPC Server and client information, and finally, the cluster shutdown process. This output will be displayed on the console as shown below.
DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends
Apache Storm
43 Here in this chapter, we will discuss a real-time application of Apache Storm. We will see how Storm is used in Twitter.
Twitter is an online social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets.
Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword.
Now let us take a real-time scenario of finding the most used hashtag per topic.
Spout Creation
The purpose of spout is to get the tweets submitted by people as soon as possible. Twitter provides “Twitter Streaming API”, a web service based tool to retrieve the tweets submitted by people in real time. Twitter Streaming API can be accessed in any programming language.
twitter4j is an open source, unofficial Java library, which provides a Java based module to easily access the Twitter Streaming API. twitter4j provides a listener-based framework to access the tweets. To access the Twitter Streaming API, we need to sign in for Twitter developer account and should get the following OAuth authentication details.
Customerkey CustomerSecret AccessToken
AccessTookenSecret
Storm provides a twitter spout, TwitterSampleSpout, in its starter kit. We will be using it to retrieve the tweets. The spout needs OAuth authentication details and at least a keyword. The spout will emit real-time tweets based on keywords. The complete program code is given below.