Psyllid  v1.12.4
Project 8 Data Acquisisition Software
tf_roach_receiver.cc
Go to the documentation of this file.
1 
2 /*
3  * udp_receiver.cc
4  *
5  * Created on: Dec 25, 2015
6  * Author: nsoblath
7  */
8 
9 #include "tf_roach_receiver.hh"
10 
11 #include "psyllid_error.hh"
12 
13 #include "logger.hh"
14 #include "param.hh"
15 
16 #include <thread>
17 #include <memory>
18 #include <sys/types.h> // for ssize_t
19 
20 using midge::stream;
21 
22 namespace psyllid
23 {
24  REGISTER_NODE_AND_BUILDER( tf_roach_receiver, "tf-roach-receiver", tf_roach_receiver_binding );
25 
26  LOGGER( plog, "tf_roach_receiver" );
27 
29  f_time_length( 10 ),
30  f_freq_length( 10 ),
31  f_udp_buffer_size( sizeof( roach_packet ) ),
32  f_time_sync_tol( 2 ),
33  f_start_paused( true ),
34  f_force_time_first( false ),
35  f_skip_after_stop( 0 ),
36  f_exe_func( &tf_roach_receiver::exe_time_and_freq ),
37  f_exe_func_mutex(),
38  f_break_exe_func( false ),
39  f_paused( true ),
40  f_time_session_pkt_counter( 0 ),
41  f_freq_session_pkt_counter( 0 )
42  {
43  }
44 
46  {
47  }
48 
50  {
51  LDEBUG( plog, "Requesting switch to frequency-only mode" );
52  f_exe_func_mutex.lock();
54  {
55  f_break_exe_func.store( true );
57  }
58  f_exe_func_mutex.unlock();
59  return;
60  }
61 
63  {
64  LDEBUG( plog, "Requesting switch to time-and-frequency mode" );
65  f_exe_func_mutex.lock();
67  {
68  f_break_exe_func.store( true );
70  }
71  f_exe_func_mutex.unlock();
72  return;
73  }
74 
76  {
77  out_buffer< 0 >().initialize( f_time_length );
78  out_buffer< 1 >().initialize( f_freq_length );
79  return;
80  }
81 
82  void tf_roach_receiver::execute( midge::diptera* a_midge )
83  {
84  try
85  {
86  LDEBUG( plog, "Executing the TF ROACH receiver" );
87 
88  exe_func_context t_ctx;
89 
90  t_ctx.f_midge = a_midge;
91 
92  t_ctx.f_in_command = stream::s_none;
93 
94  t_ctx.f_memory_block = nullptr;
95  t_ctx.f_time_data = nullptr;
96  t_ctx.f_freq_data = nullptr;
97 
98  //LDEBUG( plog, "Server is listening" );
99 
100  t_ctx.f_buffer_ptr.reset( new char[ f_udp_buffer_size ] );
101 
102  // start out as configured (default is paused)
103  //out_stream< 0 >().set( stream::s_start );
104  //out_stream< 1 >().set( stream::s_start );
105  f_paused = f_start_paused;
106 
107  //uint64_t t_time_batch_pkt = 0;
108  //uint64_t t_freq_batch_pkt = 0;
109 
110  t_ctx.f_pkt_size = 0;
111 
112  if( ! f_start_paused )
113  {
114  LDEBUG( plog, "TF ROACH receiver starting unpaused" );
115  out_stream< 0 >().data()->set_pkt_in_session( 0 );
116  out_stream< 1 >().data()->set_pkt_in_session( 0 );
117  if( ! out_stream< 0 >().set( stream::s_start ) ) return;
118  if( ! out_stream< 1 >().set( stream::s_start ) ) return;
121  }
122 
123  try
124  {
125  LPROG( plog, "Starting main loop; waiting for packets" );
126  f_break_exe_func.store( true );
127  while( f_break_exe_func.load() )
128  {
129  f_break_exe_func.store( false );
130  f_exe_func_mutex.lock();
131  if( ! (this->*f_exe_func)( t_ctx ) ) return;
132  }
133  }
134  catch( error& e )
135  {
136  throw;
137  }
138 
139  LINFO( plog, "TF ROACH receiver is exiting" );
140 
141  // normal exit condition
142  LDEBUG( plog, "Stopping output streams" );
143  bool t_stop_ok = out_stream< 0 >().set( stream::s_stop ) && out_stream< 1 >().set( stream::s_stop );
144  if( ! t_stop_ok ) return;
145 
146  LDEBUG( plog, "Exiting output streams" );
147  if( ! out_stream< 0 >().set( stream::s_exit ) ) return;
148  out_stream< 1 >().set( stream::s_exit );
149 
150  return;
151  }
152  catch( std::exception )
153  {
154  if( a_midge ) a_midge->throw_ex( std::current_exception() );
155  else throw;
156  }
157 
158  return;
159  }
160 
162  {
163  f_exe_func_mutex.unlock();
164  bool t_time_pkt_received = !f_force_time_first;
165 
166  LDEBUG( plog, "Entering time-and-frequency loop" );
167  while( ! is_canceled() && ! f_break_exe_func.load() )
168  {
169  // check if we've received a pause or unpause instruction
170  //try
171  //{
172  // check_instruction();
173  //}
174  //catch( error& e )
175  //{
176  // LERROR( plog, e.what() );
177  // break;
178  //}
179 
180  // the stream::get function is called at the end of the loop so that we can enter the exe func after switching the function pointer
181  // and still handle the input command appropriately
182 
183  if( a_ctx.f_in_command == stream::s_none )
184  {
185 
186  LTRACE( plog, "tfrr read s_none" );
187 
188  }
189  else if( a_ctx.f_in_command == stream::s_error )
190  {
191 
192  LTRACE( plog, "tfrr read s_error" );
193  break;
194 
195  }
196  else if( a_ctx.f_in_command == stream::s_exit )
197  {
198 
199  LDEBUG( plog, "TF ROACH receiver is exiting" );
200  // Output streams are stopped here because even though this is controlled by the pause/unpause commands,
201  // receiving a stop command from upstream overrides that.
202  out_stream< 0 >().set( stream::s_exit );
203  out_stream< 1 >().set( stream::s_exit );
204  return false;
205 
206  }
207  else if( a_ctx.f_in_command == stream::s_stop )
208  {
209 
210  LDEBUG( plog, "TF ROACH receiver is stopping" );
211  // Output streams are stopped here because even though this is controlled by the pause/unpause commands,
212  // receiving a stop command from upstream overrides that.
213  if( ! out_stream< 0 >().set( stream::s_stop ) ) break;
214  if( ! out_stream< 1 >().set( stream::s_stop ) ) break;
215 
216  }
217  else if( a_ctx.f_in_command == stream::s_start )
218  {
219 
220  LDEBUG( plog, "TF ROACH receiver is starting" );
221  // Output streams are not started here because this is controlled by the pause/unpause commands
222 
223  }
224  else
225  {
226  if( have_instruction() )
227  {
228  if( f_paused && use_instruction() == midge::instruction::resume )
229  {
230  LDEBUG( plog, "TF ROACH receiver resuming" );
231  out_stream< 0 >().data()->set_pkt_in_session( 0 );
232  out_stream< 1 >().data()->set_pkt_in_session( 0 );
233  if( ! out_stream< 0 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 0 error while starting";
234  if( ! out_stream< 1 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 1 error while starting";
237  f_paused = false;
238  }
239  else if( ! f_paused && use_instruction() == midge::instruction::pause )
240  {
241  LDEBUG( plog, "TF ROACH receiver pausing" );
242  if( ! out_stream< 0 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 0 error while stopping";
243  if( ! out_stream< 1 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 1 error while stopping";
244  f_paused = true;
245  t_time_pkt_received = !f_force_time_first;
246  }
247  }
248 
249  // do nothing if paused
250  if( ! f_paused && a_ctx.f_in_command == stream::s_run )
251  {
252  a_ctx.f_memory_block = in_stream< 0 >().data();
253 
254  a_ctx.f_pkt_size = a_ctx.f_memory_block->get_n_bytes_used();
255  LTRACE( plog, "Handling packet at stream index <" << in_stream< 0 >().get_current_index() << ">; size = " << a_ctx.f_pkt_size << " bytes; block address = " << (void*)a_ctx.f_memory_block->block() );
256  if( a_ctx.f_pkt_size != f_udp_buffer_size )
257  {
258  LWARN( plog, "Improper packet size; packet may be malformed: received " << a_ctx.f_memory_block->get_n_bytes_used() << " bytes; expected " << f_udp_buffer_size << " bytes" );
259  if( a_ctx.f_pkt_size == 0 ) continue;
260  }
261 
262  byteswap_inplace( reinterpret_cast< raw_roach_packet* >( a_ctx.f_memory_block->block() ) );
263  roach_packet* t_roach_packet = reinterpret_cast< roach_packet* >( a_ctx.f_memory_block->block() );
264 
265  // debug purposes only
266  #ifndef NDEBUG
267  raw_roach_packet* t_raw_packet = reinterpret_cast< raw_roach_packet* >( a_ctx.f_memory_block->block() );
268  LTRACE( plog, "Raw packet header: " << std::hex << t_raw_packet->f_word_0 << ", " << t_raw_packet->f_word_1 << ", " << t_raw_packet->f_word_2 << ", " << t_raw_packet->f_word_3 );
269  #endif
270 
271  if( t_roach_packet->f_freq_not_time )
272  {
273  // packet is frequency data
274  if( ! t_time_pkt_received ) continue;
275  //t_freq_batch_pkt = t_roach_packet->f_pkt_in_batch;
276 
277  a_ctx.f_freq_data = out_stream< 1 >().data();
278  a_ctx.f_freq_data->set_pkt_in_session( f_freq_session_pkt_counter++ );
279  ::memcpy( &a_ctx.f_freq_data->packet(), t_roach_packet, a_ctx.f_pkt_size );
280 
281  LTRACE( plog, "Frequency data received (" << a_ctx.f_pkt_size << " bytes): chan = " << a_ctx.f_freq_data->get_digital_id() <<
282  " time = " << a_ctx.f_freq_data->get_unix_time() <<
283  " pkt_session = " << a_ctx.f_freq_data->get_pkt_in_session() <<
284  " pkt_batch = " << t_roach_packet->f_pkt_in_batch <<
285  " freqNotTime = " << a_ctx.f_freq_data->get_freq_not_time() <<
286  " first 8 bins: " << (int)a_ctx.f_freq_data->get_array()[ 0 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 0 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 1 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 1 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 2 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 2 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 3 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 3 ][ 1 ]);
287  LTRACE( plog, "Frequency data written to stream index <" << out_stream< 1 >().get_current_index() << ">" );
288  if( ! out_stream< 1 >().set( stream::s_run ) )
289  {
290  LERROR( plog, "Exiting due to stream error" );
291  break;
292  }
293  }
294  else
295  {
296  // packet is time data
297  t_time_pkt_received = true;
298  //t_time_batch_pkt = t_roach_packet->f_pkt_in_batch;
299 
300  a_ctx.f_time_data = out_stream< 0 >().data();
301  a_ctx.f_time_data->set_pkt_in_session( f_time_session_pkt_counter++ );
302  ::memcpy( &a_ctx.f_time_data->packet(), t_roach_packet, a_ctx.f_pkt_size );
303 
304  LTRACE( plog, "Time data received (" << a_ctx.f_pkt_size << " bytes): chan = " << a_ctx.f_time_data->get_digital_id() <<
305  " time = " << a_ctx.f_time_data->get_unix_time() <<
306  " pkt_session = " << a_ctx.f_time_data->get_pkt_in_session() <<
307  " pkt_batch = " << t_roach_packet->f_pkt_in_batch <<
308  " freqNotTime = " << a_ctx.f_time_data->get_freq_not_time() <<
309  " first 8 bins: " << (int)a_ctx.f_time_data->get_array()[ 0 ][ 0 ] << ", " << (int)a_ctx.f_time_data->get_array()[ 0 ][ 1 ] << " -- " << (int)a_ctx.f_time_data->get_array()[ 1 ][ 0 ] << ", " << (int)a_ctx.f_time_data->get_array()[ 1 ][ 1 ] << " -- " << (int)a_ctx.f_time_data->get_array()[ 2 ][ 0 ] << ", " << (int)a_ctx.f_time_data->get_array()[ 2 ][ 1 ] << " -- " << (int)a_ctx.f_time_data->get_array()[ 3 ][ 0 ] << ", " << (int)a_ctx.f_time_data->get_array()[ 3 ][ 1 ]);
310  LTRACE( plog, "Time data written to stream index <" << out_stream< 1 >().get_current_index() << ">" );
311  if( ! out_stream< 0 >().set( stream::s_run ) )
312  {
313  LERROR( plog, "Exiting due to stream error" );
314  break;
315  }
316  }
317 
318  //if( t_time_batch_pkt == t_freq_batch_pkt )
319  //{
320  // LTRACE( plog, "Time and frequency batch IDs match: " << t_time_batch_pkt );
321  // LTRACE( plog, "Frequency data written to stream index <" << out_stream< 1 >().get_current_index() << ">" );
322  // out_stream< 1 >().set( stream::s_run );
323  // LTRACE( plog, "Time data written to stream index <" << out_stream< 1 >().get_current_index() << ">" );
324  // out_stream< 0 >().set( stream::s_run );
325  // }
326  } // if block for input command == run
327 
328  } // if-else for input command
329 
330  a_ctx.f_in_command = in_stream< 0 >().get();
331  LTRACE( plog, "TF ROACH receiver reading stream 0 at index " << in_stream< 0 >().get_current_index() );
332 
333  } // main while loop
334 
335  return true;
336  }
337 
339  {
340  f_exe_func_mutex.unlock();
341 
342  LDEBUG( plog, "Entering frequency-only loop" );
343  while( ! is_canceled() && ! f_break_exe_func.load() )
344  {
345  // check if we've received a pause or unpause instruction
346  //try
347  //{
348  // check_instruction();
349  //}
350  //catch( error& e )
351  //{
352  // LERROR( plog, e.what() );
353  // break;
354  //}
355 
356  // the stream::get function is called at the end of the loop so that we can enter the exe func after switching the function pointer
357  // and still handle the input command appropriately
358 
359  if( a_ctx.f_in_command == stream::s_none )
360  {
361 
362  LTRACE( plog, "tfrr read s_none" );
363 
364  }
365  else if( a_ctx.f_in_command == stream::s_error )
366  {
367 
368  LTRACE( plog, "tfrr read s_error" );
369  break;
370 
371  }
372  else if( a_ctx.f_in_command == stream::s_exit )
373  {
374 
375  LDEBUG( plog, "TF ROACH receiver is exiting" );
376  // Output streams are stopped here because even though this is controlled by the pause/unpause commands,
377  // receiving a stop command from upstream overrides that.
378  out_stream< 0 >().set( stream::s_exit );
379  out_stream< 1 >().set( stream::s_exit );
380  return false;
381 
382  }
383  else if( a_ctx.f_in_command == stream::s_stop )
384  {
385 
386  LDEBUG( plog, "TF ROACH receiver is stopping" );
387  // Output streams are stopped here because even though this is controlled by the pause/unpause commands,
388  // receiving a stop command from upstream overrides that.
389  if( ! out_stream< 1 >().set( stream::s_stop ) ) break;
390 
391  }
392  else if( a_ctx.f_in_command == stream::s_start )
393  {
394  LDEBUG( plog, "TF ROACH receiver is starting" );
395  // Output streams are not started here because this is controled by the pause/unpause commands
396  continue;
397  }
398 
399  else
400  {
401  // check whether an instruction has been received
402  if( have_instruction() )
403  {
404  if( f_paused && use_instruction() == midge::instruction::resume )
405  {
406  LDEBUG( plog, "TF ROACH receiver resuming" );
407  out_stream< 1 >().data()->set_pkt_in_session( 0 );
408  if( ! out_stream< 1 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 2 error while starting";
410  f_paused = false;
411  }
412  else if( ! f_paused && use_instruction() == midge::instruction::pause )
413  {
414  LDEBUG( plog, "TF ROACH receiver pausing" );
415  if( ! out_stream< 1 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 2 error while stopping";
416  f_paused = true;
417  }
418  }
419 
420  // do nothing if paused
421  if( ! f_paused && a_ctx.f_in_command == stream::s_run )
422  {
423  a_ctx.f_memory_block = in_stream< 0 >().data();
424 
425  a_ctx.f_pkt_size = a_ctx.f_memory_block->get_n_bytes_used();
426  LTRACE( plog, "Handling packet at stream index <" << in_stream< 0 >().get_current_index() << ">; size = " << a_ctx.f_pkt_size << " bytes; block address = " << (void*)a_ctx.f_memory_block->block() );
427  if( a_ctx.f_pkt_size != f_udp_buffer_size )
428  {
429  LWARN( plog, "Improper packet size; packet may be malformed: received " << a_ctx.f_memory_block->get_n_bytes_used() << " bytes; expected " << f_udp_buffer_size << " bytes" );
430  if( a_ctx.f_pkt_size == 0 ) continue;
431  }
432 
433  byteswap_inplace( reinterpret_cast< raw_roach_packet* >( a_ctx.f_memory_block->block() ) );
434  roach_packet* t_roach_packet = reinterpret_cast< roach_packet* >( a_ctx.f_memory_block->block() );
435 
436  // debug purposes only
437  #ifndef NDEBUG
438  raw_roach_packet* t_raw_packet = reinterpret_cast< raw_roach_packet* >( a_ctx.f_memory_block->block() );
439  LTRACE( plog, "Raw packet header: " << std::hex << t_raw_packet->f_word_0 << ", " << t_raw_packet->f_word_1 << ", " << t_raw_packet->f_word_2 << ", " << t_raw_packet->f_word_3 );
440  #endif
441 
442  if( t_roach_packet->f_freq_not_time )
443  {
444  // packet is frequency data
445  //t_freq_batch_pkt = t_roach_packet->f_pkt_in_batch;
446 
447  a_ctx.f_freq_data = out_stream< 1 >().data();
448  a_ctx.f_freq_data->set_pkt_in_session( f_freq_session_pkt_counter++ );
449  ::memcpy( &a_ctx.f_freq_data->packet(), t_roach_packet, a_ctx.f_pkt_size );
450 
451  LTRACE( plog, "Frequency data received (" << a_ctx.f_pkt_size << " bytes): chan = " << a_ctx.f_freq_data->get_digital_id() <<
452  " time = " << a_ctx.f_freq_data->get_unix_time() <<
453  " pkt_session = " << a_ctx.f_freq_data->get_pkt_in_session() <<
454  " pkt_batch = " << t_roach_packet->f_pkt_in_batch <<
455  " freqNotTime = " << a_ctx.f_freq_data->get_freq_not_time() <<
456  " first 8 bins: " << (int)a_ctx.f_freq_data->get_array()[ 0 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 0 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 1 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 1 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 2 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 2 ][ 1 ] << " -- " << (int)a_ctx.f_freq_data->get_array()[ 3 ][ 0 ] << ", " << (int)a_ctx.f_freq_data->get_array()[ 3 ][ 1 ]);
457  LTRACE( plog, "Frequency data written to stream index <" << out_stream< 1 >().get_current_index() << ">" );
458  if( ! out_stream< 1 >().set( stream::s_run ) )
459  {
460  LERROR( plog, "Exiting due to stream error" );
461  break;
462  }
463  }
464  } // if block for input command == run
465 
466  } // if-else block for input command
467 
468  a_ctx.f_in_command = in_stream< 0 >().get();
469  LTRACE( plog, "TF ROACH receiver reading stream 0 at index " << in_stream< 0 >().get_current_index() );
470 
471  } // main while loop
472 
473  return true;
474  }
475 
477  {
478  out_buffer< 0 >().finalize();
479  out_buffer< 1 >().finalize();
480  return;
481  }
482 
485  {
486  }
487 
489  {
490  }
491 
492  void tf_roach_receiver_binding::do_apply_config( tf_roach_receiver* a_node, const scarab::param_node& a_config ) const
493  {
494  LDEBUG( plog, "Configuring tf_roach_receiver with:\n" << a_config );
495  a_node->set_time_length( a_config.get_value( "time-length", a_node->get_time_length() ) );
496  a_node->set_freq_length( a_config.get_value( "freq-length", a_node->get_freq_length() ) );
497  a_node->set_udp_buffer_size( a_config.get_value( "udp-buffer-size", a_node->get_udp_buffer_size() ) );
498  a_node->set_time_sync_tol( a_config.get_value( "time-sync-tol", a_node->get_time_sync_tol() ) );
499  a_node->set_start_paused( a_config.get_value( "start-paused", a_node->get_start_paused() ) );
500  a_node->set_force_time_first( a_config.get_value( "force-time-first", a_node->get_force_time_first() ) );
501  return;
502  }
503 
504  void tf_roach_receiver_binding::do_dump_config( const tf_roach_receiver* a_node, scarab::param_node& a_config ) const
505  {
506  LDEBUG( plog, "Dumping tf_roach_receiver configuration" );
507  a_config.add( "time-length", scarab::param_value( a_node->get_time_length() ) );
508  a_config.add( "freq-length", scarab::param_value( a_node->get_freq_length() ) );
509  a_config.add( "udp-buffer-size", scarab::param_value( a_node->get_udp_buffer_size() ) );
510  a_config.add( "time-sync-tol", scarab::param_value( a_node->get_time_sync_tol() ) );
511  a_config.add( "start-paused", scarab::param_value( a_node->get_start_paused() ) );
512  a_config.add( "force-time-first", scarab::param_value( a_node->get_force_time_first() ) );
513  return;
514 
515  }
516 
517  bool tf_roach_receiver_binding::do_run_command( tf_roach_receiver* a_node, const std::string& a_cmd, const scarab::param_node& ) const
518  {
519  if( a_cmd == "freq-only" )
520  {
521  a_node->switch_to_freq_only();
522  return true;
523  }
524  else if( a_cmd == "time-and-freq" )
525  {
526  a_node->switch_to_time_and_freq();
527  return true;
528  }
529  else
530  {
531  LWARN( plog, "Unrecognized command: <" << a_cmd << ">" );
532  return false;
533  }
534  }
535 
536 } /* namespace psyllid */
virtual void do_dump_config(const tf_roach_receiver *a_node, scarab::param_node &a_config) const
std::atomic< bool > f_break_exe_func
static scarab::logger plog("batch_executor")
bool exe_freq_only(exe_func_context &a_ctx)
const roach_packet & packet() const
void byteswap_inplace(raw_roach_packet *a_pkt)
Definition: roach_packet.cc:22
const iq_t * get_array() const
Definition: time_data.hh:39
bool(tf_roach_receiver::* f_exe_func)(exe_func_context &a_ctx)
bool exe_time_and_freq(exe_func_context &a_ctx)
virtual void do_apply_config(tf_roach_receiver *a_node, const scarab::param_node &a_config) const
virtual bool do_run_command(tf_roach_receiver *a_node, const std::string &a_cmd, const scarab::param_node &) const
in derived classes, should throw a std::exception if the command fails, and return false if the comma...
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A transformer to receive raw blocks of memory, parse them, and distribute them as time and frequency ...
LOGGER(plog, "egg_writer")
uint32_t get_digital_id() const
virtual void execute(midge::diptera *a_midge=nullptr)
const iq_t * get_array() const
Definition: freq_data.hh:39
uint32_t get_unix_time() const