Skip to content

Commit c570733

Browse files
knopers8ktf
authored andcommitted
Improve Data Sampling infrastructure generation
1 parent 548ed10 commit c570733

13 files changed

+310
-477
lines changed

Framework/Core/include/Framework/DataSampling.h

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,73 +12,76 @@
1212
#define FRAMEWORK_DATASAMPLING_H
1313

1414
/// \file DataSampling.h
15-
/// \brief Definition of O2 Data Sampling, v0.1
15+
/// \brief Definition of O2 Data Sampling, v1.0
1616
///
1717
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1818

19-
#include <functional>
20-
#include <random>
2119
#include <string>
22-
#include <vector>
2320

24-
#include "Framework/AlgorithmSpec.h"
25-
#include "Framework/DataChunk.h"
26-
#include "Framework/DataProcessorSpec.h"
2721
#include "Framework/WorkflowSpec.h"
28-
29-
#include "Framework/Dispatcher.h"
30-
#include "Framework/DispatcherDPL.h"
31-
#include "Framework/DispatcherFairMQ.h"
32-
#include "Framework/DispatcherFlpProto.h"
33-
#include "Framework/DataSamplingConfig.h"
22+
#include "Framework/CompletionPolicy.h"
23+
#include "Framework/DataSamplingPolicy.h"
24+
#include "Framework/ChannelConfigurationPolicy.h"
3425

3526
namespace o2
3627
{
3728
namespace framework
3829
{
3930

40-
using namespace o2::framework::DataSamplingConfig;
41-
4231
/// A class responsible for providing data from main processing flow to QC tasks.
4332
///
44-
/// This class generates message-passing infrastructure to provide desired amount of data to Quality Control tasks.
45-
/// QC tasks input data should be declared in config file (e.g. O2/Framework/Core/test/exampleDataSamplerConfig.ini ).
46-
/// Data Sampling is based on Data Processing Layer, but supports also standard FairMQ devices by declaring external
47-
/// inputs/outputs in configuration file.
33+
/// This class generates message-passing infrastructure to provide desired amount of data to Quality Control tasks or
34+
/// any other clients. Data to be sampled is declared in DataSamplingPolicy'ies configuration file - an example can be
35+
/// found in O2/Framework/TestWorkflows/exampleDataSamplingConfig.json).
4836
///
4937
/// In-code usage:
5038
/// \code{.cxx}
51-
/// std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext &ctx)
39+
/// void customize(std::vector<CompletionPolicy>& policies)
40+
/// {
41+
/// DataSampling::CustomizeInfrastructure(policies);
42+
/// }
43+
///
44+
/// void customize(std::vector<ChannelConfigurationPolicy>& policies)
5245
/// {
46+
/// DataSampling::CustomizeInfrastructure(policies);
47+
/// }
48+
///
49+
/// #include "Framework/runDataProcessing.h"
5350
///
51+
/// std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext &ctx)
52+
/// {
53+
/// WorkflowSpec workflow;
5454
/// // <declaration of other DPL processors>
5555
///
56-
/// const std::string configurationFilePath = <absolute file path>;
57-
/// DataSampling::GenerateInfrastructure(workflow, configurationFilePath);
56+
/// const std::string configurationFilePath = <absolute file path>;
57+
/// DataSampling::GenerateInfrastructure(workflow, configurationFilePath);
5858
///
59+
/// return workflow;
5960
/// }
6061
/// \endcode
6162

63+
//todo: update docu
64+
//todo: clean header mess
65+
6266
class DataSampling
6367
{
6468
public:
65-
/// Deleted default constructor. This class is stateless.
69+
/// \brief Deleted default constructor. This class is stateless.
6670
DataSampling() = delete;
67-
/// Generates data sampling infrastructure.
71+
/// \brief Generates data sampling infrastructure.
6872
/// \param workflow DPL workflow with already declared data processors which provide data desired by
6973
/// QC tasks.
70-
/// \param configurationSource Path to configuration file.
71-
static void GenerateInfrastructure(WorkflowSpec& workflow, const std::string& configurationSource);
74+
/// \param policiesSource Path to configuration file.
75+
/// \param threads Number of dispatcher threads, that will handle the data
76+
static void GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads = 1);
77+
/// \brief Configures dispatcher to consume any data immediately.
78+
static void CustomizeInfrastructure(std::vector<CompletionPolicy>&);
79+
/// \brief Applies blocking/nonblocking data sampling configuration to the workflow.
80+
static void CustomizeInfrastructure(std::vector<ChannelConfigurationPolicy>&);
7281

7382
private:
74-
using SubSpecificationType = o2::header::DataHeader::SubSpecificationType;
75-
7683
// Internal functions, used by GenerateInfrastructure()
77-
static auto getEdgeMatcher(const QcTaskConfiguration& taskCfg);
78-
static std::unique_ptr<Dispatcher> createDispatcher(SubSpecificationType subSpec, const QcTaskConfiguration& taskCfg,
79-
InfrastructureConfig infCfg);
80-
static QcTaskConfigurations readQcTasksConfiguration(const std::string& configurationSource);
81-
static InfrastructureConfig readInfrastructureConfiguration(const std::string& configurationSource);
84+
static std::string dispatcherName();
8285
};
8386

8487
} // namespace framework

Framework/Core/include/Framework/DataSamplingPolicy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DataSamplingPolicy
4848
struct inputSpecEqual {
4949
bool operator()(const InputSpec& a, const InputSpec& b) const
5050
{
51-
// -1 means match all subSpec
51+
// -1 means 'match all subSpec'
5252
if (a.subSpec == -1 || b.subSpec == -1) {
5353
return a.description == b.description && a.origin == b.origin;
5454
} else {

0 commit comments

Comments
 (0)