public class TxMeta { private int start; private int count; public TxMeta(int start, int count) { this.start = start; this.count = count; } // Skipped getters }
static class BCoordinator implements BatchCoordinator<TxMeta> { private static final int TRANSACTION_COUNT = 5; private static final int TRANSACTION_ELEMENT_COUNT = 5; //TxMeta - @Override public TxMeta initializeTransaction(long l, TxMeta txMeta) { if(txMeta != null) { System.out.println(String.format("Initializing transaction id: %08d, " + "start: %04d, count: %04d", l, txMeta.getStart() + txMeta.getCount(), txMeta.getCount())); return new TxMeta(txMeta.getStart() + txMeta.getCount(), TRANSACTION_ELEMENT_COUNT); } else { return new TxMeta(0, TRANSACTION_ELEMENT_COUNT); } } // @Override public boolean isReady(long l) { if(l <= TRANSACTION_COUNT) { System.out.println("ISREADY " + l); return true; } return false; } }
static class BEmitter implements Emitter { // Batch TransactionMetadata @Override public void emitBatch(TransactionAttempt transactionAttempt, Object coordinatorMeta, TridentCollector tridentCollector) { TxMeta txMeta = (TxMeta) coordinatorMeta; System.out.println("Emitting transaction id: " + transactionAttempt.getTransactionId() + " attempt:" + transactionAttempt.getAttemptId() ); for(int i = 0; i < txMeta.getCount(); ++i) { tridentCollector.emit(new Values("TRANS [" + transactionAttempt.getAttemptId() + "] [" + (txMeta.getStart() + i) + "]") ); } } // State @Override public void success(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction success id:" + transactionAttempt.getTransactionId()); } // State @Override public void commit(TransactionAttempt transactionAttempt) { System.out.println("BEmitter:Transaction commit id:" + transactionAttempt.getTransactionId()); } }
public class TxDatabase implements State { // @Override public void beginCommit(Long txId) { System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId); } // @Override public void commit(Long txId) { System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId); } }
public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> { int count; // @Override public void updateState(TxDatabase txDatabase, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { // if(++count == 2) throw new FailedException("YYYY"); for(TridentTuple t: tridentTuples) { System.out.println("Updating: " + t.getString(0)); } } }
public class TridentTransactionApp { public static void main( String[] args ) throws Throwable { Logger.getRootLogger().setLevel(Level.ERROR); // TridentTopology tridentTopology = new TridentTopology(); // Spout tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). // Tuple - OpPrintout shuffle().each(new Fields("msg"), new OpPrintout()). parallelismHint(2). // global(). // State () partitionPersist(new TxDatabaseFactory(), new Fields("msg"), new TxDatabaseUpdater()); // Skipped LocalCluster cluster = new LocalCluster(); cluster.submitTopology("T2", config, tridentTopology.build()); Thread.sleep(1000*100); cluster.shutdown(); } }
Source: https://habr.com/ru/post/186634/
All Articles