Skip to content

Commit 9900553

Browse files
authored
Merge pull request #133 from RcppCore/feature/rcpp-parallel-num-threads
use RCPP_PARALLEL_NUM_THREADS for default num threads
2 parents b4a7edc + da83a9a commit 9900553

File tree

7 files changed

+105
-57
lines changed

7 files changed

+105
-57
lines changed

R/options.R

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,20 @@ setThreadOptions <- function(numThreads = "auto", stackSize = "auto") {
2222
else
2323
stackSize <- as.integer(stackSize)
2424

25-
# Call setThreadOptions if using tbb
26-
if (!is.null(dllInfo) && isUsingTbb())
27-
setTbbThreadOptions(numThreads, stackSize)
28-
25+
# set RCPP_PARALLEL_NUM_THREADS
2926
if (numThreads == -1L)
3027
Sys.unsetenv("RCPP_PARALLEL_NUM_THREADS")
3128
else
3229
Sys.setenv(RCPP_PARALLEL_NUM_THREADS = numThreads)
33-
}
34-
35-
setTbbThreadOptions <- function(numThreads, stackSize) {
36-
.Call(
37-
"setThreadOptions",
38-
as.integer(numThreads),
39-
as.integer(stackSize),
40-
PACKAGE = "RcppParallel"
41-
)
30+
31+
# set RCPP_PARALLEL_STACK_SIZE
32+
if (stackSize == 0L)
33+
Sys.unsetenv("RCPP_PARALLEL_STACK_SIZE")
34+
else
35+
Sys.setenv(RCPP_PARALLEL_STACK_SIZE = stackSize)
36+
4237
}
4338

4439
defaultNumThreads <- function() {
45-
.Call(
46-
"defaultNumThreads",
47-
PACKAGE = "RcppParallel"
48-
)
40+
.Call("defaultNumThreads", PACKAGE = "RcppParallel")
4941
}

inst/NEWS

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,38 @@
11
RcppParallel 5.0.3 (UNRELEASED)
22
------------------------------------------------------------------------
3+
4+
* setThreadOptions(...) can again be called multiple times per session.
5+
The requested number of threads will be used for invocations to parallelFor()
6+
and parallelReduce() that don't explicitly request a specific number of threads.
37
* The parallelFor() and parallelReduce() functions gain the 'numThreads'
48
argument, allowing one to limit the number of threads used for a
59
particular computation.
610

711
RcppParallel 5.0.2
812
------------------------------------------------------------------------
13+
914
* setThreadOptions(...) can now only be called once per session, to avoid
1015
segfaults when compiling RcppParallel / TBB with gcc 10.1. Subsequent
1116
calls to setThreadOptions(...) are ignored.
1217

1318
RcppParallel 5.0.1
1419
------------------------------------------------------------------------
20+
1521
* Fixed compilation issue on OpenSUSE Tumbleweed with -flto=auto
1622
* Fixed compilation when CPPFLAGS = -I/usr/local/include and a version
1723
of libtbb is installed there
1824

1925
RcppParallel 5.0.0
2026
------------------------------------------------------------------------
27+
2128
* RcppParallel backend can now be customized with RCPP_PARALLEL_BACKEND
2229
environment variable (supported values are 'tbb' and 'tinythread')
2330
* Fixed issue when compiling RcppParallel on macOS Catalina
2431
* Fixed issue when compiling RcppParallel with Rtools40
2532

2633
RcppParallel 4.4.4
2734
------------------------------------------------------------------------
35+
2836
* Fixed an issue when compiling RcppParallel with clang-9 on Fedora
2937

3038
RcppParallel 4.4.3

inst/include/RcppParallel.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ inline void parallelFor(std::size_t begin,
3434
std::size_t grainSize = 1,
3535
int numThreads = -1)
3636
{
37+
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1);
38+
numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1);
39+
3740
#if RCPP_PARALLEL_USE_TBB
3841
if (internal::backend() == internal::BACKEND_TBB)
3942
tbbParallelFor(begin, end, worker, grainSize, numThreads);
@@ -51,6 +54,9 @@ inline void parallelReduce(std::size_t begin,
5154
std::size_t grainSize = 1,
5255
int numThreads = -1)
5356
{
57+
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1);
58+
numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1);
59+
5460
#if RCPP_PARALLEL_USE_TBB
5561
if (internal::backend() == internal::BACKEND_TBB)
5662
tbbParallelReduce(begin, end, reducer, grainSize, numThreads);
@@ -61,6 +67,6 @@ inline void parallelReduce(std::size_t begin,
6167
#endif
6268
}
6369

64-
} // namespace RcppParallel
70+
} // end namespace RcppParallel
6571

6672
#endif // __RCPP_PARALLEL__

inst/include/RcppParallel/Common.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,41 @@
11
#ifndef __RCPP_PARALLEL_COMMON__
22
#define __RCPP_PARALLEL_COMMON__
33

4+
#include <cerrno>
45
#include <cstddef>
6+
#include <cstdlib>
57

