The spout in a transactional topology is completely different from a standard spout.
public class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata> {
As you can see in the class definition, TweetsTransactionalSpout extends BaseTransac tionalSpout with a generic type. The type you set there is something known as the transaction metadata. It will be used later to emit batches of tuples from the source.
In this example, TransactionMetadata is defined as:
public class TransactionMetadata implements Serializable { private static final long serialVersionUID = 1L;
Transactions in Action | 73
long from;
int quantity;
public TransactionMetadata(long from, int quantity) { this.from = from;
this.quantity = quantity;
} }
Here you’ll store from and quantity, which will tell you exactly how to generate the batch of tuples.
To finish the implementation of the spout, you need to implement the following three methods:
@Override
public ITransactionalSpout.Coordinator<TransactionMetadata> getCoordinator(
Map conf, TopologyContext context) {
return new TweetsTransactionalSpoutCoordinator();
}
@Override
public backtype.storm.transactional.ITransactionalSpout.Emitter<TransactionMetadata>
getEmitter(
Map conf, TopologyContext context) {
return new TweetsTransactionalSpoutEmitter();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("txid", "tweet_id", "tweet"));
}
In the getCoordinator method, you tell Storm which class will coordinate the generation of batches of tuples. With getEmitter, you tell Storm which class will be responsible for reading batches of tuples from the source and emitting them to a stream in the topology. And finally, as you did before, you need to declare which fields are emitted.
The RQ class
To make the example easier, we’ve decided to encapsulate all operations with Redis in one single class.
public class RQ {
public static final String NEXT_READ = "NEXT_READ";
public static final String NEXT_WRITE = "NEXT_WRITE";
Jedis jedis;
public RQ() {
jedis = new Jedis("localhost");
}
public long getAvailableToRead(long current) {
}
public long getNextRead() {
String sNextRead = jedis.get(NEXT_READ);
if(sNextRead == null) return 1;
return Long.valueOf(sNextRead);
}
public long getNextWrite() {
return Long.valueOf(jedis.get(NEXT_WRITE));
}
public void close() { jedis.disconnect();
}
public void setNextRead(long nextRead) { jedis.set(NEXT_READ, ""+nextRead);
}
public List<String> getMessages(long from, int quantity) { String[] keys = new String[quantity];
for (int i = 0; i < quantity; i++) keys[i] = ""+(i+from);
return jedis.mget(keys);
} }
Read carefully the implementation of each method, and make sure you understand what they do.
The Coordinator
Let’s see the implementation of the coordinator of this example.
public static class TweetsTransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata> {
TransactionMetadata lastTransactionMetadata;
RQ rq = new RQ();
long nextRead = 0;
public TweetsTransactionalSpoutCoordinator() { nextRead = rq.getNextRead();
} @Override
public TransactionMetadata initializeTransaction(BigInteger txid, TransactionMetadata prevMetadata) {
long quantity = rq.getAvailableToRead(nextRead);
quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity;
TransactionMetadata ret = new TransactionMetadata(nextRead, (int)quantity);
Transactions in Action | 75
nextRead += quantity;
return ret;
} @Override
public boolean isReady() {
return rq.getAvailableToRead(nextRead) > 0;
} @Override
public void close() { rq.close();
} }
It is important to mention that among the entire topology there will be only one coordi-nator instance. When the coordicoordi-nator is instantiated, it retrieves from Redis a sequence that tells the coordinator which is the next tweet to read. The first time, this value will be 1, which means that the next tweet to read is the first one.
The first method that will be called is isReady. It will always be called before initiali zeTransaction, to make sure the source is ready to be read from. You should return true or false accordingly. In this example, retrieve the amount of tweets and compare them with how many tweets you read. The difference between them is the amount to available tweets to read. If it is greater than 0, it means you have tweets to read.
Finally, the initializeTransaction is executed. As you can see, you get txid and pre vMetadata as parameters. The first one is a unique transaction ID generated by Storm, which identifies the batch of tuples to be generated. prevMetadata is the metadata gen-erated by the coordinator of the previous transaction.
In this example, first make sure how many tweets are available to read. And once you have sorted that out, create a new TransactionMetadata, indicating which is the first tweet to read from, and which is the quantity of tweets to read.
As soon as you return the metadata, Storm stores it with the txid in zookeeper. This guarantees that if something goes wrong, Storm will be able to replay this with the emitter to resend the batch.
The Emitter
The final step when creating a transactional spout is implementing the emitter.
Let’s start with the following implementation:
public static class TweetsTransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata> {
RQ rq = new RQ();
public TweetsTransactionalSpoutEmitter() { }
@Override
public void emitBatch(TransactionAttempt tx,
TransactionMetadata coordinatorMeta, BatchOutputCollector collector) { rq.setNextRead(coordinatorMeta.from+coordinatorMeta.quantity);
List<String> messages = rq.getMessages(coordinatorMeta.from, coordinatorMeta.quantity);
long tweetId = coordinatorMeta.from;
for (String message : messages) {
collector.emit(new Values(tx, ""+tweetId, message));
tweetId++;
} } @Override
public void cleanupBefore(BigInteger txid) { }
Emitters are the one who will read the source and send tuples to a stream. It is very important for the emitters to always be able to send the same batch of tuples for the same transaction id and transaction metadata. This way, if something goes wrong during the processing of a batch, Storm will be able to repeat the same transaction id and transaction metadata with the emitter and make sure the batch of tuples are repeated.
Storm will increase the attempt id in the TransactionAttempt. This way you know that the batch is repeated.
The important method here is emitBatch. In this method, use the metadata, given as a parameter, to get tweets from Redis. Also increase the sequence in Redis that keeps track of how many tweets you’ve read so far. And of course, emit the tweets to the topology.