Skip to content

Commit 8a345d5

Browse files
knopers8ktf
authored andcommitted
Readme and cleanup of data sampling
1 parent fd342d2 commit 8a345d5

17 files changed

+229
-267
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ set(SRCS
4444
src/DeviceSpec.cxx
4545
src/DeviceSpecHelpers.cxx
4646
src/DDSConfigHelpers.cxx
47+
src/Dispatcher.cxx
4748
src/DriverControl.cxx
4849
src/DriverInfo.cxx
4950
src/FairOptionsRetriever.cxx
@@ -60,7 +61,6 @@ set(SRCS
6061
src/TextControlService.cxx
6162
src/TableBuilder.cxx
6263
src/TableConsumer.cxx
63-
src/WorkflowDispatcher.cxx
6464
src/WorkflowHelpers.cxx
6565
src/WorkflowSpec.cxx
6666
src/runDataProcessing.cxx
@@ -145,10 +145,10 @@ set(HEADERS
145145
include/Framework/DataSamplingConditionFactory.h
146146
include/Framework/DataSamplingReadoutAdapter.h
147147
include/Framework/DataSamplingPolicy.h
148+
include/Framework/Dispatcher.h
148149
include/Framework/DPLBoostSerializer.h
149150
include/Framework/TableBuilder.h
150151
include/Framework/FairMQResizableBuffer.h
151-
include/Framework/WorkflowDispatcher.h
152152
src/ComputingResource.h
153153
src/DDSConfigHelpers.h
154154
src/DeviceSpecHelpers.h
@@ -200,7 +200,7 @@ O2_GENERATE_EXECUTABLE(
200200
target_compile_options(Framework PUBLIC -O0 -g -fno-omit-frame-pointer)
201201
target_compile_options(test_SimpleDataProcessingDevice01 PUBLIC -O0 -g -fno-omit-frame-pointer)
202202

203-
Install(FILES test/test_DataSamplingDPL.json test/test_DataSamplingFairMQ.json DESTINATION share/tests/)
203+
Install(FILES test/test_DataSampling.json DESTINATION share/tests/)
204204

205205
set(TEST_SRCS
206206
test/test_AlgorithmSpec.cxx
@@ -247,7 +247,6 @@ set(TEST_SRCS
247247
test/test_TimeParallelPipelining.cxx
248248
test/test_TypeTraits.cxx
249249
test/test_Variants.cxx
250-
# test/test_WorkflowDispatcher.cxx
251250
test/test_WorkflowHelpers.cxx
252251
)
253252

Framework/Core/README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,102 @@ There is also a few demonstrator available in particular:
437437
- [MillWheel: Fault-Tolerant Stream Processing at Internet Scale](https://research.google.com/pubs/pub41378.html) : paper about Google previous generation system for stream processing
438438
- [Concord](http://concord.io) : Similar (to the above) stream processing solution, OpenSource.
439439
440+
## Data Sampling
440441
442+
Data Sampling provides possibility to sample data in DPL workflows, basing on certain conditions ( 5% randomly, when payload is greater than 4234 bytes, etc.). The job of passing the right data is done by a data processor called `Dispatcher`. A desired data stream is specified in form of Data Sampling Policies, configured by JSON structures (example below).
443+
```
444+
{
445+
"id": "policy_example1", # name of the policy
446+
"active": "false", # activation flag
447+
"machines": [ # list of machines where the policy should be run (now ignored)
448+
"aido2flp1",
449+
"aido2flp2"
450+
],
451+
"dataHeaders": [ # list of data that should be sampled
452+
{
453+
"binding": "clusters", # binding of the data in InputRecord
454+
"dataOrigin": "TPC", # data origin in DataHeader
455+
"dataDescription": "CLUSTERS" # data description in DataHeader
456+
},
457+
{
458+
"binding": "tracks",
459+
"dataOrigin": "TPC",
460+
"dataDescription": "TRACKS"
461+
}
462+
],
463+
"subSpec": "0", # subspecification in DataHeader, use -1 for all
464+
"samplingConditions": [ # list of sampling conditions
465+
{
466+
"condition": "random", # condition type
467+
"fraction": "0.1", # condition-dependent parameter: fraction of data to sample
468+
"seed": "2112" # condition-dependent parameter: seed of PRNG
469+
}
470+
],
471+
"blocking": "false" # should the dispatcher block the main data flow? (now ignored)
472+
}
473+
```
474+
475+
### Usage
476+
477+
To use Data Sampling in a DPL workflow insert following lines to your code:
478+
```
479+
#include "Framework/DataSampling.h"
480+
using namespace o2::framework;
481+
void customize(std::vector<CompletionPolicy>& policies)
482+
{
483+
DataSampling::CustomizeInfrastructure(policies);
484+
}
485+
486+
void customize(std::vector<ChannelConfigurationPolicy>& policies)
487+
{
488+
DataSampling::CustomizeInfrastructure(policies);
489+
}
490+
491+
#include "Framework/runDataProcessing.h"
492+
493+
std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext &ctx)
494+
{
495+
496+
WorkflowSpec workflow;
497+
// <declaration of other DPL processors>
498+
499+
DataSampling::GenerateInfrastructure(workflow, "json:///absolute/path/to/config/file.json");
500+
501+
return workflow;
502+
}
503+
```
504+
505+
Sampled data can be subscribed to by adding `InputSpecs` provided by `std::vector<InputSpec> DataSampling::InputSpecsForPolicy(const std::string& policiesSource, const std::string& policyName)` to a chosen data processor. Then, they can be accessed by the bindings specified in the configuration file.
506+
507+
[dataSamplingPodAndRoot](https://github.com/AliceO2Group/AliceO2/blob/dev/Framework/TestWorkflows/src/dataSamplingPodAndRoot.cxx) workflow can serve as usage example.
508+
509+
## Data Sampling Conditions
510+
511+
The following sampling conditions are available. When more than one is used, a positive decision is taken when all the conditions are fulfilled.
512+
- **DataSamplingConditionRandom** - pseudo-randomly accepts specified fraction of incoming messages.
513+
```
514+
{
515+
"condition": "random",
516+
"fraction": "0.1",
517+
"seed": "22222"
518+
}
519+
```
520+
- **DataSamplingConditionNConsecutive** - approves n consecutive samples in defined cycle. It assumes that timesliceID always increments by one.
521+
```
522+
{
523+
"condition": "nConsecutive",
524+
"samplesNumber": "3",
525+
"cycleSize": "100"
526+
}
527+
```
528+
- **DataSamplingConditionPayloadSize** - approves messages having payload size within specified boundaries.
529+
```
530+
{
531+
"condition": "payloadSize",
532+
"lowerLimit": "300",
533+
"upperLimit": "500"
534+
}
535+
```
441536
## Document history
442537
443538
* v0.9: proposal for approval at the O2 TB - 19th June 2018

Framework/Core/include/Framework/DataSampling.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
///
1717
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1818

19-
#include <string>
20-
2119
#include "Framework/WorkflowSpec.h"
2220
#include "Framework/CompletionPolicy.h"
23-
#include "Framework/DataSamplingPolicy.h"
2421
#include "Framework/ChannelConfigurationPolicy.h"
22+
#include "Framework/InputSpec.h"
23+
24+
#include <Configuration/ConfigurationInterface.h>
25+
26+
#include <string>
2527

2628
namespace o2
2729
{
@@ -60,9 +62,6 @@ namespace framework
6062
/// }
6163
/// \endcode
6264

63-
//todo: update docu
64-
//todo: clean header mess
65-
6665
class DataSampling
6766
{
6867
public:

Framework/Core/include/Framework/DataSamplingCondition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1818

1919
#include "Framework/DataRef.h"
20+
2021
#include <boost/property_tree/ptree.hpp>
2122

2223
namespace o2

Framework/Core/include/Framework/DataSamplingConfig.h

Lines changed: 0 additions & 70 deletions
This file was deleted.

Framework/Core/include/Framework/DataSamplingPolicy.h

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616
///
1717
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1818

19-
#include <mutex>
20-
#include <boost/property_tree/ptree.hpp>
21-
2219
#include "Headers/DataHeader.h"
2320
#include "Framework/InputSpec.h"
2421
#include "Framework/Output.h"
2522
#include "Framework/OutputSpec.h"
2623
#include "Framework/DataSamplingCondition.h"
2724

25+
#include <boost/property_tree/ptree.hpp>
26+
2827
namespace o2
2928
{
3029
namespace framework
@@ -37,12 +36,13 @@ namespace framework
3736
class DataSamplingPolicy
3837
{
3938
private:
39+
// todo: see if dpl matchers can be used here instead of this strange construction
4040
struct inputSpecHasher {
4141
size_t operator()(const InputSpec& i) const
4242
{
4343
return (static_cast<size_t>(i.description.itg[0]) << 32 |
44-
static_cast<size_t>(i.description.itg[1])) ^
45-
static_cast<size_t>(i.origin.itg[0]);
44+
static_cast<size_t>(i.description.itg[1])) ^
45+
static_cast<size_t>(i.origin.itg[0]);
4646
}
4747
};
4848
struct inputSpecEqual {
@@ -76,7 +76,6 @@ class DataSamplingPolicy
7676
const Output prepareOutput(const InputSpec&) const;
7777

7878
const std::string& getName() const;
79-
// const std::vector<InputSpec>& getInputs() const;
8079
const PathMap& getPathMap() const;
8180
const header::DataHeader::SubSpecificationType getSubSpec() const;
8281
// optional fairmq channel to send stuff outside of DPL
@@ -92,8 +91,6 @@ class DataSamplingPolicy
9291
header::DataHeader::SubSpecificationType mSubSpec;
9392
std::vector<std::unique_ptr<DataSamplingCondition>> mConditions;
9493
std::string mFairMQOutputChannel;
95-
96-
std::mutex mDecisionMutex;
9794
};
9895

9996
} // namespace framework

Framework/Core/include/Framework/WorkflowDispatcher.h renamed to Framework/Core/include/Framework/Dispatcher.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
1010

11-
/// \file WorkflowDispatcher.h
12-
/// \brief Declaration of WorkflowDispatcher for O2 Data Sampling
11+
/// \file Dispatcher.h
12+
/// \brief Declaration of Dispatcher for O2 Data Sampling
1313
///
1414
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1515

16-
#ifndef ALICEO2_WORKFLOWDISPATCHER_H
17-
#define ALICEO2_WORKFLOWDISPATCHER_H
16+
#ifndef ALICEO2_DISPATCHER_H
17+
#define ALICEO2_DISPATCHER_H
1818

1919
#include "Framework/DataProcessorSpec.h"
2020
#include "Framework/DataSamplingPolicy.h"
@@ -25,13 +25,13 @@ namespace o2
2525
namespace framework
2626
{
2727

28-
class WorkflowDispatcher : public Task
28+
class Dispatcher : public Task
2929
{
3030
public:
3131
/// \brief Constructor
32-
WorkflowDispatcher(const std::string name = std::string(), const std::string reconfSource = std::string());
32+
Dispatcher(const std::string name, const std::string reconfigurationSource);
3333
/// \brief Destructor
34-
~WorkflowDispatcher();
34+
~Dispatcher();
3535

3636
/// \brief Dispatcher init callback
3737
void init(InitContext& ctx) override;
@@ -60,4 +60,4 @@ class WorkflowDispatcher : public Task
6060
} // namespace framework
6161
} // namespace o2
6262

63-
#endif //ALICEO2_WORKFLOWDISPATCHER_H
63+
#endif //ALICEO2_DISPATCHER_H

0 commit comments

Comments
 (0)