2828namespace
2929{
3030
31- using namespace o2 ::calibration;
32- using DPID = o2::dcs::DataPointIdentifier;
31+ using DPID = o2::dcs::DataPointIdentifier; // aka alias name
3332using DPVAL = o2::dcs::DataPointValue;
3433using DPMAP = std::unordered_map<DPID, std::vector<DPVAL>>;
3534
35+ using namespace o2 ::calibration;
3636std::vector<o2::framework::OutputSpec> calibrationOutputs{
3737 o2::framework::ConcreteDataTypeMatcher{Utils::gDataOriginCLB , Utils::gDataDescriptionCLBPayload },
3838 o2::framework::ConcreteDataTypeMatcher{Utils::gDataOriginCLB , Utils::gDataDescriptionCLBInfo }};
3939
4040std::array<DPMAP, 2 > dataPoints;
4141int t0{-1 };
4242
43+ /*
44+ * Create a default CCDB Object Info that will be used as a template.
45+ *
46+ * @param path describes the CCDB data path used (e.g. MCH/HV)
47+ *
48+ * The start and end validity times are supposed to be updated from this template,
49+ * as well as the metadata (if needed). The rest of the information should
50+ * be taken as is.
51+ */
4352o2::ccdb::CcdbObjectInfo createDefaultInfo (const char * path)
4453{
4554 DPMAP obj;
@@ -58,30 +67,47 @@ o2::ccdb::CcdbObjectInfo createDefaultInfo(const char* path)
5867
5968std::array<o2::ccdb::CcdbObjectInfo, 2 > info{createDefaultInfo (" MCH/HV" ), createDefaultInfo (" MCH/LV" )};
6069
70+ /*
71+ * Send a DPMAP to the output.
72+ *
73+ * @param dpmap a map of string to vector of DataPointValue
74+ * @param output a DPL data allocator
75+ * @param info a CCDB object info describing the dpmap
76+ * @param reason (optional, can be empty) a string description why the dpmap
77+ * was ready to be shipped (e.g. big enough, long enough, end of process, etc...)
78+ */
6179void sendOutput (const DPMAP& dpmap, o2::framework::DataAllocator& output, o2::ccdb::CcdbObjectInfo info, const std::string& reason)
6280{
6381 if (dpmap.empty ()) {
64- // do not write empty objects
82+ // we do _not_ write empty objects
6583 return ;
6684 }
6785 auto md = info.getMetaData ();
6886 md[" upload reason" ] = reason;
6987 info.setMetaData (md);
7088 auto image = o2::ccdb::CcdbApi::createObjectImage (&dpmap, &info);
71- LOG (INFO ) << " Sending object " << info.getPath () << " /" << info.getFileName () << " of size " << image->size ()
72- << " bytes, valid for " << info.getStartValidityTimestamp () << " : " << info.getEndValidityTimestamp ();
89+ LOG (DEBUG ) << " Sending object " << info.getPath () << " /" << info.getFileName () << " of size " << image->size ()
90+ << " bytes, valid for " << info.getStartValidityTimestamp () << " : " << info.getEndValidityTimestamp ();
7391 output.snapshot (o2::framework::Output{Utils::gDataOriginCLB , Utils::gDataDescriptionCLBPayload , 0 }, *image.get ());
7492 output.snapshot (o2::framework::Output{Utils::gDataOriginCLB , Utils::gDataDescriptionCLBInfo , 0 }, info);
7593}
7694
95+ /*
96+ * Implementation of DPL end of stream callback.
97+ *
98+ * We send the remaining datapoints at the end of the processing.
99+ */
77100void endOfStream (o2::framework::EndOfStreamContext& eosc)
78101{
79- std::cout << " This is the end. Must write what we have left ?\n " ;
102+ LOG (DEBUG) << " This is the end. Must write what we have left ?\n " ;
80103 for (auto i = 0 ; i < 2 ; i++) {
81104 sendOutput (dataPoints[i], eosc.outputs (), info[i], " end of stream" );
82105 }
83106}
84107
108+ /*
109+ * Compute the (approximate) size (in KB) of a dpmap.
110+ */
85111size_t computeSize (const DPMAP& dpmap)
86112{
87113 constexpr int itemSize = 64 ; // DataPointIdentifier or DataPointValue have the same size = 64 bytes
@@ -95,6 +121,9 @@ size_t computeSize(const DPMAP& dpmap)
95121 return static_cast <size_t >(std::floor (nofItems * itemSize * byte2KB));
96122}
97123
124+ /*
125+ * Compute the duration (in seconds) span by the datapoints in the dpmap.
126+ */
98127int computeDuration (const DPMAP& dpmap)
99128{
100129 uint64_t minTime{std::numeric_limits<uint64_t >::max ()};
@@ -109,6 +138,17 @@ int computeDuration(const DPMAP& dpmap)
109138 return static_cast <int >((maxTime - minTime) / 1000 );
110139}
111140
141+ /*
142+ * Decides whether or not the dpmap should be sent to the output.
143+ *
144+ * @param maxSize if the dpmap size is above this size,
145+ * then it should go to output
146+ * @param maxDuration if the dpmap spans more than this duration,
147+ * then it should go to output
148+ *
149+ * @returns a boolean stating if the dpmap should be output and a string
150+ * describing why it should be output.
151+ */
112152std::tuple<bool , std::string> needOutput (const DPMAP& dpmap, int maxSize, int maxDuration)
113153{
114154 std::string reason;
@@ -146,6 +186,21 @@ o2::ccdb::CcdbObjectInfo addTFInfo(o2::ccdb::CcdbObjectInfo inf,
146186 return inf;
147187}
148188
189+ /*
190+ * Process the datapoints received.
191+ *
192+ * The datapoints are accumulated into two DPMAPs (map from alias names to
193+ * vector of DataPointValue) : one for HV values and one for LV values.
194+ * If the DPMAPs satisfy certain conditions (@see needOutput) they
195+ * are sent to the output.
196+ *
197+ * @param aliases an array of 2 vectors of aliases (one for HV values, one
198+ * for LV values)
199+ * @param maxSize an array of two values for the maxsizes of the HV and LV
200+ * values respectively
201+ * @param maxDuration an array of two values for the max durations of the HV and LV
202+ * values respectively
203+ */
149204void processDataPoints (o2::framework::ProcessingContext& pc,
150205 std::array<std::vector<std::string>, 2 > aliases,
151206 std::array<int , 2 > maxSize,
@@ -176,11 +231,19 @@ void processDataPoints(o2::framework::ProcessingContext& pc,
176231 }
177232}
178233
234+ /*
235+ * Creates the main processing function.
236+ *
237+ * @param ic InitContext which is used to get the options and set the end of
238+ * stream callback
239+ */
179240o2::framework::AlgorithmSpec::ProcessCallback createProcessFunction (o2::framework::InitContext& ic)
180241{
181242 auto & callbacks = ic.services ().get <o2::framework::CallbackService>();
182243 callbacks.set (o2::framework::CallbackService::Id::EndOfStream, endOfStream);
183244
245+ // the aliases arrays contain all the names of the MCH data points
246+ // we are interested to transit to the CCDB
184247 std::array<std::vector<std::string>, 2 > aliases = {
185248 o2::mch::dcs::aliases ({o2::mch::dcs::MeasurementType::HV_V,
186249 o2::mch::dcs::MeasurementType::HV_I}),
@@ -205,6 +268,14 @@ o2::framework::AlgorithmSpec::ProcessCallback createProcessFunction(o2::framewor
205268 };
206269}
207270
271+ /* Helper function to create a ConfigParamSpec option object.
272+ *
273+ * @param name is either 'size' or 'duration'
274+ * @param value is the default value to be used (i.e. when the option is not
275+ * specified on the command line)
276+ * @param what is either 'hv' or 'lv'
277+ * @param unit is the unit in which the values are given
278+ */
208279o2::framework::ConfigParamSpec whenToSendOption (const char * name, int value,
209280 const char * what, const char * unit)
210281{
@@ -228,6 +299,21 @@ using o2::framework::ConfigContext;
228299using o2::framework::DataProcessorSpec;
229300using o2::framework::WorkflowSpec;
230301
302+ /* *
303+ * DPL Workflow to process MCH DCS data points.
304+ *
305+ * The expected input is a vector of DataPointCompositeObject containing
306+ * only MCH data points.
307+ *
308+ * Those datapoints are accumulated into two DPMAPs (map from alias names to
309+ * vector of DataPointValue) : one for HV values and one for LV values.
310+ *
311+ * The accumulated DPMAP are sent to the output whenever :
312+ * - they reach a given size (--hv-max-size and --lv-max-size options)
313+ * - they span a given duration (--hv-max-duration and --lv-max-duration options)
314+ * - the workflow is ended
315+ *
316+ */
231317WorkflowSpec defineDataProcessing (ConfigContext const & configcontext)
232318{
233319 DataProcessorSpec dcsProcessor;
0 commit comments