Psyllid  v1.12.4
Project 8 Data Acquisisition Software
frequency_mask_trigger.cc
Go to the documentation of this file.
1 /*
2  * frequency_mask_trigger.cc
3  *
4  * Created on: Feb 8, 2016
5  * Author: nsoblath
6  */
7 
9 
10 #include "psyllid_error.hh"
11 
12 #include "logger.hh"
13 #include "param_codec.hh"
14 #include "time.hh"
15 
16 #include "tk_spline.hh"
17 
18 #include <cmath>
19 
20 using midge::stream;
21 
22 namespace psyllid
23 {
24 
25  REGISTER_NODE_AND_BUILDER( frequency_mask_trigger, "frequency-mask-trigger", frequency_mask_trigger_binding );
26 
27  LOGGER( plog, "frequency_mask_trigger" );
28 
29  // trigger_mode_t utility functions
31  {
32  // note that the string representations use hyphens, not underscores
33  switch (a_trigger_mode) {
34  case frequency_mask_trigger::trigger_mode_t::single_level: return "single-level";
36  default: throw psyllid::error() << "trigger_mode value <" << trigger_mode_to_uint(a_trigger_mode) << "> not recognized";
37  }
38  }
40  {
43  throw psyllid::error() << "string <" << a_trigger_mode_string << "> not recognized as valid trigger_mode type";
44  }
45 
46  // threshold_t utility functions
48  {
49  switch (a_threshold) {
51  case frequency_mask_trigger::threshold_t::sigma: return "sigma";
52  default: throw psyllid::error() << "threshold value <" << threshold_to_uint(a_threshold) << "> not recognized";
53  }
54  }
56  {
59  throw psyllid::error() << "string <" << a_threshold_string << "> not recognized as valid threshold type";
60  }
61 
63  f_length( 10 ),
64  f_n_packets_for_mask( 10 ),
65  f_threshold_snr( 30. ),
66  f_threshold_snr_high( 30. ),
67  f_threshold_sigma( 30 ),
68  f_threshold_sigma_high( 30 ),
69  f_threshold_type( threshold_t::sigma ),
70  f_n_spline_points( 20 ),
71  f_status( status_t::mask_update ),
72  f_trigger_mode(trigger_mode_t::single_level ),
73  f_n_excluded_bins( 0 ),
75  f_mask(),
76  f_mask2(),
79  f_n_summed( 0 ),
80  f_mask_mutex()
81  {
82  }
83 
85  {
86  }
87 
89  {
90  if( a_n_pkts == 0 )
91  {
92  throw error() << "Number of packets for the trigger mask must be non-zero";
93  }
94  f_n_packets_for_mask = a_n_pkts;
95  return;
96  }
97 
99  {
100  f_threshold_snr = a_ampl_snr * a_ampl_snr;
101  LDEBUG( plog, "Setting threshold (power via ampl) to " << f_threshold_snr );
102  return;
103  }
104 
106  {
107  f_threshold_snr = pow( 10, a_dB / 10. );
108  LDEBUG( plog, "Setting threshold (power via dB) to " << f_threshold_snr );
109  return;
110  }
111 
112  void frequency_mask_trigger::calculate_sigma_mask_spline_points( std::vector< double >& t_x_vals, std::vector< double >& t_y_vals, double threshold )
113  {
114  unsigned t_n_bins_per_point = f_average_data.size() / f_n_spline_points;
115  for( unsigned i_spline_point = 0; i_spline_point < f_n_spline_points; ++i_spline_point )
116  {
117  unsigned t_bin_begin = i_spline_point * t_n_bins_per_point;
118  unsigned t_bin_end = i_spline_point == f_n_spline_points - 1 ? f_average_data.size() : t_bin_begin + t_n_bins_per_point;
119  double t_mean = 0.;
120  for( unsigned i_bin = t_bin_begin; i_bin < t_bin_end; ++i_bin )
121  {
122  t_mean += f_average_data[ i_bin ] + threshold * sqrt( f_variance_data[ i_bin ] );
123  }
124  t_mean *= 1. / (double)(t_bin_end - t_bin_begin);
125  t_y_vals[ i_spline_point ] = t_mean;
126  t_x_vals[ i_spline_point ] = (double)t_bin_begin + 0.5 * (double)(t_bin_end - 1 - t_bin_begin);
127  }
128  }
129 
130  void frequency_mask_trigger::calculate_snr_mask_spline_points( std::vector< double >& t_x_vals, std::vector< double >& t_y_vals, double threshold )
131  {
132  unsigned t_n_bins_per_point = f_average_data.size() / f_n_spline_points;
133  for( unsigned i_spline_point = 0; i_spline_point < f_n_spline_points; ++i_spline_point )
134  {
135  unsigned t_bin_begin = i_spline_point * t_n_bins_per_point;
136  unsigned t_bin_end = i_spline_point == f_n_spline_points - 1 ? f_average_data.size() : t_bin_begin + t_n_bins_per_point;
137  double t_mean = 0.;
138  for( unsigned i_bin = t_bin_begin; i_bin < t_bin_end; ++i_bin )
139  {
140  t_mean += f_average_data[ i_bin ] * threshold;
141  }
142  t_mean *= 1. / (double)(t_bin_end - t_bin_begin);
143  t_y_vals[ i_spline_point ] = t_mean;
144  t_x_vals[ i_spline_point ] = (double)t_bin_begin + 0.5 * (double)(t_bin_end - 1 - t_bin_begin);
145  }
146  }
147 
148  void frequency_mask_trigger::set_mask_parameters_from_node( const scarab::param_node& a_mask_and_data_values )
149  {
150  // set n-points
151  f_n_packets_for_mask = a_mask_and_data_values["n-packets"]().as_uint();
152  // grab the new arrays
153  const scarab::param_array t_new_mask = a_mask_and_data_values["mask"].as_array();
154  const scarab::param_array t_new_mask2 = a_mask_and_data_values["mask2"].as_array();
155  const scarab::param_array t_new_data_mean = a_mask_and_data_values["data-mean"].as_array();
156  const scarab::param_array t_new_data_variance = a_mask_and_data_values["data-variance"].as_array();
157  LDEBUG( plog, "Finished reading mask" );
158  // prep the data members
159  f_mask.clear();
160  f_mask2.clear();
161  f_average_data.clear();
162  f_variance_data.clear();
163  f_mask.resize( t_new_mask.size() );
164  f_mask2.resize( 0 );
165  f_average_data.resize( t_new_data_mean.size() );
166  f_variance_data.resize( t_new_data_variance.size() );
167 
168  // assign new values
169  for( unsigned i_bin = 0; i_bin < t_new_mask.size(); ++i_bin )
170  {
171  f_mask[ i_bin ] = t_new_mask[i_bin]().as_double();
172  f_average_data[ i_bin ] = t_new_data_mean[i_bin]().as_double();
173  f_variance_data[ i_bin ] = t_new_data_variance[i_bin]().as_double();
174  }
175  //TODO what case are we covering here, and what should we do?
176  //if ( t_new_mask2 != nullptr )
177  //{
178  if ( t_new_mask2.size() != t_new_mask.size() ) throw psyllid::error() << "new mask and new mask2 must have same size";
179 
180  f_mask2.resize( t_new_mask2.size() );
181  for( unsigned i_bin = 0; i_bin < t_new_mask2.size(); ++i_bin )
182  {
183  f_mask2[ i_bin ] = t_new_mask2[i_bin]().as_double();
184  }
185  //}
186  }
187 
189  {
190  LDEBUG( plog, "Requesting switch to update-mask mode" );
191  f_exe_func_mutex.lock();
193  {
194  f_break_exe_func.store( true );
195  f_status = status_t::mask_update;
197  }
198  f_exe_func_mutex.unlock();
199  return;
200  }
201 
203  {
204  LDEBUG( plog, "Requesting switch to apply-trigger mode" );
205  f_exe_func_mutex.lock();
206  if ( f_trigger_mode == trigger_mode_t::single_level)
207  {
209  {
210  f_break_exe_func.store( true );
211  f_status = status_t::triggering;
213  }
214  }
215  else
216  {
217  LDEBUG( plog, "Switching to exe_apply_two_thresholds" );
219  {
220  LDEBUG( plog, "Break exe_func" );
221  f_break_exe_func.store( true );
222  f_status = status_t::triggering;
224  }
225  }
226  f_exe_func_mutex.unlock();
227  return;
228  }
229 
230  void frequency_mask_trigger::write_mask( const std::string& a_filename )
231  {
232  std::unique_lock< std::mutex > t_lock( f_mask_mutex );
233 
234  if( f_mask.empty() )
235  {
236  throw error() << "Mask is empty";
237  }
238 
239  scarab::param_node t_output_node;
240  t_output_node.add( "timestamp", scarab::param_value( scarab::get_formatted_now() ) );
241  t_output_node.add( "n-packets", scarab::param_value( f_n_packets_for_mask ) );
242 
243  scarab::param_array t_mask_array = scarab::param_array();
244  t_mask_array.resize( f_mask.size() );
245  for( unsigned i_bin = 0; i_bin < f_mask.size(); ++i_bin )
246  {
247  t_mask_array.assign( i_bin, scarab::param_value( f_mask[ i_bin ] ) );
248  }
249  t_output_node.add( "mask", t_mask_array );
250 
251  if ( !f_mask2.empty() )
252  {
253  scarab::param_array t_mask_array2 = scarab::param_array();
254  t_mask_array2.resize( f_mask2.size() );
255  for( unsigned i_bin = 0; i_bin < f_mask2.size(); ++i_bin )
256  {
257  t_mask_array2.assign( i_bin, scarab::param_value( f_mask2[ i_bin ] ) );
258  }
259  t_output_node.add( "mask2", t_mask_array2 );
260  }
261 
262 
263  scarab::param_array t_mean_data_array = scarab::param_array();
264  scarab::param_array t_variance_data_array = scarab::param_array();
265  t_mean_data_array.resize( f_average_data.size() );
266  t_variance_data_array.resize( f_variance_data.size() );
267  for( unsigned i_bin = 0; i_bin < f_average_data.size(); ++i_bin )
268  {
269  t_mean_data_array.assign( i_bin, scarab::param_value( f_average_data[ i_bin ] ) );
270  t_variance_data_array.assign( i_bin, scarab::param_value( f_variance_data[ i_bin ] ) );
271  }
272  t_output_node.add( "data-mean", t_mean_data_array );
273  t_output_node.add( "data-variance", t_variance_data_array );
274 
275  scarab::param_translator t_param_translator = scarab::param_translator();
276 
277  LTRACE( plog, "Mask file:\n" << t_output_node );
278  if( ! t_param_translator.write_file( t_output_node, a_filename ) )
279  {
280  throw error() << "Unable to write mask to file <" << a_filename << ">";
281  }
282 
283  return;
284  }
285 
287  {
288  out_buffer< 0 >().initialize( f_length );
289  return;
290  }
291 
292  void frequency_mask_trigger::execute( midge::diptera* a_midge )
293  {
294  exe_func_context t_ctx;
295  t_ctx.f_midge = a_midge;
296  t_ctx.f_first_packet_after_start = false;
297  t_ctx.f_in_command = stream::s_none;
298 
299  try
300  {
301  LINFO( plog, "Starting main loop" );
302  f_break_exe_func.store( true );
303  while( f_break_exe_func.load() )
304  {
305  f_break_exe_func.store( false );
306  f_exe_func_mutex.lock();
307  (this->*f_exe_func)( t_ctx );
308  }
309  }
310  catch( error& e )
311  {
312  throw;
313  }
314  LINFO( plog, "FMT has exited" );
315  return;
316  }
317 
319  {
320  f_exe_func_mutex.unlock();
321 
322  try
323  {
324  freq_data* t_freq_data = nullptr;
325  double t_real = 0., t_imag = 0., t_abs_square = 0.;
326  unsigned t_array_size = 0;
327 
328  LDEBUG( plog, "Entering add-to-mask loop" );
329  while( ! is_canceled() && ! f_break_exe_func.load() )
330  {
331  // the stream::get function is called at the end of the loop so
332  // that we can enter the exe func after switching the function pointer
333  // and still handle the input command appropriately
334 
335  if( a_ctx.f_in_command == stream::s_none )
336  {
337  LTRACE( plog, "FMT read s_none" );
338  }
339  else if( a_ctx.f_in_command == stream::s_error )
340  {
341  LTRACE( plog, "FMT read s_error" );
342  break;
343  }
344  else if( a_ctx.f_in_command == stream::s_start )
345  {
346  LDEBUG( plog, "Starting mask update" );
347  a_ctx.f_first_packet_after_start = true;
348  f_n_summed = 0;
349  f_average_data.clear();
350  f_variance_data.clear();
351  }
352  else if( a_ctx.f_in_command == stream::s_run )
353  {
354  t_freq_data = in_stream< 0 >().data();
355 
356  try
357  {
358  if( f_n_summed >= f_n_packets_for_mask )
359  {
360  LTRACE( plog, "Already have enough packets for the mask; skipping this packet" );
361  }
362  else
363  {
364  LTRACE( plog, "Considering frequency data: chan = " << t_freq_data->get_digital_id() <<
365  " time = " << t_freq_data->get_unix_time() <<
366  " id = " << t_freq_data->get_pkt_in_session() <<
367  " freqNotTime = " << t_freq_data->get_freq_not_time() <<
368  " bin 0 [0] = " << (unsigned)t_freq_data->get_array()[ 0 ][ 0 ] );
369 
370  if( a_ctx.f_first_packet_after_start )
371  {
372  t_array_size = t_freq_data->get_array_size();
373  f_average_data.resize( t_array_size );
374  f_variance_data.resize( t_array_size );
375  for( unsigned i_bin = 0; i_bin < t_array_size; ++i_bin )
376  {
377  f_average_data[ i_bin ] = 0.;
378  f_variance_data[ i_bin ] = 0.;
379  }
380  a_ctx.f_first_packet_after_start = false;
381  }
382  for( unsigned i_bin = 0; i_bin < t_array_size; ++i_bin )
383  {
384  t_real = t_freq_data->get_array()[ i_bin ][ 0 ];
385  t_imag = t_freq_data->get_array()[ i_bin ][ 1 ];
386  t_abs_square = t_real*t_real + t_imag*t_imag;
387  f_variance_data[ i_bin ] = f_variance_data[ i_bin ] + t_abs_square * t_abs_square;
388  f_average_data[ i_bin ] = f_average_data[ i_bin ] + t_abs_square;
389  }
390 
391  ++f_n_summed;
392  LTRACE( plog, "Added data to frequency mask; mask now has " << f_n_summed << " packets" );
393 
394  if( f_n_summed == f_n_packets_for_mask )
395  {
396  // calculate average and variance
397  for( unsigned i_bin = 0; i_bin < f_average_data.size(); ++i_bin )
398  {
399  f_variance_data [ i_bin ] = ( f_variance_data [ i_bin ] - f_average_data [ i_bin ] * f_average_data [ i_bin ] / (double) f_n_summed ) /( (double) f_n_summed -1 );
400  f_average_data[ i_bin ] = f_average_data[ i_bin ]/ (double) f_n_summed;
401  }
402 
403  LDEBUG( plog, "Calculating spline for frequency mask" );
404  std::vector< double > t_x_vals( f_n_spline_points );
405  std::vector< double > t_y_vals( f_n_spline_points );
406 
407  if ( f_threshold_type == threshold_t::sigma )
408  {
409  calculate_sigma_mask_spline_points(t_x_vals, t_y_vals, f_threshold_sigma);
410 
411  // create the spline
412  tk::spline t_spline;
413  t_spline.set_points( t_x_vals, t_y_vals );
414 
415  f_mask_mutex.lock();
416  LDEBUG( plog, "Calculating frequency sigma mask" );
417  f_mask.resize( f_average_data.size() );
418  for( unsigned i_bin = 0; i_bin < f_mask.size(); ++i_bin )
419  {
420  f_mask[ i_bin ] = t_spline( i_bin );
421  }
422 
423  if ( f_trigger_mode == trigger_mode_t::two_level )
424  {
425  calculate_sigma_mask_spline_points(t_x_vals, t_y_vals, f_threshold_sigma_high);
426  // create the spline
427  tk::spline t_spline;
428  t_spline.set_points( t_x_vals, t_y_vals );
429 
430  LDEBUG( plog, "Calculating frequency sigma mask2" );
431 
432  f_mask2.resize( f_average_data.size() );
433  for( unsigned i_bin = 0; i_bin < f_mask2.size(); ++i_bin )
434  {
435  f_mask2[ i_bin ] = t_spline( i_bin );
436  }
437  }
438  }
439  else
440  {
441  calculate_snr_mask_spline_points(t_x_vals, t_y_vals, f_threshold_snr);
442 
443  // create the spline
444  tk::spline t_spline;
445  t_spline.set_points( t_x_vals, t_y_vals );
446 
447  f_mask_mutex.lock();
448  LDEBUG( plog, "Calculating frequency snr mask" );
449 
450  f_mask.resize( f_average_data.size() );
451  for( unsigned i_bin = 0; i_bin < f_mask.size(); ++i_bin )
452  {
453  f_mask[ i_bin ] = t_spline( i_bin );
454  }
455 
456  if ( f_trigger_mode == trigger_mode_t::two_level )
457  {
458  calculate_snr_mask_spline_points(t_x_vals, t_y_vals, f_threshold_snr_high);
459  // create the spline
460  tk::spline t_spline;
461  t_spline.set_points( t_x_vals, t_y_vals );
462 
463  LDEBUG( plog, "Calculating frequency snr mask2" );
464 
465  f_mask2.resize( f_average_data.size() );
466  for( unsigned i_bin = 0; i_bin < f_mask2.size(); ++i_bin )
467  {
468  f_mask2[ i_bin ] = t_spline( i_bin );
469  }
470  }
471  }
472 
473  f_mask_mutex.unlock();
474  }
475  }
476  }
477  catch( error& e )
478  {
479  LERROR( plog, "Exiting due to error while processing frequency data: " << e.what() );
480  break;
481  }
482 
483  }
484  else if( a_ctx.f_in_command == stream::s_stop )
485  {
486  LDEBUG( plog, "FMT is stopping" );
487  if( f_n_summed < f_n_packets_for_mask )
488  {
489  LWARN( plog, "FMT is stopping: it did not process enough packets to update the mask" );
490  }
491  }
492  else if( a_ctx.f_in_command == stream::s_exit )
493  {
494  LDEBUG( plog, "FMT is exiting" );
495  if( f_n_summed < f_n_packets_for_mask )
496  {
497  LWARN( plog, "FMT is exiting: it did not process enough packets to update the mask" );
498  }
499  break;
500 
501  }
502 
503  a_ctx.f_in_command = in_stream< 0 >().get();
504  LTRACE( plog, "FMT (update-mask) reading stream at index " << in_stream< 0 >().get_current_index() );
505 
506  } // end while( ! is_canceled() && ! a_ctx.f_break_exe_loop() )
507 
508  LDEBUG( plog, "FMT has exited the add-to-mask while loop; possible reasons: is_canceled() = " << is_canceled() << "; f_break_exe_func.load() = " << f_break_exe_func.load() );
509  if( f_break_exe_func.load() )
510  {
511  LINFO( plog, "FMT is switching exe while loops" );
512  return;
513  }
514  else
515  {
516  LINFO( plog, "FMT is exiting" );
517  }
518 
519  LDEBUG( plog, "Stopping output stream" );
520  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
521 
522  LDEBUG( plog, "Exiting output stream" );
523  out_stream< 0 >().set( stream::s_exit );
524 
525  return;
526  }
527  catch(...)
528  {
529  if( a_ctx.f_midge ) a_ctx.f_midge->throw_ex( std::current_exception() );
530  else throw;
531  }
532  }
533 
535  {
536  f_exe_func_mutex.unlock();
537 
538  try
539  {
540  freq_data* t_freq_data = nullptr;
541  trigger_flag* t_trigger_flag = nullptr;
542  double t_real = 0., t_imag = 0., t_power_amp = 0.;
543  unsigned t_array_size = 0;
544  unsigned t_loop_lower_limit = 0;
545  unsigned t_loop_upper_limit = 0;
546 
547  f_mask_mutex.lock();
548  std::vector< double > t_mask_buffer( f_mask );
549  f_mask_mutex.unlock();
550 
551  LDEBUG( plog, "Entering apply-threshold loop" );
552  while( ! is_canceled() && ! f_break_exe_func.load() )
553  {
554  // the stream::get function is called at the end of the loop so
555  // that we can enter the exe func after switching the function pointer
556  // and still handle the input command appropriately
557 
558  if( a_ctx.f_in_command == stream::s_none )
559  {
560  LTRACE( plog, "FMT read s_none" );
561  }
562  else if( a_ctx.f_in_command == stream::s_error )
563  {
564  LTRACE( plog, "FMT read s_error" );
565  break;
566  }
567  else if( a_ctx.f_in_command == stream::s_start )
568  {
569  LDEBUG( plog, "Starting the FMT; output at stream index " << out_stream< 0 >().get_current_index() );
570  if( ! out_stream< 0 >().set( stream::s_start ) ) break;
571  a_ctx.f_first_packet_after_start = true;
572  }
573  if( a_ctx.f_in_command == stream::s_run )
574  {
575  t_freq_data = in_stream< 0 >().data();
576  t_trigger_flag = out_stream< 0 >().data();
577 
578  LTRACE( plog, "Considering frequency data: chan = " << t_freq_data->get_digital_id() <<
579  " time = " << t_freq_data->get_unix_time() <<
580  " id = " << t_freq_data->get_pkt_in_session() <<
581  " freqNotTime = " << t_freq_data->get_freq_not_time() <<
582  " bin 0 [0] = " << (unsigned)t_freq_data->get_array()[ 0 ][ 0 ] );
583  try
584  {
585  t_array_size = t_freq_data->get_array_size();
586  t_loop_lower_limit = f_n_excluded_bins;
587  t_loop_upper_limit = t_array_size - f_n_excluded_bins;
588  LDEBUG( plog, "Array size: "<<t_array_size );
589  LDEBUG( plog, "Looping from "<<t_loop_lower_limit<<" to "<<t_loop_upper_limit-1 );
590 
591  if( a_ctx.f_first_packet_after_start )
592  {
593  if( t_mask_buffer.size() != t_array_size )
594  {
595  throw psyllid::error() << "Frequency mask is not the same size as frequency data array";
596  }
597  a_ctx.f_first_packet_after_start = false;
598  }
599 
600  t_trigger_flag->set_flag( false );
601  t_trigger_flag->set_high_threshold( false );
602  t_trigger_flag->set_id( t_freq_data->get_pkt_in_session() );
603 
604  for( unsigned i_bin = t_loop_lower_limit; i_bin < t_loop_upper_limit; ++i_bin )
605  {
606  t_real = t_freq_data->get_array()[ i_bin ][ 0 ];
607  t_imag = t_freq_data->get_array()[ i_bin ][ 1 ];
608  t_power_amp = t_real*t_real + t_imag*t_imag;
609 
610  if( t_power_amp >= t_mask_buffer[ i_bin ] )
611  {
612  t_trigger_flag->set_flag( true );
613  t_trigger_flag->set_high_threshold( true );
614  LDEBUG( plog, "Data id <" << t_trigger_flag->get_id() << "> [bin " << i_bin <<
615  "] resulted in flag <" << t_trigger_flag->get_flag() << ">" << '\n' <<
616  "\tdata: " << t_power_amp << "; mask1: " << t_mask_buffer[ i_bin ] );
617  break;
618  }
619  }
620 #ifndef NDEBUG
621  if( ! t_trigger_flag->get_flag() )
622  {
623  LTRACE( plog, "Data id <" << t_trigger_flag->get_id() << "> resulted in flag <" << t_trigger_flag->get_flag() << ">");
624  }
625 #endif
626 
627  LTRACE( plog, "FMT writing data to output stream at index " << out_stream< 0 >().get_current_index() );
628  if( ! out_stream< 0 >().set( stream::s_run ) )
629  {
630  LERROR( plog, "Exiting due to stream error" );
631  throw midge::node_nonfatal_error() << "Stream error while applying threshold";
632  }
633  }
634  catch( error& e )
635  {
636  LERROR( plog, "Exiting due to error while processing frequency data: " << e.what() );
637  throw;
638  }
639 
640  }
641  else if( a_ctx.f_in_command == stream::s_stop )
642  {
643  LDEBUG( plog, "FMT is stopping at stream index " << out_stream< 0 >().get_current_index() );
644  if( ! out_stream< 0 >().set( stream::s_stop ) ) break;
645  }
646  else if( a_ctx.f_in_command == stream::s_exit )
647  {
648  LDEBUG( plog, "FMT is exiting at stream index " << out_stream< 0 >().get_current_index() );
649  out_stream< 0 >().set( stream::s_exit );
650  break;
651  }
652 
653  a_ctx.f_in_command = in_stream< 0 >().get();
654  LTRACE( plog, "FMT (apply-threshold) reading stream at index " << in_stream< 0 >().get_current_index() );
655 
656  } // while( ! is_canceled() && ! f_break_exe_func.load() )
657 
658  LDEBUG( plog, "FMT has exited the apply-threshold while loop; possible reasons: is_canceled() = " <<
659  is_canceled() << "; f_break_exe_func.load() = " << f_break_exe_func.load() );
660  if( f_break_exe_func.load() )
661  {
662  LINFO( plog, "FMT is switching exe while loops" );
663  return;
664  }
665  else
666  {
667  LINFO( plog, "FMT is exiting" );
668  }
669 
670  LDEBUG( plog, "Stopping output stream" );
671  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
672 
673  LDEBUG( plog, "Exiting output stream" );
674  out_stream< 0 >().set( stream::s_exit );
675 
676  return;
677  }
678  catch(...)
679  {
680  if( a_ctx.f_midge ) a_ctx.f_midge->throw_ex( std::current_exception() );
681  else throw;
682  }
683  }
684 
685 
687  {
688  f_exe_func_mutex.unlock();
689 
690  try
691  {
692  freq_data* t_freq_data = nullptr;
693  trigger_flag* t_trigger_flag = nullptr;
694  double t_real = 0., t_imag = 0., t_power_amp = 0.;
695  unsigned t_array_size = 0;
696  unsigned t_loop_lower_limit = 0;
697  unsigned t_loop_upper_limit = 0;
698 
699 
700  f_mask_mutex.lock();
701  std::vector< double > t_mask_buffer( f_mask );
702  std::vector< double > t_mask2_buffer( f_mask2 );
703 
704  LDEBUG( plog, "mask sizes: " << t_mask_buffer.size() << " " << t_mask2_buffer.size() );
705 
706  f_mask_mutex.unlock();
707 
708  LDEBUG( plog, "Entering apply-two-thresholds loop" );
709 
710  while( ! is_canceled() && ! f_break_exe_func.load() )
711  {
712  // the stream::get function is called at the end of the loop so
713  // that we can enter the exe func after switching the function pointer
714  // and still handle the input command appropriately
715 
716  if( a_ctx.f_in_command == stream::s_none )
717  {
718  LTRACE( plog, "FMT read s_none" );
719  }
720  else if( a_ctx.f_in_command == stream::s_error )
721  {
722  LTRACE( plog, "FMT read s_error" );
723  break;
724  }
725  else if( a_ctx.f_in_command == stream::s_start )
726  {
727  LDEBUG( plog, "Starting the FMT; output at stream index " << out_stream< 0 >().get_current_index() );
728  if( ! out_stream< 0 >().set( stream::s_start ) ) break;
729  a_ctx.f_first_packet_after_start = true;
730  }
731  if( a_ctx.f_in_command == stream::s_run )
732  {
733  t_freq_data = in_stream< 0 >().data();
734  t_trigger_flag = out_stream< 0 >().data();
735 
736  LTRACE( plog, "Considering frequency data: chan = " << t_freq_data->get_digital_id() <<
737  " time = " << t_freq_data->get_unix_time() <<
738  " id = " << t_freq_data->get_pkt_in_session() <<
739  " freqNotTime = " << t_freq_data->get_freq_not_time() <<
740  " bin 0 [0] = " << (unsigned)t_freq_data->get_array()[ 0 ][ 0 ] );
741  try
742  {
743  t_array_size = t_freq_data->get_array_size();
744  t_loop_lower_limit = f_n_excluded_bins;
745  t_loop_upper_limit = t_array_size - f_n_excluded_bins;
746  LDEBUG( plog, "Array size: "<<t_array_size );
747  LDEBUG( plog, "Looping from "<<t_loop_lower_limit<<" to "<<t_loop_upper_limit-1 );
748 
749  if( a_ctx.f_first_packet_after_start )
750  {
751  if ( t_mask_buffer.size() != t_freq_data->get_array_size() )
752  {
753  throw psyllid::error() << "Frequency mask is not the same size as frequency data array";
754  }
755  if ( t_mask2_buffer.size() != t_freq_data->get_array_size() )
756  {
757  throw psyllid::error() << "Frequency mask2 is not the same size as frequency data array";
758  }
759  a_ctx.f_first_packet_after_start = false;
760  }
761 
762  t_trigger_flag->set_flag( false );
763  t_trigger_flag->set_high_threshold( false );
764  t_trigger_flag->set_id( t_freq_data->get_pkt_in_session() );
765 
766  for( unsigned i_bin = t_loop_lower_limit; i_bin < t_loop_upper_limit; ++i_bin )
767  {
768  t_real = t_freq_data->get_array()[ i_bin ][ 0 ];
769  t_imag = t_freq_data->get_array()[ i_bin ][ 1 ];
770  t_power_amp = t_real*t_real + t_imag*t_imag;
771 
772  if( t_power_amp >= t_mask2_buffer[ i_bin ] )
773  {
774  t_trigger_flag->set_flag( true );
775  t_trigger_flag->set_high_threshold( true );
776  LDEBUG( plog, "Data " << t_trigger_flag->get_id() << " [bin " << i_bin <<
777  "] resulted in flag <" << t_trigger_flag->get_flag() << ">" << '\n' <<
778  "\tdata: " << t_power_amp << "; mask2: " << t_mask2_buffer[ i_bin ] );
779  break;
780  }
781  else if( t_power_amp >= t_mask_buffer[ i_bin ] )
782  {
783  t_trigger_flag->set_flag( true );
784  t_trigger_flag->set_high_threshold( false );
785  LTRACE( plog, "Data id <" << t_trigger_flag->get_id() << "> [bin " << i_bin <<
786  "] resulted in flag <" << t_trigger_flag->get_flag() << ">" << '\n' <<
787  "\tdata: " << t_power_amp << "; mask1: " << t_mask_buffer[ i_bin ] );
788  }
789  }
790 
791 #ifndef NDEBUG
792  if( ! t_trigger_flag->get_flag() )
793  {
794  LTRACE( plog, "Data id <" << t_trigger_flag->get_id() << "> resulted in flag <" <<
795  t_trigger_flag->get_flag() << ">");
796  }
797 #endif
798 
799  LTRACE( plog, "FMT writing data to output stream at index " << out_stream< 0 >().get_current_index() );
800  if( ! out_stream< 0 >().set( stream::s_run ) )
801  {
802  LERROR( plog, "Exiting due to stream error" );
803  throw midge::node_nonfatal_error() << "Stream error while applying threshold";
804  }
805  }
806  catch( error& e )
807  {
808  LERROR( plog, "Exiting due to error while processing frequency data: " << e.what() );
809  throw;
810  }
811 
812  }
813  else if( a_ctx.f_in_command == stream::s_stop )
814  {
815  LDEBUG( plog, "FMT is stopping at stream index " << out_stream< 0 >().get_current_index() );
816  if( ! out_stream< 0 >().set( stream::s_stop ) ) break;
817  }
818  else if( a_ctx.f_in_command == stream::s_exit )
819  {
820  LDEBUG( plog, "FMT is exiting at stream index " << out_stream< 0 >().get_current_index() );
821  out_stream< 0 >().set( stream::s_exit );
822  break;
823  }
824 
825  a_ctx.f_in_command = in_stream< 0 >().get();
826  LTRACE( plog, "FMT (apply-threshold) reading stream at index " << in_stream< 0 >().get_current_index() );
827 
828  } // while( ! is_canceled() && ! f_break_exe_func.load() )
829 
830  LDEBUG( plog, "FMT has exited the apply-two-threshold while loop; possible reasons: is_canceled() = " <<
831  is_canceled() << "; f_break_exe_func.load() = " << f_break_exe_func.load() );
832  if( f_break_exe_func.load() )
833  {
834  LINFO( plog, "FMT is switching exe while loops" );
835  return;
836  }
837  else
838  {
839  LINFO( plog, "FMT is exiting" );
840  }
841 
842  LDEBUG( plog, "Stopping output stream" );
843  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
844 
845  LDEBUG( plog, "Exiting output stream" );
846  out_stream< 0 >().set( stream::s_exit );
847 
848  return;
849  }
850  catch(...)
851  {
852  if( a_ctx.f_midge ) a_ctx.f_midge->throw_ex( std::current_exception() );
853  else throw;
854  }
855  }
856 
858  {
859  out_buffer< 0 >().finalize();
860  return;
861  }
862 
863 
866  {
867  }
868 
870  {
871  }
872 
873  void frequency_mask_trigger_binding::do_apply_config( frequency_mask_trigger* a_node, const scarab::param_node& a_config ) const
874  {
875  LDEBUG( plog, "Configuring frequency_mask_trigger with:\n" << a_config );
876  a_node->set_n_packets_for_mask( a_config.get_value( "n-packets-for-mask", a_node->get_n_packets_for_mask() ) );
877  a_node->set_n_spline_points( a_config.get_value( "n-spline-points", a_node->get_n_spline_points() ) );
878 
879  if( a_config.has( "threshold-ampl-snr" ) )
880  {
881  a_node->set_threshold_ampl_snr( a_config["threshold-ampl-snr"]().as_double() );
882  }
883  if( a_config.has( "threshold-power-snr" ) )
884  {
885  a_node->set_threshold_snr( a_config["threshold-power-snr"]().as_double() );
886  }
887  if( a_config.has( "threshold-power-snr-high" ) )
888  {
889  a_node->set_threshold_snr_high( a_config["threshold-power-snr-high"]().as_double() );
890  }
891  if( a_config.has( "threshold-power-sigma" ) )
892  {
893  a_node->set_threshold_sigma( a_config["threshold-power-sigma"]().as_double() );
894  }
895  if( a_config.has( "threshold-power-sigma-high" ) )
896  {
897  a_node->set_threshold_sigma_high( a_config["threshold-power-sigma-high"]().as_double() );
898  }
899  if( a_config.has( "threshold-db" ) )
900  {
901  a_node->set_threshold_dB( a_config["threshold-db"]().as_double() );
902  }
903  if( a_config.has( "trigger-mode" ) )
904  {
905  a_node->set_trigger_mode( a_config["trigger-mode"]().as_string() );
906  }
907  if( a_config.has( "threshold-type" ) )
908  {
909  a_node->set_threshold_type( a_config["threshold-type"]().as_string() );
910  }
911  if( a_config.has( "n-excluded-bins" ))
912  {
913  a_node->set_n_excluded_bins( a_config["n-excluded-bins"]().as_uint() );
914  }
915  if( a_config.has( "mask-configuration" ) )
916  {
917  const scarab::param& t_mask_config = a_config["mask-configuration"];
918  if ( t_mask_config.is_value() )
919  {
920  scarab::param_translator t_param_translator = scarab::param_translator();
921  scarab::param_ptr_t t_file_param = t_param_translator.read_file( t_mask_config.as_value().as_string() );
922  if ( t_file_param->is_node() )
923  {
924  a_node->set_mask_parameters_from_node( t_file_param->as_node() );
925  }
926  else
927  {
928  throw psyllid::error() << "mask file must be a node";
929  }
930  }
931  else if ( t_mask_config.is_node() )
932  {
933  a_node->set_mask_parameters_from_node( t_mask_config.as_node() );
934  }
935  else
936  {
937  throw psyllid::error() << "invalid config: mask-configuration must be a file path or a node with mask and mask-data";
938  }
939  }
940 
941  a_node->set_length( a_config.get_value( "length", a_node->get_length() ) );
942  return;
943  }
944 
945  void frequency_mask_trigger_binding::do_dump_config( const frequency_mask_trigger* a_node, scarab::param_node& a_config ) const
946  {
947  LDEBUG( plog, "Dumping configuration for frequency_mask_trigger" );
948  a_config.add( "n-packets-for-mask", a_node->get_n_packets_for_mask() );
949  a_config.add( "n-spline-points", a_node->get_n_spline_points() );
950  a_config.add( "length", a_node->get_length() );
951  a_config.add( "trigger-mode", a_node->get_trigger_mode_str() );
952  a_config.add( "threshold-type", a_node->get_threshold_type_str() );
953  a_config.add( "n-excluded-bins", a_node->get_n_excluded_bins() );
954 
955  // get threshold values corresponding only to the configured threshold type
956  switch ( a_node->get_threshold_type() )
957  {
959  a_config.add( "threshold-power-snr", a_node->get_threshold_snr() );
960  a_config.add( "threshold-power-snr-high", a_node->get_threshold_snr_high() );
961  break;
963  a_config.add( "threshold-power-sigma", a_node->get_threshold_sigma() );
964  a_config.add( "threshold-power-sigma-high", a_node->get_threshold_sigma_high() );
965  break;
966  }
967  //TODO the vectors of average and variance data are large, should add logic to export them to a file
968  return;
969  }
970 
971  bool frequency_mask_trigger_binding::do_run_command( frequency_mask_trigger* a_node, const std::string& a_cmd, const scarab::param_node& a_args ) const
972  {
973  if( a_cmd == "update-mask" )
974  {
975  a_node->switch_to_update_mask();
976  return true;
977  }
978  else if( a_cmd == "apply-trigger" )
979  {
980  a_node->switch_to_apply_trigger();
981  return true;
982  }
983  else if( a_cmd == "write-mask" )
984  {
985  try
986  {
987  a_node->write_mask( a_args.get_value( "filename", "fmt_mask.yaml" ) );
988  }
989  catch( error& e )
990  {
991  throw e;
992  }
993  return true;
994  }
995  else
996  {
997  LWARN( plog, "Unrecognized command: <" << a_cmd << ">" );
998  return false;
999  }
1000  }
1001 
1002 
1003 } /* namespace psyllid */
void calculate_sigma_mask_spline_points(std::vector< double > &t_x_vals, std::vector< double > &t_y_vals, double threshold)
static uint32_t threshold_to_uint(threshold_t a_threshold)
const char * what() const
void exe_add_to_mask(exe_func_context &a_ctx)
static scarab::logger plog("batch_executor")
static std::string trigger_mode_to_string(trigger_mode_t a_trigger_mode)
static uint32_t trigger_mode_to_uint(trigger_mode_t a_trigger_mode)
void set_threshold_ampl_snr(double a_ampl_snr)
virtual void do_apply_config(frequency_mask_trigger *a_node, const scarab::param_node &a_config) const
void set_mask_parameters_from_node(const scarab::param_node &a_mask_and_data_values)
void write_mask(const std::string &a_filename)
void(frequency_mask_trigger::* f_exe_func)(exe_func_context &a_ctx)
virtual bool do_run_command(frequency_mask_trigger *a_node, const std::string &a_cmd, const scarab::param_node &a_args) const
in derived classes, should throw a std::exception if the command fails, and return false if the comma...
static std::string threshold_to_string(threshold_t a_threshold)
void set_n_packets_for_mask(unsigned a_n_pkts)
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
static trigger_mode_t string_to_trigger_mode(const std::string &a_trigger_mode)
void exe_apply_threshold(exe_func_context &a_ctx)
LOGGER(plog, "egg_writer")
void calculate_snr_mask_spline_points(std::vector< double > &t_x_vals, std::vector< double > &t_y_vals, double threshold)
uint32_t get_digital_id() const
void set_threshold_type(const std::string &a_threshold_type)
static threshold_t string_to_threshold(const std::string &a_threshold_string)
void set_trigger_mode(const std::string &a_trigger_mode)
virtual void do_dump_config(const frequency_mask_trigger *a_node, scarab::param_node &a_config) const
const iq_t * get_array() const
Definition: freq_data.hh:39
void exe_apply_two_thresholds(exe_func_context &a_ctx)
size_t get_array_size() const
Definition: freq_data.hh:49
void execute(midge::diptera *a_midge=nullptr)
uint32_t get_unix_time() const