public class FailAwareSpout extends BaseRichSpout { private Message[] messages; // Skipped ... private static class Message implements Serializable { private String message; private int failCount; private Message(String message) { this.message = message; } } // Skipped ... @Override public void nextTuple() { // Skipped ... // Tuple c msgId outputCollector.emit(new Values(messages[messageId].message), messageId); } // Tuple @Override public void ack(Object msgId) { Message m = messages[(Integer) msgId]; System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processed successfully"); } // Tuple @Override public void fail(Object msgId) { Message m = messages[(Integer) msgId]; if(++m.failCount > MAX_RETRY_COUNT) { throw new IllegalStateException("Too many message processing errors"); } System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processing failed " + "[" + m.failCount + "]"); // sendQueue.addLast((Integer) msgId); } }
public class FailingBolt extends BaseRichBolt { OutputCollector outputCollector; // Skipped ... @Override public void execute(Tuple tuple) { // Skipped ... outputCollector.ack(tuple); // } else { // Skipped ... outputCollector.fail(tuple); // } } // Skipped ... }
public class MultiplierBolt extends BaseRichBolt { // Skipped ... @Override public void execute(Tuple tuple) { // Tuple for(int i = 0; i < MULTI_COUNT; ++i) { // Anchoring, Tuple outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i)); } outputCollector.ack(tuple); } // Skipped ... }
Source: https://habr.com/ru/post/186436/
All Articles