📜 ⬆️ ⬇️

R and Big Data: Use Replyr

replyr is short for RE mote PLY ing of big data for R (remote processing of big data in R).

Why try the replyr ? Because it allows you to use standard working approaches to remote data (databases or Spark ).

You can work the same way as with local data.frame . replyr provides these features:
')

Most likely, you do all this with the data locally, so these features will make working with Spark and sparklyr much easier.

replyr is a product of the collective experience of using R in application solutions for many clients, collecting feedback and correcting flaws.

Examples below.

Everything is now changing rapidly, so we will use for examples the versions of packages in development.

 base::date() ## [1] "Thu Jul 6 15:56:28 2017" # devtools::install_github('rstudio/sparklyr') # devtools::install_github('tidyverse/dplyr') # devtools::install_github('tidyverse/dbplyr') # install.packages("replyr") suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr") ## [1] '0.7.1.9000' packageVersion("dbplyr") ## [1] '1.1.0.9000' library("tidyr") packageVersion("tidyr") ## [1] '0.6.3' library("replyr") packageVersion("replyr") ## [1] '0.4.2' suppressPackageStartupMessages(library("sparklyr")) packageVersion("sparklyr") ## [1] '0.5.6.9012' #  ,    https://github.com/rstudio/sparklyr/issues/783 config <- spark_config() config[["sparklyr.shell.driver-memory"]] <- "8G" sc <- sparklyr::spark_connect(version='2.1.0', hadoop_version = '2.7', master = "local", config = config) 

Summary


Standard summary() and glance() , which cannot be performed on Spark .

 mtcars_spark <- copy_to(sc, mtcars) #  ,    summary(mtcars_spark) ## Length Class Mode ## src 1 src_spark list ## ops 2 op_base_remote list packageVersion("broom") ## [1] '0.4.2' broom::glance(mtcars_spark) ## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl 

replyr_summary works.

 replyr_summary(mtcars_spark) %>% select(-lexmin, -lexmax, -nunique, -index) ## column class nrows nna min max mean sd ## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481 ## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216 ## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938 ## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685 ## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787 ## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574 ## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432 ## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161 ## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909 ## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041 ## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000 

Aggregation / distribution


tidyr works mainly with local data.

 mtcars2 <- mtcars %>% mutate(car = row.names(mtcars)) %>% copy_to(sc, ., 'mtcars2') #  mtcars2 %>% tidyr::gather('fact', 'value') ## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')" mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', nameForNewValueColumn= 'value', columnsToTakeFrom= colnames(mtcars), nameForNewClassColumn= 'class') %>% arrange(car, fact) ## # Source: lazy query [?? x 4] ## # Database: spark_connection ## # Ordered by: car, fact ## car fact value class ## ## 1 AMC Javelin am 0.00 numeric ## 2 AMC Javelin carb 2.00 numeric ## 3 AMC Javelin cyl 8.00 numeric ## 4 AMC Javelin disp 304.00 numeric ## 5 AMC Javelin drat 3.15 numeric ## 6 AMC Javelin gear 3.00 numeric ## 7 AMC Javelin hp 150.00 numeric ## 8 AMC Javelin mpg 15.20 numeric ## 9 AMC Javelin qsec 17.30 numeric ## 10 AMC Javelin vs 0.00 numeric ## # ... with 342 more rows 

String binding


dplyr bind_rows , union and union_all not applicable in Spark . replyr::replyr_union_all() and replyr::replyr_bind_rows() is a workable alternative.

bind_rows ()


 db1 <- copy_to(sc, data.frame(x=1:2, y=c('a','b'), stringsAsFactors=FALSE), name='db1') db2 <- copy_to(sc, data.frame(y=c('c','d'), x=3:4, stringsAsFactors=FALSE), name='db2') #  -     ,    bind_rows(list(db1, db2)) ## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl 

union_all


 #          union_all(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d 

union


 #          #  ,     union(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 4 d ## 2 1 a ## 3 3 c ## 4 2 b 

replyr_bind_rows


replyr::replyr_bind_rows can link together several data.frame s.

 replyr_bind_rows(list(db1, db2)) ## # Source: table [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d 

dplyr :: do


In our example, we simply take several rows from each group of an aggregated data set. Please note: since we do not explicitly set the order with the help of arrange, one cannot always expect the results to coincide in different data sources (DB or Spark ).

dplyr::do on local data


From help('do', package='dplyr') :

 by_cyl <- group_by(mtcars, cyl) do(by_cyl, head(., 2)) ## # A tibble: 6 x 11 ## # Groups: cyl [3] ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 

