public class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; } // Skipped getters }   static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5; //TxMeta -    @Override public TxMeta initializeTransaction(long l, TxMeta txMeta) { if(txMeta != null) { System.out.println(String.format("Initializing transaction id: %08d, " + "start: %04d, count: %04d", l, txMeta.getStart() + txMeta.getCount(), txMeta.getCount())); return new TxMeta(txMeta.getStart() + txMeta.getCount(), TRANSACTION_ELEMENT_COUNT); } else { return new TxMeta(0, TRANSACTION_ELEMENT_COUNT); } } //       @Override public boolean isReady(long l) { if(l <= TRANSACTION_COUNT) { System.out.println("ISREADY " + l); return true; } return false; } }  static class BEmitter implements Emitter { //  Batch    TransactionMetadata @Override public void emitBatch(TransactionAttempt transactionAttempt, Object coordinatorMeta, TridentCollector tridentCollector) { TxMeta txMeta = (TxMeta) coordinatorMeta; System.out.println("Emitting transaction id: " + transactionAttempt.getTransactionId() + " attempt:" + transactionAttempt.getAttemptId() ); for(int i = 0; i < txMeta.getCount(); ++i) { tridentCollector.emit(new Values("TRANS [" + transactionAttempt.getAttemptId() + "] [" + (txMeta.getStart() + i) + "]") ); } } //     State @Override public void success(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction success id:" + transactionAttempt.getTransactionId()); } //     State @Override public void commit(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction commit id:" + transactionAttempt.getTransactionId()); } }  public class TxDatabase implements State { //       @Override public void beginCommit(Long txId) { System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId); } //       @Override public void commit(Long txId) { System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId); } }  public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count; //     @Override public void updateState(TxDatabase txDatabase, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { //    if(++count == 2) throw new FailedException("YYYY"); for(TridentTuple t: tridentTuples) { System.out.println("Updating: " + t.getString(0)); } } }  public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR); //   TridentTopology tridentTopology = new TridentTopology(); //   Spout tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). //  Tuple   - OpPrintout    shuffle().each(new Fields("msg"), new OpPrintout()). parallelismHint(2). //        global(). //    State () partitionPersist(new TxDatabaseFactory(), new Fields("msg"), new TxDatabaseUpdater()); // Skipped LocalCluster cluster = new LocalCluster(); cluster.submitTopology("T2", config, tridentTopology.build()); Thread.sleep(1000*100); cluster.shutdown(); } } Source: https://habr.com/ru/post/186634/
All Articles