14 #include "M3Monarch.hh" 17 #include "M3Monarch.hh" 31 f_egg_path(
"/dev/null" ),
32 f_read_n_records( 0 ),
33 f_repeat_egg( false ),
35 f_start_paused( true ),
53 LDEBUG(
plog,
"opening egg file [" << f_egg_path <<
"]" );
54 f_egg = monarch3::Monarch3::OpenForReading( f_egg_path );
57 const monarch3::M3Header *t_egg_header = f_egg->GetHeader();
58 LDEBUG(
plog,
"egg header content:\n" );
59 LDEBUG(
plog, *t_egg_header );
70 LDEBUG(
plog,
"Executing the egg3_reader" );
73 const monarch3::M3Stream* t_stream = f_egg->GetStream( 0 );
74 const monarch3::M3Record* t_record = t_stream->GetChannelRecord( 0 );
81 if( ! out_stream< 0 >().
set( stream::s_start ) )
return;
84 uint64_t t_records_read = 0;
87 while (! is_canceled() )
89 if( (out_stream< 0 >().
get() == stream::s_stop) )
91 LWARN(
plog,
"Output stream(s) have stop condition" );
94 if( have_instruction() )
96 if(
f_paused && use_instruction() == midge::instruction::resume )
98 LDEBUG(
plog,
"egg reader resuming" );
99 if( ! out_stream< 0 >().
set( stream::s_start ) )
throw midge::node_nonfatal_error() <<
"Stream 0 error while starting";
103 else if ( !
f_paused && use_instruction() == midge::instruction::pause )
105 LDEBUG(
plog,
"egg reader pausing" );
106 if( ! out_stream< 0 >().
set( stream::s_stop ) )
throw midge::node_nonfatal_error() <<
"Stream 0 error while stopping";
114 bool read_slice_ok =
read_slice(t_data, t_stream, t_record);
118 if ( !read_slice_ok || (f_read_n_records > 0 && t_records_read >= f_read_n_records) )
120 LINFO(
plog,
"breaking out of loop because record limit or end of file reached" );
122 t_daq_control->stop_run();
125 std::this_thread::sleep_for(std::chrono::microseconds(100));
129 std::this_thread::sleep_for(std::chrono::milliseconds(100));
133 catch( std::exception )
135 LWARN(
plog,
"got an exception, throwing" );
136 a_midge->throw_ex( std::current_exception() );
138 LDEBUG(
plog,
"at the end of egg3 execute" );
143 LDEBUG(
plog,
"finalize the egg3_reader" );
145 LDEBUG(
plog,
"buffer finalized" );
152 LDEBUG(
plog,
"reading a slice" );
154 t_data = out_stream< 0 >().data();
156 if ( !t_stream->ReadRecord() )
160 LDEBUG(
plog,
"reached end of file, stopping" );
165 LDEBUG(
plog,
"reached end of file, restarting" );
166 t_stream->ReadRecord( -1 *
int(t_stream->GetRecordCountInFile()) );
176 t_data->set_pkt_in_session( t_record->GetRecordId() +
f_pkt_id_offset );
177 if ( !out_stream< 0 >().set( stream::s_run ) )
179 LERROR(
plog,
"egg reader exiting due to stream error" );
187 LDEBUG(
plog,
"cleaning up file" );
188 if ( f_egg == NULL )
return;
189 LDEBUG(
plog,
"clean egg" );
190 if ( f_egg->GetState() != monarch3::Monarch3::eClosed )
192 LDEBUG(
plog,
"actually close egg" );
193 f_egg->FinishReading();
209 LDEBUG(
plog,
"Configuring egg3_reader with:\n" << a_config );
210 a_node->set_egg_path( a_config.get_value(
"egg-path", a_node->get_egg_path() ) );
211 a_node->set_read_n_records( a_config.get_value(
"read-n-records", a_node->get_read_n_records() ) );
212 a_node->set_repeat_egg( a_config.get_value(
"repeat-egg", a_node->get_repeat_egg() ) );
213 a_node->set_length( a_config.get_value(
"length", a_node->get_length() ) );
214 a_node->set_start_paused( a_config.get_value(
"start-paused", a_node->get_start_paused() ) );
220 LDEBUG(
plog,
"Dumping configuration for egg3_reader" );
221 a_config.add(
"egg-path", scarab::param_value( a_node->get_egg_path() ) );
222 a_config.add(
"read-n-records", scarab::param_value( a_node->get_read_n_records() ) );
223 a_config.add(
"repeat-egg", scarab::param_value( a_node->get_repeat_egg() ) );
224 a_config.add(
"length", scarab::param_value( a_node->get_length() ) );
225 a_config.add(
"start-paused", scarab::param_value( a_node->get_length() ) );
virtual void execute(midge::diptera *a_midge=nullptr)
static scarab::logger plog("batch_executor")
dc_ptr_t use_daq_control()
virtual void initialize()
virtual ~egg3_reader_binding()
const iq_t * get_array() const
virtual void do_apply_config(egg3_reader *a_node, const scarab::param_node &a_config) const
void set_pkt_in_batch(uint32_t a_pkt)
bool read_slice(time_data *t_data, const monarch3::M3Stream *t_stream, const monarch3::M3Record *t_record)
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A producer to read time-domain slices from an egg file and place them in time data buffers...
LOGGER(plog, "egg_writer")
virtual void do_dump_config(const egg3_reader *a_node, scarab::param_node &a_config) const