In this section, we will learn how to write a producer that will publish events into the Kafka messaging queue. In the next section, we will process the events published in this section with a Storm topology that reads data from Kafka using KafkaSpout. Perform the following steps to create the producer:
1. Create a new Maven project with the com.learningstorm group ID and the kafka-producer artifact ID.
2. Add the following dependencies for Kafka in the pom.xml file:
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
3. Add the following build plugins to the pom.xml file; it will execute the producer using Maven:
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true </includeProjectDependencies>
<includePluginDependencies>false </includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.learningstorm.kafka.WordsProducer </mainClass>
</configuration>
</plugin>
</plugins>
</build>
4. Now, we will create the WordsProducer class in the com.learningstorm.
kafka package. This class will produce each word from the first paragraph of Franz Kafka's Metamorphosis into the words_topic topic in Kafka as a single message. The following is the code of the WordsProducer class with explanation:
public class WordsProducer {
public static void main(String[] args) {
// Build the configuration required for connecting to Kafka
Properties props = new Properties();
//List of Kafka brokers. Complete list of brokers is not
//required as the producer will auto discover the rest of
//the brokers. Change this to suit your deployment.
props.put("metadata.broker.list", "localhost:9092");
// Serializer used for sending data to kafka. Since we are sending string,
// we are using StringEncoder.
props.put("serializer.class", "kafka.serializer.StringEncoder");
// We want acks from Kafka that messages are properly received.
props.put("request.required.acks", "1");
// Create the producer instance
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new
Producer<String, String>(config);
// Now we break each word from the paragraph for (String word :
METAMORPHOSIS_OPENING_PARA.split("\\s")) {
// Create message to be sent to "words_topic" topic with the word
KeyedMessage<String, String> data = new KeyedMessage<String, String>
("words_topic", word);
// Send the message producer.send(data);
}
System.out.println("Produced data");
// close the producer producer.close();
}
// First paragraph from Franz Kafka's Metamorphosis private static String METAMORPHOSIS_OPENING_PARA = "One morning, when Gregor Samsa woke from troubled
dreams, " + "he found himself transformed in his bed into a horrible " + "vermin. He lay on his armour-like back, and if he lifted " + "his head a little he could see his brown belly, slightly " + "domed and divided by arches into stiff sections.";
}
5. Now, we can run the producer by executing the following command:
mvn compile exec:java
The following output is displayed:
Produced data
6. Now, let's verify that the message has been produced using Kafka's console consumer by executing the following command:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic words_topic --from-beginning
The following output is displayed:
One morning,
Samsa woke from troubled dreams, he found himself transformed in
his bed into a
horrible vermin.
So, we are able to produce messages into Kafka. In the next section, we will see how we can use KafkaSpout to read messages from Kafka and process them inside a Storm topology.
Integrating Kafka with Storm
Now, we will create a Storm topology that will consume messages from a Kafka topic, word_topic, and aggregate words into sentences.
The complete message flow is shown in the following diagram:
Kafka Broker
Words Producer
Storm Topology
KafkaSpout
SentenceBolt
PrinterBolt
The message flow in the example Storm-Kafka integration
We have already seen the WordsProducer class that produces words into the Kafka broker. Now, we will create a Storm topology that will read these words from Kafka and aggregate them into sentences. For this, we will have one KafkaSpout in the application that will read the messages from Kafka and two bolts: SentenceBolt, which receives words from KafkaSpout and then aggregates them into sentences which are then passed onto PrinterBolt, which simply prints them on the output stream. We will be running this topology in a local mode. Perform the following steps to create the Storm topology:
1. Create a new Maven project with the com.learningstorm group ID and the kafka-storm-topology artifact ID.
2. Add the following dependencies for KafkaSpout and Storm in the pom.xml file:
<!-- Dependency for Storm-Kafka spout -->
<dependency>
<groupId>net.wurstmeister.storm</groupId>
<artifactId>storm-kafka-0.8-plus</artifactId>
<version>0.4.0</version>
</dependency>
<!-- Dependency for Storm -->
<dependency>
<groupId>storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
3. Add the exec-maven-plugin plugin to the pom.xml file so that we are able to run the topology from the command line in a local mode using the following code:
<executable>java</executable>
<includeProjectDependencies>true </includeProjectDependencies>
<includePluginDependencies>false </includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${main.class}</mainClass>
</configuration>
</plugin>
4. Add the maven-assembly-plugin plugin to the pom.xml file so that we can package the topology to deploy it on Storm using the following code:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
5. Now, add the repositories for the KafkaSpout dependencies in the pom.xml file:
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/
github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
6. Now, we will first create SentenceBolt, which will aggregate the words into sentences. For this, create a class called SentenceBolt in the com.
learningstorm.kafka package. The following is the code for the SentenceBolt class with explanation:
public class SentenceBolt extends BaseBasicBolt { // list used for aggregating the words
private List<String> words = new ArrayList<String>();
public void execute(Tuple input, BasicOutputCollector collector) {
// Get the word from the tuple String word = input.getString(0);
if(StringUtils.isBlank(word)){
// ignore blank lines return;
}
System.out.println("Received Word:" + word);
// add word to current list of words words.add(word);
if (word.endsWith(".")) {
// word ends with '.' which means this is the end // the SentenceBolt publishes a sentence tuple collector.emit(ImmutableList.of(
(Object) StringUtils.join(words, ' ')));
// and reset the words list.
words.clear();
} }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// here we declare we will be emitting tuples with // a single field called "sentence"
declarer.declare(new Fields("sentence"));
} }
7. Next is PrinterBolt, which just prints the sentences that are received.
Create the PrinterBolt class in the com.learningstorm.kafka package.
The following is the code with explanation:
public class PrinterBolt extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
// get the sentence from the tuple and print it String sentence = input.getString(0);
System.out.println("Received Sentence: " + sentence);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// we don't emit anything }
}
8. Now, we will create KafkaTopology, which will define KafkaSpout and wire it with PrinterBolt and SentenceBolt. Create a new KafkaTopology class in the com.learningstorm.kafka package. The following is the code with explanation:
public class KafkaTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // zookeeper hosts for the Kafka cluster
ZkHosts zkHosts = new ZkHosts("localhost:2181");
// Create the KafkaSpout configuration // Second argument is the topic name
// Third argument is the ZooKeeper root for Kafka // Fourth argument is consumer group id
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "words_topic", "", "id7");
// Specify that the kafka messages are String
kafkaConfig.scheme = new SchemeAsMultiScheme(new
kafkaConfig.forceFromStart = true;
// Now we create the topology
TopologyBuilder builder = new TopologyBuilder();
// set the kafka spout class
builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
// configure the bolts
builder.setBolt("SentenceBolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
// create an instance of LocalCluster class // for executing topology in local mode.
LocalCluster cluster = new LocalCluster();
Config conf = new Config();
// Submit topology for execution
cluster.submitTopology("KafkaToplogy", conf, builder.createTopology());
try {
// Wait for some time before exiting
System.out.println("Waiting to consume from kafka");
Thread.sleep(10000);
} catch (Exception exception) {
System.out.println("Thread interrupted exception : "
+ exception);
}
// kill the KafkaTopology
cluster.killTopology("KafkaToplogy");
// shut down the storm test cluster cluster.shutdown();
} }
9. Now, we will run the topology. Make sure the Kafka cluster is running and you have executed the producer in the last section so that there are messages in Kafka for consumption.
Run the topology by executing the following command:
This will execute the topology. You should see messages similar to the following output:
RecievedWord:One RecievedWord:morning, RecievedWord:when RecievedWord:Gregor RecievedWord:Samsa RecievedWord:woke RecievedWord:from RecievedWord:troubled RecievedWord:dreams, RecievedWord:he RecievedWord:found RecievedWord:himself RecievedWord:transformed RecievedWord:in
RecievedWord:his RecievedWord:bed RecievedWord:into RecievedWord:a
RecievedWord:horrible RecievedWord:vermin
RecievedSentence:One morning, when Gregor Samsa woke from troubled dreams, he found himself transformed in his bed into a horrible vermin.
So, we were able to consume messages from Kafka and process them in a Storm topology.
Summary
In this chapter, we learned about the basics of Apache Kafka and how to use it as part of a real-time stream processing pipeline build with Storm. We learned about the architecture of Apache Kafka and how it can be integrated into Storm processing by using KafkaSpout.
In the next chapter, we will have a look at Trident, which is a high-level abstraction for defining Storm topologies. We will also see transactional topologies in Storm that support exactly-once message processing semantics.