1111#include "CCDB/BasicCCDBManager.h"
1212#include <pthread.h>
1313#include <thread>
14- #include <mutex>
1514#endif
1615#include "populateCCDB.C"
1716
18- using CcdbManager = o2 ::ccdb ::BasicCCDBManager ;
17+ using CcdbManager = o2 ::ccdb ::CCDBManagerInstance ;
18+
19+ std ::vector < std ::unique_ptr < CcdbManager >> ccdbManPool ;
20+ std ::vector < bool > ccdbManPoolFlag ;
21+ std ::decay_t < decltype (std ::chrono ::high_resolution_clock ::now ())> startTime {};
1922
2023// macro to populate the CCDB emulating the rates that we expect for
2124// Run 3, as read (in terms of size and rate) from an external file
2225
23- void retrieve (const std ::vector < CCDBObj > & objs , float procTimeSec , std ::mutex & mtx , std ::atomic < int > & n , size_t tfID );
26+ int getFreeCCDBManagerID ();
27+ void retrieve (const std ::vector < CCDBObj > & objs , float procTimeSec , int ccdbID , std ::atomic < int > & n , size_t tfID );
2428
2529void retrieveFromCCDB (int maxTFs = 8 , float procTimeSec = 10. ,
2630 const std ::string & fname = "cdbSizeV0.txt" ,
2731 const std ::string & ccdbHost = "http://localhost:8080" /*"http://ccdb-test.cern.ch:8080"*/ ,
2832 bool allowCaching = true)
2933{
30- auto & mgr = CcdbManager ::instance ();
31- mgr .setURL (ccdbHost .c_str ()); // or http://localhost:8080 for a local installation
32- mgr .setCaching (allowCaching );
34+ ccdbManPool .resize (1 + maxTFs );
35+ ccdbManPoolFlag .resize (ccdbManPool .size (), false); // nothig is used at the moment
36+ LOG (INFO ) << "Caching is " << (allowCaching ? "ON" : "OFF" ) ;
37+ for (auto& mgr : ccdbManPool ) {
38+ mgr = std ::make_unique < CcdbManager > (ccdbHost .c_str ());
39+ mgr -> setCaching (allowCaching );
40+ }
3341 auto objs = readObjectsList (fname );
3442 if (objs .empty ()) {
3543 return ;
3644 }
37- std ::mutex ccdb_mtx ;
45+ startTime = std ::chrono ::high_resolution_clock ::now ();
46+
3847 std ::atomic < int > nTFs {0 };
3948 size_t tfID = 0 ;
4049 while (1 ) {
4150 if (nTFs < maxTFs ) {
42- std ::thread th (retrieve , std ::cref (objs ), procTimeSec , std ::ref (ccdb_mtx ), std ::ref (nTFs ), tfID ++ );
51+ int ccdbID = getFreeCCDBManagerID ();
52+ if (ccdbID < 0 ) { // all slots are busy, wait
53+ continue ;
54+ }
55+ std ::thread th (retrieve , std ::cref (objs ), procTimeSec , ccdbID , std ::ref (nTFs ), tfID ++ );
4356 th .detach ();
4457 LOG (INFO ) << nTFs << " TFs currently in processing" ;
4558 } else {
@@ -48,28 +61,43 @@ void retrieveFromCCDB(int maxTFs = 8, float procTimeSec = 10.,
4861 }
4962}
5063
51- void retrieve (const std ::vector < CCDBObj > & objs , float procTimeSec , std :: mutex & mtx , std ::atomic < int > & n , size_t tfID )
64+ void retrieve (const std ::vector < CCDBObj > & objs , float procTimeSec , int ccdbID , std ::atomic < int > & n , size_t tfID )
5265{
5366 // function to retrieve the CCDB objects and wait some (TF processing) time
5467 // to avoid all treads starting to read at the same time, we randomize the time of reading within the allocated processing time
55-
68+ auto mgr = ccdbManPool [ ccdbID ]. get ();
5669 n ++ ;
57- auto& mgr = CcdbManager ::instance ();
5870 float tfrac = gRandom -> Rndm ();
5971 usleep (long (procTimeSec * tfrac * 1e6 ));
72+ auto ret_start = std ::chrono ::high_resolution_clock ::now ();
6073 for (auto & o : objs ) {
61- auto now = std ::chrono ::system_clock ::now ();
74+ auto now = std ::chrono ::high_resolution_clock ::now ();
6275 auto now_ms = std ::chrono ::time_point_cast < std ::chrono ::milliseconds > (now );
6376 auto timeStamp = now_ms .time_since_epoch ();
64- mtx .lock ();
65- std ::vector < uint8_t > * ob = mgr .getForTimeStamp < std ::vector < uint8_t >>(o .path , timeStamp .count ());
66- mtx .unlock ();
67- LOG (INFO ) << "Retrieved object " << o .path << " of size " << ob -> size () << " Bytes"
68- << " for TF " << tfID ;
69- if (!mgr .isCachingEnabled ()) { // we can delete object only when caching is disabled
77+ std ::vector < uint8_t > * ob = mgr -> getForTimeStamp < std ::vector < uint8_t >>(o .path , timeStamp .count ());
78+ LOG (DEBUG ) << "Retrieved object " << o .path << " of size " << ob -> size () << " Bytes"
79+ << " for TF " << tfID ;
80+ if (!mgr -> isCachingEnabled ()) { // we can delete object only when caching is disabled
7081 delete ob ;
7182 }
7283 }
84+ auto ret_end = std ::chrono ::high_resolution_clock ::now ();
7385 usleep (long (procTimeSec * (1. - tfrac ) * 1e6 ));
86+ std ::chrono ::duration < double , std ::ratio < 1 , 1 >> elapsedSeconds = std ::chrono ::high_resolution_clock ::now () - startTime ;
87+ std ::chrono ::duration < double , std ::ratio < 1 , 1 >> retTime = ret_end - ret_start ;
88+ LOG (INFO ) << "Finished TF " << tfID << " elapsed time " << elapsedSeconds .count () << " s., CCDB query time: " << retTime .count () << " s." ;
7489 n -- ;
90+ ccdbManPoolFlag [ccdbID ] = false; // release the manager
91+ }
92+
93+ int getFreeCCDBManagerID ()
94+ {
95+ /// get ID of 1st CCDBManager not used by any thread
96+ for (unsigned i = 0 ; i < ccdbManPoolFlag .size (); i ++ ) {
97+ if (!ccdbManPoolFlag [i ]) {
98+ ccdbManPoolFlag [i ] = true;
99+ return i ;
100+ }
101+ }
102+ return -1 ;
75103}
0 commit comments