Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions R/frollapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,30 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
warn.simplify = gettext("frollapply completed successfully but raised a warning when attempting to simplify results using our internal 'simplifylist' function. Be sure to provide 'fill' argument matching the type and shape of results returned by the your function. Use simplify=FALSE to obtain a list instead. If you believe your results could be automatically simplified please submit your use case as new issue in our issue tracker.\n%s")
}

DTths = getDTthreads(FALSE)
use.fork = .Platform$OS.type!="windows" && DTths > 1L
if (verbose) {
if (use.fork) cat("frollapply running on multiple CPU threads using parallel::mcparallel\n")
else cat("frollapply running on single CPU thread\n")
}
DTths0 = getDTthreads(FALSE)
use.fork0 = .Platform$OS.type!="windows" && DTths0 > 1L
if (verbose && !use.fork0)
cat("frollapply running on single CPU thread\n")
ans = vector("list", nx*nn)
## vectorized x
for (i in seq_len(nx)) {
thisx = X[[i]]
thislen = len[i]
if (!thislen)
next
if (!use.fork0) {
use.fork = use.fork0
} else {
# throttle
DTths = getDTthreadsC(thislen, TRUE)
use.fork = DTths > 1L
if (verbose) {
if (DTths < DTths0)
catf("frollapply run on %d CPU threads throttled to %d threads, input length %d\n", DTths0, DTths, thislen)
else
catf("frollapply running on %d CPU threads\n", DTths)
}
}
## vectorized n
for (j in seq_len(nn)) {
thisn = N[[j]]
Expand All @@ -302,7 +313,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
} else {
tight0
}
if (use.fork) { ## !windows && getDTthreads()>1L
if (use.fork) { ## !windows && getDTthreads()>1L, and then throttle using getDTthreadsC(thislen, TRUE)
ths = min(DTths, length(ansi))
ii = split(ansi, sort(rep_len(seq_len(ths), length(ansi)))) ## assign row indexes to threads
jobs = vector("integer", ths)
Expand Down Expand Up @@ -343,7 +354,7 @@ frollapply = function(X, N, FUN, ..., by.column=TRUE, fill=NA, align=c("right","
if (any(fork.err)) {
stopf(
"frollapply received an error(s) when evaluating FUN:\n%s",
paste(unique(vapply_1c(fork.res[fork.err], function(err) attr(err, "condition", TRUE)[["message"]], use.names = FALSE)), collapse = "\n")
attr(fork.res[fork.err][[1L]], "condition", TRUE)[["message"]] ## print only first error for consistency to single threaded code
)
}
thisans = unlist(fork.res, recursive = FALSE, use.names = FALSE)
Expand Down
5 changes: 5 additions & 0 deletions R/openmp-utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ setDTthreads = function(threads=NULL, restore_after_fork=NULL, percent=NULL, thr
getDTthreads = function(verbose=getOption("datatable.verbose")) {
.Call(CgetDTthreads, verbose)
}

# internal, same as C's getDTthreads so can be used for parallel package from R
getDTthreadsC = function(n, throttle) {
.Call(CgetDTthreadsC, n, throttle)
}
47 changes: 37 additions & 10 deletions inst/tests/froll.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -1522,13 +1522,32 @@ test(6010.016, frollapply(c(1, 9), 1L, FUN=function(x) copy(list(x)), simplify=F
setDTthreads(old)

#### test disabling parallelism
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
ths = getDTthreads()
use.fork = .Platform$OS.type!="windows" && ths > 1L
if (use.fork) {
options(datatable.verbose=TRUE)
test(6010.021, frollapply(1:2, 1, identity), 1:2, output="running on multiple CPU threads using parallel::mcparallel")
options(datatable.verbose=FALSE)
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); x}), 1:2) ## warning ignored
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); x}), error="err:1\nerr:2")
## throttle test start
test(6010.0201, frollapply(1:2, 1, copy), 1:2, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 2", ths), options=c(datatable.verbose=TRUE))
test(6010.0202, frollapply(1:1024, 1, copy), 1:1024, output=sprintf("frollapply run on %d CPU threads throttled to 1 threads, input length 1024", ths), options=c(datatable.verbose=TRUE))
if (ths > 2L) { ## setDTthreads(8); ths = getDTthreads()
test(6010.0203, frollapply(1:1025, 1, copy), 1:1025, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 1025", ths), options=c(datatable.verbose=TRUE))
test(6010.0204, frollapply(1:2048, 1, copy), 1:2048, output=sprintf("frollapply run on %d CPU threads throttled to 2 threads, input length 2048", ths), options=c(datatable.verbose=TRUE))
} else { ## CRAN: setDTthreads(2); ths = 2
test(6010.0205, frollapply(1:1025, 1, copy), 1:1025, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
test(6010.0206, frollapply(1:2048, 1, copy), 1:2048, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
}
if (ths > 3L) { ## setDTthreads(8); ths = getDTthreads()
test(6010.0207, frollapply(1:2049, 1, copy), 1:2049, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 2049", ths), options=c(datatable.verbose=TRUE))
test(6010.0208, frollapply(1:3072, 1, copy), 1:3072, output=sprintf("frollapply run on %d CPU threads throttled to 3 threads, input length 3072", ths), options=c(datatable.verbose=TRUE))
} else if (ths > 2L) { ## setDTthreads(3); ths = 3
test(6010.0209, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
test(6010.0210, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 3 CPU threads", options=c(datatable.verbose=TRUE))
} else { ## CRAN: setDTthreads(2); ths = 2
test(6010.0211, frollapply(1:2049, 1, copy), 1:2049, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
test(6010.0212, frollapply(1:3072, 1, copy), 1:3072, output="frollapply running on 2 CPU threads", options=c(datatable.verbose=TRUE))
}
## throttle test end
test(6010.022, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), 1:2) ## warning ignored
test(6010.023, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## second error not printed for consistency to single threaded
test(6010.024, frollapply(1:2, 1, function(x) stop("err")), error="err") ## unique error
}
old = setDTthreads(1L)
Expand All @@ -1540,9 +1559,9 @@ test(6010.027, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy
test(6010.028, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
setDTthreads(old)
if (getDTthreads()>1L) { ## check for consistency
test(6010.036, frollapply(1:2, 1, function(x) {warning("warn"); copy(x)}), c(1L,2L))
test(6010.037, frollapply(1:2, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), c(1L,2L))
test(6010.038, frollapply(1:2, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
test(6010.036, frollapply(1:1025, 1, function(x) {warning("warn"); copy(x)}), 1:1025)
test(6010.037, frollapply(1:1025, 1, function(x) {warning("warn:", tail(x,1)); copy(x)}), 1:1025)
test(6010.038, frollapply(1:1025, 1, function(x) {stop("err:", tail(x,1)); copy(x)}), error="err:1") ## only first
}

#### corner cases from examples - handled properly after frollapply rewrite to R
Expand Down Expand Up @@ -1755,9 +1774,16 @@ test(6010.711, frollapply(1:5, 2, function(x) as.list(range(x)), fill=list(NA_in
test(6010.712, as.null(frollapply(1:3, 1, function(x) if (x==1L) sum else if (x==2L) mean else `[`, simplify=TRUE)), NULL) ## as.null as we are only interested in codecov here
test(6010.713, as.null(frollapply(1:3, 1, function(x) `[`, simplify = TRUE)), NULL) ## as.null as we are only interested in codecov here

#### fixing .internal.selfref
#### mutlithreading throttle caveats from manual: copy, fixing .internal.selfref
use.fork = .Platform$OS.type!="windows" && getDTthreads()>1L
if (use.fork) {
setDTthreads(throttle=1) ## disable throttle
old = setDTthreads(1)
test(6010.761, frollapply(c(1, 9), N=1L, FUN=identity), c(9,9)) ## unexpected
test(6010.762, frollapply(c(1, 9), N=1L, FUN=list), data.table(V1=c(9,9))) ## unexpected
setDTthreads(2)
test(6010.763, frollapply(c(1, 9), N=1L, FUN=identity), c(1,9)) ## good only because threads >= input
test(6010.764, frollapply(c(1, 5, 9), N=1L, FUN=identity), c(5,5,9)) ## unexpected again
is.ok = function(x) {stopifnot(is.data.table(x)); capture.output(print(attr(x, ".internal.selfref", TRUE)))!="<pointer: (nil)>"}
ans = frollapply(1:2, 2, data.table) ## default: fill=NA
test(6010.770, is.ok(ans[[2L]])) ## mismatch of 'fill' type so simplify=TRUE did not run rbindlist but frollapply detected DT and fixed
Expand All @@ -1777,6 +1803,7 @@ if (use.fork) {
test(6010.776, !is.ok(ans[[3L]]))
ans = frollapply(1:3, 2, f, fill=data.table(NA), simplify=function(x) lapply(x, function(y) if (is.data.table(y)) setDT(y) else y))
test(6010.777, is.ok(ans[[3L]])) ## fix inside frollapply via simplify
setDTthreads(throttle=1024) ## re-enable throttle
}

## partial adaptive
Expand Down
49 changes: 25 additions & 24 deletions man/frollapply.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ frollapply(c(1, 9), N=1L, FUN=list) ## unexpected
# <num>
#1: 9
#2: 9
setDTthreads(2)
setDTthreads(2, throttle=1) ## disable throttle
frollapply(c(1, 9), N=1L, FUN=identity) ## good only because threads >= input
#[1] 1 9
frollapply(c(1, 5, 9), N=1L, FUN=identity) ## unexpected again
Expand Down Expand Up @@ -86,6 +86,7 @@ setDTthreads(old)
\preformatted{
is.ok = function(x) {stopifnot(is.data.table(x)); format(attr(x, ".internal.selfref", TRUE))!="<pointer: (nil)>"}

setDTthreads(2, throttle=1) ## disable throttle
## frollapply will fix DT in most cases
ans = frollapply(1:2, 2, data.table, fill=data.table(NA))
is.ok(ans)
Expand Down Expand Up @@ -134,27 +135,27 @@ is.ok(ans[[3L]])
\itemize{
\item When using \code{by.column=FALSE} one can subset dataset before passing it to \code{X} to keep only columns relevant for the computation:
\preformatted{
x = setDT(lapply(1:100, function(x) as.double(rep.int(x,1e4L))))
f = function(x) sum(x$V1*x$V2)
x = setDT(lapply(1:1000, function(x) as.double(rep.int(x,1e4L))))
f = function(x) sum(x$V1 * x$V2)
system.time(frollapply(x, 100, f, by.column=FALSE))
# user system elapsed
# 0.157 0.067 0.081
# 0.689 0.180 0.164
system.time(frollapply(x[, c("V1","V2"), with=FALSE], 100, f, by.column=FALSE))
# user system elapsed
# 0.096 0.054 0.054
# 0.057 0.142 0.070
}
\item Avoid partial, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
\item Avoid \code{partial} argument, see \emph{\code{partial} argument} section of \code{\link{froll}} manual.
\item Avoid \code{simplify=TRUE} and provide a function instead:
\preformatted{
x = rnorm(1e5)
system.time(frollapply(x, 2, function(x) 1L, simplify=TRUE))
# user system elapsed
# 0.308 0.076 0.196
# 0.212 0.188 0.156
system.time(frollapply(x, 2, function(x) 1L, simplify=unlist))
# user system elapsed
# 0.214 0.080 0.088
# 0.105 0.167 0.056
}
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads.
\item CPU threads utilization in \code{frollapply} can be controlled by \code{\link{setDTthreads}}, which by default uses half of available CPU threads. Usage of multiple CPU threads will be throttled for small input, as described in \code{\link{setDTthreads}} manual.
\item Optimization that avoids repeated allocation of a window subset (see \emph{Implementation} section for details), in case of adaptive rolling function, depends on R's \emph{growable bit}. This feature has been added in R 3.4.0. Adaptive \code{frollapply} will still work on older versions of R but, due to repeated allocation of window subset, it will be much slower.
\item Parallel computation of \code{FUN} is handled by \code{parallel} package (part of R core since 2.14.0) and its \emph{fork} mechanism. \emph{Fork} is not available on Windows OS therefore it will be always single threaded on that platform.
}
Expand All @@ -170,61 +171,61 @@ fill1 = data.table(min=NA_integer_, max=NA_integer_)
fill2 = list(min=NA_integer_, max=NA_integer_)
system.time(a<-frollapply(1:1e4, 100, fun1, fill=fill1))
# user system elapsed
# 2.047 0.337 0.788
# 1.064 0.765 0.421
system.time(b<-frollapply(1:1e4, 100, fun2, fill=fill2))
# user system elapsed
# 0.205 0.125 0.138
all.equal(a, b)
# 0.082 0.221 0.112
all.equal(a, b)
#[1] TRUE
}
\item Code that is not dependent on rolling window should be taken out as pre or post computation:
\item Code that is not dependent on a rolling window should be taken out as pre or post computation:
\preformatted{
x = c(1L,3L)
system.time(for (i in 1:1e6) sum(x+1L))
# user system elapsed
# 0.308 0.004 0.312
# 0.218 0.002 0.221
system.time({y = x+1L; for (i in 1:1e6) sum(y)})
# user system elapsed
# 0.203 0.000 0.202
# 0.160 0.001 0.161
}
\item Being strict about data types removes the need for R to handle them automatically:
\preformatted{
x = vector("integer", 1e6)
system.time(for (i in 1:1e6) x[i] = NA)
# user system elapsed
# 0.160 0.000 0.161
# 0.114 0.000 0.114
system.time(for (i in 1:1e6) x[i] = NA_integer_)
# user system elapsed
# 0.05 0.00 0.05
# 0.029 0.000 0.030
}
\item If a function calls another function under the hood, it is usually better to call the latter one directly:
\preformatted{
x = matrix(c(1L,2L,3L,4L), c(2L,2L))
system.time(for (i in 1:1e4) colSums(x))
# user system elapsed
# 0.051 0.000 0.051
# 0.033 0.000 0.033
system.time(for (i in 1:1e4) .colSums(x, 2L, 2L))
# user system elapsed
# 0.015 0.000 0.015
# 0.010 0.002 0.012
}
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely be faster when evaluating in many iterations:
\item There are many functions that may be optimized for scaling up for bigger input, yet for a small input they may carry bigger overhead comparing to their simpler counterparts. One may need to experiment on own data, but low overhead functions are likely to be faster when evaluating in many iterations:
\preformatted{
## uniqueN
x = c(1L,3L,5L)
system.time(for (i in 1:1e4) uniqueN(x))
# user system elapsed
# 0.156 0.004 0.160
# 0.078 0.001 0.080
system.time(for (i in 1:1e4) length(unique(x)))
# user system elapsed
# 0.040 0.004 0.043
# 0.018 0.000 0.018
## column subset
x = data.table(v1 = c(1L,3L,5L))
system.time(for (i in 1:1e4) x[, v1])
# user system elapsed
# 3.197 0.004 3.201
# 1.952 0.011 1.964
system.time(for (i in 1:1e4) x[["v1"]])
# user system elapsed
# 0.063 0.000 0.063
# 0.036 0.000 0.035
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ SEXP gshift(SEXP, SEXP, SEXP, SEXP);
SEXP nestedid(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP setDTthreads(SEXP, SEXP, SEXP, SEXP);
SEXP getDTthreads_R(SEXP);
SEXP getDTthreads_C(SEXP, SEXP);
SEXP nqRecreateIndices(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP fsort(SEXP, SEXP);
SEXP inrange(SEXP, SEXP, SEXP, SEXP);
Expand Down
1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ R_CallMethodDef callMethods[] = {
{"Cnestedid", (DL_FUNC) &nestedid, -1},
{"CsetDTthreads", (DL_FUNC) &setDTthreads, -1},
{"CgetDTthreads", (DL_FUNC) &getDTthreads_R, -1},
{"CgetDTthreadsC", (DL_FUNC) &getDTthreads_C, -1},
{"CnqRecreateIndices", (DL_FUNC) &nqRecreateIndices, -1},
{"Cfsort", (DL_FUNC) &fsort, -1},
{"Cinrange", (DL_FUNC) &inrange, -1},
Expand Down
9 changes: 9 additions & 0 deletions src/openmp-utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ static const char *mygetenv(const char *name, const char *unset)
return (ans == NULL || ans[0] == '\0') ? unset : ans;
}

SEXP getDTthreads_C(SEXP n, SEXP throttle)
{
if(!isInteger(n) || INTEGER(n)[0] < 0)
internal_error(__func__, "n must be non-negative integer"); // # nocov
if(!IS_TRUE_OR_FALSE(throttle))
internal_error(__func__, "throttle must be TRUE or FALSE"); // # nocov
return ScalarInteger(getDTthreads(INTEGER(n)[0], LOGICAL(throttle)[0]));
}

SEXP getDTthreads_R(SEXP verbose)
{
if(!IS_TRUE_OR_FALSE(verbose))
Expand Down
Loading