Write a request that for each adjusted value finds
1) previous original (ordered by id)
2) original with a shift that is set by a variable (the previous one is considered with a shift of 0)
3) the number of previous original so that their amount does not exceed the specified limit
var shift number
var limit number
exec: shift: = 2;
PL / SQL procedure successfully completed.
exec: limit: = 2000;
PL / SQL procedure successfully completed.
SQL> select t.id, 2 t.type, 3 t.value, 4 decode(type, 'adjusted', max(decode(type, 'original', value)) over(partition by grp)) prev_o_value, 5 decode(type, 'adjusted', max(decode(type, 'original', shift_n)) over(partition by grp)) prev_shift_n_o_value, 6 decode(type, 'adjusted', count(decode(type, 'original', id)) 7 over(order by orig_val_running_sum range between 2000 preceding and 1 preceding)) count_o 8 from (select t.*, 9 sum(decode(type, 'original', value)) over(order by id) - decode(type, 'original', value, 0) orig_val_running_sum, 10 decode(type, 'original', lag(value, 2) over(order by decode(type, 'original', 0, 1), id)) shift_n, 11 sum(decode(type, 'original', 1)) over(order by id) grp 12 from t) t 13 order by id; ID TYPE VALUE PREV_O_VALUE PREV_SHIFT_N_O_VALUE COUNT_O ---------- ------------------------------ ---------- ------------ -------------------- ---------- 10 original 100 20 original 200 30 adjusted 300 200 2 40 original 400 50 adjusted 500 400 100 3 60 original 600 70 original 700 80 adjusted 800 700 400 5 90 adjusted 900 700 400 5 100 original 1000 110 adjusted 1100 1000 600 2 120 original 1200 130 adjusted 1300 1200 700 1 140 original 1400 150 adjusted 1500 1400 1000 1 15 rows selected.
select t.id, t.type, t.value, case when type = 'adjusted' then max(case when type = 'original' then value end) over (partition by position, grp) end prev_o_value, case when type = 'adjusted' then max(case when type = 'original' then shift_n end) over (partition by position, grp) end prev_shift_n_o_value from (select t.*, case when type = 'original' then lag(value, 2) over (partition by position order by case when type = 'original' then 0 else 1 end, id) end shift_n, sum(case when type = 'original' then 1 end) over (partition by position order by id) grp from t) t order by id +---+--------+-----+------------+--------------------+ |id |type |value|prev_o_value|prev_shift_n_o_value| +---+--------+-----+------------+--------------------+ |10 |original|100 |null |null | |20 |original|200 |null |null | |30 |adjusted|300 |200 |null | |40 |original|400 |null |null | |50 |adjusted|500 |400 |100 | |60 |original|600 |null |null | |70 |original|700 |null |null | |80 |adjusted|800 |700 |400 | |90 |adjusted|900 |700 |400 | |100|original|1000 |null |null | |110|adjusted|1100 |1000 |600 | |120|original|1200 |null |null | |130|adjusted|1300 |1200 |700 | |140|original|1400 |null |null | |150|adjusted|1500 |1400 |1000 | +---+--------+-----+------------+--------------------+
public class ParquetMapper extends Mapper<LongWritable, GenericRecord, Text, AvroValue<GenericRecord>> { private final Text outputKey = new Text(); private final AvroValue<GenericRecord> outputValue = new AvroValue<GenericRecord>(); @Override protected void map(LongWritable key, GenericRecord value, Context context) throws IOException, InterruptedException { outputKey.set(String.valueOf(value.get("position"))); outputValue.datum(value); context.write(outputKey, outputValue); } }
public class ParquetReducer extends Reducer<Text, AvroValue<GenericRecord>, Void, Text> { private static final byte shift = 2 ; private TreeMap<Integer, AbstractMap.SimpleEntry<String, Integer>> rows = new TreeMap<Integer,AbstractMap.SimpleEntry<String, Integer>>(); List<Integer> queue = new LinkedList<Integer>(); private String adj = ""; private int lastValue = -1; @Override protected void reduce(Text key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { for (AvroValue<GenericRecord> value : values) { rows.put((Integer) value.datum().get("id"), new AbstractMap.SimpleEntry(value.datum().get("type"), value.datum().get("value"))) ; } for(Map.Entry<Integer, AbstractMap.SimpleEntry<String, Integer>> entry : rows.entrySet()) { AbstractMap.SimpleEntry<String, Integer> rowData = entry.getValue(); if (rowData.getKey().equals("original")) { lastValue = rowData.getValue() ; queue.add(lastValue) ; adj = "" ; } else { adj = " " + String.valueOf(lastValue); if (queue.size()- shift >0) { adj = adj + " " + queue.get(queue.size()-shift).toString() ; } } Text output = new Text(entry.getKey()+" "+rowData.getKey() + " " + rowData.getValue() + adj); context.write(null, output ); } } }
[yarn@sandbox map-reduce]$ hadoop fs -cat /out/part-r-00000
10 original 100
20 original 200
30 adjusted 300 200
40 original 400
50 adjusted 500 400 200
60 original 600
70 original 700
80 adjusted 800 700 600
90 adjusted 900 700 600
100 original 1000
110 adjusted 1100 1000 700
120 original 1200
130 adjusted 1300 1200 1000
140 original 1400
150 adjusted 1500 1400 1200
Source: https://habr.com/ru/post/337128/