replyr is short for RE mote PLY ing of big data for R (remote processing of big data in R).replyr ? Because it allows you to use standard working approaches to remote data (databases or Spark ).data.frame . replyr provides these features:replyr_summary() : replyr_summary() .replyr_union_all() .replyr_bind_rows() .dplyr::do() ): replyr_split() , replyr::gapply() .replyr_moveValuesToRows() / replyr_moveValuesToColumns() .Spark and sparklyr much easier.replyr is a product of the collective experience of using R in application solutions for many clients, collecting feedback and correcting flaws. base::date() ## [1] "Thu Jul 6 15:56:28 2017" # devtools::install_github('rstudio/sparklyr') # devtools::install_github('tidyverse/dplyr') # devtools::install_github('tidyverse/dbplyr') # install.packages("replyr") suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr") ## [1] '0.7.1.9000' packageVersion("dbplyr") ## [1] '1.1.0.9000' library("tidyr") packageVersion("tidyr") ## [1] '0.6.3' library("replyr") packageVersion("replyr") ## [1] '0.4.2' suppressPackageStartupMessages(library("sparklyr")) packageVersion("sparklyr") ## [1] '0.5.6.9012' # , https://github.com/rstudio/sparklyr/issues/783 config <- spark_config() config[["sparklyr.shell.driver-memory"]] <- "8G" sc <- sparklyr::spark_connect(version='2.1.0', hadoop_version = '2.7', master = "local", config = config) summary() and glance() , which cannot be performed on Spark . mtcars_spark <- copy_to(sc, mtcars) # , summary(mtcars_spark) ## Length Class Mode ## src 1 src_spark list ## ops 2 op_base_remote list packageVersion("broom") ## [1] '0.4.2' broom::glance(mtcars_spark) ## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl replyr_summary works. replyr_summary(mtcars_spark) %>% select(-lexmin, -lexmax, -nunique, -index) ## column class nrows nna min max mean sd ## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481 ## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216 ## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938 ## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685 ## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787 ## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574 ## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432 ## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161 ## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909 ## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041 ## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000 tidyr works mainly with local data. mtcars2 <- mtcars %>% mutate(car = row.names(mtcars)) %>% copy_to(sc, ., 'mtcars2') # mtcars2 %>% tidyr::gather('fact', 'value') ## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')" mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', nameForNewValueColumn= 'value', columnsToTakeFrom= colnames(mtcars), nameForNewClassColumn= 'class') %>% arrange(car, fact) ## # Source: lazy query [?? x 4] ## # Database: spark_connection ## # Ordered by: car, fact ## car fact value class ## ## 1 AMC Javelin am 0.00 numeric ## 2 AMC Javelin carb 2.00 numeric ## 3 AMC Javelin cyl 8.00 numeric ## 4 AMC Javelin disp 304.00 numeric ## 5 AMC Javelin drat 3.15 numeric ## 6 AMC Javelin gear 3.00 numeric ## 7 AMC Javelin hp 150.00 numeric ## 8 AMC Javelin mpg 15.20 numeric ## 9 AMC Javelin qsec 17.30 numeric ## 10 AMC Javelin vs 0.00 numeric ## # ... with 342 more rows dplyr bind_rows , union and union_all not applicable in Spark . replyr::replyr_union_all() and replyr::replyr_bind_rows() is a workable alternative. db1 <- copy_to(sc, data.frame(x=1:2, y=c('a','b'), stringsAsFactors=FALSE), name='db1') db2 <- copy_to(sc, data.frame(y=c('c','d'), x=3:4, stringsAsFactors=FALSE), name='db2') # - , bind_rows(list(db1, db2)) ## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl # union_all(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d # # , union(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 4 d ## 2 1 a ## 3 3 c ## 4 2 b replyr::replyr_bind_rows can link together several data.frame s. replyr_bind_rows(list(db1, db2)) ## # Source: table [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d Spark ).dplyr::do on local datahelp('do', package='dplyr') : by_cyl <- group_by(mtcars, cyl) do(by_cyl, head(., 2)) ## # A tibble: 6 x 11 ## # Groups: cyl [3] ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 dplyr::do on Spark by_cyl <- group_by(mtcars_spark, cyl) do(by_cyl, head(., 2)) ## # A tibble: 3 x 2 ## cyl V2 ## ## 1 6 ## 2 4 ## 3 8 replyr split / merge mtcars_spark %>% replyr_split('cyl', partitionMethod = 'extract') %>% lapply(function(di) head(di, 2)) %>% replyr_bind_rows() ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 replyr gapply mtcars_spark %>% gapply('cyl', partitionMethod = 'extract', function(di) head(di, 2)) ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 replyr::replyr_apply_f_mappedreplyr::replyr_apply_f_mapped() , and it works as follows.wrapr::let() ). # DecreaseRankColumnByOne <- function(d) { d$RankColumn <- d$RankColumn - 1 d } d (in which the column names are not as expected!), We use replyr::replyr_apply_f_mapped() to create a new parameterized adapter: # d <- data.frame(Sepal_Length = c(5.8,5.7), Sepal_Width = c(4.0,4.4), Species = 'setosa', rank = c(1,2)) # DecreaseRankColumnByOneNamed <- function(d, ColName) { replyr::replyr_apply_f_mapped(d, f = DecreaseRankColumnByOne, nmap = c(RankColumn = ColName), restrictMapIn = FALSE, restrictMapOut = FALSE) } # dF <- DecreaseRankColumnByOneNamed(d, 'rank') print(dF) ## Sepal_Length Sepal_Width Species rank ## 1 5.8 4.0 setosa 0 ## 2 5.7 4.4 setosa 1 replyr::replyr_apply_f_mapped() renames columns as expected in DecreaseRankColumnByOne (the match is specified in nmap ), applies DecreaseRankColumnByOne and returns the names to the original before returning the result.Sparklyr involve the creation of intermediate or temporary tables. This can be done with dplyr::copy_to() and dplyr::compute() . These methods can be resource intensive.replyr there are functions to keep the process under control: temporary name generators that do not alter the actual data (they are also used within the packet itself). print(replyr::makeTempNameGenerator) ## function (prefix, suffix = NULL) ## { ## force(prefix) ## if ((length(prefix) != 1) || (!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if (is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), ## collapse = "") ## } ## count <- 0 ## nameList <- list() ## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) { ## if (length(list(...)) > 0) { ## stop("replyr::makeTempNameGenerator tempname generate unexpected argument") ## } ## if (peek) { ## return(names(nameList)) ## } ## if (dumpList) { ## v <- names(nameList) ## nameList <<- list() ## return(v) ## } ## if (!is.null(remove)) { ## victims <- intersect(remove, names(nameList)) ## nameList[victims] <<- NULL ## return(victims) ## } ## nm <- paste(prefix, suffix, sprintf("%010d", count), ## sep = "_") ## nameList[[nm]] <<- 1 ## count <<- count + 1 ## nm ## } ## } ## ## compute after each join (otherwise, the resulting SQL can become long and difficult to understand and maintain). The code looks like this: # names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) # tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') # joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { # joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } # temps <- tmpNamGen(dumpList = TRUE) print(temps) ## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000" ## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001" ## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002" for(ti in temps) { db_drop_table(sc, ti) } # print(joined) ## # Source: table [?? x 6] ## # Database: spark_connection ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## ## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796 ## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018 ## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551 Sparklyr transformations, and then clear the temporary values all together when the results no longer depend on them.Spark or DB using R , consider replyr in addition to dplyr and sparklyr . sparklyr::spark_disconnect(sc) rm(list=ls()) gc() ## used (Mb) gc trigger (Mb) max used (Mb) ## Ncells 821292 43.9 1442291 77.1 1168576 62.5 ## Vcells 1364897 10.5 2552219 19.5 1694265 13.0 Source: https://habr.com/ru/post/334398/
All Articles