14 #include "M3Exception.hh" 17 #include "signal_handler.hh" 19 #include <boost/filesystem.hpp> 32 return static_cast< uint32_t
>( a_stage );
40 return a_os <<
to_uint( a_stage );
50 f_monarch_wrap( a_monarch_wrap ),
53 f_od_continue_condition(),
69 f_od_condition.wait_for( t_od_lock, std::chrono::milliseconds( 500 ) );
73 bool t_do_wait =
false;
82 LDEBUG(
plog,
"Finishing pre-existing to-finish file" );
97 catch( std::exception& e )
99 LERROR(
plog,
"Exception caught in monarch-on-deck manager: " << e.what() );
100 scarab::signal_handler::cancel_all( RETURN_ERROR );
104 LINFO(
plog,
"Monarch-on-deck manager is stopping" );
113 LDEBUG(
plog,
"Creating a new on-deck monarch" );
116 std::stringstream t_count_stream;
119 LDEBUG(
plog,
"On-deck filename: <" << t_new_filename <<
">" );
122 std::shared_ptr< monarch3::Monarch3 > t_new_monarch;
125 t_new_monarch.reset( monarch3::Monarch3::OpenForWriting( t_new_filename ) );
126 LTRACE(
plog,
"New file is open" );
128 catch( monarch3::M3Exception& e )
130 throw error() <<
"Unable to open the file <" << t_new_filename <<
"\n" <<
131 "Reason: " << e.
what();
136 monarch3::M3Header* t_new_header = t_new_monarch->GetHeader();
137 t_new_header->CopyBasicInfo( t_old_header_ptr->header() );
138 t_new_header->Filename() = t_new_filename;
139 t_new_header->Description() = t_old_header_ptr->ptr()->Description() +
"\nContinuation of file " + t_old_header_ptr->ptr()->Filename();
142 std::vector< monarch3::M3StreamHeader >* t_old_stream_headers = &t_old_header_ptr->ptr()->GetStreamHeaders();
143 std::vector< unsigned > t_chan_vec;
144 for(
unsigned i_stream = 0; i_stream != t_old_stream_headers->size(); ++i_stream )
148 monarch3::M3StreamHeader* t_old_stream_header = &t_old_stream_headers->operator[]( i_stream );
149 unsigned n_channels = t_old_stream_header->GetNChannels();
152 t_new_header->AddStream( t_old_stream_header->Source(), n_channels, t_old_stream_header->GetChannelFormat(),
153 t_old_stream_header->GetAcquisitionRate(), t_old_stream_header->GetRecordSize(), t_old_stream_header->GetSampleSize(),
154 t_old_stream_header->GetDataTypeSize(), t_old_stream_header->GetDataFormat(),
155 t_old_stream_header->GetBitDepth(), t_old_stream_header->GetBitAlignment(),
160 t_new_header->AddStream( t_old_stream_header->Source(),
161 t_old_stream_header->GetAcquisitionRate(), t_old_stream_header->GetRecordSize(), t_old_stream_header->GetSampleSize(),
162 t_old_stream_header->GetDataTypeSize(), t_old_stream_header->GetDataFormat(),
163 t_old_stream_header->GetBitDepth(), t_old_stream_header->GetBitAlignment(),
166 std::vector< monarch3::M3ChannelHeader >& t_old_chan_headers = t_old_header_ptr->ptr()->GetChannelHeaders();
167 std::vector< monarch3::M3ChannelHeader >& t_new_chan_headers = t_new_header->GetChannelHeaders();
168 for(
unsigned i_chan = 0; i_chan < n_channels; ++i_chan )
170 t_new_chan_headers[ i_chan ].SetVoltageOffset( t_old_chan_headers[ i_chan ].GetVoltageOffset() );
171 t_new_chan_headers[ i_chan ].SetVoltageRange( t_old_chan_headers[ i_chan ].GetVoltageRange() );
172 t_new_chan_headers[ i_chan ].SetDACGain( t_old_chan_headers[ i_chan ].GetDACGain() );
173 t_new_chan_headers[ i_chan ].SetFrequencyMin( t_old_chan_headers[ i_chan ].GetFrequencyMin() );
174 t_new_chan_headers[ i_chan ].SetFrequencyRange( t_old_chan_headers[ i_chan ].GetFrequencyRange() );
179 LTRACE(
plog,
"Writing new header" );
180 t_new_monarch->WriteHeader();
200 LDEBUG(
plog,
"Closing on-deck file <" << t_filename <<
">" );
203 catch( monarch3::M3Exception& e )
205 LWARN(
plog,
"File could not be closed properly: " << e.what() );
209 LDEBUG(
plog,
"On-deck file was written out to <" << t_filename <<
">; now removing the file" );
210 boost::filesystem::remove( t_filename );
212 catch( boost::filesystem::filesystem_error& e )
214 LWARN(
plog,
"File could not be removed: <" << t_filename <<
">\n" << e.what() );
227 LDEBUG(
plog,
"Finishing to-finish file" );
240 f_orig_filename( a_filename ),
244 f_max_file_size_mb( 0. ),
245 f_file_size_est_mb( 0. ),
247 f_switch_thread( nullptr ),
248 f_ok_to_write( true ),
249 f_do_switch_flag( false ),
255 f_run_start_time(
std::chrono::steady_clock::now() ),
257 f_od_thread( nullptr ),
258 f_monarch_od_manager( this )
260 std::string::size_type t_ext_pos = a_filename.find_last_of(
'.' );
261 if( t_ext_pos == std::string::npos )
277 catch( monarch3::M3Exception& e )
280 "Reason: " << e.what();
294 LWARN(
plog,
"Trying to destroy monarch_wrapper; waiting for on-deck thread to join" );
304 LWARN(
plog,
"Trying to destroy monarch_wrapper; waiting for switch thread to join" );
319 catch( monarch3::M3Exception& e )
321 LERROR(
plog,
"Unable to write file on monarch_wrapper deletion: " << e.what() );
353 LDEBUG(
plog,
"Stream <" << a_stream_no <<
"> is being retrieved from the Monarch object" );
380 throw error() <<
"One or more of the Monarch pointers is not empty";
385 throw error() <<
"On-deck thread already exists";
392 LDEBUG(
plog,
"Writing the header for file <" <<
f_header_wrap->header().Filename() );
396 LDEBUG(
plog,
"Header written for file <" <<
f_header_wrap->header().Filename() <<
">" );
398 catch( monarch3::M3Exception& e )
404 t_header_lock.unlock();
411 LDEBUG(
plog,
"Starting the switch thread for file <" <<
f_header_wrap->header().Filename() <<
">" );
415 LDEBUG(
plog,
"Starting the on-deck thread for file <" <<
f_header_wrap->header().Filename() <<
">" );
419 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
429 LINFO(
plog,
"Monarch's execute-switch-loop for file <" <<
f_header_wrap->header().Filename() <<
"> is starting up" );
440 if( is_canceled() )
break;
447 LDEBUG(
plog,
"Switching egg files" );
452 catch( std::exception& e )
454 LERROR(
plog,
"Caught exception while switching to new file: " << e.what() );
455 scarab::signal_handler::cancel_all( RETURN_ERROR );
463 LINFO(
plog,
"Monarch's execute-switch-loop for file <" <<
f_header_wrap->header().Filename() <<
"> is stopping" );
481 LWARN(
plog,
"Monarch wrapper was not in use" );
505 throw error() <<
"Monarch must be in the writing stage to finish a stream";
511 throw error() <<
"Stream number <" << a_stream_no <<
"> was not found";
513 LDEBUG(
plog,
"Finishing stream <" << a_stream_no <<
">" );
523 std::string t_filename(
f_monarch->GetHeader()->Filename() );
533 LDEBUG(
plog,
"Header written for file <" << t_filename <<
">" );
535 catch( monarch3::M3Exception& e )
542 t_monarch_lock.unlock();
545 for(
unsigned i_attempt = 0; i_attempt < 10 && !
f_stream_wraps.empty(); ++i_attempt)
548 std::this_thread::sleep_for( std::chrono::milliseconds(500));
554 LERROR(
plog,
"Streams were not cancelled as expected (via their writers). Attempting a global cancellation." );
555 scarab::signal_handler::cancel_all( RETURN_ERROR );
556 for(
unsigned i_attempt = 0; i_attempt < 10 && !
f_stream_wraps.empty(); ++i_attempt)
559 std::this_thread::sleep_for( std::chrono::milliseconds(500));
565 throw error() <<
"Streams did not all finish after wait period and global cancellation";
569 t_monarch_lock.lock();
571 LINFO(
plog,
"Finished writing file <" << t_filename <<
">" );
585 LDEBUG(
plog,
"Switching to new file; locking header mutex, and monarch mutex" );
590 LTRACE(
plog,
"Synchronous call to finish to-finish" );
594 LTRACE(
plog,
"Synchronous call to create on-deck" );
597 LTRACE(
plog,
"Switching file pointers" );
607 LTRACE(
plog,
"Switching header pointer" );
611 t_header_lock.unlock();
613 LTRACE(
plog,
"Switching stream pointers" );
616 for( std::map< unsigned, stream_wrap_ptr >::iterator t_stream_it =
f_stream_wraps.begin(); t_stream_it !=
f_stream_wraps.end(); ++t_stream_it )
618 t_stream_it->second->f_stream =
f_monarch->GetStream( t_stream_it->first );
619 if( t_stream_it->second->f_stream ==
nullptr )
621 throw error() <<
"Stream <" << t_stream_it->first <<
"> was invalid";
625 LDEBUG(
plog,
"Switch to new file is complete: <" <<
f_header_wrap->ptr()->Filename() <<
">" );
633 catch( std::exception& e )
642 LDEBUG(
plog,
"Setting monarch stage to <" << a_stage <<
">" );
651 LTRACE(
plog,
"File contribution: " << a_size <<
" MB; Estimated file size is now " << t_file_size_est_mb <<
" MB; limit is " <<
f_max_file_size_mb <<
" MB" );
654 LDEBUG(
plog,
"Max file size exceeded (" << t_file_size_est_mb <<
" MB >= " <<
f_max_file_size_mb <<
" MB)" );
662 LTRACE(
plog,
"Checking ok to write" );
664 std::mutex t_wait_mutex;
668 f_wait_to_write.wait_for( t_wait_lock, std::chrono::milliseconds( 100 ) );
679 f_header( a_monarch.GetHeader() ),
684 throw error() <<
"Unable to get monarch header";
693 a_orig.f_header =
nullptr;
703 a_orig.f_header =
nullptr;
709 if(
f_header ==
nullptr )
throw error() <<
"Unable to write to header; the owning Monarch object must have moved beyond the preparation stage";
719 f_monarch_wrapper( a_monarch_wrapper ),
720 f_stream( a_monarch.GetStream( a_stream_no ) ),
722 f_record_size_mb( 1.e-6 * (double)f_stream->GetStreamRecordNBytes() )
726 throw error() <<
"Invalid stream number requested: " << a_stream_no;
736 a_orig.f_stream =
nullptr;
737 a_orig.f_is_valid =
false;
747 a_orig.f_stream =
nullptr;
748 a_orig.f_is_valid =
false;
754 bool stream_wrapper::write_record( monarch3::RecordIdType a_rec_id, monarch3::TimeType a_rec_time,
const void* a_rec_block, uint64_t a_bytes,
bool a_is_new_acq )
756 LTRACE(
plog,
"Writing record <" << a_rec_id <<
">" );
759 LERROR(
plog,
"Unable to write to monarch file" );
766 bool t_return =
f_stream->WriteRecord( a_is_new_acq );
std::shared_ptr< header_wrapper > header_wrap_ptr
stream_wrap_ptr get_stream(unsigned a_stream_no)
void execute_switch_loop()
void switch_to_new_file()
std::thread * f_switch_thread
const char * what() const
std::condition_variable f_od_condition
std::shared_ptr< stream_wrapper > stream_wrap_ptr
double f_max_file_size_mb
monarch3::M3Stream * f_stream
std::map< unsigned, stream_wrap_ptr > f_stream_wraps
static scarab::logger plog("batch_executor")
~monarch_on_deck_manager()
monarch3::M3Record * get_stream_record()
Get the pointer to the stream record.
std::unique_lock< std::mutex > unique_lock
void start_using()
Make the wrapper available for use; starts parallel on-deck thread.
std::string f_filename_base
std::atomic< bool > f_do_switch_flag
void finish_stream(unsigned a_stream_no)
Finish the given stream. The stream object will be deleted.
void notify()
Notify the manager to process its monarch objects if needed (asynchronous)
std::thread * f_od_thread
std::shared_ptr< monarch3::Monarch3 > f_monarch
header_wrap_ptr get_header()
Returns the header wrapped in a header_wrap_ptr to be filled at the beginning of file writing...
void execute()
Execute the thread loop: handle the asynchronous processing of the on-deck and to-finish monarch obje...
monarch_wrapper * f_monarch_wrapper
stream_wrapper & operator=(stream_wrapper &&a_orig)
bool pointers_empty() const
Return true if both f_monarch_on_deck and f_monarch_to_finish are empty.
std::condition_variable f_wait_to_write
Wrapper class for a monarch3::M3Stream object.
void finish_to_finish()
Finish the to-finish monarch object (synchronous)
std::ostream & operator<<(std::ostream &a_os, monarch_stage a_stage)
void finish_file()
Finish the file.
monarch_stage to_op_t(uint32_t a_stage_uint)
void record_file_contribution(double a_size)
const monarch_wrapper * f_monarch_wrap
monarch_on_deck_manager f_monarch_od_manager
std::condition_variable f_do_switch_trig
std::mutex f_monarch_mutex
void clear_on_deck()
Clear the on-deck monarch object (synchronous)
monarch_on_deck_manager(monarch_wrapper *a_monarch_wrap)
LOGGER(plog, "egg_writer")
std::atomic< bool > f_ok_to_write
unsigned get_and_increment_file_count() const
As it says, return the current value to, and then increment, the file count.
std::string f_orig_filename
void set_as_to_finish(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Give a monarch object to the on-deck manager with the intent that it be finished asynchronously.
void create_on_deck_nolock()
stream_wrapper(monarch3::Monarch3 &, unsigned a_stream_no, monarch_wrapper *a_monarch_wrapper)
Wrapper class for a monarch3::M3Monarch object.
void create_on_deck()
Create the on-deck monarch object if it doesn't exist already (synchronous)
void stop_using()
Make the wrapper unavailable for use; stops the parallel on-deck thread.
bool okay_to_write()
Ensure that the file is available to write to (via a stream)
void set_stage(monarch_stage a_stage)
Override the stage value.
void finish_to_finish_nolock()
std::shared_ptr< monarch3::Monarch3 > f_monarch_on_deck
uint32_t to_uint(monarch_stage a_stage)
monarch_wrapper(const std::string &a_filename)
void get_on_deck(std::shared_ptr< monarch3::Monarch3 > &a_monarch)
Get the on-deck monarch object that has been created asynchronously.
std::atomic< double > f_file_size_est_mb
std::shared_ptr< monarch3::Monarch3 > f_monarch_to_finish
std::string f_filename_ext
bool write_record(monarch3::RecordIdType a_rec_id, monarch3::TimeType a_rec_time, const void *a_rec_block, uint64_t a_bytes, bool a_is_new_acq)
Write the record contents to the file.
header_wrap_ptr f_header_wrap