From 0eae42c59e765a0eb47005abce00eaf855d177e8 Mon Sep 17 00:00:00 2001 From: Jan Gorecki Date: Wed, 10 Sep 2025 17:58:54 +0200 Subject: [PATCH 1/3] frollapply throttle --- R/frollapply.R | 27 +++++++++++++++++------- R/openmp-utils.R | 5 +++++ inst/tests/froll.Rraw | 47 ++++++++++++++++++++++++++++++++--------- man/frollapply.Rd | 49 ++++++++++++++++++++++--------------------- src/data.table.h | 1 + src/init.c | 1 + src/openmp-utils.c | 9 ++++++++ 7 files changed, 97 insertions(+), 42 deletions(-) diff --git a/R/frollapply.R b/R/frollapply.R index 3863e99c5c..7a87e91485 100644 --- a/R/frollapply.R +++ b/R/frollapply.R @@ -278,12 +278,8 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," warn.simplify = gettext("frollapply completed successfully but raised a warning when attempting to simplify results using our internal 'simplifylist' function. Be sure to provide 'fill' argument matching the type and shape of results returned by the your function. Use simplify=FALSE to obtain a list instead. If you believe your results could be automatically simplified please submit your use case as new issue in our issue tracker.\n%s") } - DTths = getDTthreads(FALSE) - use.fork = .Platform$OS.type!="windows" && DTths > 1L - if (verbose) { - if (use.fork) cat("frollapply running on multiple CPU threads using parallel::mcparallel\n") - else cat("frollapply running on single CPU thread\n") - } + DTths0 = getDTthreads(FALSE) + use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L ans = vector("list", nx*nn) ## vectorized x for (i in seq_len(nx)) { @@ -291,6 +287,21 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," thislen = len[i] if (!thislen) next + if (!use.fork0) { + use.fork = use.fork0 + if (verbose) + cat("frollapply running on single CPU thread\n") + } else { + # throttle + DTths = getDTthreadsC(thislen, TRUE) + use.fork = DTths > 1L + if (verbose) { + if (DTths < DTths0) + catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen) + else + catf("frollapply running on %d CPU threads\n", DTths) + } + } ## vectorized n for (j in seq_len(nn)) { thisn = N[[j]] @@ -302,7 +313,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," } else { tight0 } - if (use.fork) { ## !windows && getDTthreads()>1L + if (use.fork) { ## !windows && getDTthreads()>1L, and then throttle using getDTthreadsC(thislen, TRUE) ths = min(DTths, length(ansi)) ii = split(ansi, sort(rep_len(seq_len(ths), length(ansi)))) ## assign row indexes to threads jobs = vector("integer", ths) @@ -343,7 +354,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," if (any(fork.err)) { stopf( "frollapply received an error(s) when evaluating FUN:\n%s", - paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n") + attr(fork.res[fork.err][[1L]], "condition", TRUE)[["message"]] ## print only first error for consistency to single threaded code ) } thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE) diff --git a/R/openmp-utils.R b/R/openmp-utils.R index d0b34b49bd..d9f879e1e9 100644 --- a/R/openmp-utils.R +++ b/R/openmp-utils.R @@ -13,3 +13,8 @@ setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, thr getDTthreads = function(verbose=getOption("datatable.verbose")) { .Call(CgetDTthreads, verbose) } + +# internal, same as C's getDTthreads so can be used for parallel package from R +getDTthreadsC = function(n, throttle) { + .Call(CgetDTthreadsC, n, throttle) +} diff --git a/inst/tests/froll.Rraw b/inst/tests/froll.Rraw index f9beb1d634..530570d4f5 100644 --- a/inst/tests/froll.Rraw +++ b/inst/tests/froll.Rraw @@ -1261,13 +1261,32 @@ test(6010.016, frollapply(c(1, 9), 1L, FUN=function(x) copy(list(x)), simplify=F setDTthreads(old) #### test disabling parallelism -use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L +ths = getDTthreads() +use.fork = .Platform$OS.type!="windows" && ths > 1L if (use.fork) { - options(datatable.verbose=TRUE) - test(6010.021, frollapply(1:2, 1, identity), 1:2, output="running on multiple CPU threads using parallel::mcparallel") - options(datatable.verbose=FALSE) - test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); x}), 1:2) ## warning ignored - test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); x}), error="err:1\nerr:2") + ## throttle test start + test(6010.0201, frollapply(1:2, 1, copy), 1:2, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 2", ths), options=c(datatable.verbose=TRUE)) + test(6010.0202, frollapply(1:1024, 1, copy), 1:1024, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 1024", ths), options=c(datatable.verbose=TRUE)) + if (ths > 2L) { ## setDTthreads(8); ths = getDTthreads() + test(6010.0203, frollapply(1:1025, 1, copy), 1:1025, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 1025", ths), options=c(datatable.verbose=TRUE)) + test(6010.0204, frollapply(1:2048, 1, copy), 1:2048, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 2048", ths), options=c(datatable.verbose=TRUE)) + } else { ## CRAN: setDTthreads(2); ths = 2 + test(6010.0205, frollapply(1:1025, 1, copy), 1:1025, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0206, frollapply(1:2048, 1, copy), 1:2048, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + } + if (ths > 3L) { ## setDTthreads(8); ths = getDTthreads() + test(6010.0207, frollapply(1:2049, 1, copy), 1:2049, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 2049", ths), options=c(datatable.verbose=TRUE)) + test(6010.0208, frollapply(1:3072, 1, copy), 1:3072, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 3072", ths), options=c(datatable.verbose=TRUE)) + } else if (ths > 2L) { ## setDTthreads(3); ths = 3 + test(6010.0209, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0210, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE)) + } else { ## CRAN: setDTthreads(2); ths = 2 + test(6010.0211, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + test(6010.0212, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE)) + } + ## throttle test end + test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), 1:2) ## warning ignored + test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## second error not printed for consistency to single threaded test(6010.024, frollapply(1:2, 1, function(x) stop("err")), error="err") ## unique error } old = setDTthreads(1L) @@ -1279,9 +1298,9 @@ test(6010.027, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy test(6010.028, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first setDTthreads(old) if (getDTthreads()>1L) { ## check for consistency - test(6010.036, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), c(1L,2L)) - test(6010.037, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), c(1L,2L)) - test(6010.038, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first + test(6010.036, frollapply(1:1025, 1, function(x) {warning("warn"); copy(x)}), 1:1025) + test(6010.037, frollapply(1:1025, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), 1:1025) + test(6010.038, frollapply(1:1025, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first } #### corner cases from examples - handled properly after frollapply rewrite to R @@ -1494,9 +1513,16 @@ test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_in test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here -#### fixing .internal.selfref +#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L if (use.fork) { + setDTthreads(throttle=1) ## disable throttle + old = setDTthreads(1) + test(6010.761, frollapply(c(1, 9), N=1L, FUN=identity), c(9,9)) ## unexpected + test(6010.762, frollapply(c(1, 9), N=1L, FUN=list), data.table(V1=c(9,9))) ## unexpected + setDTthreads(2) + test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input + test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!=""} ans = frollapply(1:2, 2, data.table) ## default: fill=NA test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed @@ -1516,6 +1542,7 @@ if (use.fork) { test(6010.776, !is.ok(ans[[3L]])) ans = frollapply(1:3, 2, f, fill=data.table(NA), simplify=function(x) lapply(x, function(y) if (is.data.table(y)) setDT(y) else y)) test(6010.777, is.ok(ans[[3L]])) ## fix inside frollapply via simplify + setDTthreads(throttle=1024) ## re-enable throttle } ## partial adaptive diff --git a/man/frollapply.Rd b/man/frollapply.Rd index f313d50d5b..b1930b4da0 100644 --- a/man/frollapply.Rd +++ b/man/frollapply.Rd @@ -55,7 +55,7 @@ frollapply(c(1, 9), N=1L, FUN=list) ## unexpected # #1: 9 #2: 9 -setDTthreads(2) +setDTthreads(2, throttle=1) ## disable throttle frollapply(c(1, 9), N=1L, FUN=identity) ## good only because threads >= input #[1] 1 9 frollapply(c(1, 5, 9), N=1L, FUN=identity) ## unexpected again @@ -86,6 +86,7 @@ setDTthreads(old) \preformatted{ is.ok = function(x) {stopifnot(is.data.table(x)); format(attr(x, ".internal.selfref", TRUE))!=""} +setDTthreads(2, throttle=1) ## disable throttle ## frollapply will fix DT in most cases ans = frollapply(1:2, 2, data.table, fill=data.table(NA)) is.ok(ans) @@ -134,27 +135,27 @@ is.ok(ans[[3L]]) \itemize{ \item When using \code{by.column=FALSE} one can subset dataset before passing it to \code{X} to keep only columns relevant for the computation: \preformatted{ -x = setDT(lapply(1:100, function(x) as.double(rep.int(x,1e4L)))) -f = function(x) sum(x$V1*x$V2) +x = setDT(lapply(1:1000, function(x) as.double(rep.int(x,1e4L)))) +f = function(x) sum(x$V1 * x$V2) system.time(frollapply(x, 100, f, by.column=FALSE)) # user system elapsed -# 0.157 0.067 0.081 +# 0.689 0.180 0.164 system.time(frollapply(x[, c("V1","V2"), with=FALSE], 100, f, by.column=FALSE)) # user system elapsed -# 0.096 0.054 0.054 +# 0.057 0.142 0.070 } - \item Avoid partial, see \emph{\code{partial} argument} section of \code{\link{froll}} manual. + \item Avoid \code{partial} argument, see \emph{\code{partial} argument} section of \code{\link{froll}} manual. \item Avoid \code{simplify=TRUE} and provide a function instead: \preformatted{ x = rnorm(1e5) system.time(frollapply(x, 2, function(x) 1L, simplify=TRUE)) # user system elapsed -# 0.308 0.076 0.196 +# 0.212 0.188 0.156 system.time(frollapply(x, 2, function(x) 1L, simplify=unlist)) # user system elapsed -# 0.214 0.080 0.088 +# 0.105 0.167 0.056 } - \item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. + \item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. Usage of multiple CPU threads will be throttled for small input, as described in \code{\link{setDTthreads}} manual. \item Optimization that avoids repeated allocation of a window subset (see \emph{Implementation} section for details), in case of adaptive rolling function, depends on R's \emph{growable bit}. This feature has been added in R 3.4.0. Adaptive \code{frollapply} will still work on older versions of R but, due to repeated allocation of window subset, it will be much slower. \item Parallel computation of \code{FUN} is handled by \code{parallel} package (part of R core since 2.14.0) and its \emph{fork} mechanism. \emph{Fork} is not available on Windows OS therefore it will be always single threaded on that platform. } @@ -170,61 +171,61 @@ fill1 = data.table(min=NA_integer_, max=NA_integer_) fill2 = list(min=NA_integer_, max=NA_integer_) system.time(a<-frollapply(1:1e4, 100, fun1, fill=fill1)) # user system elapsed -# 2.047 0.337 0.788 +# 1.064 0.765 0.421 system.time(b<-frollapply(1:1e4, 100, fun2, fill=fill2)) # user system elapsed -# 0.205 0.125 0.138 -all.equal(a, b) +# 0.082 0.221 0.112 +all.equal(a, b) #[1] TRUE } - \item Code that is not dependent on rolling window should be taken out as pre or post computation: + \item Code that is not dependent on a rolling window should be taken out as pre or post computation: \preformatted{ x = c(1L,3L) system.time(for (i in 1:1e6) sum(x+1L)) # user system elapsed -# 0.308 0.004 0.312 +# 0.218 0.002 0.221 system.time({y = x+1L; for (i in 1:1e6) sum(y)}) # user system elapsed -# 0.203 0.000 0.202 +# 0.160 0.001 0.161 } \item Being strict about data types removes the need for R to handle them automatically: \preformatted{ x = vector("integer", 1e6) system.time(for (i in 1:1e6) x[i] = NA) # user system elapsed -# 0.160 0.000 0.161 +# 0.114 0.000 0.114 system.time(for (i in 1:1e6) x[i] = NA_integer_) # user system elapsed -# 0.05 0.00 0.05 +# 0.029 0.000 0.030 } \item If a function calls another function under the hood, it is usually better to call the latter one directly: \preformatted{ x = matrix(c(1L,2L,3L,4L), c(2L,2L)) system.time(for (i in 1:1e4) colSums(x)) # user system elapsed -# 0.051 0.000 0.051 +# 0.033 0.000 0.033 system.time(for (i in 1:1e4) .colSums(x, 2L, 2L)) # user system elapsed -# 0.015 0.000 0.015 +# 0.010 0.002 0.012 } - \item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely be faster when evaluating in many iterations: + \item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely to be faster when evaluating in many iterations: \preformatted{ ## uniqueN x = c(1L,3L,5L) system.time(for (i in 1:1e4) uniqueN(x)) # user system elapsed -# 0.156 0.004 0.160 +# 0.078 0.001 0.080 system.time(for (i in 1:1e4) length(unique(x))) # user system elapsed -# 0.040 0.004 0.043 +# 0.018 0.000 0.018 ## column subset x = data.table(v1 = c(1L,3L,5L)) system.time(for (i in 1:1e4) x[, v1]) # user system elapsed -# 3.197 0.004 3.201 +# 1.952 0.011 1.964 system.time(for (i in 1:1e4) x[["v1"]]) # user system elapsed -# 0.063 0.000 0.063 +# 0.036 0.000 0.035 } } } diff --git a/src/data.table.h b/src/data.table.h index ca72bf510c..95f43d3d55 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -376,6 +376,7 @@ SEXP gshift(SEXP, SEXP, SEXP, SEXP); SEXP nestedid(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setDTthreads(SEXP, SEXP, SEXP, SEXP); SEXP getDTthreads_R(SEXP); +SEXP getDTthreads_C(SEXP, SEXP); SEXP nqRecreateIndices(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP fsort(SEXP, SEXP); SEXP inrange(SEXP, SEXP, SEXP, SEXP); diff --git a/src/init.c b/src/init.c index 67394415f2..ef81a7a0e0 100644 --- a/src/init.c +++ b/src/init.c @@ -120,6 +120,7 @@ R_CallMethodDef callMethods[] = { {"Cnestedid", (DL_FUNC) &nestedid, -1}, {"CsetDTthreads", (DL_FUNC) &setDTthreads, -1}, {"CgetDTthreads", (DL_FUNC) &getDTthreads_R, -1}, +{"CgetDTthreadsC", (DL_FUNC) &getDTthreads_C, -1}, {"CnqRecreateIndices", (DL_FUNC) &nqRecreateIndices, -1}, {"Cfsort", (DL_FUNC) &fsort, -1}, {"Cinrange", (DL_FUNC) &inrange, -1}, diff --git a/src/openmp-utils.c b/src/openmp-utils.c index a3ccf6699c..8d6adb173f 100644 --- a/src/openmp-utils.c +++ b/src/openmp-utils.c @@ -77,6 +77,15 @@ static const char *mygetenv(const char *name, const char *unset) return (ans == NULL || ans[0] == '\0') ? unset : ans; } +SEXP getDTthreads_C(SEXP n, SEXP throttle) +{ + if(!isInteger(n) || INTEGER(n)[0] < 0) + error(_("%s must be non-negative integer"), "n"); + if(!IS_TRUE_OR_FALSE(throttle)) + error(_("%s must be TRUE or FALSE"), "throttle"); + return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0])); +} + SEXP getDTthreads_R(SEXP verbose) { if(!IS_TRUE_OR_FALSE(verbose)) From 14deecd154381b043ce6ed31a73e2d72ff1552d3 Mon Sep 17 00:00:00 2001 From: Jan Gorecki Date: Wed, 10 Sep 2025 20:39:38 +0200 Subject: [PATCH 2/3] fix codecov --- src/openmp-utils.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openmp-utils.c b/src/openmp-utils.c index 8d6adb173f..a0cdec5360 100644 --- a/src/openmp-utils.c +++ b/src/openmp-utils.c @@ -80,9 +80,9 @@ static const char *mygetenv(const char *name, const char *unset) SEXP getDTthreads_C(SEXP n, SEXP throttle) { if(!isInteger(n) || INTEGER(n)[0] < 0) - error(_("%s must be non-negative integer"), "n"); + internal_error(__func__, "n must be non-negative integer"); // # nocov if(!IS_TRUE_OR_FALSE(throttle)) - error(_("%s must be TRUE or FALSE"), "throttle"); + internal_error(__func__, "throttle must be TRUE or FALSE"); // # nocov return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0])); } From 4ea30ea68ec0edcddd120aeb54cb61e32e40e701 Mon Sep 17 00:00:00 2001 From: Jan Gorecki Date: Thu, 11 Sep 2025 12:31:38 +0200 Subject: [PATCH 3/3] print once whatever will not change --- R/frollapply.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/frollapply.R b/R/frollapply.R index 7a87e91485..9105250427 100644 --- a/R/frollapply.R +++ b/R/frollapply.R @@ -280,6 +280,8 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," DTths0 = getDTthreads(FALSE) use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L + if (verbose && !use.fork0) + cat("frollapply running on single CPU thread\n") ans = vector("list", nx*nn) ## vectorized x for (i in seq_len(nx)) { @@ -289,8 +291,6 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right"," next if (!use.fork0) { use.fork = use.fork0 - if (verbose) - cat("frollapply running on single CPU thread\n") } else { # throttle DTths = getDTthreadsC(thislen, TRUE)