Skip to content

Commit 20353e3

Browse files
committed
use arena + task group to control num threads
1 parent 6c9139e commit 20353e3

File tree

4 files changed

+167
-23
lines changed

4 files changed

+167
-23
lines changed

inst/NEWS

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
RcppParallel 5.0.3 (UNRELEASED)
22
------------------------------------------------------------------------
3-
3+
* The parallelFor() and parallelReduce() functions gain the 'numThreads'
4+
argument, allowing one to limit the number of threads used for a
5+
particular computation.
46

57
RcppParallel 5.0.2
68
------------------------------------------------------------------------

inst/include/RcppParallel.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ namespace RcppParallel {
3131
inline void parallelFor(std::size_t begin,
3232
std::size_t end,
3333
Worker& worker,
34-
std::size_t grainSize = 1)
34+
std::size_t grainSize = 1,
35+
std::size_t numThreads = -1)
3536
{
3637
#if RCPP_PARALLEL_USE_TBB
3738
if (internal::backend() == internal::BACKEND_TBB)
38-
tbbParallelFor(begin, end, worker, grainSize);
39+
tbbParallelFor(begin, end, worker, grainSize, numThreads);
3940
else
4041
ttParallelFor(begin, end, worker, grainSize);
4142
#else
@@ -47,11 +48,12 @@ template <typename Reducer>
4748
inline void parallelReduce(std::size_t begin,
4849
std::size_t end,
4950
Reducer& reducer,
50-
std::size_t grainSize = 1)
51+
std::size_t grainSize = 1,
52+
std::size_t numThreads = -1)
5153
{
5254
#if RCPP_PARALLEL_USE_TBB
5355
if (internal::backend() == internal::BACKEND_TBB)
54-
tbbParallelReduce(begin, end, reducer, grainSize);
56+
tbbParallelReduce(begin, end, reducer, grainSize, numThreads);
5557
else
5658
ttParallelReduce(begin, end, reducer, grainSize);
5759
#else

inst/include/RcppParallel/TBB.h

Lines changed: 148 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,163 @@ struct TBBReducer
5050
Reducer* pSplitReducer_;
5151
Reducer& reducer_;
5252
};
53+
54+
class TBBParallelForExecutor
55+
{
56+
public:
5357

54-
} // anonymous namespace
58+
TBBParallelForExecutor(Worker& worker,
59+
std::size_t begin,
60+
std::size_t end,
61+
std::size_t grainSize)
62+
: worker_(worker),
63+
begin_(begin),
64+
end_(end),
65+
grainSize_(grainSize)
66+
{
67+
}
68+
69+
void operator()() const
70+
{
71+
TBBWorker tbbWorker(worker_);
72+
tbb::parallel_for(
73+
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
74+
tbbWorker
75+
);
76+
}
77+
78+
private:
79+
Worker& worker_;
80+
std::size_t begin_;
81+
std::size_t end_;
82+
std::size_t grainSize_;
83+
};
5584

85+
template <typename Reducer>
86+
class TBBParallelReduceExecutor
87+
{
88+
public:
89+
90+
TBBParallelReduceExecutor(Reducer& reducer,
91+
std::size_t begin,
92+
std::size_t end,
93+
std::size_t grainSize)
94+
: reducer_(reducer),
95+
begin_(begin),
96+
end_(end),
97+
grainSize_(grainSize)
98+
{
99+
}
100+
101+
void operator()() const
102+
{
103+
TBBReducer<Reducer> tbbReducer(reducer_);
104+
tbb::parallel_reduce(
105+
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
106+
tbbReducer
107+
);
108+
}
109+
110+
private:
111+
Reducer& reducer_;
112+
std::size_t begin_;
113+
std::size_t end_;
114+
std::size_t grainSize_;
115+
};
56116

57-
inline void tbbParallelFor(std::size_t begin, std::size_t end,
58-
Worker& worker, std::size_t grainSize = 1) {
117+
class TBBArenaParallelForExecutor
118+
{
119+
public:
59120

60-
TBBWorker tbbWorker(worker);
121+
TBBArenaParallelForExecutor(tbb::task_group& group,
122+
Worker& worker,
123+
std::size_t begin,
124+
std::size_t end,
125+
std::size_t grainSize)
126+
: group_(group),
127+
worker_(worker),
128+
begin_(begin),
129+
end_(end),
130+
grainSize_(grainSize)
131+
{
132+
}
61133

62-
tbb::parallel_for(tbb::blocked_range<size_t>(begin, end, grainSize),
63-
tbbWorker);
64-
}
134+
void operator()() const
135+
{
136+
TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_);
137+
group_.run_and_wait(executor);
138+
}
139+
140+
private:
141+
142+
tbb::task_group& group_;
143+
Worker& worker_;
144+
std::size_t begin_;
145+
std::size_t end_;
146+
std::size_t grainSize_;
147+
};
65148

