diff --git a/R/frollapply.R b/R/frollapply.R index 3863e99c5..910525042 100644 --- a/R/frollapply.R +++ b/R/frollapply.R @@ -278,12 +278,10 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," warn.simplify = gettext("frollapply completed successfully but raised a warning when attempting to simplify results using our internal 'simplifylist' function. Be sure to provide 'fill' argument matching the type and shape of results returned by the your function. Use simplify=FALSE to obtain a list instead. If you believe your results could be automatically simplified please submit your use case as new issue in our issue tracker.\n%s") } - DTths = getDTthreads(FALSE) - use.fork = .Platform$OS.type!="windows" && DTths > 1L - if (verbose) { - if (use.fork) cat("frollapply running on multiple CPU threads using parallel::mcparallel\n") - else cat("frollapply running on single CPU thread\n") - } + DTths0 = getDTthreads(FALSE) + use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L + if (verbose && !use.fork0) + cat("frollapply running on single CPU thread\n") ans = vector("list", nx*nn) ## vectorized x for (i in seq_len(nx)) { @@ -291,6 +289,19 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," thislen = len[i] if (!thislen) next + if (!use.fork0) { + use.fork = use.fork0 + } else { + # throttle + DTths = getDTthreadsC(thislen, TRUE) + use.fork = DTths > 1L + if (verbose) { + if (DTths < DTths0) + catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen) + else + catf("frollapply running on %d CPU threads\n", DTths) + } + } ## vectorized n for (j in seq_len(nn)) { thisn = N[[j]] @@ -302,7 +313,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," } else { tight0 } - if (use.fork) { ## !windows && getDTthreads()>1L + if (use.fork) { ## !windows && getDTthreads()>1L, and then throttle using getDTthreadsC(thislen, TRUE) ths = min(DTths, length(ansi)) ii = split(ansi, sort(rep_len(seq_len(ths), length(ansi)))) ## assign row indexes to threads jobs = vector("integer", ths) @@ -343,7 +354,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," if (any(fork.err)) { stopf( "frollapply received an error(s) when evaluating FUN:\n%s", - paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n") + attr(fork.res[fork.err][[1L]], "condition", TRUE)[["message"]] ## print only first error for consistency to single threaded code ) } thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE) diff --git a/R/openmp-utils.R b/R/openmp-utils.R index d0b34b49b..d9f879e1e 100644 --- a/R/openmp-utils.R +++ b/R/openmp-utils.R @@ -13,3 +13,8 @@ setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, thr getDTthreads = function(verbose=getOption("datatable.verbose")) { .Call(CgetDTthreads, verbose) } + +# internal, same as C's getDTthreads so can be used for parallel package from R +getDTthreadsC = function(n, throttle) { + .Call(CgetDTthreadsC, n, throttle) +} diff --git a/inst/tests/froll.Rraw b/inst/tests/froll.Rraw index 77770d1cd..88711e0c7 100644 --- a/inst/tests/froll.Rraw +++ b/inst/tests/froll.Rraw @@ -1522,13 +1522,32 @@ test(6010.016, frollapply(c(1, 9), 1L, FUN=function(x) copy(list(x)), simplify=F setDTthreads(old) #### test disabling parallelism -use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L +ths = getDTthreads() +use.fork = .Platform$OS.type!="windows" && ths > 1L if (use.fork) { - options(datatable.verbose=TRUE) - test(6010.021, frollapply(1:2, 1, identity), 1:2, output="running on multiple CPU threads using parallel::mcparallel") - options(datatable.verbose=FALSE) - test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); x}), 1:2) ## warning ignored - test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); x}), error="err:1\nerr:2") + ## throttle test start + test(6010.0201, frollapply(1:2, 1, copy), 1:2, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 2", ths), options=c(datatable.verbose=TRUE)) + test(6010.0202, frollapply(1:1024, 1, copy), 1:1024, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 1024", ths), options=c(datatable.verbose=TRUE)) + if (ths > 2L) { ## setDTthreads(8); ths = getDTthreads() + test(6010.0203, frollapply(1:1025, 1, copy), 1:1025, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 1025", ths), options=c(datatable.verbose=TRUE)) + test(6010.0204, frollapply(1:2048, 1, copy), 1:2048, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 2048", ths), options=c(datatable.verbose=TRUE)) + } else { ## CRAN: setDTthreads(2); ths = 2 + test(6010.0205, frollapply(1:1025, 1, copy), 1:1025, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0206, frollapply(1:2048, 1, copy), 1:2048, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + } + if (ths > 3L) { ## setDTthreads(8); ths = getDTthreads() + test(6010.0207, frollapply(1:2049, 1, copy), 1:2049, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 2049", ths), options=c(datatable.verbose=TRUE)) + test(6010.0208, frollapply(1:3072, 1, copy), 1:3072, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 3072", ths), options=c(datatable.verbose=TRUE)) + } else if (ths > 2L) { ## setDTthreads(3); ths = 3 + test(6010.0209, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0210, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE)) + } else { ## CRAN: setDTthreads(2); ths = 2 + test(6010.0211, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0212, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + } + ## throttle test end + test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), 1:2) ## warning ignored + test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## second error not printed for consistency to single threaded test(6010.024, frollapply(1:2, 1, function(x) stop("err")), error="err") ## unique error } old = setDTthreads(1L) @@ -1540,9 +1559,9 @@ test(6010.027, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy test(6010.028, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first setDTthreads(old) if (getDTthreads()>1L) { ## check for consistency - test(6010.036, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), c(1L,2L)) - test(6010.037, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), c(1L,2L)) - test(6010.038, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first + test(6010.036, frollapply(1:1025, 1, function(x) {warning("warn"); copy(x)}), 1:1025) + test(6010.037, frollapply(1:1025, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), 1:1025) + test(6010.038, frollapply(1:1025, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first } #### corner cases from examples - handled properly after frollapply rewrite to R @@ -1755,9 +1774,16 @@ test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_in test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here -#### fixing .internal.selfref +#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L if (use.fork) { + setDTthreads(throttle=1) ## disable throttle + old = setDTthreads(1) + test(6010.761, frollapply(c(1, 9), N=1L, FUN=identity), c(9,9)) ## unexpected + test(6010.762, frollapply(c(1, 9), N=1L, FUN=list), data.table(V1=c(9,9))) ## unexpected + setDTthreads(2) + test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input + test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!=""} ans = frollapply(1:2, 2, data.table) ## default: fill=NA test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed @@ -1777,6 +1803,7 @@ if (use.fork) { test(6010.776, !is.ok(ans[[3L]])) ans = frollapply(1:3, 2, f, fill=data.table(NA), simplify=function(x) lapply(x, function(y) if (is.data.table(y)) setDT(y) else y)) test(6010.777, is.ok(ans[[3L]])) ## fix inside frollapply via simplify + setDTthreads(throttle=1024) ## re-enable throttle } ## partial adaptive diff --git a/man/frollapply.Rd b/man/frollapply.Rd index f313d50d5..b1930b4da 100644 --- a/man/frollapply.Rd +++ b/man/frollapply.Rd @@ -55,7 +55,7 @@ frollapply(c(1, 9), N=1L, FUN=list) ## unexpected # #1: 9 #2: 9 -setDTthreads(2) +setDTthreads(2, throttle=1) ## disable throttle frollapply(c(1, 9), N=1L, FUN=identity) ## good only because threads >= input #[1] 1 9 frollapply(c(1, 5, 9), N=1L, FUN=identity) ## unexpected again @@ -86,6 +86,7 @@ setDTthreads(old) \preformatted{ is.ok = function(x) {stopifnot(is.data.table(x)); format(attr(x, ".internal.selfref", TRUE))!=""} +setDTthreads(2, throttle=1) ## disable throttle ## frollapply will fix DT in most cases ans = frollapply(1:2, 2, data.table, fill=data.table(NA)) is.ok(ans) @@ -134,27 +135,27 @@ is.ok(ans[[3L]]) \itemize{ \item When using \code{by.column=FALSE} one can subset dataset before passing it to \code{X} to keep only columns relevant for the computation: \preformatted{ -x = setDT(lapply(1:100, function(x) as.double(rep.int(x,1e4L)))) -f = function(x) sum(x$V1*x$V2) +x = setDT(lapply(1:1000, function(x) as.double(rep.int(x,1e4L)))) +f = function(x) sum(x$V1 * x$V2) system.time(frollapply(x, 100, f, by.column=FALSE)) # user system elapsed -# 0.157 0.067 0.081 +# 0.689 0.180 0.164 system.time(frollapply(x[, c("V1","V2"), with=FALSE], 100, f, by.column=FALSE)) # user system elapsed -# 0.096 0.054 0.054 +# 0.057 0.142 0.070 } - \item Avoid partial, see \emph{\code{partial} argument} section of \code{\link{froll}} manual. + \item Avoid \code{partial} argument, see \emph{\code{partial} argument} section of \code{\link{froll}} manual. \item Avoid \code{simplify=TRUE} and provide a function instead: \preformatted{ x = rnorm(1e5) system.time(frollapply(x, 2, function(x) 1L, simplify=TRUE)) # user system elapsed -# 0.308 0.076 0.196 +# 0.212 0.188 0.156 system.time(frollapply(x, 2, function(x) 1L, simplify=unlist)) # user system elapsed -# 0.214 0.080 0.088 +# 0.105 0.167 0.056 } - \item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. + \item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. Usage of multiple CPU threads will be throttled for small input, as described in \code{\link{setDTthreads}} manual. \item Optimization that avoids repeated allocation of a window subset (see \emph{Implementation} section for details), in case of adaptive rolling function, depends on R's \emph{growable bit}. This feature has been added in R 3.4.0. Adaptive \code{frollapply} will still work on older versions of R but, due to repeated allocation of window subset, it will be much slower. \item Parallel computation of \code{FUN} is handled by \code{parallel} package (part of R core since 2.14.0) and its \emph{fork} mechanism. \emph{Fork} is not available on Windows OS therefore it will be always single threaded on that platform. } @@ -170,61 +171,61 @@ fill1 = data.table(min=NA_integer_, max=NA_integer_) fill2 = list(min=NA_integer_, max=NA_integer_) system.time(a<-frollapply(1:1e4, 100, fun1, fill=fill1)) # user system elapsed -# 2.047 0.337 0.788 +# 1.064 0.765 0.421 system.time(b<-frollapply(1:1e4, 100, fun2, fill=fill2)) # user system elapsed -# 0.205 0.125 0.138 -all.equal(a, b) +# 0.082 0.221 0.112 +all.equal(a, b) #[1] TRUE } - \item Code that is not dependent on rolling window should be taken out as pre or post computation: + \item Code that is not dependent on a rolling window should be taken out as pre or post computation: \preformatted{ x = c(1L,3L) system.time(for (i in 1:1e6) sum(x+1L)) # user system elapsed -# 0.308 0.004 0.312 +# 0.218 0.002 0.221 system.time({y = x+1L; for (i in 1:1e6) sum(y)}) # user system elapsed -# 0.203 0.000 0.202 +# 0.160 0.001 0.161 } \item Being strict about data types removes the need for R to handle them automatically: \preformatted{ x = vector("integer", 1e6) system.time(for (i in 1:1e6) x[i] = NA) # user system elapsed -# 0.160 0.000 0.161 +# 0.114 0.000 0.114 system.time(for (i in 1:1e6) x[i] = NA_integer_) # user system elapsed -# 0.05 0.00 0.05 +# 0.029 0.000 0.030 } \item If a function calls another function under the hood, it is usually better to call the latter one directly: \preformatted{ x = matrix(c(1L,2L,3L,4L), c(2L,2L)) system.time(for (i in 1:1e4) colSums(x)) # user system elapsed -# 0.051 0.000 0.051 +# 0.033 0.000 0.033 system.time(for (i in 1:1e4) .colSums(x, 2L, 2L)) # user system elapsed -# 0.015 0.000 0.015 +# 0.010 0.002 0.012 } - \item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely be faster when evaluating in many iterations: + \item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely to be faster when evaluating in many iterations: \preformatted{ ## uniqueN x = c(1L,3L,5L) system.time(for (i in 1:1e4) uniqueN(x)) # user system elapsed -# 0.156 0.004 0.160 +# 0.078 0.001 0.080 system.time(for (i in 1:1e4) length(unique(x))) # user system elapsed -# 0.040 0.004 0.043 +# 0.018 0.000 0.018 ## column subset x = data.table(v1 = c(1L,3L,5L)) system.time(for (i in 1:1e4) x[, v1]) # user system elapsed -# 3.197 0.004 3.201 +# 1.952 0.011 1.964 system.time(for (i in 1:1e4) x[["v1"]]) # user system elapsed -# 0.063 0.000 0.063 +# 0.036 0.000 0.035 } } } diff --git a/src/data.table.h b/src/data.table.h index 80e1feb86..49c37bc53 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -386,6 +386,7 @@ SEXP gshift(SEXP, SEXP, SEXP, SEXP); SEXP nestedid(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setDTthreads(SEXP, SEXP, SEXP, SEXP); SEXP getDTthreads_R(SEXP); +SEXP getDTthreads_C(SEXP, SEXP); SEXP nqRecreateIndices(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP fsort(SEXP, SEXP); SEXP inrange(SEXP, SEXP, SEXP, SEXP); diff --git a/src/init.c b/src/init.c index 67394415f..ef81a7a0e 100644 --- a/src/init.c +++ b/src/init.c @@ -120,6 +120,7 @@ R_CallMethodDef callMethods[] = { {"Cnestedid", (DL_FUNC) &nestedid, -1}, {"CsetDTthreads", (DL_FUNC) &setDTthreads, -1}, {"CgetDTthreads", (DL_FUNC) &getDTthreads_R, -1}, +{"CgetDTthreadsC", (DL_FUNC) &getDTthreads_C, -1}, {"CnqRecreateIndices", (DL_FUNC) &nqRecreateIndices, -1}, {"Cfsort", (DL_FUNC) &fsort, -1}, {"Cinrange", (DL_FUNC) &inrange, -1}, diff --git a/src/openmp-utils.c b/src/openmp-utils.c index a3ccf6699..a0cdec536 100644 --- a/src/openmp-utils.c +++ b/src/openmp-utils.c @@ -77,6 +77,15 @@ static const char *mygetenv(const char *name, const char *unset) return (ans == NULL || ans[0] == '\0') ? unset : ans; } +SEXP getDTthreads_C(SEXP n, SEXP throttle) +{ + if(!isInteger(n) || INTEGER(n)[0] < 0) + internal_error(__func__, "n must be non-negative integer"); // # nocov + if(!IS_TRUE_OR_FALSE(throttle)) + internal_error(__func__, "throttle must be TRUE or FALSE"); // # nocov + return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0])); +} + SEXP getDTthreads_R(SEXP verbose) { if(!IS_TRUE_OR_FALSE(verbose))