dplyr::do on Spark


 by_cyl <- group_by(mtcars_spark, cyl) do(by_cyl, head(., 2)) ## # A tibble: 3 x 2 ## cyl V2 ## ## 1 6 ## 2 4 ## 3 8 

We get not quite what can be used.

replyr split / merge


 mtcars_spark %>% replyr_split('cyl', partitionMethod = 'extract') %>% lapply(function(di) head(di, 2)) %>% replyr_bind_rows() ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 

replyr gapply


 mtcars_spark %>% gapply('cyl', partitionMethod = 'extract', function(di) head(di, 2)) ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 

replyr::replyr_apply_f_mapped


What I would like to receive: the data with the names corresponding to the code (ie, change the data, not the code).

By some thought, this can be achieved by associating the change in data with the environment of the function, and not with the data. That is, the data is changed as long as the corresponding monitoring function is executed. In our case, this function is replyr::replyr_apply_f_mapped() , and it works as follows.

Suppose that the operation we want to use is a function of decreasing order, obtained from some source that is not under our control (for example, a packet). This may be a simple function (as below), but suppose we want to use it without modification (excluding even very small ones, like the introduction of wrapr::let() ).

 #          DecreaseRankColumnByOne <- function(d) { d$RankColumn <- d$RankColumn - 1 d } 

To apply this function to d (in which the column names are not as expected!), We use replyr::replyr_apply_f_mapped() to create a new parameterized adapter:

 #   d <- data.frame(Sepal_Length = c(5.8,5.7), Sepal_Width = c(4.0,4.4), Species = 'setosa', rank = c(1,2)) #     DecreaseRankColumnByOneNamed <- function(d, ColName) { replyr::replyr_apply_f_mapped(d, f = DecreaseRankColumnByOne, nmap = c(RankColumn = ColName), restrictMapIn = FALSE, restrictMapOut = FALSE) } #  dF <- DecreaseRankColumnByOneNamed(d, 'rank') print(dF) ## Sepal_Length Sepal_Width Species rank ## 1 5.8 4.0 setosa 0 ## 2 5.7 4.4 setosa 1 

replyr::replyr_apply_f_mapped() renames columns as expected in DecreaseRankColumnByOne (the match is specified in nmap ), applies DecreaseRankColumnByOne and returns the names to the original before returning the result.

Track Intermediate Results


Many tasks in Sparklyr involve the creation of intermediate or temporary tables. This can be done with dplyr::copy_to() and dplyr::compute() . These methods can be resource intensive.

In replyr there are functions to keep the process under control: temporary name generators that do not alter the actual data (they are also used within the packet itself).

The function itself is fairly simple:

 print(replyr::makeTempNameGenerator) ## function (prefix, suffix = NULL) ## { ## force(prefix) ## if ((length(prefix) != 1) || (!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if (is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), ## collapse = "") ## } ## count <- 0 ## nameList <- list() ## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) { ## if (length(list(...)) > 0) { ## stop("replyr::makeTempNameGenerator tempname generate unexpected argument") ## } ## if (peek) { ## return(names(nameList)) ## } ## if (dumpList) { ## v <- names(nameList) ## nameList <<- list() ## return(v) ## } ## if (!is.null(remove)) { ## victims <- intersect(remove, names(nameList)) ## nameList[victims] <<- NULL ## return(victims) ## } ## nm <- paste(prefix, suffix, sprintf("%010d", count), ## sep = "_") ## nameList[[nm]] <<- 1 ## count <<- count + 1 ## nm ## } ## } ## ## 

For example, to combine multiple tables, a good solution for some data sources is to call compute after each join (otherwise, the resulting SQL can become long and difficult to understand and maintain). The code looks like this:

 #     names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) #     tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') #      joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { #    joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } #    temps <- tmpNamGen(dumpList = TRUE) print(temps) ## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000" ## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001" ## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002" for(ti in temps) { db_drop_table(sc, ti) } #  print(joined) ## # Source: table [?? x 6] ## # Database: spark_connection ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## ## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796 ## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018 ## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551 

Accurate introduction and management of time data can save resources (both time and place) and significantly improve results. It seems to us good practice to explicitly set a temporary name generator, pass it to all Sparklyr transformations, and then clear the temporary values ​​all together when the results no longer depend on them.

Conclusion


If you want to carefully control the processing of data in Spark or DB using R , consider replyr in addition to dplyr and sparklyr .

 sparklyr::spark_disconnect(sc) rm(list=ls()) gc() ## used (Mb) gc trigger (Mb) max used (Mb) ## Ncells 821292 43.9 1442291 77.1 1168576 62.5 ## Vcells 1364897 10.5 2552219 19.5 1694265 13.0 

Source: https://habr.com/ru/post/334398/


All Articles