66149
template <typename Reducer>
67-
inline void tbbParallelReduce(std::size_t begin, std::size_t end,
68-
Reducer& reducer, std::size_t grainSize = 1) {
150+
class TBBArenaParallelReduceExecutor
151+
{
152+
public:
153+
154+
TBBArenaParallelReduceExecutor(tbb::task_group& group,
155+
Reducer& reducer,
156+
std::size_t begin,
157+
std::size_t end,
158+
std::size_t grainSize)
159+
: group_(group),
160+
reducer_(reducer),
161+
begin_(begin),
162+
end_(end),
163+
grainSize_(grainSize)
164+
{
165+
}
166+
167+
void operator()() const
168+
{
169+
TBBParallelReduceExecutor<Reducer> executor(reducer_, begin_, end_, grainSize_);
170+
group_.run_and_wait(executor);
171+
}
69172

70-
TBBReducer<Reducer> tbbReducer(reducer);
173+
private:
174+
175+
tbb::task_group& group_;
176+
Reducer& reducer_;
177+
std::size_t begin_;
178+
std::size_t end_;
179+
std::size_t grainSize_;
180+
};
181+
182+
} // anonymous namespace
183+
184+
185+
inline void tbbParallelFor(std::size_t begin,
186+
std::size_t end,
187+
Worker& worker,
188+
std::size_t grainSize = 1,
189+
std::size_t numThreads = -1)
190+
{
191+
tbb::task_arena arena(numThreads == -1 ? tbb::task_arena::automatic : numThreads);
192+
tbb::task_group group;
193+
194+
TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize);
195+
arena.execute(executor);
196+
}
197+
198+
template <typename Reducer>
199+
inline void tbbParallelReduce(std::size_t begin,
200+
std::size_t end,
201+
Reducer& reducer,
202+
std::size_t grainSize = 1,
203+
std::size_t numThreads = -1)
204+
{
205+
tbb::task_arena arena(numThreads == -1 ? tbb::task_arena::automatic : numThreads);
206+
tbb::task_group group;
71207

72-
tbb::parallel_reduce(tbb::blocked_range<size_t>(begin, end, grainSize),
73-
tbbReducer);
208+
TBBArenaParallelReduceExecutor<Reducer> executor(group, reducer, begin, end, grainSize);
209+
arena.execute(executor);
74210
}
75211

76212
} // namespace RcppParallel

inst/include/RcppParallel/TinyThread.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,11 @@ std::vector<IndexRange> splitInputRange(const IndexRange& range,
102102
} // anonymous namespace
103103

104104
// Execute the Worker over the IndexRange in parallel
105-
inline void ttParallelFor(std::size_t begin, std::size_t end,
106-
Worker& worker, std::size_t grainSize = 1) {
107-
105+
inline void ttParallelFor(std::size_t begin,
106+
std::size_t end,
107+
Worker& worker,
108+
std::size_t grainSize = 1)
109+
{
108110
// split the work
109111
IndexRange inputRange(begin, end);
110112
std::vector<IndexRange> ranges = splitInputRange(inputRange, grainSize);
@@ -124,9 +126,11 @@ inline void ttParallelFor(std::size_t begin, std::size_t end,
124126

125127
// Execute the IWorker over the range in parallel then join results
126128
template <typename Reducer>
127-
inline void ttParallelReduce(std::size_t begin, std::size_t end,
128-
Reducer& reducer, std::size_t grainSize = 1) {
129-
129+
inline void ttParallelReduce(std::size_t begin,
130+
std::size_t end,
131+
Reducer& reducer,
132+
std::size_t grainSize = 1)
133+
{
130134
// split the work
131135
IndexRange inputRange(begin, end);
132136
std::vector<IndexRange> ranges = splitInputRange(inputRange, grainSize);

0 commit comments

Comments
 (0)