23 f_skip_tolerance( 0 ),
25 f_state(
state_t::untriggered ),
26 f_pretrigger_buffer(),
51 midge::enum_t t_in_command = stream::s_none;
54 unsigned t_trigger_count = 0;
56 bool t_current_trig_flag =
false;
57 bool t_current_trig_high_thr =
false;
59 while( ! is_canceled() )
61 t_in_command = in_stream< 0 >().
get();
62 if( t_in_command == stream::s_none )
continue;
63 if( t_in_command == stream::s_error )
break;
65 LTRACE(
plog,
"Event builder reading stream at index " << in_stream< 0 >().get_current_index() );
67 t_trigger_flag = in_stream< 0 >().data();
68 t_write_flag = out_stream< 0 >().data();
70 if( t_in_command == stream::s_start )
72 LDEBUG(
plog,
"Starting the event builder" );
73 if( ! out_stream< 0 >().
set( stream::s_start ) )
break;
77 if( t_in_command == stream::s_run )
79 t_current_trig_flag = t_trigger_flag->get_flag();
80 t_current_trig_high_thr = t_trigger_flag->get_high_threshold();
82 LTRACE(
plog,
"Event builder received id <" << t_trigger_flag->get_id() <<
"> with flag value <" << t_trigger_flag->get_flag() <<
">" );
105 LTRACE(
plog,
"Currently in untriggered state" );
106 if( t_current_trig_flag and t_current_trig_high_thr ==
true)
108 LINFO(
plog,
"New trigger" );
110 if (t_trigger_count == f_n_triggers)
119 goto exit_outer_loop;
122 t_write_flag = out_stream< 0 >().data();
125 LDEBUG(
plog,
"Next state is triggered" );
131 LDEBUG(
plog,
"Next state is collecting" );
137 LTRACE(
plog,
"No new trigger; Writing to from pretrig buffer only if buffer is full: " <<
f_pretrigger_buffer.full() );
153 LTRACE(
plog,
"Currently in collecting state" );
154 if (t_current_trig_flag)
157 LDEBUG(
plog,
"Got another trigger: "<<t_trigger_count<<
", need N triggers: "<<f_n_triggers);
158 if (t_trigger_count == f_n_triggers)
167 goto exit_outer_loop;
170 t_write_flag = out_stream< 0 >().data();
174 LTRACE(
plog,
"Current state waiting. Writing id "<<
f_skip_buffer.front()<<
" as true");
177 goto exit_outer_loop;
180 t_write_flag = out_stream< 0 >().data();
185 LDEBUG(
plog,
"Next state is triggered");
200 goto exit_outer_loop;
203 t_write_flag = out_stream< 0 >().data();
208 LTRACE(
plog,
"Current state waiting. Writing id "<<
f_skip_buffer.front()<<
" as false");
211 goto exit_outer_loop;
214 t_write_flag = out_stream< 0 >().data();
233 goto exit_outer_loop;
236 t_write_flag = out_stream< 0 >().data();
247 LDEBUG(
plog,
"Next state is untriggered" );
252 LTRACE(
plog,
"Currently in triggered state" );
253 if( t_current_trig_flag )
255 LTRACE(
plog,
"Continuing as triggered" );
258 LDEBUG(
plog,
"Current state triggered. Writing id "<<
f_skip_buffer.front()<<
" as true");
269 LDEBUG(
plog,
"No new trigger; Switching state" );
276 LDEBUG(
plog,
"Next state is untriggered");
292 LDEBUG(
plog,
"Next state is skipping" );
306 LTRACE(
plog,
"Currently in skipping state" );
308 if( t_current_trig_flag )
310 LINFO(
plog,
"New trigger; flushing skip buffer" );
313 LTRACE(
plog,
"Current state skipping. Writing id " <<
f_skip_buffer.front() <<
" as true" );
316 goto exit_outer_loop;
319 t_write_flag = out_stream< 0 >().data();
324 LDEBUG(
plog,
"Next state is triggered" );
332 LINFO(
plog,
"Skip_tolerance reached. Continuing as untriggered");
338 LTRACE(
plog,
"Current state skipping. Writing id " <<
f_skip_buffer.front() <<
" as true" );
341 goto exit_outer_loop;
344 t_write_flag = out_stream< 0 >().data();
353 LTRACE(
plog,
"Current state skipping. Writing id " <<
f_skip_buffer.front() <<
" as true" );
357 goto exit_outer_loop;
360 t_write_flag = out_stream< 0 >().data();
387 LTRACE(
plog,
"No new trigger. Continue to fill skip and pretrigger buffer buffer." )
394 if( t_in_command == stream::s_stop )
396 LDEBUG(
plog,
"Event builder is stopping at stream index " << out_stream< 0 >().get_current_index() );
397 LDEBUG(
plog,
"Flushing buffers as untriggered" );
406 goto exit_outer_loop;
408 t_write_flag = out_stream< 0 >().data();
416 goto exit_outer_loop;
418 t_write_flag = out_stream< 0 >().data();
425 goto exit_outer_loop;
427 t_write_flag = out_stream< 0 >().data();
433 LDEBUG(
plog,
"Skip buffer is empty" );
439 goto exit_outer_loop;
442 t_write_flag = out_stream< 0 >().data();
447 LDEBUG(
plog,
"Pretrigger buffer is empty");
453 goto exit_outer_loop;
456 t_write_flag = out_stream< 0 >().data();
462 if( ! out_stream< 0 >().
set( stream::s_stop ) )
464 LERROR(
plog,
"Exiting due to stream error" );
470 if( t_in_command == stream::s_exit )
472 LDEBUG(
plog,
"Event builder is exiting at stream index " << out_stream< 0 >().get_current_index() );
473 LDEBUG(
plog,
"Flushing buffers as untriggered" );
482 goto exit_outer_loop;
484 t_write_flag = out_stream< 0 >().data();
492 goto exit_outer_loop;
494 t_write_flag = out_stream< 0 >().data();
501 goto exit_outer_loop;
503 t_write_flag = out_stream< 0 >().data();
508 LTRACE(
plog,
"Skip buffer is empty" );
514 goto exit_outer_loop;
517 t_write_flag = out_stream< 0 >().data();
522 LTRACE(
plog,
"Pretrigger buffer is empty" );
528 goto exit_outer_loop;
531 t_write_flag = out_stream< 0 >().data();
537 out_stream< 0 >().
set( stream::s_exit );
544 LDEBUG(
plog,
"Stopping output stream" );
545 if( ! out_stream< 0 >().
set( stream::s_stop ) )
return;
547 LDEBUG(
plog,
"Exiting output stream" );
548 out_stream< 0 >().
set( stream::s_exit );
553 if( a_midge ) a_midge->throw_ex( std::current_exception() );
560 a_write_flag->set_id( a_id );
561 a_write_flag->set_flag( a_trig_flag );
562 LDEBUG(
plog,
"Event builder writing data to the output stream at index " << out_stream< 0 >().get_current_index() );
563 out_stream< 0 >().
set( midge::stream::s_run );
585 LDEBUG(
plog,
"Configuring event_builder with:\n" << a_config );
586 a_node->set_length( a_config.get_value(
"length", a_node->get_length() ) );
587 a_node->set_pretrigger( a_config.get_value(
"pretrigger", a_node->get_pretrigger() ) );
588 a_node->set_skip_tolerance( a_config.get_value(
"skip-tolerance", a_node->get_skip_tolerance() ) );
589 a_node->set_n_triggers( a_config.get_value(
"n-triggers", a_node->get_n_triggers() ) );
595 LDEBUG(
plog,
"Dumping configuration for event_builder" );
596 a_config.add(
"length", scarab::param_value( a_node->get_length() ) );
597 a_config.add(
"pretrigger", scarab::param_value( a_node->get_pretrigger() ) );
598 a_config.add(
"skip-tolerance", scarab::param_value( a_node->get_skip_tolerance() ) );
599 a_config.add(
"n-triggers", scarab::param_value( a_node->get_n_triggers() ) );
pretrigger_buffer_t f_skip_buffer
static scarab::logger plog("batch_executor")
virtual void execute(midge::diptera *a_midge=nullptr)
bool write_output_from_ptbuff_front(bool a_flag, trigger_flag *a_data)
bool write_output_from_skipbuff_front(bool a_flag, trigger_flag *a_data)
A transformer that considers a sequence of triggered packets and decides what constitutes a contiguou...
virtual ~event_builder_binding()
virtual void initialize()
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
LOGGER(plog, "egg_writer")
pretrigger_buffer_t f_pretrigger_buffer
virtual void do_dump_config(const event_builder *a_node, scarab::param_node &a_config) const
virtual void do_apply_config(event_builder *a_node, const scarab::param_node &a_config) const
void advance_output_stream(trigger_flag *a_write_flag, uint64_t a_id, bool a_trig_flag)