Psyllid  v1.12.4
Project 8 Data Acquisisition Software
monarch3_wrap.cc
Go to the documentation of this file.
1 /*
2  * monarch3_wrap.cc
3  *
4  * Created on: Feb 11, 2016
5  * Author: nsoblath
6  */
7 
8 #include "butterfly_house.hh"
9 #include "monarch3_wrap.hh"
10 
11 #include "psyllid_constants.hh"
12 #include "psyllid_error.hh"
13 
14 #include "M3Exception.hh"
15 
16 #include "logger.hh"
17 #include "signal_handler.hh"
18 
19 #include <boost/filesystem.hpp>
20 
21 #include <future>
22 #include <signal.h>
23 
24 
25 namespace psyllid
26 {
27  LOGGER( plog, "monarch3_wrap" );
28 
29 
30  uint32_t to_uint( monarch_stage a_stage )
31  {
32  return static_cast< uint32_t >( a_stage );
33  }
34  monarch_stage to_op_t( uint32_t a_stage_uint )
35  {
36  return static_cast< monarch_stage >( a_stage_uint );
37  }
38  std::ostream& operator<<( std::ostream& a_os, monarch_stage a_stage )
39  {
40  return a_os << to_uint( a_stage );
41  }
42 
43 
44  //***************************
45  // monarch_on_deck_manager
46  //***************************
47 
49  scarab::cancelable(),
50  f_monarch_wrap( a_monarch_wrap ),
51  f_monarch_on_deck(),
52  f_od_condition(),
53  f_od_continue_condition(),
54  f_od_mutex()
55  {}
56 
58  {}
59 
61  {
62  LINFO( plog, "Monarch-on-deck manager for file <" << f_monarch_wrap->get_header()->header().Filename() << "> is starting up" );
63 
64  while( ! is_canceled() && f_monarch_wrap->f_stage != monarch_stage::finished )
65  {
66  unique_lock t_od_lock( f_od_mutex );
67 
68  // wait on the condition variable
69  f_od_condition.wait_for( t_od_lock, std::chrono::milliseconds( 500 ) );
70 
71  // this particular setup of the while on t_do_wait and the if-elseif-else structure
72  // is used so that any of the actions can be taken in any order without going through waiting on the condition.
73  bool t_do_wait = false;
74  try
75  {
76  while( ! t_do_wait )
77  {
79  {
80  // then we need to finish a file
81  // finish the old file
82  LDEBUG( plog, "Finishing pre-existing to-finish file" );
84  }
85  else if( ! f_monarch_on_deck )
86  {
87  // then we need to make a new on-deck monarch
88  unique_lock t_header_lock( f_monarch_wrap->get_header()->get_lock() );
90  }
91  else
92  {
93  t_do_wait = true;
94  }
95  } // end while( ! to_do_wait )
96  }
97  catch( std::exception& e )
98  {
99  LERROR( plog, "Exception caught in monarch-on-deck manager: " << e.what() );
100  scarab::signal_handler::cancel_all( RETURN_ERROR );
101  }
102  } // end while( ! is_canceled() && f_monarch_wrap->f_stage != monarch_stage::finished )
103 
104  LINFO( plog, "Monarch-on-deck manager is stopping" );
105 
106  return;
107  }
108 
110  {
111  try
112  {
113  LDEBUG( plog, "Creating a new on-deck monarch" );
114 
115  // create the new filename
116  std::stringstream t_count_stream;
117  t_count_stream << f_monarch_wrap->get_and_increment_file_count();
118  std::string t_new_filename = f_monarch_wrap->f_filename_base + '_' + t_count_stream.str() + f_monarch_wrap->f_filename_ext;
119  LDEBUG( plog, "On-deck filename: <" << t_new_filename << ">" );
120 
121  // open the new file
122  std::shared_ptr< monarch3::Monarch3 > t_new_monarch;
123  try
124  {
125  t_new_monarch.reset( monarch3::Monarch3::OpenForWriting( t_new_filename ) );
126  LTRACE( plog, "New file is open" );
127  }
128  catch( monarch3::M3Exception& e )
129  {
130  throw error() << "Unable to open the file <" << t_new_filename << "\n" <<
131  "Reason: " << e.what();
132  }
133 
134  // copy info into the new header
135  const header_wrap_ptr t_old_header_ptr = f_monarch_wrap->get_header();
136  monarch3::M3Header* t_new_header = t_new_monarch->GetHeader();
137  t_new_header->CopyBasicInfo( t_old_header_ptr->header() );
138  t_new_header->Filename() = t_new_filename;
139  t_new_header->Description() = t_old_header_ptr->ptr()->Description() + "\nContinuation of file " + t_old_header_ptr->ptr()->Filename();
140 
141  // for each stream, create new stream in new file
142  std::vector< monarch3::M3StreamHeader >* t_old_stream_headers = &t_old_header_ptr->ptr()->GetStreamHeaders();
143  std::vector< unsigned > t_chan_vec;
144  for( unsigned i_stream = 0; i_stream != t_old_stream_headers->size(); ++i_stream )
145  {
146  //t_stream_it->second->lock();
147  t_chan_vec.clear();
148  monarch3::M3StreamHeader* t_old_stream_header = &t_old_stream_headers->operator[]( i_stream );
149  unsigned n_channels = t_old_stream_header->GetNChannels();
150  if( n_channels > 1 )
151  {
152  t_new_header->AddStream( t_old_stream_header->Source(), n_channels, t_old_stream_header->GetChannelFormat(),
153  t_old_stream_header->GetAcquisitionRate(), t_old_stream_header->GetRecordSize(), t_old_stream_header->GetSampleSize(),
154  t_old_stream_header->GetDataTypeSize(), t_old_stream_header->GetDataFormat(),
155  t_old_stream_header->GetBitDepth(), t_old_stream_header->GetBitAlignment(),
156  &t_chan_vec );
157  }
158  else
159  {
160  t_new_header->AddStream( t_old_stream_header->Source(),
161  t_old_stream_header->GetAcquisitionRate(), t_old_stream_header->GetRecordSize(), t_old_stream_header->GetSampleSize(),
162  t_old_stream_header->GetDataTypeSize(), t_old_stream_header->GetDataFormat(),
163  t_old_stream_header->GetBitDepth(), t_old_stream_header->GetBitAlignment(),
164  &t_chan_vec );
165  }
166  std::vector< monarch3::M3ChannelHeader >& t_old_chan_headers = t_old_header_ptr->ptr()->GetChannelHeaders();
167  std::vector< monarch3::M3ChannelHeader >& t_new_chan_headers = t_new_header->GetChannelHeaders();
168  for( unsigned i_chan = 0; i_chan < n_channels; ++i_chan )
169  {
170  t_new_chan_headers[ i_chan ].SetVoltageOffset( t_old_chan_headers[ i_chan ].GetVoltageOffset() );
171  t_new_chan_headers[ i_chan ].SetVoltageRange( t_old_chan_headers[ i_chan ].GetVoltageRange() );
172  t_new_chan_headers[ i_chan ].SetDACGain( t_old_chan_headers[ i_chan ].GetDACGain() );
173  t_new_chan_headers[ i_chan ].SetFrequencyMin( t_old_chan_headers[ i_chan ].GetFrequencyMin() );
174  t_new_chan_headers[ i_chan ].SetFrequencyRange( t_old_chan_headers[ i_chan ].GetFrequencyRange() );
175  }
176  }
177 
178  // write the new header
179  LTRACE( plog, "Writing new header" );
180  t_new_monarch->WriteHeader();
181 
182  // switch out the monarch pointers, f_monarch_on_deck <--> t_new_monarch
183  f_monarch_on_deck.swap( t_new_monarch );
184  }
185  catch(...)
186  {
187  throw;
188  }
189  return;
190  }
191 
193  {
194  f_od_mutex.lock();
195  if( f_monarch_on_deck )
196  {
197  std::string t_filename( f_monarch_on_deck->GetHeader()->Filename() );
198  try
199  {
200  LDEBUG( plog, "Closing on-deck file <" << t_filename << ">" );
201  f_monarch_on_deck.reset();
202  }
203  catch( monarch3::M3Exception& e )
204  {
205  LWARN( plog, "File could not be closed properly: " << e.what() );
206  }
207  try
208  {
209  LDEBUG( plog, "On-deck file was written out to <" << t_filename << ">; now removing the file" );
210  boost::filesystem::remove( t_filename );
211  }
212  catch( boost::filesystem::filesystem_error& e )
213  {
214  LWARN( plog, "File could not be removed: <" << t_filename << ">\n" << e.what() );
215  }
216  }
217  f_od_mutex.unlock();
218  return;
219  }
220 
222  {
223  f_od_mutex.lock();
224 
225  if( f_monarch_to_finish )
226  {
227  LDEBUG( plog, "Finishing to-finish file" );
229  }
230  f_od_mutex.unlock();
231  return;
232  }
233 
234 
235  //*******************
236  // monarch_wrapper
237  //*******************
238 
239  monarch_wrapper::monarch_wrapper( const std::string& a_filename ) :
240  f_orig_filename( a_filename ),
241  f_filename_base(),
242  f_filename_ext(),
243  f_file_count( 1 ),
244  f_max_file_size_mb( 0. ),
245  f_file_size_est_mb( 0. ),
246  f_wait_to_write(),
247  f_switch_thread( nullptr ),
248  f_ok_to_write( true ),
249  f_do_switch_flag( false ),
250  f_do_switch_trig(),
251  f_monarch(),
252  f_monarch_mutex(),
253  f_header_wrap(),
254  f_stream_wraps(),
255  f_run_start_time( std::chrono::steady_clock::now() ),
256  f_stage( monarch_stage::initialized ),
257  f_od_thread( nullptr ),
258  f_monarch_od_manager( this )
259  {
260  std::string::size_type t_ext_pos = a_filename.find_last_of( '.' );
261  if( t_ext_pos == std::string::npos )
262  {
264  }
265  else
266  {
267  f_filename_base = f_orig_filename.substr( 0, t_ext_pos );
268  f_filename_ext = f_orig_filename.substr( t_ext_pos );
269  }
270  LDEBUG( plog, "Monarch wrapper created with filename base <" << f_filename_base << "> and extension <" << f_filename_ext << ">" );
271 
272  unique_lock t_monarch_lock( f_monarch_mutex );
273  try
274  {
275  f_monarch.reset( monarch3::Monarch3::OpenForWriting( f_orig_filename ) );
276  }
277  catch( monarch3::M3Exception& e )
278  {
279  throw error() << "Unable to open the file <" << f_orig_filename << "\n" <<
280  "Reason: " << e.what();
281  }
282  }
283 
285  {
286  f_monarch_mutex.lock();
287 
289 
290  if( f_od_thread != nullptr )
291  {
292  if( f_od_thread->joinable() )
293  {
294  LWARN( plog, "Trying to destroy monarch_wrapper; waiting for on-deck thread to join" );
295  f_od_thread->join();
296  }
297  delete f_od_thread;
298  }
299 
300  if( f_switch_thread != nullptr )
301  {
302  if( f_switch_thread->joinable() )
303  {
304  LWARN( plog, "Trying to destroy monarch_wrapper; waiting for switch thread to join" );
305  f_switch_thread->join();
306  }
307  delete f_switch_thread;
308  }
309 
310  try
311  {
312  if( f_monarch )
313  {
314  f_monarch->FinishWriting();
315  f_monarch.reset();
316  }
317 
318  }
319  catch( monarch3::M3Exception& e )
320  {
321  LERROR( plog, "Unable to write file on monarch_wrapper deletion: " << e.what() );
322  }
323 
324  f_wait_to_write.notify_all();
325 
326  f_monarch_mutex.unlock();
327 
328  }
329 
331  {
332  unique_lock t_monarch_lock( f_monarch_mutex );
334  {
335  try
336  {
338  header_wrap_ptr t_header_ptr( new header_wrapper( *f_monarch.get() ) );
339  f_header_wrap = t_header_ptr;
340  }
341  catch( error& e )
342  {
343  throw;
344  }
345  }
346  if( f_stage != monarch_stage::preparing ) throw error() << "Invalid monarch stage for getting the (writeable) header: " << f_stage << "; use the const get_header() instead";
347 
348  return f_header_wrap;
349  }
350 
352  {
353  LDEBUG( plog, "Stream <" << a_stream_no << "> is being retrieved from the Monarch object" );
354 
355  unique_lock t_monarch_lock( f_monarch_mutex );
356 
357  if( f_stage != monarch_stage::writing ) throw error() << "Invalid monarch stage for getting a stream: " << f_stage;
358 
359  if( f_stream_wraps.find( a_stream_no ) == f_stream_wraps.end() )
360  {
361  try
362  {
363  stream_wrap_ptr t_stream_ptr( new stream_wrapper( *f_monarch.get(), a_stream_no, this ) );
364  f_stream_wraps[ a_stream_no ] = t_stream_ptr;
365  }
366  catch( error& e )
367  {
368  throw;
369  }
370  }
371  return f_stream_wraps[ a_stream_no ];
372  }
373 
375  {
376  if( f_stage != monarch_stage::preparing ) throw error() << "Invalid monarch stage for start-using: " << f_stage;
377 
379  {
380  throw error() << "One or more of the Monarch pointers is not empty";
381  }
382 
383  if( f_od_thread != nullptr )
384  {
385  throw error() << "On-deck thread already exists";
386  }
387 
388  unique_lock t_monarch_lock( f_monarch_mutex );
389 
390  unique_lock t_header_lock( f_header_wrap->get_lock() );
391 
392  LDEBUG( plog, "Writing the header for file <" << f_header_wrap->header().Filename() );
393  try
394  {
395  f_monarch->WriteHeader();
396  LDEBUG( plog, "Header written for file <" << f_header_wrap->header().Filename() << ">" );
397  }
398  catch( monarch3::M3Exception& e )
399  {
400  throw error() << e.what();
401  }
403 
404  t_header_lock.unlock();
405 
406 
407  // prepare file-switching components
408 
409  f_do_switch_flag = false;
410 
411  LDEBUG( plog, "Starting the switch thread for file <" << f_header_wrap->header().Filename() << ">" );
412  f_switch_thread = new std::thread( &monarch_wrapper::execute_switch_loop, this );
413 
414  // start the on-deck thread and assign it to the member variable for safe keeping
415  LDEBUG( plog, "Starting the on-deck thread for file <" << f_header_wrap->header().Filename() << ">" );
417 
418  // let the thread start up
419  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
420 
421  // release it to create the on-deck file
423 
424  return;
425  }
426 
428  {
429  LINFO( plog, "Monarch's execute-switch-loop for file <" << f_header_wrap->header().Filename() << "> is starting up" );
430 
431  while( ! is_canceled() && f_stage != monarch_stage::finished )
432  {
433  unique_lock t_lock( f_monarch_mutex );
434 
435  // wait on the condition variable
436  while( ! f_do_switch_flag.load() && ! is_canceled() )
437  {
438  f_do_switch_trig.wait_for( t_lock, std::chrono::milliseconds( 500 ) );
439  }
440  if( is_canceled() ) break;
441  //if( f_stage == monarch_stage::finished) break;
442 
443  // f_monarch_mutex is locked at this point
444 
445  f_do_switch_flag = false;
446 
447  LDEBUG( plog, "Switching egg files" );
448  try
449  {
451  }
452  catch( std::exception& e )
453  {
454  LERROR( plog, "Caught exception while switching to new file: " << e.what() );
455  scarab::signal_handler::cancel_all( RETURN_ERROR );
456  }
457 
458  f_ok_to_write = true;
459  f_wait_to_write.notify_all();
460 
461  } // end while( ! f_monarch_od_manager.is_canceled() && f_monarch_wrap->f_stage != monarch_stage::finished )
462 
463  LINFO( plog, "Monarch's execute-switch-loop for file <" << f_header_wrap->header().Filename() << "> is stopping" );
464 
465  return;
466  }
467 
469  {
470  if( f_do_switch_flag.load() ) return;
471  f_do_switch_flag = true;
472  f_ok_to_write = false;
473  f_do_switch_trig.notify_one();
474  return;
475  }
476 
478  {
479  if( f_od_thread == nullptr )
480  {
481  LWARN( plog, "Monarch wrapper was not in use" );
482  return;
483  }
484 
486 
487  f_od_thread->join();
488  delete f_od_thread;
489  f_od_thread = nullptr;
490 
491  f_switch_thread->join();
492  delete f_switch_thread;
493  f_switch_thread = nullptr;
494 
496 
497  return;
498  }
499 
500  void monarch_wrapper::finish_stream( unsigned a_stream_no )
501  {
502  unique_lock t_monarch_lock( f_monarch_mutex );
504  {
505  throw error() << "Monarch must be in the writing stage to finish a stream";
506  }
507 
508  auto t_stream_it = f_stream_wraps.find( a_stream_no );
509  if( t_stream_it == f_stream_wraps.end() )
510  {
511  throw error() << "Stream number <" << a_stream_no << "> was not found";
512  }
513  LDEBUG( plog, "Finishing stream <" << a_stream_no << ">" );
514  f_stream_wraps.erase( t_stream_it );
515 
516  return;
517  }
518 
520  {
521  unique_lock t_monarch_lock( f_monarch_mutex );
522 
523  std::string t_filename( f_monarch->GetHeader()->Filename() );
524 
526 
528  {
529  f_header_wrap.reset();
530  try
531  {
532  f_monarch->WriteHeader();
533  LDEBUG( plog, "Header written for file <" << t_filename << ">" );
534  }
535  catch( monarch3::M3Exception& e )
536  {
537  throw error() << e.what();
538  }
539  }
540  else if( f_stage == monarch_stage::writing )
541  {
542  t_monarch_lock.unlock(); // finish_stream() locks f_monarch_mutex, so it needs to be unlocked before calls on that can be processed
543  // we expect that streams are being cancelled from their respective writers
544  // here we wait for them to all be cancelled
545  for( unsigned i_attempt = 0; i_attempt < 10 && ! f_stream_wraps.empty(); ++i_attempt)
546  {
547  // give midge time to finish the streams before finishing the files
548  std::this_thread::sleep_for( std::chrono::milliseconds(500));
549  }
550  if( ! f_stream_wraps.empty() )
551  {
552  // perhaps we got here without midge being cancelled.
553  // let's call a global cancel and then wait for the streams to cancel again
554  LERROR( plog, "Streams were not cancelled as expected (via their writers). Attempting a global cancellation." );
555  scarab::signal_handler::cancel_all( RETURN_ERROR );
556  for( unsigned i_attempt = 0; i_attempt < 10 && ! f_stream_wraps.empty(); ++i_attempt)
557  {
558  // give midge time to finish the streams before finishing the files
559  std::this_thread::sleep_for( std::chrono::milliseconds(500));
560  }
561  // check again that the streams are now empty
562  if( ! f_stream_wraps.empty() )
563  {
564  // get out if the streams did not cancel because we can't finish the file correctly in that case
565  throw error() << "Streams did not all finish after wait period and global cancellation";
566  }
567  }
568  // re-lock so that the lock condition is the same once we exit this block
569  t_monarch_lock.lock();
570  }
571  LINFO( plog, "Finished writing file <" << t_filename << ">" );
573  f_monarch->FinishWriting();
574  f_monarch.reset();
575  f_file_size_est_mb = 0.;
576  return;
577  }
578 
580  {
581  if( f_stage != monarch_stage::writing ) throw error() << "Invalid monarch stage for starting a new file: " << f_stage;
582 
583  try
584  {
585  LDEBUG( plog, "Switching to new file; locking header mutex, and monarch mutex" );
586  unique_lock t_header_lock( f_header_wrap->get_lock() );
587  //unique_lock t_monarch_lock( f_monarch_mutex ); // monarch mutex is already locked in the loop in execute_switch_loop
588 
589  // if the to-finish monarch is full for some reason, empty it
590  LTRACE( plog, "Synchronous call to finish to-finish" );
592 
593  // if the on-deck monarch doesn't exist, create it
594  LTRACE( plog, "Synchronous call to create on-deck" );
596 
597  LTRACE( plog, "Switching file pointers" );
598 
599  // move the old file to the to_finish pointer
601 
602  // move the on_deck pointer to the current pointer
604 
605  f_file_size_est_mb = 0.;
606 
607  LTRACE( plog, "Switching header pointer" );
608 
609  // swap out the header pointer
610  f_header_wrap->f_header = f_monarch->GetHeader();
611  t_header_lock.unlock();
612 
613  LTRACE( plog, "Switching stream pointers" );
614 
615  // swap out the stream pointers
616  for( std::map< unsigned, stream_wrap_ptr >::iterator t_stream_it = f_stream_wraps.begin(); t_stream_it != f_stream_wraps.end(); ++t_stream_it )
617  {
618  t_stream_it->second->f_stream = f_monarch->GetStream( t_stream_it->first );
619  if( t_stream_it->second->f_stream == nullptr )
620  {
621  throw error() << "Stream <" << t_stream_it->first << "> was invalid";
622  }
623  }
624 
625  LDEBUG( plog, "Switch to new file is complete: <" << f_header_wrap->ptr()->Filename() << ">" );
626 
627  //f_file_switch_started = false;
628 
629  // notify the on-deck thread to process the to-finish and on-deck monarchs
631 
632  }
633  catch( std::exception& e )
634  {
635  throw;
636  }
637  return;
638  }
639 
641  {
642  LDEBUG( plog, "Setting monarch stage to <" << a_stage << ">" );
643  f_stage = a_stage;
644  return;
645  }
646 
648  {
649  double t_file_size_est_mb = f_file_size_est_mb.load() + a_size;
650  f_file_size_est_mb = t_file_size_est_mb;
651  LTRACE( plog, "File contribution: " << a_size << " MB; Estimated file size is now " << t_file_size_est_mb << " MB; limit is " << f_max_file_size_mb << " MB" );
652  if( t_file_size_est_mb >= f_max_file_size_mb )
653  {
654  LDEBUG( plog, "Max file size exceeded (" << t_file_size_est_mb << " MB >= " << f_max_file_size_mb << " MB)" );
655  trigger_switch();
656  }
657  return;
658  }
659 
661  {
662  LTRACE( plog, "Checking ok to write" );
663  if( f_ok_to_write.load() ) return f_monarch.operator bool();
664  std::mutex t_wait_mutex;
665  unique_lock t_wait_lock( t_wait_mutex );
666  while( ! f_ok_to_write.load() )
667  {
668  f_wait_to_write.wait_for( t_wait_lock, std::chrono::milliseconds( 100 ) );
669  }
670  return f_monarch.operator bool();
671  }
672 
673 
674  //******************
675  // header_wrapper
676  //******************
677 
678  header_wrapper::header_wrapper( monarch3::Monarch3& a_monarch ) :
679  f_header( a_monarch.GetHeader() ),
680  f_mutex()
681  {
682  if( ! f_header )
683  {
684  throw error() << "Unable to get monarch header";
685  }
686  }
687 
689  f_header( a_orig.f_header ),
690  f_mutex()//,
691  //f_lock( f_mutex )
692  {
693  a_orig.f_header = nullptr;
694  }
695 
697  {
698  }
699 
701  {
702  f_header = a_orig.f_header;
703  a_orig.f_header = nullptr;
704  return *this;
705  }
706 
707  monarch3::M3Header& header_wrapper::header()
708  {
709  if( f_header == nullptr ) throw error() << "Unable to write to header; the owning Monarch object must have moved beyond the preparation stage";
710  return *f_header;
711  }
712 
713 
714  //******************
715  // stream_wrapper
716  //******************
717 
718  stream_wrapper::stream_wrapper( monarch3::Monarch3& a_monarch, unsigned a_stream_no, monarch_wrapper* a_monarch_wrapper ) :
719  f_monarch_wrapper( a_monarch_wrapper ),
720  f_stream( a_monarch.GetStream( a_stream_no ) ),
721  f_is_valid( true ),
722  f_record_size_mb( 1.e-6 * (double)f_stream->GetStreamRecordNBytes() )
723  {
724  if( f_stream == nullptr )
725  {
726  throw error() << "Invalid stream number requested: " << a_stream_no;
727  }
728  }
729 
732  f_stream( a_orig.f_stream ),
733  f_is_valid( a_orig.f_is_valid ),
735  {
736  a_orig.f_stream = nullptr;
737  a_orig.f_is_valid = false;
738  }
739 
741  {}
742 
744  {
745  f_monarch_wrapper = a_orig.f_monarch_wrapper;
746  f_stream = a_orig.f_stream;
747  a_orig.f_stream = nullptr;
748  a_orig.f_is_valid = false;
749  f_record_size_mb = a_orig.f_record_size_mb;
750  return *this;
751  }
752 
754  bool stream_wrapper::write_record( monarch3::RecordIdType a_rec_id, monarch3::TimeType a_rec_time, const void* a_rec_block, uint64_t a_bytes, bool a_is_new_acq )
755  {
756  LTRACE( plog, "Writing record <" << a_rec_id << ">" );
758  {
759  LERROR( plog, "Unable to write to monarch file" );
760  //f_mutex.unlock();
761  return false;
762  }
763  get_stream_record()->SetRecordId( a_rec_id );
764  get_stream_record()->SetTime( a_rec_time );
765  ::memcpy( get_stream_record()->GetData(), a_rec_block, a_bytes );
766  bool t_return = f_stream->WriteRecord( a_is_new_acq );
768  return t_return;
769  }
770 
771 } /* namespace psyllid */
std::shared_ptr< header_wrapper > header_wrap_ptr
stream_wrap_ptr get_stream(unsigned a_stream_no)
std::thread * f_switch_thread
const char * what() const
monarch3::M3Header & header()
Get a reference to the M3Header; Will throw psyllid::error if the header object is not valid...
std::condition_variable f_od_condition
std::shared_ptr< stream_wrapper > stream_wrap_ptr
monarch3::M3Stream * f_stream
std::map< unsigned, stream_wrap_ptr > f_stream_wraps
static scarab::logger plog("batch_executor")
monarch3::M3Record * get_stream_record()
Get the pointer to the stream record.
std::unique_lock< std::mutex > unique_lock
void start_using()
Make the wrapper available for use; starts parallel on-deck thread.
std::atomic< bool > f_do_switch_flag
STL namespace.
void finish_stream(unsigned a_stream_no)
Finish the given stream. The stream object will be deleted.
void notify()
Notify the manager to process its monarch objects if needed (asynchronous)
std::shared_ptr< monarch3::Monarch3 > f_monarch
Wrapper class for a monarch3::M3Header object.
header_wrap_ptr get_header()
Returns the header wrapped in a header_wrap_ptr to be filled at the beginning of file writing...
void execute()
Execute the thread loop: handle the asynchronous processing of the on-deck and to-finish monarch obje...
monarch_wrapper * f_monarch_wrapper
stream_wrapper & operator=(stream_wrapper &&a_orig)
bool pointers_empty() const
Return true if both f_monarch_on_deck and f_monarch_to_finish are empty.
std::condition_variable f_wait_to_write
Wrapper class for a monarch3::M3Stream object.
void finish_to_finish()
Finish the to-finish monarch object (synchronous)
std::ostream & operator<<(std::ostream &a_os, monarch_stage a_stage)
header_wrapper(monarch3::Monarch3 &a_monarch)
void finish_file()
Finish the file.
monarch_stage to_op_t(uint32_t a_stage_uint)
void record_file_contribution(double a_size)
const monarch_wrapper * f_monarch_wrap
monarch_on_deck_manager f_monarch_od_manager
header_wrapper & operator=(header_wrapper &&a_orig)
std::condition_variable f_do_switch_trig
void clear_on_deck()
Clear the on-deck monarch object (synchronous)
monarch_on_deck_manager(monarch_wrapper *a_monarch_wrap)
LOGGER(plog, "egg_writer")
std::atomic< bool > f_ok_to_write
unsigned get_and_increment_file_count() const
As it says, return the current value to, and then increment, the file count.
void set_as_to_finish(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Give a monarch object to the on-deck manager with the intent that it be finished asynchronously.
stream_wrapper(monarch3::Monarch3 &, unsigned a_stream_no, monarch_wrapper *a_monarch_wrapper)
Wrapper class for a monarch3::M3Monarch object.
void create_on_deck()
Create the on-deck monarch object if it doesn&#39;t exist already (synchronous)
void stop_using()
Make the wrapper unavailable for use; stops the parallel on-deck thread.
bool okay_to_write()
Ensure that the file is available to write to (via a stream)
void set_stage(monarch_stage a_stage)
Override the stage value.
std::shared_ptr< monarch3::Monarch3 > f_monarch_on_deck
uint32_t to_uint(monarch_stage a_stage)
monarch_wrapper(const std::string &a_filename)
void get_on_deck(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Get the on-deck monarch object that has been created asynchronously.
std::atomic< double > f_file_size_est_mb
std::shared_ptr< monarch3::Monarch3 > f_monarch_to_finish
bool write_record(monarch3::RecordIdType a_rec_id, monarch3::TimeType a_rec_time, const void *a_rec_block, uint64_t a_bytes, bool a_is_new_acq)
Write the record contents to the file.
monarch3::M3Header * f_header
header_wrap_ptr f_header_wrap