• 沒有找到結果。

Implementing a transactional topology

在文檔中 Storm Real-time (頁 169-175)

In the previous chapters, we have concerned ourselves with data that was not "transactional"

in nature, not in the way we typically think of things such as financial transactions. As a result, one can potentially tolerate system failure because there is no direct monetary implication to each transaction that may be affected by any failure cases, especially given that transactional schematics come with a cost, both in performance and storage. The recipes in this chapter deal with scoring transactions that require transactional schematics, and it is therefore relevant to understand how to achieve exactly-once schematics with Storm at this stage.

The transactional schematics of Storm, like most aspects of Storm, are excellently documented on the project's wiki. The transactional logic is presented here as a matter of convenience and completeness, but the reader is encouraged to read the source of this information at https://

github.com/nathanmarz/storm/wiki/Trident-state.

Trident's support for exactly-once schematics requires specific implementations of spouts and state. When failures occur, tuples will be replayed. This brings up a problem when doing state updates (or anything with side effects)—you have no idea if you've ever successfully updated the state based on this tuple before. The state, therefore, needs to be fundamentally idempotent. In order to achieve this, you use the following properties of Storm:

f Tuples are processed as small batches.

f Each batch of tuples is given a unique ID called the transaction ID (txid). If the batch is replayed, it is given the exact same txid.

f State updates are ordered among batches; that is, the state updates for batch 2 won't be applied until the state updates for batch 1 have succeeded.

Given these properties, and storing a little extra state, it is possible to ensure that state updates are truly idempotent in the face of failures and retries.

The key to implementing these properties is the spout. The spout must follow a specific set of rules:

f Batches for a given txid are always the same. Replays of batches for a txid will extract the same set of tuples as the first time that batch was emitted for that txid.

f There's no overlap between the batches of tuples (tuples are in one batch or another, never multiple).

f Every tuple is in a batch.

Finally, the transaction ID is stored atomically with the value in the underlying Trident state.

Using this transaction ID, Trident is can detect if a given update is a duplicate of a previous update involving this batch and can decide whether to skip or apply this update. The logic of including the transaction ID is handled entirely by Trident and its state implementations and works regardless of the underlying persistence.

Getting ready

Before we get started, we need to install a message broker that more readily supports the transactional logic inherent in Storm's Trident, specifically Apache Kafka. Kafka is a distributed publish-subscribe messaging system. According to the Kafka website (http://kafka.apache.org/), it is designed to support the following:

f Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.

f High-throughput is supported; even with very modest hardware, Kafka can support hundreds of thousands of messages per second.

f Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.

In order to install Kafka, download the source packages from the website for Version 0.7.2.

Once the download is complete, install Scala Build Tool (sbt), then unpack and build the Kafka server by executing the following command:

sudo apt-get install sbt

tar xzf kafka-0.7.2-incubating-src.tgz cd kafka-0.7.2-incubating-src

./sbt update ./sbt package

The default installation of Kafka sets the number of partitions to 1, which isn't a practical value. So, start by editing the server.properties file under config and setting the value of num.partitions to 2.

With Kafka installed, open up three separate terminal instances within the kafka directory.

Within the first terminal, execute the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties Then, from within the second terminal, execute the following command:

bin/kafka-server-start.sh config/server.properties

You now have a functioning Kafka server. Note that your Zookeeper and Kafka instance are executing in separate terminals. Your third terminal will be used to interact with Kafka topics.

To test your installation, let's publish some messages. In the third terminal instance, execute

Once the initialization sequence is complete, you will be able to type text; each line you type will publish that text to the "test" topic. Enter a few lines and then escape the application by using Ctrl + C. Finally, verify that the messages can be read; to do this, type the following command into the terminal:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This application should display all of your previously created messages. You can use Ctrl + C to quit this application, but note that we will be shortly using this subscriber utility for testing.

How to do it…

We will now implement an extremely basic transactional topology. This will illustrate how transactional semantics are achieved in Storm. In order to fully illustrate this, we will create the topology and test and understand the resulting state, but we will also create some forced errors in order to check whether the failure and recovery cases are working. This will give you a clear understanding of how the transactional elements hang together and where to start debugging should you encounter errors.

1. Start by creating a Storm project that includes the following dependencies in the POM file:

<artifactId>storm</artifactId>

<groupId>storm</groupId>

<artifactId>storm</artifactId>

<groupId>storm</groupId>

</exclusion>

</exclusions>

</dependency>

2. Import your project into Eclipse, create a new main class named TransactionalTopology, and then create the basic main method implementation:

Config conf = new Config();

conf.setDebug(true);

conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("transactional-topology", conf,makeTopology().build());

Thread.sleep(100000);

cluster.shutdown();

