📜 ⬆️ ⬇️

We study Storm Framework. Part II

The first part dealt with the basic concepts of Storm.

Different classes of tasks have different requirements for reliability. It's one thing to miss a couple of records when calculating the statistics of visits, where the score goes to hundreds of thousands and special accuracy is not needed. And quite another thing is to lose, for example, information about a customer’s payment.

Next, consider the mechanisms for protection against data loss, which are implemented in the Storm.

Basic example


Spout

If it doesn’t matter to us if there were any errors during Tuple processing, then Spout sends Tuple to SpoutOutputCollector by calling the emit (new Values ​​(...)) method.
')
If we want to know if Tuple was successfully processed, then the call will look like emit (new Values ​​(...), msgId), where msgId is an object of an arbitrary class. In this case, the ISpout interface provides methods:
where msgId is the msgId with which SpoutOutputCollector.emit was called.
Example FailAwareSpout :
public class FailAwareSpout extends BaseRichSpout { private Message[] messages; // Skipped ... private static class Message implements Serializable { private String message; private int failCount; private Message(String message) { this.message = message; } } // Skipped ... @Override public void nextTuple() { // Skipped ... //  Tuple c msgId outputCollector.emit(new Values(messages[messageId].message), messageId); } // Tuple   @Override public void ack(Object msgId) { Message m = messages[(Integer) msgId]; System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processed successfully"); } // Tuple   @Override public void fail(Object msgId) { Message m = messages[(Integer) msgId]; if(++m.failCount > MAX_RETRY_COUNT) { throw new IllegalStateException("Too many message processing errors"); } System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processing failed " + "[" + m.failCount + "]"); //       sendQueue.addLast((Integer) msgId); } } 

The nextTuple, ack, and fail methods are called in the same thread and do not require additional synchronization when accessing the Spout fields.

Bolt

In order for Bolt to be able to inform Storm of the processing results, it must implement the IRichBolt interface. The easiest way to do this is to inherit the BaseRichBolt class.
Bolt informs Storm about the results of its work by calling the following methods of the OutputCollector class in the execute (Tuple) method:
Example FailingBolt :
 public class FailingBolt extends BaseRichBolt { OutputCollector outputCollector; // Skipped ... @Override public void execute(Tuple tuple) { // Skipped ... outputCollector.ack(tuple); //    } else { // Skipped ... outputCollector.fail(tuple); //     } } // Skipped ... } 

Usage example: BasicFailApp , Spout FailAwareSpout and Bolt FailingBolt randomly generating processing errors.

In Bolt's inherited from the BaseBasicBolt class, ack (Tuple) is called automatically after exiting the execute method.

Anchoring


When processing an input Tuple, Bolt can generate more than one output Tuple. If Bolt called emit (sourceTuple, resultTuple), then a DAG is formed with the vertex in the form of the source Tuple and the descendants in the form of the generated Tuple. Storm tracks the processing errors of all nodes in the graph. If an error occurs at any level in the hierarchy, the Spout that spawned the original Tuple will be notified by a call fail. MultiplierBolt Example:
 public class MultiplierBolt extends BaseRichBolt { // Skipped ... @Override public void execute(Tuple tuple) { //    Tuple    for(int i = 0; i < MULTI_COUNT; ++i) { // Anchoring,   Tuple   outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i)); } outputCollector.ack(tuple); } // Skipped ... } 

Anchoring Example: TreeFailApp

In Bolt's inherited from the BaseBasicBolt class , the execute method (Tuple, BasicOutputCollector) is called with the BasicOutputCollector collector. The peculiarity of BasicOutputCollector is that it automatically anchors the input Tuple during emit .

Since Storm is a distributed system, Tuple can be transferred from one cluster node to another. In this regard, Storm provides tracking timeouts processing. By default, the entire graph must be processed in 30 seconds, or the Storm will call the fail method of the Spout graph that originated. Timeout can be changed .

The code is available on github .

The next part will be devoted to Transactional Topologies, used in conjunction with transactional data sources.

UPD. Published the final part of the article.

Source: https://habr.com/ru/post/186436/


All Articles