📜 ⬆️ ⬇️

We study Storm Framework. Part III

In the second part of the article, the mechanisms of error detection during processing were described.

Processing ended with an error, what to do next? It is possible that communication with one of the cluster nodes is lost or the database is temporarily unavailable. In this case, it is impossible to say with certainty which operations were performed successfully and which were not. If all operations in the chain are re-applicable ( idempotent ), for example, setting the flag, then you can simply restart the processing. If not, then the Storm transaction mechanisms come to the rescue.

When they talk about the characteristics of transactions, the term ACID immediately pops up:

Consistency and Durability are more database related. We will be interested in Atomicity and Isolation.

In version 0.8.0, the Trident subsystem appeared in Storm, an analogue of Apache Pig . It also migrated the functionality of Transactional topology .
')

Transactions in Storm


A tomicity

In Topology, an object is created that implements the State interface, which encapsulates work with the database. The input to the spout is split into a Tuple and is collected into packets (batch). Batch associates with a unique transaction id. Tuple-forming batch can be processed in parallel.
At the end of the processing chain, a set of Tuples related to a single transaction is passed to the updateState method of the class that implements the StateUpdater interface, which also modifies the State. In case of successful completion, Spout is notified of the success of batch processing. In case of an error, Spout must re-process the entire batch.
Thus Storm guarantees that Batch will be recorded in the database completely and only once.

I solation

Storm ensures that Batch'i are transmitted to the StateUpdater strictly sequentially, in ascending order transaction id. That is, Batch # 2 will be fixed only after successful fixing of Batch # 1.

Implementation


Transaction-enabled spout must implement the ICommitterTridentSpout <TransactionMetadata> interface. TransactionMetadata - any class that contains data for generating batch and generating the following transaction: TxMeta .
Hidden text
public class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; } // Skipped getters } 


The class that implements the ITridentSpout.BatchCoordinator <TransactionMetadata> interface initializes TransactionMetadata when creating a transaction and responds to a query if the data for the next transaction is ready : TridentTxSpout . It is created in a single copy for each Topology.
Hidden text
  static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5; //TxMeta -    @Override public TxMeta initializeTransaction(long l, TxMeta txMeta) { if(txMeta != null) { System.out.println(String.format("Initializing transaction id: %08d, " + "start: %04d, count: %04d", l, txMeta.getStart() + txMeta.getCount(), txMeta.getCount())); return new TxMeta(txMeta.getStart() + txMeta.getCount(), TRANSACTION_ELEMENT_COUNT); } else { return new TxMeta(0, TRANSACTION_ELEMENT_COUNT); } } //       @Override public boolean isReady(long l) { if(l <= TRANSACTION_COUNT) { System.out.println("ISREADY " + l); return true; } return false; } } 


The class that implements the ICommitterTridentSpout.Emitter interface forms the Batch. In the event of an error in the processing of Batch'a, forms the Batch again.
Important - the reformed Batch must contain exactly the same Tuple set as the original.
Hidden text
 static class BEmitter implements Emitter { //  Batch    TransactionMetadata @Override public void emitBatch(TransactionAttempt transactionAttempt, Object coordinatorMeta, TridentCollector tridentCollector) { TxMeta txMeta = (TxMeta) coordinatorMeta; System.out.println("Emitting transaction id: " + transactionAttempt.getTransactionId() + " attempt:" + transactionAttempt.getAttemptId() ); for(int i = 0; i < txMeta.getCount(); ++i) { tridentCollector.emit(new Values("TRANS [" + transactionAttempt.getAttemptId() + "] [" + (txMeta.getStart() + i) + "]") ); } } //     State @Override public void success(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction success id:" + transactionAttempt.getTransactionId()); } //     State @Override public void commit(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction commit id:" + transactionAttempt.getTransactionId()); } } 


The class that implements the State interface in our case is the database driver: TxDatabase .
Hidden text
 public class TxDatabase implements State { //       @Override public void beginCommit(Long txId) { System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId); } //       @Override public void commit(Long txId) { System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId); } } 


The class that inherits BaseStateUpdater <S extends State> , makes changes to the State (DB): TxDatabaseUpdater
Hidden text
 public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count; //     @Override public void updateState(TxDatabase txDatabase, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { //    if(++count == 2) throw new FailedException("YYYY"); for(TridentTuple t: tridentTuples) { System.out.println("Updating: " + t.getString(0)); } } } 


A class that implements the StateFactory interface, creates instances of State: TxDatabaseFactory .

Putting it all together TridentTransactionApp :
 public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR); //   TridentTopology tridentTopology = new TridentTopology(); //   Spout tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). //  Tuple   - OpPrintout    shuffle().each(new Fields("msg"), new OpPrintout()). parallelismHint(2). //        global(). //    State () partitionPersist(new TxDatabaseFactory(), new Fields("msg"), new TxDatabaseUpdater()); // Skipped LocalCluster cluster = new LocalCluster(); cluster.submitTopology("T2", config, tridentTopology.build()); Thread.sleep(1000*100); cluster.shutdown(); } } 

Storm's transactional capabilities are very useful for transferring data from one system to another when nontrivial processing is required. For example, one system generates files, Storm splits them into records, processes them in parallel mode and adds them to a database. In case of a processing error there is a guarantee that the file will not be deleted and will not be processed twice.

Ps. To reveal all the possibilities of Storm in the framework of the articles is impossible, the material is enough for the whole book. I hope I managed to show the key capabilities of the framework and the possibilities of its use in real projects.
Regarding the deployment of the cluster - I recently came across a great article . I see no reason to repeat. Expand Storm in production is really easy.

Pps. In Hadoop, there is an analogue of Storm on-line processing - Hadoop Streaming , but unlike Storm, it does not support transactions.

Source: https://habr.com/ru/post/186634/


All Articles