In the previous part of the series we (in the 100,500th time) tried to tell about the main techniques and stages of the Google MapReduce approach, I must admit that the first part was going to be the “captain” to let the target articles about the next articles to be known about MapReduce. We did not have time to show a single line of how we are going to implement all this in Caché ObjectScript. And about this our story today (and in the following days).
Recall the initial message of our mini-project: you are still planning to implement the MapReduce algorithm using the available tools in Caché ObjectScript. When creating interfaces, we will try to adhere to the API that we described in the previous article about the original implementation of Google MapReduce, any deviations will be announced accordingly.
Let's start with the implementation of abstract interfaces Mapper and Reducer.
Class MR.Base.Mapper { Method Map(MapInput As MR.Base.Iterator, MapOutput As MR.Base.Emitter) [ Abstract ] { } } Class MR.Base.Reducer { Method Reduce(ReduceInput As MR.Base.Iterator, ReduceOutput As MR.Base.Emitter) [ Abstract ] { } }
Initially, as in the canonical implementation, we made 2 separate interfaces MapInput and ReduceInput. But it immediately became obvious that they serve the same goal, and provide the same methods - their goal is to walk through the data stream to the end, including they are both iterators. Therefore, in the end, we reduce them to the general interface MR.Base.Iterator:
Class MR.Base.Iterator { Method GetNext() As %String [Abstract ] { } Method IsAtEnd() As %Boolean [Abstract ] { } }
The original Google MapReduce implementation used the Google GFS file system as a transport between nodes and algorithm stages. Caché has its own mechanism for distributing (coherent) data between nodes (if you don’t use bare TCP / UDP), this is the ECP ( Enterprise Case Protocol ). It is usually used by application servers to retrieve data from remote database servers. But nothing stops us from building on the basis of such peer-to-peer ECP connections some virtual control bus, where we will add data in the form of pairs <key, value> or similar data. This data will be sent between the actors involved in the algorithm's pipelines (that is, the emit sent by the Mapper object will be written to the ECP bus and read by the Reducer object). If the actors work within a single node, for example, they can use fast globals displayed in CACHETEMP or ordinary globals if the implemented algorithm is multistage and needs reliability and logging.
In any case, be it local (for configuration on one node) globals, or globals of a remote node connected via ECP, globals are a convenient and well-proven transport for transferring data between Caché cluster nodes, in this case, between the functions involved in MapReduce and classes.
Therefore, a natural solution to simplify our system will be using the Caché environment for transferring data between nodes of an ECP cluster instead of GFS or HDFS file systems. The functional characteristics of the ECP will make other simplifications (but more on that later).
As we already told in the previous series , from the moment when the data moves away from the Mapper object, until the moment they arrive at the Reducer input, in the classical implementation on the master, the heavy operation of mixing and sorting takes place.
In an environment that uses globals for transport quality, in a MUMPS / Caché ObjectScript environment, we can completely avoid the extra cost of such sorting, since aggregation and sorting will be done by the underlying btree * repository.
Having such design requirements, we will create a basic emitter interface:
Class MR.Base.Emitter Extends MR.Base.Iterator { /// emit $listbuild(key,value(s)) Method Emit(EmitList... As %String) [Abstract ] { } }
The emitter should be similar to the interface of the input iterator shown above (because we inherited from MR.Base.Iterator), but in addition to the data-through interface, the emitter should also be able to send data to its intermediate storage (i.e. add emit function).
Initially, our Emit function was very similar to the classical implementation and took only 2 arguments as a pair of <key, value>, but then we came across a (rare) need to pass something more multidimensional, longer than a pair of values ​​(for example, a tuple of any arity ), therefore, at the moment, Emit has become a function of receiving a variable number of arguments.
Note that in most cases, in practice, only a couple of arguments <key, value> will arrive here, as we saw in the classical implementation.
This is still an abstract interface, more meat will be added very soon.
If we, during processing, had to preserve the order of the incoming elements, then we would use the implementation below:
/// Emitter which maintains the order of (key,value(s)) Class MR.Emitter.Ordered Extends (%RegisteredObject, MR.Base.Emitter) { /// global name serving as data channel Property GlobalName As %String; Method %OnNew(initval As %String) As %Status { $$$ThrowOnError($length(initval)>0) set ..GlobalName = initval quit $$$OK } Parameter AUTOCLEANUP = 1; Method %OnClose() As %Status { if ..#AUTOCLEANUP { if $data(@i%GlobalName) { kill @i%GlobalName } } Quit $$$OK } ... }
We note in the fields that in Caché, globals are, in general, global :), and will not be cleared automatically upon completion of the processes that created them. Unlike, for example, PPG (process-private globals) . But sometimes it is still desirable that our intermediate channels created for interaction between the stages of the MapReduce pipeline are deleted upon completion of the subroutine that created them. Therefore, the "auto-cleaning" mode was added (class parameter #AUTOCLEANUP) in which the global, whose name is stored in the GlobalName property, is deleted when the object is closed (at the time of the% OnClose call).
Note that we force one required parameter in the% New method (in% OnNew, generate $$$ ThrowOnError if the name in Initval is not defined). The class constructor expects to get the name of the global with which it will work as a data transport.
Class MR.Emitter.Ordered Extends MR.Base.Emitter { /// ... Method IsAtEnd() As %Boolean { quit ($data(@i%GlobalName)\10)=0 } /// emit $listbuild(key,value) Method Emit(EmitList... As %String) { #dim list As %String = "" for i=1:1:$get(EmitList) { set $li(list,i) = $get(EmitList(i)) } #dim name As %String = ..GlobalName set @name@($seq(@name)) = list } /// returns emitted $lb(key,value) Method GetNext() As %String { #dim value As %String #dim index As %String = $order(@i%GlobalName@(""), 1, value) if index '= "" { kill @i%GlobalName@(index) quit value } else { kill @i%GlobalName quit "" } } Method Dump() { zwrite @i%GlobalName } }
Hopefully, you still remember that our Emitter is a descendant of the Iterator? Therefore, it needs to implement a couple of iterator functions - IsAtEnd and GetNext.
IsAtEnd is simple: if our service global does not contain data (ie, $ data (.. GlobalName) does not return 10 or 11, which means that there are more data nodes in the subtree), then we have reached the end of the data stream;
As is well known, and as Sasha Koblov wrote well , $ SEQUENCE can be used in almost all the places where $ INCREMENT was used , while providing the best speeds when working in multiprocessor or multi-server mode (via ECP). Due to fewer collisions when accessing a single global node. Therefore, in the code above, we use $ sequence to highlight the index of the next element of the ordered list.
Please note that this option of removing an item from the list / global is not very compatible with parallel mode , and it would be necessary to add locks or change the data structure. But since for the next series, we will have only one Reducer, for the whole set of Mappers, then we will postpone the solution of this problem for the future when we begin a multi-server implementation.
Note that the data structure implemented by MR.Emitter.Ordered essentially implements the classic FIFO collection ("FirstIn - FirstOut"). We put a new item at the end of the list and pull it out of the head of the list.
If you look at the data that we send in between the stages of the pipeline in the example word-count (ok, not now, but when we show you this implementation), then you quickly realize that:
In fact, we are not interested in the order in which we "emit" the pairs <key, value>. Moreover, the underlying btree * repository always keeps the list of keys sorted for quick retrieval, eliminating the need to sort on the wizard, as it would have done in a classic implementation;
So why send such a large traffic of unnecessary data, if we can aggregate them even at the time of sending?
This is exactly how MR.Emitter.Sorted, which is the successor of MR.Emitter.Ordered (shown above), works:
/// Emitter which sorts by keys all emitted pairs or tuples (key, value(s)) Class MR.Emitter.Sorted Extends MR.Emitter.Ordered { Property AutoIncrement As %Boolean [ InitialExpression = 1 ]; /// emit $listbuild(key,value) Method Emit(EmitList... As %String) { #dim name As %String = ..GlobalName #dim key As %String #dim value As %String if $get(EmitList)=1 { // special case - only key name given, no value set key = $get(EmitList(1)) quit:key="" if ..AutoIncrement { #dim dummyX As %Integer = $increment(@name@(key)) ; $seq is non-deterministic } else { set @name@(key) = 1 } } else { set value = $get(EmitList(EmitList)) set EmitList = EmitList - 1 for i=1:1:$get(EmitList) { #dim index As %String = $get(EmitList(i)) quit:index="" set name = $name(@name@(index)) } if ..AutoIncrement { #dim dummyY As %Integer = $increment(@name,value) } else { set @name = value } } } /// ... }
For the simplest case, issuing the <key, 1> pair or, when the value is omitted, and has one key <key> we implemented local optimization, when in the auto increment mode (AutoIncrement = 1), we immediately increment the corresponding counter for the key when we call it. If autoincrement is not enabled, then we simply (re) define the key node to 1, fixing the fact of key transfer.
For a more general case, with two elements, key-value pairs <key, value> or even with a large number of <key, key2, key3, ... keyn, value> elements (a tuple of any arity), we again implemented 2 modes of operation:
with autoincrement, we immediately sum the value of the corresponding node addressed by the key (s) with the transmitted value;
Note that we are passing a tuple by means of an array accumulating a variable number of arguments. All elements of this array except the last, will go as addresses of sub-indices . The last element of the tuple will be considered a value.
Such an unusual extension of a key-value pair in tuples of any capacity, to our knowledge, is atypical or may be unique. We do not need to work with a strict key-value repository or bigtable repository, and we can easily work with multidimensional keys in the transmitted elements ("because we can"), which can greatly facilitate some implementations of algorithms that require additional data dimensionality, which greatly improves code readability and ease of understanding. In theory...
Note that we did not redefine IsAtEnd and it inherited the implementation from MR.Emitter.Ordered, so it will still return a non-zero value at the end of the data in the intermediate storage sub-nodes.
But we need to redefine GetNext, since we no longer try to remember the order of the data sent and the format of its internal storage has changed:
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered { /// ... /// returns emitted $lb(key,value) Method GetNext() As %String { #dim name As %String = ..GlobalName #dim value As %String #dim ref As %String = $query(@name,1,value) if ref'="" { zkill @ref #dim i As %Integer #dim refLen As %Integer = $qlength(ref) #dim baseLen As %Integer = $qlength(name) #dim listbuild = "" for i=baseLen+1:1:refLen { set $li(listbuild,i-baseLen)=$qs(ref,i) } set $li(listbuild,*+1)=value quit listbuild } quit "" } }
At the exit of GetNext (), we expect a $ LISTBUILD <> list, but inside the repository the data of pairs / tuples are scattered across the nodes of the hierarchical repository. The $ QUERY function allows you to bypass nodes with data (pairs / tuples values) in an array for subsequent repacking in the $ LISTBUILD format, indexes from the array are sequentially added to the next list item (by assigning an element through the $ LIST function. The value of the storage node itself the key-value pair or the last element of the tuple will be added to the end of the generated list using the same $ LIST function (listbuild, * + 1). In this case, * + 1 will indicate the number of the list item following the current end.
At this unexpected place, we interrupt our story about MapReduce in Caché. In the second part of this narration, we showed the basic interfaces of the infrastructure, which will be used later in the implementation of concrete examples. Already in the next series we will put it all together and implement the classic WordCount example, but already in ObjectScript. Do not go far!
Source: https://habr.com/ru/post/310196/
All Articles