Psyllid  v1.12.4
Project 8 Data Acquisisition Software
monarch3_wrap.hh
Go to the documentation of this file.
1 /*
2  * monarch3_wrap.hh
3  *
4  * Created on: Feb 11, 2016
5  * Author: nsoblath
6  *
7  * Monarch stages:
8  * - initialized
9  * - preparing
10  * - writing
11  * - finished
12  *
13  * Thread safety
14  * Thread-safe operations:
15  * - Initializing an egg file
16  * - Accessing the Monarch header (can only be done by one thread at a time)
17  * - Writing data to a file (handled by HDF5's internal thread safety)
18  * - Finishing an egg file
19  *
20  * Non-thread-safe operations:
21  * - Stream function calls are not thread-safe other than HDF5's internal thread safety.
22  * It is highly (highly highly) recommended that you only access a given stream from one thread.
23  */
24 
25 #ifndef PSYLLID_MONARCH3_WRAP_HH_
26 #define PSYLLID_MONARCH3_WRAP_HH_
27 
28 #include "M3Monarch.hh"
29 
30 #include "cancelable.hh"
31 
32 #include <future>
33 #include <map>
34 #include <memory>
35 #include <mutex>
36 
37 namespace psyllid
38 {
39 
40  enum class monarch_stage
41  {
42  initialized = 0,
43  preparing = 1,
44  writing = 2,
45  finished = 3
46  };
47  uint32_t to_uint( monarch_stage a_stage );
48  monarch_stage to_stage( uint32_t a_stage_uint );
49  std::ostream& operator<<( std::ostream& a_os, monarch_stage a_stage );
50 
52  typedef std::shared_ptr< monarch_wrapper > monarch_wrap_ptr;
53 
55  typedef std::shared_ptr< header_wrapper > header_wrap_ptr;
56 
58  typedef std::shared_ptr< stream_wrapper > stream_wrap_ptr;
59 
60  typedef std::chrono::time_point< std::chrono::steady_clock, std::chrono::nanoseconds > monarch_time_point_t;
61 
62  typedef std::unique_lock< std::mutex > unique_lock;
63 
64 
65  //***************************
66  // monarch_on_deck_manager
67  //***************************
68 
87  class monarch_on_deck_manager : public scarab::cancelable
88  {
89  public:
90  monarch_on_deck_manager( monarch_wrapper* a_monarch_wrap );
92 
93  const monarch3::Monarch3* od_ptr() const {return f_monarch_on_deck.get();}
94  const monarch3::Monarch3* tf_ptr() const {return f_monarch_to_finish.get();}
95 
97  bool pointers_empty() const;
99  bool mod_exists() const;
101  bool mtf_exists() const;
102 
104  void execute();
105 
107  void create_on_deck();
109  void clear_on_deck();
111  void finish_to_finish();
112 
114  void notify();
115 
117  void set_as_to_finish( std::shared_ptr< monarch3::Monarch3 >& a_monarch );
119  void get_on_deck( std::shared_ptr< monarch3::Monarch3 >& a_monarch );
120 
121  private:
122  void create_on_deck_nolock();
123  void finish_to_finish_nolock();
124 
126 
127  std::shared_ptr< monarch3::Monarch3 > f_monarch_on_deck;
128  std::shared_ptr< monarch3::Monarch3 > f_monarch_to_finish;
129  std::condition_variable f_od_condition;
130  std::condition_variable f_od_continue_condition;
131  std::mutex f_od_mutex;
132 
133  };
134 
135 
136  //*******************
137  // monarch_wrapper
138  //*******************
139 
152  class monarch_wrapper : public scarab::cancelable
153  {
154  public:
155  monarch_wrapper( const std::string& a_filename );
156  ~monarch_wrapper();
157 
159  unsigned get_and_increment_file_count() const;
160 
162  header_wrap_ptr get_header();
164  const header_wrap_ptr get_header() const;
165 
168  stream_wrap_ptr get_stream( unsigned a_stream_no );
169 
171  void start_using();
172 
173  void execute_switch_loop();
174 
175  void trigger_switch();
176 
178  bool okay_to_write();
179 
183  void switch_to_new_file();
184 
186  void stop_using();
187 
189  void finish_stream( unsigned a_stream_no );
190 
192  void finish_file();
193 
194  monarch_time_point_t get_run_start_time() const;
195 
197  void set_stage( monarch_stage a_stage );
198 
200  void set_max_file_size( double a_size );
201 
204  void record_file_contribution( double a_size );
205 
206  private:
208 
209  void do_cancellation( int a_code );
210 
211  monarch_wrapper( const monarch_wrapper& ) = delete;
212  monarch_wrapper& operator=( const monarch_wrapper& ) = delete;
213 
214  std::string f_orig_filename;
215  std::string f_filename_base;
216  std::string f_filename_ext;
217  mutable unsigned f_file_count;
218 
220  std::atomic< double > f_file_size_est_mb;
221  std::condition_variable f_wait_to_write;
222  std::thread* f_switch_thread;
223  std::atomic< bool > f_ok_to_write;
224  std::atomic< bool > f_do_switch_flag;
225  std::condition_variable f_do_switch_trig;
226 
227  std::shared_ptr< monarch3::Monarch3 > f_monarch;
228  mutable std::mutex f_monarch_mutex;
229 
230  header_wrap_ptr f_header_wrap;
231 
232  std::map< unsigned, stream_wrap_ptr > f_stream_wraps;
233 
234  monarch_time_point_t f_run_start_time;
235 
237 
238  std::thread* f_od_thread;
240 
241  };
242 
243 
244  //******************
245  // header_wrapper
246  //******************
247 
264  {
265  public:
266  header_wrapper( monarch3::Monarch3& a_monarch );
267  header_wrapper( header_wrapper&& a_orig );
268  ~header_wrapper();
269 
270  header_wrapper& operator=( header_wrapper&& a_orig );
271 
273  monarch3::M3Header& header();
274 
276  monarch3::M3Header* ptr();
278  const monarch3::M3Header* ptr() const;
279 
281  unique_lock get_lock() const;
282 
283  private:
284  header_wrapper( const header_wrapper& ) = delete;
285  header_wrapper& operator=( const header_wrapper& ) = delete;
286 
287  friend class monarch_wrapper;
288 
289  monarch3::M3Header* f_header;
290  mutable std::mutex f_mutex;
291  };
292 
293 
294  //******************
295  // stream_wrapper
296  //******************
297 
315  {
316  public:
317  stream_wrapper( monarch3::Monarch3&, unsigned a_stream_no, monarch_wrapper* a_monarch_wrapper );
318  stream_wrapper( stream_wrapper&& a_orig );
319  ~stream_wrapper();
320 
321  stream_wrapper& operator=( stream_wrapper&& a_orig );
322 
323  bool is_valid() const;
324 
326  monarch3::M3Record* get_stream_record();
328  monarch3::M3Record* get_channel_record( unsigned a_chan_no );
329 
331  bool write_record( monarch3::RecordIdType a_rec_id, monarch3::TimeType a_rec_time, const void* a_rec_block, uint64_t a_bytes, bool a_is_new_acq );
332 
333  private:
334  stream_wrapper( const stream_wrapper& ) = delete;
335  stream_wrapper& operator=( const stream_wrapper& ) = delete;
336 
337  friend class monarch_wrapper;
338 
340 
341  monarch3::M3Stream* f_stream;
343 
345  };
346 
347 
348  //***************************
349  // monarch_on_deck_manager
350  //***************************
351 
353  {
354  return ! f_monarch_on_deck && ! f_monarch_to_finish;
355  }
356 
358  {
359  return f_monarch_on_deck.operator bool();
360  }
361 
363  {
364  return f_monarch_to_finish.operator bool();
365  }
366 
368  {
369  f_od_condition.notify_one();
370  return;
371  }
372 
373  inline void monarch_on_deck_manager::set_as_to_finish( std::shared_ptr< monarch3::Monarch3 >& a_monarch )
374  {
375  f_od_mutex.lock();
376  f_monarch_to_finish.swap( a_monarch );
377  f_od_mutex.unlock();
378  return;
379  }
380  inline void monarch_on_deck_manager::get_on_deck( std::shared_ptr< monarch3::Monarch3 >& a_monarch )
381  {
382  f_od_mutex.lock();
383  a_monarch.swap( f_monarch_on_deck );
384  f_od_mutex.unlock();
385  return;
386  }
387 
389  {
390  f_od_mutex.lock();
391  if( ! f_monarch_on_deck ) create_on_deck_nolock();
392  f_od_mutex.unlock();
393  return;
394  }
395 
397  {
398  f_monarch_to_finish->FinishWriting();
399  f_monarch_to_finish.reset();
400  }
401 
402 
403  //*******************
404  // monarch_wrapper
405  //*******************
406 
407  inline const header_wrap_ptr monarch_wrapper::get_header() const
408  {
409  return f_header_wrap;
410  }
411 
412  inline monarch_time_point_t monarch_wrapper::get_run_start_time() const
413  {
414  return f_run_start_time;
415  }
416 
418  {
419  return f_file_count++;
420  }
421 
422  inline void monarch_wrapper::set_max_file_size( double a_size )
423  {
424  f_max_file_size_mb = a_size;
425  return;
426  }
427 
428  inline void monarch_wrapper::do_cancellation( int a_code )
429  {
430  f_monarch_od_manager.cancel( a_code );
431  return;
432  }
433 
434 
435  //******************
436  // header_wrapper
437  //******************
438 
439  inline monarch3::M3Header* header_wrapper::ptr()
440  {
441  return f_header;
442  }
443 
444  inline const monarch3::M3Header* header_wrapper::ptr() const
445  {
446  return f_header;
447  }
448 
449  inline unique_lock header_wrapper::get_lock() const
450  {
451  return unique_lock( f_mutex );
452  }
453 
454 
455  //******************
456  // stream_wrapper
457  //******************
458 
459  inline bool stream_wrapper::is_valid() const
460  {
461  return f_is_valid;
462  }
463 
464  inline monarch3::M3Record* stream_wrapper::get_stream_record()
465  {
466  return f_stream->GetStreamRecord();
467  }
468 
469  inline monarch3::M3Record* stream_wrapper::get_channel_record( unsigned a_chan_no )
470  {
471  return f_stream->GetChannelRecord( a_chan_no );
472  }
473 
474 } /* namespace psyllid */
475 
476 #endif /* PSYLLID_MONARCH3_WRAP_HH_ */
std::shared_ptr< header_wrapper > header_wrap_ptr
monarch_time_point_t f_run_start_time
std::thread * f_switch_thread
std::condition_variable f_od_condition
std::shared_ptr< stream_wrapper > stream_wrap_ptr
monarch3::M3Stream * f_stream
std::map< unsigned, stream_wrap_ptr > f_stream_wraps
void set_max_file_size(double a_size)
Set the maximum file size used to determine when a new file is automatically started.
monarch3::M3Record * get_stream_record()
Get the pointer to the stream record.
std::unique_lock< std::mutex > unique_lock
std::atomic< bool > f_do_switch_flag
monarch3::M3Header * ptr()
Get M3Header pointer.
void notify()
Notify the manager to process its monarch objects if needed (asynchronous)
monarch3::M3Record * get_channel_record(unsigned a_chan_no)
Get the pointer to a particular channel record.
std::shared_ptr< monarch3::Monarch3 > f_monarch
Wrapper class for a monarch3::M3Header object.
header_wrap_ptr get_header()
Returns the header wrapped in a header_wrap_ptr to be filled at the beginning of file writing...
monarch_wrapper * f_monarch_wrapper
Handles asynchronous creation of on-deck monarch files and finishing of completed files...
bool pointers_empty() const
Return true if both f_monarch_on_deck and f_monarch_to_finish are empty.
std::condition_variable f_wait_to_write
Wrapper class for a monarch3::M3Stream object.
std::ostream & operator<<(std::ostream &a_os, monarch_stage a_stage)
unique_lock get_lock() const
Lock the header mutex and return a unique_lock object.
const monarch_wrapper * f_monarch_wrap
std::shared_ptr< monarch_wrapper > monarch_wrap_ptr
monarch_on_deck_manager f_monarch_od_manager
monarch_time_point_t get_run_start_time() const
const monarch3::Monarch3 * od_ptr() const
const monarch3::Monarch3 * tf_ptr() const
std::condition_variable f_do_switch_trig
std::condition_variable f_od_continue_condition
bool mod_exists() const
Return true if f_monarch_on_deck exists.
void do_cancellation(int a_code)
monarch_stage to_stage(uint32_t a_stage_uint)
std::atomic< bool > f_ok_to_write
unsigned get_and_increment_file_count() const
As it says, return the current value to, and then increment, the file count.
void set_as_to_finish(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Give a monarch object to the on-deck manager with the intent that it be finished asynchronously.
std::chrono::time_point< std::chrono::steady_clock, std::chrono::nanoseconds > monarch_time_point_t
Wrapper class for a monarch3::M3Monarch object.
void create_on_deck()
Create the on-deck monarch object if it doesn&#39;t exist already (synchronous)
std::shared_ptr< monarch3::Monarch3 > f_monarch_on_deck
bool mtf_exists() const
Return true if f_monarch_to_finish exists.
uint32_t to_uint(monarch_stage a_stage)
void get_on_deck(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Get the on-deck monarch object that has been created asynchronously.
std::atomic< double > f_file_size_est_mb
std::shared_ptr< monarch3::Monarch3 > f_monarch_to_finish
monarch3::M3Header * f_header
header_wrap_ptr f_header_wrap