18 #include <sys/types.h> 33 f_start_paused( true ),
34 f_force_time_first( false ),
35 f_skip_after_stop( 0 ),
38 f_break_exe_func( false ),
40 f_time_session_pkt_counter( 0 ),
41 f_freq_session_pkt_counter( 0 )
51 LDEBUG(
plog,
"Requesting switch to frequency-only mode" );
64 LDEBUG(
plog,
"Requesting switch to time-and-frequency mode" );
86 LDEBUG(
plog,
"Executing the TF ROACH receiver" );
100 t_ctx.
f_buffer_ptr.reset(
new char[ f_udp_buffer_size ] );
112 if( ! f_start_paused )
114 LDEBUG(
plog,
"TF ROACH receiver starting unpaused" );
115 out_stream< 0 >().data()->set_pkt_in_session( 0 );
116 out_stream< 1 >().data()->set_pkt_in_session( 0 );
117 if( ! out_stream< 0 >().
set( stream::s_start ) )
return;
118 if( ! out_stream< 1 >().
set( stream::s_start ) )
return;
125 LPROG(
plog,
"Starting main loop; waiting for packets" );
139 LINFO(
plog,
"TF ROACH receiver is exiting" );
142 LDEBUG(
plog,
"Stopping output streams" );
143 bool t_stop_ok = out_stream< 0 >().
set( stream::s_stop ) && out_stream< 1 >().
set( stream::s_stop );
144 if( ! t_stop_ok )
return;
146 LDEBUG(
plog,
"Exiting output streams" );
147 if( ! out_stream< 0 >().
set( stream::s_exit ) )
return;
148 out_stream< 1 >().
set( stream::s_exit );
152 catch( std::exception )
154 if( a_midge ) a_midge->throw_ex( std::current_exception() );
164 bool t_time_pkt_received = !f_force_time_first;
166 LDEBUG(
plog,
"Entering time-and-frequency loop" );
186 LTRACE(
plog,
"tfrr read s_none" );
192 LTRACE(
plog,
"tfrr read s_error" );
199 LDEBUG(
plog,
"TF ROACH receiver is exiting" );
202 out_stream< 0 >().
set( stream::s_exit );
203 out_stream< 1 >().
set( stream::s_exit );
210 LDEBUG(
plog,
"TF ROACH receiver is stopping" );
213 if( ! out_stream< 0 >().
set( stream::s_stop ) )
break;
214 if( ! out_stream< 1 >().
set( stream::s_stop ) )
break;
220 LDEBUG(
plog,
"TF ROACH receiver is starting" );
226 if( have_instruction() )
228 if(
f_paused && use_instruction() == midge::instruction::resume )
230 LDEBUG(
plog,
"TF ROACH receiver resuming" );
231 out_stream< 0 >().data()->set_pkt_in_session( 0 );
232 out_stream< 1 >().data()->set_pkt_in_session( 0 );
233 if( ! out_stream< 0 >().
set( stream::s_start ) )
throw midge::node_nonfatal_error() <<
"Stream 0 error while starting";
234 if( ! out_stream< 1 >().
set( stream::s_start ) )
throw midge::node_nonfatal_error() <<
"Stream 1 error while starting";
239 else if( !
f_paused && use_instruction() == midge::instruction::pause )
241 LDEBUG(
plog,
"TF ROACH receiver pausing" );
242 if( ! out_stream< 0 >().
set( stream::s_stop ) )
throw midge::node_nonfatal_error() <<
"Stream 0 error while stopping";
243 if( ! out_stream< 1 >().
set( stream::s_stop ) )
throw midge::node_nonfatal_error() <<
"Stream 1 error while stopping";
245 t_time_pkt_received = !f_force_time_first;
255 LTRACE(
plog,
"Handling packet at stream index <" << in_stream< 0 >().get_current_index() <<
">; size = " << a_ctx.
f_pkt_size <<
" bytes; block address = " << (
void*)a_ctx.
f_memory_block->
block() );
258 LWARN(
plog,
"Improper packet size; packet may be malformed: received " << a_ctx.
f_memory_block->get_n_bytes_used() <<
" bytes; expected " << f_udp_buffer_size <<
" bytes" );
268 LTRACE(
plog,
"Raw packet header: " << std::hex << t_raw_packet->
f_word_0 <<
", " << t_raw_packet->
f_word_1 <<
", " << t_raw_packet->
f_word_2 <<
", " << t_raw_packet->
f_word_3 );
274 if( ! t_time_pkt_received )
continue;
283 " pkt_session = " << a_ctx.
f_freq_data->get_pkt_in_session() <<
287 LTRACE(
plog,
"Frequency data written to stream index <" << out_stream< 1 >().get_current_index() <<
">" );
288 if( ! out_stream< 1 >().
set( stream::s_run ) )
290 LERROR(
plog,
"Exiting due to stream error" );
297 t_time_pkt_received =
true;
306 " pkt_session = " << a_ctx.
f_time_data->get_pkt_in_session() <<
310 LTRACE(
plog,
"Time data written to stream index <" << out_stream< 1 >().get_current_index() <<
">" );
311 if( ! out_stream< 0 >().
set( stream::s_run ) )
313 LERROR(
plog,
"Exiting due to stream error" );
331 LTRACE(
plog,
"TF ROACH receiver reading stream 0 at index " << in_stream< 0 >().get_current_index() );
342 LDEBUG(
plog,
"Entering frequency-only loop" );
362 LTRACE(
plog,
"tfrr read s_none" );
368 LTRACE(
plog,
"tfrr read s_error" );
375 LDEBUG(
plog,
"TF ROACH receiver is exiting" );
378 out_stream< 0 >().
set( stream::s_exit );
379 out_stream< 1 >().
set( stream::s_exit );
386 LDEBUG(
plog,
"TF ROACH receiver is stopping" );
389 if( ! out_stream< 1 >().
set( stream::s_stop ) )
break;
394 LDEBUG(
plog,
"TF ROACH receiver is starting" );
402 if( have_instruction() )
404 if(
f_paused && use_instruction() == midge::instruction::resume )
406 LDEBUG(
plog,
"TF ROACH receiver resuming" );
407 out_stream< 1 >().data()->set_pkt_in_session( 0 );
408 if( ! out_stream< 1 >().
set( stream::s_start ) )
throw midge::node_nonfatal_error() <<
"Stream 2 error while starting";
412 else if( !
f_paused && use_instruction() == midge::instruction::pause )
414 LDEBUG(
plog,
"TF ROACH receiver pausing" );
415 if( ! out_stream< 1 >().
set( stream::s_stop ) )
throw midge::node_nonfatal_error() <<
"Stream 2 error while stopping";
426 LTRACE(
plog,
"Handling packet at stream index <" << in_stream< 0 >().get_current_index() <<
">; size = " << a_ctx.
f_pkt_size <<
" bytes; block address = " << (
void*)a_ctx.
f_memory_block->
block() );
429 LWARN(
plog,
"Improper packet size; packet may be malformed: received " << a_ctx.
f_memory_block->get_n_bytes_used() <<
" bytes; expected " << f_udp_buffer_size <<
" bytes" );
439 LTRACE(
plog,
"Raw packet header: " << std::hex << t_raw_packet->
f_word_0 <<
", " << t_raw_packet->
f_word_1 <<
", " << t_raw_packet->
f_word_2 <<
", " << t_raw_packet->
f_word_3 );
453 " pkt_session = " << a_ctx.
f_freq_data->get_pkt_in_session() <<
457 LTRACE(
plog,
"Frequency data written to stream index <" << out_stream< 1 >().get_current_index() <<
">" );
458 if( ! out_stream< 1 >().
set( stream::s_run ) )
460 LERROR(
plog,
"Exiting due to stream error" );
469 LTRACE(
plog,
"TF ROACH receiver reading stream 0 at index " << in_stream< 0 >().get_current_index() );
494 LDEBUG(
plog,
"Configuring tf_roach_receiver with:\n" << a_config );
495 a_node->set_time_length( a_config.get_value(
"time-length", a_node->get_time_length() ) );
496 a_node->set_freq_length( a_config.get_value(
"freq-length", a_node->get_freq_length() ) );
497 a_node->set_udp_buffer_size( a_config.get_value(
"udp-buffer-size", a_node->get_udp_buffer_size() ) );
498 a_node->set_time_sync_tol( a_config.get_value(
"time-sync-tol", a_node->get_time_sync_tol() ) );
499 a_node->set_start_paused( a_config.get_value(
"start-paused", a_node->get_start_paused() ) );
500 a_node->set_force_time_first( a_config.get_value(
"force-time-first", a_node->get_force_time_first() ) );
506 LDEBUG(
plog,
"Dumping tf_roach_receiver configuration" );
507 a_config.add(
"time-length", scarab::param_value( a_node->get_time_length() ) );
508 a_config.add(
"freq-length", scarab::param_value( a_node->get_freq_length() ) );
509 a_config.add(
"udp-buffer-size", scarab::param_value( a_node->get_udp_buffer_size() ) );
510 a_config.add(
"time-sync-tol", scarab::param_value( a_node->get_time_sync_tol() ) );
511 a_config.add(
"start-paused", scarab::param_value( a_node->get_start_paused() ) );
512 a_config.add(
"force-time-first", scarab::param_value( a_node->get_force_time_first() ) );
519 if( a_cmd ==
"freq-only" )
524 else if( a_cmd ==
"time-and-freq" )
531 LWARN(
plog,
"Unrecognized command: <" << a_cmd <<
">" );
virtual void do_dump_config(const tf_roach_receiver *a_node, scarab::param_node &a_config) const
virtual void initialize()
virtual ~tf_roach_receiver_binding()
tf_roach_receiver_binding()
uint64_t f_time_session_pkt_counter
std::atomic< bool > f_break_exe_func
std::unique_ptr< char[] > f_buffer_ptr
static scarab::logger plog("batch_executor")
bool exe_freq_only(exe_func_context &a_ctx)
const roach_packet & packet() const
void byteswap_inplace(raw_roach_packet *a_pkt)
const iq_t * get_array() const
uint64_t f_freq_session_pkt_counter
bool(tf_roach_receiver::* f_exe_func)(exe_func_context &a_ctx)
bool exe_time_and_freq(exe_func_context &a_ctx)
virtual void do_apply_config(tf_roach_receiver *a_node, const scarab::param_node &a_config) const
virtual bool do_run_command(tf_roach_receiver *a_node, const std::string &a_cmd, const scarab::param_node &) const
in derived classes, should throw a std::exception if the command fails, and return false if the comma...
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
bool get_freq_not_time() const
A transformer to receive raw blocks of memory, parse them, and distribute them as time and frequency ...
LOGGER(plog, "egg_writer")
uint32_t get_digital_id() const
virtual void execute(midge::diptera *a_midge=nullptr)
void switch_to_freq_only()
virtual ~tf_roach_receiver()
std::mutex f_exe_func_mutex
void switch_to_time_and_freq()
const iq_t * get_array() const
midge::enum_t f_in_command
memory_block * f_memory_block
uint32_t get_unix_time() const