3. Then, define a really simple topology in the makeTopology() method:

TridentTopology topology = new TridentTopology();

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(

StaticHosts.fromHostString(

Arrays.asList(new String[] { "localhost" }), 2), "test");

topology.newStream("kafka", new TransactionalTridentKafkaSpout(

spoutConfig))

.each(new Fields("bytes"), new DebugBytes(), new Fields("text"))

.groupBy(new Fields("text"))

.persistentAggregate(getBatchStateFactory("test"), new Count(), new Fields("count"));

return topology;

4. In order to complete the topology definition, you must implement the DebugBytes Trident function. Trident ships with a built-in Debug function, but here we need a specialized equivalent that prints out the message contents and generates

}

String text = new String(tuple.getBinary(0));

System.out.println(name + text);

collector.emit(new Values(text));

5. This is the complete implementation. So, we can simply expect the topology to print our messages, group them by content, and store a count against content in a Cassandra column family. Before we test the topology, you need to create the Cassandra column family; execute the following commands in the Cassandra-cli to do so:

create keyspace test

with strategy_options = [{replication_factor:1}]

and placement_strategy = 'org.apache.cassandra.locator.

SimpleStrategy';

use test;

create column family transactional with comparator = AsciiType

and default_validation_class = 'UTF8Type' and key_validation_class = 'UTF8Type';

6. You can now test the topology by starting it directly from Eclipse by navigating to Run As | Java Application. Once the topology is initialized, switch back to your third Kafka terminal and run the following command to start the message producer:

bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test

7. Using this interface, publish a single message. Then, take note of the Storm logs, which should be clear. Also take note of the value in the Cassandra column family, which you can do by the following Cassandra-cli instructions:

use test;

list transactional;

Note that the value stored in Cassandra contains more than just your intended count of 1.

8. Now, switch back to the terminal and enter a second message. At this point, your topology should throw an exception and terminate. Validate that this has occurred, and also validate that the stored count is still 1.

You have effectively simulated a failure during transaction processing. The expected behavior now is that you can start up the topology again, the message should now be processed, and the count in Cassandra should be updated to 2. To test this, simply run the topology again in local mode from Eclipse.

9. When you execute this , you will notice that the message does not get replayed into the topology, and the count remains at 1 until you publish a third message. This means that the message was effectively lost as a result of the failure case. This is obviously what we are trying to prevent; in order to fix the issue, add the following lines of code to your main method:

conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"127.0.0.1"}));

conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);

conf.put(Config.STORM_ZOOKEEPER_ROOT, "/storm");

10. You can now re-run the same test, you will find that after the failure, the topology does replay the message and the count is updated as expected.

How it works...

We saw many elements coming together in order to provide the exactly-once semantics that we finally witnessed during our tests. Let's unpack each element at play.

Firstly, the Kafka spout, which is a prebuilt module from the storm-contrib project, already implements the required set of logic to be a transactional compliant spout. We configured the spout using the following lines of code:

TridentKafkaConfig spoutConfig = new TridentKafkaConfig(

StaticHosts.fromHostString(

Arrays.asList(new String[] { localhost" }), 2), "test");

This configuration tells Kafka where to find the Kafka nodes. In this case, we only have a single node on localhost; however, in a larger deployment, we would supply a list of actual Kafka nodes. We also specify the number of partitions and the name of the topic to subscribe to. We then create an instance of a transactional spout as the spout for the only stream in our topology. This is all that is required to implement a transactional Kafka spout. Many other transactional spouts exist within the storm-contrib project; however, if your broker isn't currently supported, using the existing storm-contrib code as a starting point would be the best approach to implementing a new spout.

Secondly, we have seen the transactional state in the Cassandra-backed Trident state. Trident adds some transaction IS state to the state element in the underlying data store on our behalf. It then also uses this state to safely replay the message and update the state. In order to use the transactional state, ensure that you create your Cassandra state using the transactional utility method.

Finally, we managed to create failure cases, which weren't dealt with correctly without adding some additional configurations for the topology. The properties in question effectively tell Storm where to find a running Zookeeper instance:

conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[]

{"127.0.0.1"}));

conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);

conf.put(Config.STORM_ZOOKEEPER_ROOT, "/storm");

This fixes the problem, because Storm stores an additional bit of state in Zookeeper, which allows Storm to enforce strong ordering between batches. This is key, Trident allows for parallel processing of batches; however, they always commit in order. This capability is enabled by maintaining a batch-tracking state inside zookeeper. Within a standard deployment, there is no need to tell Storm where to find Zookeeper; however, in the local mode, there is no Zookeeper instance running, thus the requirement for this configuration.

Creating a Random Forest classification

在文檔中 Storm Real-time (頁 169-175)