In 2011, Twitter opened, under the
Eclipse Public License , a distributed computing project
Storm . Storm was created by BackType and moved to Twitter after purchase.
Storm is a system focused on distributed processing of large data streams, similar to
Apache Hadoop , but in real time.
Key features of Storm:
- Scalable . Processing tasks are distributed across cluster nodes and threads on each node.
- Guaranteed protection against data loss.
- Easy to deploy and spawn.
- Recovery after failures. If any of the handlers fails, the tasks are redirected to other handlers.
- The ability to write components not only in Java. Simple Multilang protocol using JSON objects. There are ready-made adapters for the languages ​​of Python, Ruby and Fancy.
The first part covers the basic concepts and basics of creating an application using Storm version 0.8.2.
Storm elements
TupleData view element The default can contain Long, Integer, Short, Byte, String, Double, Float, Boolean, and byte [] fields. User types used in Tuple must be serializable.
')
StreamSequence from Tuple. Contains a field naming scheme in Tuple.
SpoutData provider for Stream. Receives data from external sources, forms Tuple from them and sends to Stream. Can send Tuple to several different Stream. There are ready for popular messaging systems:
RabbitMQ / AMQP ,
Kestrel ,
JMS ,
Kafka .
BoltData handler At the entrance come Tuple. On the exit sends 0 or more Tuple.
TopologyThe set of elements with a description of their relationship. Analogue to MapReduce job in Hadoop. Unlike a MapReduce job, it does not stop when the input data stream is exhausted. Tuple transport between Spout and Bolt elements. It can be launched locally or loaded into the Storm cluster.
Usage example
Task
There is a stream of data about phone calls
Cdr . Based on the source number, the client id is determined. Based on the destination number and client id, the rate is determined and the cost of the call is calculated. Each of the stages should work in several threads.
The sample will run on the local machine.
Implementation
To begin, simply print out the
BasicApp input data.
Create a new Topology:
TopologyBuilder builder = new TopologyBuilder();
Add Spout
CdrSpout generating input data:
builder.setSpout("CdrReader", new CdrSpout());
Add a bolt with two threads and indicate that the output of the CdrReader is fed to the input. shuffleGrouping means that data from the CdrReader is fed to a randomly selected PrintOutBolt.
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");
Configure and launch the local Storm cluster:
Config config = new Config();
At the output we get about the following:
Hidden text OUT >> [80] Cdr {callSource = '78119990005', callDestination = '8313610698077174239',
callTime = 7631, clientId = 0, price = 0}
OUT >> [78] Cdr {callSource = '78119990006', callDestination = '2238707710336895468',
callTime = 20738, clientId = 0, price = 0}
OUT >> [78] Cdr {callSource = '78119990007', callDestination = '579372726495390920',
callTime = 31544, clientId = 0, price = 0}
OUT >> [80] Cdr {callSource = '78119990006', callDestination = '2010724447342634423',
callTime = 10268, clientId = 0, price = 0}
The number in square brackets is Thread Id, it is clear that processing is carried out in parallel.
For further experiments, you need to deal with the distribution of input data among several processors.
In the example above, a random approach was used. But in actual use, Bolts will most likely use external reference systems and databases. In this case, it is desirable that each Bolt process its subset of the input data. Then it will be possible to organize effective caching of data from external systems.
For this, the Storm provides a CustomStreamGrouping interface.
Add to the project
CdrGrouper . His task is to send a Tuple with the same source numbers to the same Bolt. For this, there are two calls to the CustomStreamGrouping:
prepare - called before first use:
@Override public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) { tasks = new ArrayList<>(integers);
and
chooseTasks - where the list of Tuple is input, and the list of Bolt numbers for each position in the Tuple list is returned:
@Override public List<Integer> chooseTasks(int i, List<Object> objects) { List<Integer> rvalue = new ArrayList<>(objects.size()); for(Object o: objects) { Cdr cdr = (Cdr) o; rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) % tasks.size())); } return rvalue; }
Replace shuffleGrouping with CdrGrouper
BasicGroupApp :
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2). customGrouping("CdrReader", new CdrGrouper());
Run and see what works as intended:
Hidden text OUT >> [80] Cdr {callSource = '78119990007', callDestination = '3314931472251135073',
callTime = 17632, clientId = 0, price = 0}
OUT >> [80] Cdr {callSource = '78119990007', callDestination = '4182885669941386786',
callTime = 31533, clientId = 0, price = 0}
Next, add to the project:
ClientIdBolt - determines the client id by source number.
ClientIdGrouper -
Groups by client id.
RaterBolt - is engaged in charging.
CalcApp - the final version of the program.
If the topic is interesting, then in the next part I hope to talk about the mechanisms for protecting against data loss and running on a real cluster. The code is available on
github .
Ps. Of course, you cannot discard the words from the song, but the name of the data processor “Bolt” is somewhat confusing :)
UPD. Published the
second part of the article.