📜 ⬆️ ⬇️

We study Storm Framework. Part I

In 2011, Twitter opened, under the Eclipse Public License , a distributed computing project Storm . Storm was created by BackType and moved to Twitter after purchase.

Storm is a system focused on distributed processing of large data streams, similar to Apache Hadoop , but in real time.

Key features of Storm:

The first part covers the basic concepts and basics of creating an application using Storm version 0.8.2.

Storm elements


Tuple
Data view element The default can contain Long, Integer, Short, Byte, String, Double, Float, Boolean, and byte [] fields. User types used in Tuple must be serializable.
')
Stream
Sequence from Tuple. Contains a field naming scheme in Tuple.

Spout
Data provider for Stream. Receives data from external sources, forms Tuple from them and sends to Stream. Can send Tuple to several different Stream. There are ready for popular messaging systems: RabbitMQ / AMQP , Kestrel , JMS , Kafka .

Bolt
Data handler At the entrance come Tuple. On the exit sends 0 or more Tuple.

Topology
The set of elements with a description of their relationship. Analogue to MapReduce job in Hadoop. Unlike a MapReduce job, it does not stop when the input data stream is exhausted. Tuple transport between Spout and Bolt elements. It can be launched locally or loaded into the Storm cluster.

Usage example


Task


There is a stream of data about phone calls Cdr . Based on the source number, the client id is determined. Based on the destination number and client id, the rate is determined and the cost of the call is calculated. Each of the stages should work in several threads.
The sample will run on the local machine.

Implementation


To begin, simply print out the BasicApp input data.

Create a new Topology:
TopologyBuilder builder = new TopologyBuilder(); 

Add Spout CdrSpout generating input data:
 builder.setSpout("CdrReader", new CdrSpout()); 

Add a bolt with two threads and indicate that the output of the CdrReader is fed to the input. shuffleGrouping means that data from the CdrReader is fed to a randomly selected PrintOutBolt.
 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader"); 

Configure and launch the local Storm cluster:
 Config config = new Config(); //     config.setDebug(false); LocalCluster cluster = new LocalCluster(); //   Storm  cluster.submitTopology("T1", config, builder.createTopology()); //  Topology Thread.sleep(1000*10); cluster.shutdown(); //   

At the output we get about the following:
Hidden text
 OUT >> [80] Cdr {callSource = '78119990005', callDestination = '8313610698077174239', 
 callTime = 7631, clientId = 0, price = 0}
 OUT >> [78] Cdr {callSource = '78119990006', callDestination = '2238707710336895468', 
 callTime = 20738, clientId = 0, price = 0}
 OUT >> [78] Cdr {callSource = '78119990007', callDestination = '579372726495390920', 
 callTime = 31544, clientId = 0, price = 0}
 OUT >> [80] Cdr {callSource = '78119990006', callDestination = '2010724447342634423', 
 callTime = 10268, clientId = 0, price = 0}

The number in square brackets is Thread Id, it is clear that processing is carried out in parallel.

For further experiments, you need to deal with the distribution of input data among several processors.
In the example above, a random approach was used. But in actual use, Bolts will most likely use external reference systems and databases. In this case, it is desirable that each Bolt process its subset of the input data. Then it will be possible to organize effective caching of data from external systems.

For this, the Storm provides a CustomStreamGrouping interface.
Add to the project CdrGrouper . His task is to send a Tuple with the same source numbers to the same Bolt. For this, there are two calls to the CustomStreamGrouping:
prepare - called before first use:
 @Override public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) { tasks = new ArrayList<>(integers); //   Bolts } 

and chooseTasks - where the list of Tuple is input, and the list of Bolt numbers for each position in the Tuple list is returned:
 @Override public List<Integer> chooseTasks(int i, List<Object> objects) { List<Integer> rvalue = new ArrayList<>(objects.size()); for(Object o: objects) { Cdr cdr = (Cdr) o; rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % tasks.size())); } return rvalue; } 

Replace shuffleGrouping with CdrGrouper BasicGroupApp :
 builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2). customGrouping("CdrReader", new CdrGrouper()); 

Run and see what works as intended:
Hidden text
 OUT >> [80] Cdr {callSource = '78119990007', callDestination = '3314931472251135073', 
 callTime = 17632, clientId = 0, price = 0}
 OUT >> [80] Cdr {callSource = '78119990007', callDestination = '4182885669941386786', 
 callTime = 31533, clientId = 0, price = 0}


Next, add to the project:
ClientIdBolt - determines the client id by source number.
ClientIdGrouper - Groups by client id.
RaterBolt - is engaged in charging.
CalcApp - the final version of the program.

If the topic is interesting, then in the next part I hope to talk about the mechanisms for protecting against data loss and running on a real cluster. The code is available on github .

Ps. Of course, you cannot discard the words from the song, but the name of the data processor “Bolt” is somewhat confusing :)

UPD. Published the second part of the article.

Source: https://habr.com/ru/post/186208/


All Articles