-- . ( ) CREATE EXTERNAL TABLE win_bids_log ( date_field string, request_id string, user_ssp_id string, dsp_id string, win_price int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION 'hdfs://inpit/bid-logs'; CREATE EXTERNAL TABLE win_bids_by_dsp ( dsp_id string, win_bids_cout int, win_price int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION ''hdfs://output/win-bids-by-dsp''; INSERT OVERWRITE TABLE win_bids_by_dsp SELECT dsp_id, COUNT(dsp_id), SUM(win_price) FROM win_bids_log GROUP BY dsp_id;
INSERT OVERWRITE TABLE movieapp_log_stage SELECT * FROM ( SELECT custid, movieid, CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid, time, CAST((CASE recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended, activity, CAST(null AS INT) rating, price FROM movieapp_log_avro WHERE activity IN (2,4,5,11) UNION ALL SELECT m1.custid, m1.movieid, CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid, m1.time, CAST((CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended, m1.activity, m1.rating, CAST(null as float) price FROM movieapp_log_avro m1 JOIN ( SELECT custid,movieid, CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,MAX(time) max_time, activity FROM movieapp_log_avro GROUP BY custid, movieid, genreid, activity ) m2 ON ( m1.custid = m2.custid AND m1.movieid = m2.movieid AND m1.genreid = m2.genreid AND m1.time = m2.max_time AND m1.activity = 1 AND m2.activity = 1 ) ) union_result;
-- HDFS (Pig Hadoop) fs -rm -f -r -skipTrash /data/pig/out -- 'raw_data' raw_data = LOAD '/data/pig/example/' USING PigStorage('\t') AS (time, bid_id, user_id, dsp_id, bid:int);
raw_data -> tuple . ------------------------------------------------------------------------------------------- time, bid_id, user_id, dsp_id, bid ------------------------------------------------------------------------------------------- (2014.02.14 14:08:27.711, 56949, 45234534553459, DSP-2, 12) (2014.02.14 14:08:28.712, 61336, 45221696259999, DSP-1, 56) (2014.02.14 14:08:29.713, 74685, 45221699381039, DSP-2, 89) (2014.02.14 14:08:30.714, 56949, 45221695781716, DSP-1, 21) (2014.02.14 14:08:25.715, 27617, 45221682863705, DSP-3, 22)
-- . timestamp SUBSTRING norm_data = FOREACH raw_data GENERATE SUBSTRING(time, 0,10) AS date, dsp_id, bid;
norm_data -> tuple --------------------------------------- date, dsp_id, bid --------------------------------------- (2014.02.14, DSP-2, 12) (2014.02.14, DSP-1, 56) (2014.02.14, DSP-2, 89) (2014.02.14, DSP-1, 21)
-- dsp_id date group_norm_data = GROUP norm_data BY (dsp_id, date);
group_norm_data -> ( ) : [ (norm_data), (norm_data) ] ---------------------------------------------------------------------------------- ( group), array of norm_data ---------------------------------------------------------------------------------- ( (DSP-1, 2014.02.14), {(2014.02.14, DSP-1, 56), (2014.02.14, DSP-1, 21)} ) ( (DSP-1, 2014.02.17), {(2014.02.17, DSP-1, 34), (2014.02.17, DSP-1, 24)} ) ( (DSP-2, 2014.02.14), {(2014.02.14, DSP-2, 89), (2014.02.14, DSP-2, 12)} )
-- ft_group_norm_data = FOREACH group_norm_data GENERATE FLATTEN(group), FLATTEN(norm_data);
ft_group_norm_data -> tuple ---------------------------------------------------------------------- dsp_id, date date dsp_id bid ----------------------------------------------------------------------- (DSP-1, 2014.02.14, 2014.02.14, DSP-1, 56) (DSP-1, 2014.02.14, 2014.02.14, DSP-1, 21) (DSP-1, 2014.02.15, 2014.02.15, DSP-1, 15) (DSP-1, 2014.02.15, 2014.02.15, DSP-1, 31)
-- , sum_bids_dsp = FOREACH group_norm_data GENERATE group, SUM(norm_data.bid) AS bids_sum;
sum_bids_dsp -> : bids_sum ------------------------------------------------------ group, bids_sum ------------------------------------------------------ ( (DSP-1, 2014.02.16), 82) ( (DSP-1, 2014.02.17), 58) ( (DSP-2, 2014.02.14), 101) ( (DSP-2, 2014.02.16), 58)
-- , . -- . group_all = GROUP sum_bids_dsp ALL;
( all, { ((DSP-1,2014.02.14),77), ((DSP-1,2014.02.15),67), ((DSP-1,2014.02.16),82),((DSP-1,2014.02.17),58),((DSP-2,2014.02.14),101),((DSP-2,2014.02.16),58),((DSP-2,2014.02.17),123),((DSP-3,2014.02.14),22),((DSP-3,2014.02.15),109),((DSP-3,2014.02.16),136),((DSP-3,2014.02.17),81) } )
summary = FOREACH group_all GENERATE COUNT(sum_bids_dsp), SUM(sum_bids_dsp.bids_sum);
------------------------------------------------------ count, sum ------------------------------------------------------ (11, 914)
Source: https://habr.com/ru/post/223217/
All Articles