⬆️ ⬇️

Bigdata stack eyes militant oracoloyd

On Habré and other Internet sites, almost every day they post empty articles on bigdat, creating a persistent feeling among specialists that there is nothing besides marketing bigdatas. In fact, there are enough interesting technologies under the hood of Hadoop, and here I want to slightly dilute the marketing, technical expertise with the experience of Oracle.



First of all, it should be understood that one of the pillars of Hadoop's bigdats is not only batch processing and map-reduce, as many try to portray. It can easily be processed from the opposite range of tasks: reading a stream of small messages, for example from IoT (spark to Hadoop, reading Kafka stream), while aggregating and detecting deviations on the go. These can be clouds of small requests for reporting systems from JOINs to parquet files via Impala. All this is the same Hadoop ecosystem. There are a lot of things, different degrees of maturity and requiring a different approach. And believe me, no one really knows the most winning option. Somewhere tasks go great on classic map-reduce and sufficient primitive parquet to hdfs, somewhere Spark with ordinary SQL JOINs almost on text files. The largest distribution of Cloudera is now actively promoting something resembling the Kudu database, which can be used both in java style map-reduce, and under SQL in Impala.



There is a lot of everything and almost all of this is combined with each other, and depending on the combination, the treatment approaches can be very, very different. In principle, this vaguely resembles Oracle throwing in the late 90s, the beginnings of zero, when they started to shove all sorts of xml tables in the subd, java stored procedures and other strange things.

')

At first, having gotten into the project with Hadoop / map-reduce, it was completely unclear, due to which, with this approach, you can compete with full-fledged DBMS, because the entire map-reduce communication passes through scribbling hdfs. First, the mapper reads everything, then he writes his output for the reduers to hdfs, then the redizers read and write again. With oraklovymi representations of the beautiful it seemed it just will not fly. But later there was an understanding of what.



Approximately how Oracle does seemingly a lot of extra work, trying to increase parallel execution at the expense of “extra” writing on the disks. Oracle, for example, writes in the "extra" UNDO, which allows it to dissolve competing readers. Writes locks in data blocks, which allows not to keep huge lists of locks in memory, transparently exchange information about locks between RAC cluster nodes. Approximately in the same way, it’s worth looking at the extra scribbling of map-reduce. All this "superfluous" as a result allows to perform much more tasks in parallel, against the background of Oracle.



At the same time, I was immediately surprised by the size of the parquet files, the plates in Oracle without indexes of 80-100 GB, easily turn into 20-30 GB parquet. As a result, map-reduce reads from disks at times less, loading at once a cloud of cluster cores, while Oracle should read more and place all calculations on a single user process. This is a stone in the PL / SQL machine garden, although the SQL engine also has a lot of nuances on the topic of parallel reading.



To understand what the implementation of logic in Hadoop looks like, I can illustrate one of the “Friday tasks” from the sql.ru oracle branch. In principle, it all started with a battle on map-reduce vs Spark .



The problem is this:



Write a request that for each adjusted value finds

1) previous original (ordered by id)

2) original with a shift that is set by a variable (the previous one is considered with a shift of 0)

3) the number of previous original so that their amount does not exceed the specified limit



var shift number

var limit number

exec: shift: = 2;



PL / SQL procedure successfully completed.



exec: limit: = 2000;



PL / SQL procedure successfully completed.


In Oracle, the solution was proposed as follows:



SQL> select t.id, 2 t.type, 3 t.value, 4 decode(type, 'adjusted', max(decode(type, 'original', value)) over(partition by grp)) prev_o_value, 5 decode(type, 'adjusted', max(decode(type, 'original', shift_n)) over(partition by grp)) prev_shift_n_o_value, 6 decode(type, 'adjusted', count(decode(type, 'original', id)) 7 over(order by orig_val_running_sum range between 2000 preceding and 1 preceding)) count_o 8 from (select t.*, 9 sum(decode(type, 'original', value)) over(order by id) - decode(type, 'original', value, 0) orig_val_running_sum, 10 decode(type, 'original', lag(value, 2) over(order by decode(type, 'original', 0, 1), id)) shift_n, 11 sum(decode(type, 'original', 1)) over(order by id) grp 12 from t) t 13 order by id; ID TYPE VALUE PREV_O_VALUE PREV_SHIFT_N_O_VALUE COUNT_O ---------- ------------------------------ ---------- ------------ -------------------- ---------- 10 original 100 20 original 200 30 adjusted 300 200 2 40 original 400 50 adjusted 500 400 100 3 60 original 600 70 original 700 80 adjusted 800 700 400 5 90 adjusted 900 700 400 5 100 original 1000 110 adjusted 1100 1000 600 2 120 original 1200 130 adjusted 1300 1200 700 1 140 original 1400 150 adjusted 1500 1400 1000 1 15 rows selected. 


