replyr
is short for RE mote PLY ing of big data for R (remote processing of big data in R).replyr
? Because it allows you to use standard working approaches to remote data (databases or Spark ).data.frame
. replyr
provides these features:replyr_summary()
: replyr_summary()
.replyr_union_all()
.replyr_bind_rows()
.dplyr::do()
): replyr_split()
, replyr::gapply()
.replyr_moveValuesToRows()
/ replyr_moveValuesToColumns()
.Spark
and sparklyr
much easier.replyr
is a product of the collective experience of using R in application solutions for many clients, collecting feedback and correcting flaws. base::date() ## [1] "Thu Jul 6 15:56:28 2017" # devtools::install_github('rstudio/sparklyr') # devtools::install_github('tidyverse/dplyr') # devtools::install_github('tidyverse/dbplyr') # install.packages("replyr") suppressPackageStartupMessages(library("dplyr")) packageVersion("dplyr") ## [1] '0.7.1.9000' packageVersion("dbplyr") ## [1] '1.1.0.9000' library("tidyr") packageVersion("tidyr") ## [1] '0.6.3' library("replyr") packageVersion("replyr") ## [1] '0.4.2' suppressPackageStartupMessages(library("sparklyr")) packageVersion("sparklyr") ## [1] '0.5.6.9012' # , https://github.com/rstudio/sparklyr/issues/783 config <- spark_config() config[["sparklyr.shell.driver-memory"]] <- "8G" sc <- sparklyr::spark_connect(version='2.1.0', hadoop_version = '2.7', master = "local", config = config)
summary()
and glance()
, which cannot be performed on Spark
. mtcars_spark <- copy_to(sc, mtcars) # , summary(mtcars_spark) ## Length Class Mode ## src 1 src_spark list ## ops 2 op_base_remote list packageVersion("broom") ## [1] '0.4.2' broom::glance(mtcars_spark) ## Error: glance doesn't know how to deal with data of class tbl_sparktbl_sqltbl_lazytbl
replyr_summary
works. replyr_summary(mtcars_spark) %>% select(-lexmin, -lexmax, -nunique, -index) ## column class nrows nna min max mean sd ## 1 mpg numeric 32 0 10.400 33.900 20.090625 6.0269481 ## 2 cyl numeric 32 0 4.000 8.000 6.187500 1.7859216 ## 3 disp numeric 32 0 71.100 472.000 230.721875 123.9386938 ## 4 hp numeric 32 0 52.000 335.000 146.687500 68.5628685 ## 5 drat numeric 32 0 2.760 4.930 3.596563 0.5346787 ## 6 wt numeric 32 0 1.513 5.424 3.217250 0.9784574 ## 7 qsec numeric 32 0 14.500 22.900 17.848750 1.7869432 ## 8 vs numeric 32 0 0.000 1.000 0.437500 0.5040161 ## 9 am numeric 32 0 0.000 1.000 0.406250 0.4989909 ## 10 gear numeric 32 0 3.000 5.000 3.687500 0.7378041 ## 11 carb numeric 32 0 1.000 8.000 2.812500 1.6152000
tidyr
works mainly with local data. mtcars2 <- mtcars %>% mutate(car = row.names(mtcars)) %>% copy_to(sc, ., 'mtcars2') # mtcars2 %>% tidyr::gather('fact', 'value') ## Error in UseMethod("gather_"): no applicable method for 'gather_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')" mtcars2 %>% replyr_moveValuesToRows(nameForNewKeyColumn= 'fact', nameForNewValueColumn= 'value', columnsToTakeFrom= colnames(mtcars), nameForNewClassColumn= 'class') %>% arrange(car, fact) ## # Source: lazy query [?? x 4] ## # Database: spark_connection ## # Ordered by: car, fact ## car fact value class ## ## 1 AMC Javelin am 0.00 numeric ## 2 AMC Javelin carb 2.00 numeric ## 3 AMC Javelin cyl 8.00 numeric ## 4 AMC Javelin disp 304.00 numeric ## 5 AMC Javelin drat 3.15 numeric ## 6 AMC Javelin gear 3.00 numeric ## 7 AMC Javelin hp 150.00 numeric ## 8 AMC Javelin mpg 15.20 numeric ## 9 AMC Javelin qsec 17.30 numeric ## 10 AMC Javelin vs 0.00 numeric ## # ... with 342 more rows
dplyr bind_rows
, union
and union_all
not applicable in Spark
. replyr::replyr_union_all()
and replyr::replyr_bind_rows()
is a workable alternative. db1 <- copy_to(sc, data.frame(x=1:2, y=c('a','b'), stringsAsFactors=FALSE), name='db1') db2 <- copy_to(sc, data.frame(y=c('c','d'), x=3:4, stringsAsFactors=FALSE), name='db2') # - , bind_rows(list(db1, db2)) ## Error in bind_rows_(x, .id): Argument 1 must be a data frame or a named atomic vector, not a tbl_spark/tbl_sql/tbl_lazy/tbl
# union_all(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
# # , union(db1, db2) ## # Source: lazy query [?? x 2] ## # Database: spark_connection ## xy ## ## 1 4 d ## 2 1 a ## 3 3 c ## 4 2 b
replyr::replyr_bind_rows
can link together several data.frame
s. replyr_bind_rows(list(db1, db2)) ## # Source: table [?? x 2] ## # Database: spark_connection ## xy ## ## 1 1 a ## 2 2 b ## 3 3 c ## 4 4 d
Spark
).dplyr::do
on local datahelp('do', package='dplyr')
: by_cyl <- group_by(mtcars, cyl) do(by_cyl, head(., 2)) ## # A tibble: 6 x 11 ## # Groups: cyl [3] ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 2 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 3 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 4 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
dplyr::do
on Spark
by_cyl <- group_by(mtcars_spark, cyl) do(by_cyl, head(., 2)) ## # A tibble: 3 x 2 ## cyl V2 ## ## 1 6 ## 2 4 ## 3 8
replyr
split / merge mtcars_spark %>% replyr_split('cyl', partitionMethod = 'extract') %>% lapply(function(di) head(di, 2)) %>% replyr_bind_rows() ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr gapply
mtcars_spark %>% gapply('cyl', partitionMethod = 'extract', function(di) head(di, 2)) ## # Source: table [?? x 11] ## # Database: spark_connection ## mpg cyl disp hp drat wt qsec vs am gear carb ## ## 1 21.0 6 160.0 110 3.90 2.620 16.46 0 1 4 4 ## 2 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4 ## 3 22.8 4 108.0 93 3.85 2.320 18.61 1 1 4 1 ## 4 24.4 4 146.7 62 3.69 3.190 20.00 1 0 4 2 ## 5 18.7 8 360.0 175 3.15 3.440 17.02 0 0 3 2 ## 6 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4
replyr::replyr_apply_f_mapped
replyr::replyr_apply_f_mapped()
, and it works as follows.wrapr::let()
). # DecreaseRankColumnByOne <- function(d) { d$RankColumn <- d$RankColumn - 1 d }
d
(in which the column names are not as expected!), We use replyr::replyr_apply_f_mapped()
to create a new parameterized adapter: # d <- data.frame(Sepal_Length = c(5.8,5.7), Sepal_Width = c(4.0,4.4), Species = 'setosa', rank = c(1,2)) # DecreaseRankColumnByOneNamed <- function(d, ColName) { replyr::replyr_apply_f_mapped(d, f = DecreaseRankColumnByOne, nmap = c(RankColumn = ColName), restrictMapIn = FALSE, restrictMapOut = FALSE) } # dF <- DecreaseRankColumnByOneNamed(d, 'rank') print(dF) ## Sepal_Length Sepal_Width Species rank ## 1 5.8 4.0 setosa 0 ## 2 5.7 4.4 setosa 1
replyr::replyr_apply_f_mapped()
renames columns as expected in DecreaseRankColumnByOne
(the match is specified in nmap
), applies DecreaseRankColumnByOne
and returns the names to the original before returning the result.Sparklyr
involve the creation of intermediate or temporary tables. This can be done with dplyr::copy_to()
and dplyr::compute()
. These methods can be resource intensive.replyr
there are functions to keep the process under control: temporary name generators that do not alter the actual data (they are also used within the packet itself). print(replyr::makeTempNameGenerator) ## function (prefix, suffix = NULL) ## { ## force(prefix) ## if ((length(prefix) != 1) || (!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if (is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size = 20, replace = TRUE), ## collapse = "") ## } ## count <- 0 ## nameList <- list() ## function(..., peek = FALSE, dumpList = FALSE, remove = NULL) { ## if (length(list(...)) > 0) { ## stop("replyr::makeTempNameGenerator tempname generate unexpected argument") ## } ## if (peek) { ## return(names(nameList)) ## } ## if (dumpList) { ## v <- names(nameList) ## nameList <<- list() ## return(v) ## } ## if (!is.null(remove)) { ## victims <- intersect(remove, names(nameList)) ## nameList[victims] <<- NULL ## return(victims) ## } ## nm <- paste(prefix, suffix, sprintf("%010d", count), ## sep = "_") ## nameList[[nm]] <<- 1 ## count <<- count + 1 ## nm ## } ## } ## ##
compute
after each join (otherwise, the resulting SQL can become long and difficult to understand and maintain). The code looks like this: # names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) # tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') # joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { # joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } # temps <- tmpNamGen(dumpList = TRUE) print(temps) ## [1] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000000" ## [2] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000001" ## [3] "JOINTMP_1dr7xHI9CkSZJwXfKA1B_0000000002" for(ti in temps) { db_drop_table(sc, ti) } # print(joined) ## # Source: table [?? x 6] ## # Database: spark_connection ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## ## 1 1 0.8045418 0.5006293 0.8656174 0.5248073 0.8611796 ## 2 2 0.1593121 0.5802938 0.9722113 0.4532369 0.7429018 ## 3 3 0.4853835 0.5313043 0.6224256 0.1843134 0.1125551
Sparklyr
transformations, and then clear the temporary values all together when the results no longer depend on them.Spark
or DB using R
, consider replyr
in addition to dplyr
and sparklyr
. sparklyr::spark_disconnect(sc) rm(list=ls()) gc() ## used (Mb) gc trigger (Mb) max used (Mb) ## Ncells 821292 43.9 1442291 77.1 1168576 62.5 ## Vcells 1364897 10.5 2552219 19.5 1694265 13.0
Source: https://habr.com/ru/post/334398/
All Articles