68
namespace RcppParallel {
79

10+
template <typename T, typename U>
11+
inline int resolveValue(const char* envvar,
12+
T requestedValue,
13+
U defaultValue)
14+
{
15+
// if the requested value is non-zero and not the default, we can use it
16+
if (requestedValue != defaultValue && requestedValue > 0)
17+
return requestedValue;
18+
19+
// otherwise, try reading the default from associated envvar
20+
// if the environment variable is unset, use the default
21+
const char* var = getenv(envvar);
22+
if (var == NULL)
23+
return defaultValue;
24+
25+
// try to convert the string to a number
26+
// if an error occurs during conversion, just use default
27+
errno = 0;
28+
char* end;
29+
long value = strtol(var, &end, 10);
30+
31+
// check for conversion failure
32+
if (end == var || *end != '\0' || errno == ERANGE)
33+
return defaultValue;
34+
35+
// okay, return the parsed environment variable value
36+
return value;
37+
}
38+
839
// Work executed within a background thread. We implement dynamic
940
// dispatch using vtables so we can have a stable type to cast
1041
// to from the void* passed to the worker thread (required because

inst/include/RcppParallel/TBB.h

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33

44
#include "Common.h"
55

6+
#ifndef TBB_PREVIEW_GLOBAL_CONTROL
7+
# define TBB_PREVIEW_GLOBAL_CONTROL 1
8+
#endif
9+
610
#include <tbb/tbb.h>
11+
#include <tbb/global_control.h>
712
#include <tbb/scalable_allocator.h>
813

914
namespace RcppParallel {
@@ -178,6 +183,43 @@ class TBBArenaParallelReduceExecutor
178183
std::size_t end_;
179184
std::size_t grainSize_;
180185
};
186+
187+
class ThreadStackSizeControl
188+
{
189+
public:
190+
191+
ThreadStackSizeControl()
192+
: control_(nullptr)
193+
{
194+
int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0);
195+
if (stackSize > 0)
196+
{
197+
control_ = new tbb::global_control(
198+
tbb::global_control::thread_stack_size,
199+
stackSize
200+
);
201+
}
202+
}
203+
204+
~ThreadStackSizeControl()
205+
{
206+
if (control_ != nullptr)
207+
{
208+
delete control_;
209+
control_ = nullptr;
210+
}
211+
}
212+
213+
private:
214+
215+
// COPYING: not copyable
216+
ThreadStackSizeControl(const ThreadStackSizeControl&);
217+
ThreadStackSizeControl& operator=(const ThreadStackSizeControl&);
218+
219+
// private members
220+
tbb::global_control* control_;
221+
222+
};
181223

182224
} // anonymous namespace
183225

@@ -186,9 +228,11 @@ inline void tbbParallelFor(std::size_t begin,
186228
std::size_t end,
187229
Worker& worker,
188230
std::size_t grainSize = 1,
189-
int numThreads = -1)
231+
int numThreads = tbb::task_arena::automatic)
190232
{
191-
tbb::task_arena arena(numThreads == -1 ? tbb::task_arena::automatic : numThreads);
233+
ThreadStackSizeControl control;
234+
235+
tbb::task_arena arena(numThreads);
192236
tbb::task_group group;
193237

194238
TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize);
@@ -200,9 +244,11 @@ inline void tbbParallelReduce(std::size_t begin,
200244
std::size_t end,
201245
Reducer& reducer,
202246
std::size_t grainSize = 1,
203-
int numThreads = -1)
247+
int numThreads = tbb::task_arena::automatic)
204248
{
205-
tbb::task_arena arena(numThreads == -1 ? tbb::task_arena::automatic : numThreads);
249+
ThreadStackSizeControl control;
250+
251+
tbb::task_arena arena(numThreads);
206252
tbb::task_group group;
207253

208254
TBBArenaParallelReduceExecutor<Reducer> executor(group, reducer, begin, end, grainSize);

src/init.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,9 @@
66

77
/* .Call calls */
88
extern "C" SEXP defaultNumThreads();
9-
extern "C" SEXP setThreadOptions(SEXP, SEXP);
109

1110
static const R_CallMethodDef CallEntries[] = {
1211
{"defaultNumThreads", (DL_FUNC) &defaultNumThreads, 0},
13-
{"setThreadOptions", (DL_FUNC) &setThreadOptions, 2},
1412
{NULL, NULL, 0}
1513
};
1614

src/options.cpp

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,6 @@
88
#include <string>
99
#include <exception>
1010

11-
#include <tbb/task_scheduler_init.h>
12-
13-
extern "C" SEXP setThreadOptions(SEXP numThreadsSEXP, SEXP stackSizeSEXP) {
14-
15-
static tbb::task_scheduler_init* s_pTaskScheduler = NULL;
16-
if (s_pTaskScheduler != NULL)
17-
return Rf_ScalarLogical(0);
18-
19-
int numThreads = Rf_asInteger(numThreadsSEXP);
20-
int stackSize = Rf_asInteger(stackSizeSEXP);
21-
22-
try
23-
{
24-
s_pTaskScheduler = new tbb::task_scheduler_init(numThreads, stackSize);
25-
}
26-
catch(const std::exception& e)
27-
{
28-
const char* fmt = "Error loading TBB: %s\n";
29-
Rf_error(fmt, e.what());
30-
}
31-
catch(...)
32-
{
33-
const char* fmt = "Error loading TBB: %s\n";
34-
Rf_error(fmt, "(Unknown error)");
35-
}
36-
37-
return Rf_ScalarLogical(1);
38-
}
39-
4011
extern "C" SEXP defaultNumThreads() {
4112
SEXP threadsSEXP = Rf_allocVector(INTSXP, 1);
4213
INTEGER(threadsSEXP)[0] = tbb::task_scheduler_init::default_num_threads();
@@ -47,10 +18,6 @@ extern "C" SEXP defaultNumThreads() {
4718

4819
#include <tthread/tinythread.h>
4920

50-
extern "C" SEXP setThreadOptions(SEXP numThreadsSEXP, SEXP stackSizeSEXP) {
51-
return R_NilValue;
52-
}
53-
5421
extern "C" SEXP defaultNumThreads() {
5522
SEXP threadsSEXP = Rf_allocVector(INTSXP, 1);
5623
INTEGER(threadsSEXP)[0] = tthread::thread::hardware_concurrency();

0 commit comments

Comments
 (0)