In SparkSQL, the problem is solved without the third point.



 select t.id, t.type, t.value, case when type = 'adjusted' then max(case when type = 'original' then value end) over (partition by position, grp) end prev_o_value, case when type = 'adjusted' then max(case when type = 'original' then shift_n end) over (partition by position, grp) end prev_shift_n_o_value from (select t.*, case when type = 'original' then lag(value, 2) over (partition by position order by case when type = 'original' then 0 else 1 end, id) end shift_n, sum(case when type = 'original' then 1 end) over (partition by position order by id) grp from t) t order by id +---+--------+-----+------------+--------------------+ |id |type |value|prev_o_value|prev_shift_n_o_value| +---+--------+-----+------------+--------------------+ |10 |original|100 |null |null | |20 |original|200 |null |null | |30 |adjusted|300 |200 |null | |40 |original|400 |null |null | |50 |adjusted|500 |400 |100 | |60 |original|600 |null |null | |70 |original|700 |null |null | |80 |adjusted|800 |700 |400 | |90 |adjusted|900 |700 |400 | |100|original|1000 |null |null | |110|adjusted|1100 |1000 |600 | |120|original|1200 |null |null | |130|adjusted|1300 |1200 |700 | |140|original|1400 |null |null | |150|adjusted|1500 |1400 |1000 | +---+--------+-----+------------+--------------------+ 


As we see the decision on spark-sql almost does not differ from orakla. But the map-reduce solution:



Mapper



 public class ParquetMapper extends Mapper<LongWritable, GenericRecord, Text, AvroValue<GenericRecord>> { private final Text outputKey = new Text(); private final AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>(); @Override protected void map(LongWritable key, GenericRecord value, Context context) throws IOException, InterruptedException { outputKey.set(String.valueOf(value.get("position"))); outputValue.datum(value); context.write(outputKey, outputValue); } } 


Reducer



 public class ParquetReducer extends Reducer<Text, AvroValue<GenericRecord>, Void, Text> { private static final byte shift = 2 ; private TreeMap<Integer, AbstractMap.SimpleEntry<String, Integer>> rows = new TreeMap<Integer,AbstractMap.SimpleEntry<String, Integer>>(); List<Integer> queue = new LinkedList<Integer>(); private String adj = ""; private int lastValue = -1; @Override protected void reduce(Text key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { for (AvroValue<GenericRecord> value : values) { rows.put((Integer) value.datum().get("id"), new AbstractMap.SimpleEntry(value.datum().get("type"), value.datum().get("value"))) ; } for(Map.Entry<Integer, AbstractMap.SimpleEntry<String, Integer>> entry : rows.entrySet()) { AbstractMap.SimpleEntry<String, Integer> rowData = entry.getValue(); if (rowData.getKey().equals("original")) { lastValue = rowData.getValue() ; queue.add(lastValue) ; adj = "" ; } else { adj = " " + String.valueOf(lastValue); if (queue.size()- shift >0) { adj = adj + " " + queue.get(queue.size()-shift).toString() ; } } Text output = new Text(entry.getKey()+" "+rowData.getKey() + " " + rowData.getValue() + adj); context.write(null, output ); } } } 


Run



[yarn@sandbox map-reduce]$ hadoop fs -cat /out/part-r-00000

10 original 100

20 original 200

30 adjusted 300 200

40 original 400

50 adjusted 500 400 200

60 original 600

70 original 700

80 adjusted 800 700 600

90 adjusted 900 700 600

100 original 1000

110 adjusted 1100 1000 700

120 original 1200

130 adjusted 1300 1200 1000

140 original 1400

150 adjusted 1500 1400 1200




The result: the bigdat stack is huge, there are a lot of subsystems working in its paradigm, map-reduce is just one of the bolts, not the fact that now everyone needs it, in the light of the heyday of Spark fashion. Butch processing is not the only paradigm implemented in bigdata, and Spark already allows you to write logic, including declarative SQL language. Map-reduce is also quite beautiful for some tasks, while it easily solves tasks that only recently were able to be accessed by serious Oracle servers. If you take a closer look at the code, you can see that Spark-sql still doesn’t implement item 3 of the task, but map-reduce code, although not declarative, came out quite elegant and easily extensible. Item 3 of the task is easily added to the map-reduce code in a few minutes by the developer of average scall, while the accumulation of analytical functions in an Oracle solution will require serious preparation by the developer.

Source: https://habr.com/ru/post/337128/



All Articles