Psyllid  v1.12.4
Project 8 Data Acquisisition Software
frequency_transform.cc
Go to the documentation of this file.
1 
2 /*
3  * frequency_transform.cc
4  *
5  * Created on: March 28, 2018
6  * Author: laroque
7  */
8 
9 #include "frequency_transform.hh"
10 
11 
12 #include "logger.hh"
13 #include "param.hh"
14 
15 #include <cmath>
16 #include <thread>
17 #include <memory>
18 #include <sys/types.h>
19 
20 
21 using midge::stream;
22 
23 namespace psyllid
24 {
25  REGISTER_NODE_AND_BUILDER( frequency_transform, "frequency-transform", frequency_transform_binding );
26 
27  LOGGER( plog, "frequency_transform" );
28 
30  f_time_length( 10 ),
31  f_freq_length( 10 ),
32  f_fft_size( 4096 ),
33  f_transform_flag( "ESTIMATE" ),
34  f_use_wisdom( true ),
35  f_wisdom_filename( "wisdom_complexfft.fftw3" ),
36  f_enable_time_output( true ),
37  f_transform_flag_map(),
38  f_fftw_input(),
39  f_fftw_output(),
40  f_fftw_plan(),
41  f_multithreaded_is_initialized( false )
42  {
44  }
45 
47  {
48  }
49 
51  {
52  LDEBUG( plog, "switching to frequency output only mode" );
53  f_enable_time_output = false;
54  }
55 
57  {
58  LDEBUG( plog, "switching to frequency and time output mode" );
59  f_enable_time_output = true;
60  }
61 
63  {
64  out_buffer< 0 >().initialize( f_time_length );
65  out_buffer< 1 >().initialize( f_freq_length );
66 
67  // fftw stuff
68  TransformFlagMap::const_iterator iter = f_transform_flag_map.find(f_transform_flag);
69  unsigned transform_flag = iter->second;
70  // initialize FFTW IO arrays
71  f_fftw_input = (fftw_complex*) fftw_malloc(sizeof(fftw_complex) * f_fft_size);
72  f_fftw_output = (fftw_complex*) fftw_malloc(sizeof(fftw_complex) * f_fft_size);
73  if (f_use_wisdom)
74  {
75  LDEBUG( plog, "Reading wisdom from file <" << f_wisdom_filename << ">");
76  if (fftw_import_wisdom_from_filename(f_wisdom_filename.c_str()) == 0)
77  {
78  LWARN( plog, "Unable to read FFTW wisdom from file <" << f_wisdom_filename << ">" );
79  }
80  }
81  //initialize multithreaded
82  #ifdef FFTW_NTHREADS
84  {
85  fftw_init_threads();
86  fftw_plan_with_nthreads(FFTW_NTHREADS);
87  LDEBUG( plog, "Configuring FFTW to use up to " << FFTW_NTHREADS << " threads.");
89  }
90  #endif
91  //create fftw plan
92  f_fftw_plan = fftw_plan_dft_1d(f_fft_size, f_fftw_input, f_fftw_output, FFTW_FORWARD, transform_flag | FFTW_PRESERVE_INPUT);
93  //save plan
94  if (f_fftw_plan != NULL)
95  {
96  if (f_use_wisdom)
97  {
98  if (fftw_export_wisdom_to_filename(f_wisdom_filename.c_str()) == 0)
99  {
100  LWARN( plog, "Unable to write FFTW wisdom to file<" << f_wisdom_filename << ">");
101  }
102  }
103  LDEBUG( plog, "FFTW plan created; initialization complete" );
104  }
105 
106  return;
107  }
108 
109  void frequency_transform::execute( midge::diptera* a_midge )
110  {
111  try
112  {
113  LDEBUG( plog, "Executing the frequency transformer" );
114 
115  time_data* time_data_in = nullptr;
116  time_data* time_data_out = nullptr;
117  freq_data* freq_data_out = nullptr;
118  double fft_norm = sqrt(1. / (double)f_fft_size);
119 
120  try
121  {
122  LINFO( plog, "Starting main loop (frequency transform)" );
123  while (! is_canceled() )
124  {
125  // stop if output stream buffers have s_stop
126  if (f_enable_time_output && out_stream< 0 >().get() == stream::s_stop)
127  {
128  LWARN( plog, "time output stream has stop condition" );
129  break;
130  }
131  if (out_stream< 1 >().get() == stream::s_stop)
132  {
133  LWARN( plog, "frequency output stream has stop condition" );
134  break;
135  }
136  // grab the next input data and check slot status
137  midge::enum_t in_cmd = stream::s_none;
138  in_cmd = in_stream< 0 >().get();
139  if ( in_cmd == stream::s_none)
140  {
141  continue;
142  }
143  if ( in_cmd == stream::s_error )
144  {
145  LDEBUG( plog, "got an s_error on slot <" << in_stream< 0 >().get_current_index() << ">" );
146  break;
147  }
148  if ( in_cmd == stream::s_exit )
149  {
150  LDEBUG( plog, "got an s_exit on slot <" << in_stream< 0 >().get_current_index() << ">" );
151  break;
152  }
153  if ( in_cmd == stream::s_stop )
154  {
155  LDEBUG( plog, "got an s_stop on slot <" << in_stream< 0 >().get_current_index() << ">" );
156  if ( f_enable_time_output && ! out_stream< 0 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 0 error while stopping";
157  if ( ! out_stream< 1 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 1 error while stopping";
158  continue;
159  }
160  if ( in_cmd == stream::s_start )
161  {
162  LDEBUG( plog, "got an s_start on slot <" << in_stream< 0 >().get_current_index() << ">" );
163  if ( f_enable_time_output && ! out_stream< 0 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 0 error while starting";
164  if ( ! out_stream< 1 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 1 error while starting";
165  continue;
166  }
167  if ( in_cmd == stream::s_run )
168  {
169  LDEBUG( plog, "got an s_run on slot <" << in_stream< 0 >().get_current_index() << ">" );
170  time_data_in = in_stream< 0 >().data();
171 
172  //time output
174  {
175  time_data_out = out_stream< 0 >().data();
176  *time_data_out = *time_data_in;
177  }
178  //frequency output
179  freq_data_out = out_stream< 1 >().data();
180  freq_data_out->set_freq_not_time( true );
181  //TODO there are many other members of the underlying roach_packet_data type; should more carefully think through all of them and if they should be on the frequency_data (is there a way to copy all the members *except* the data array?)
182  LDEBUG( plog, "next steams acquired, doing FFT" );
183  std::copy(&time_data_in->get_array()[0][0], &time_data_in->get_array()[0][0] + f_fft_size*2, &f_fftw_input[0][0]);
184  fftw_execute_dft(f_fftw_plan, f_fftw_input, f_fftw_output);
185  //is this the normalization we want? (is it what the ROACH does?)
186  for (size_t i_bin=0; i_bin<f_fft_size; ++i_bin)
187  {
188  f_fftw_output[i_bin][0] = round(f_fftw_output[i_bin][0]*fft_norm);
189  f_fftw_output[i_bin][1] = round(f_fftw_output[i_bin][1]*fft_norm);
190  }
191  //std::copy(&f_fftw_output[0][0], &f_fftw_output[0][0] + f_fft_size*2, &freq_data_out->get_array()[0][0]);
192  // FFT unfolding based on katydid:Source/Data/Transform/KTFrequencyTransformFFTW
193  unsigned t_center_bin = time_data_in->get_array_size();
194  std::copy(&f_fftw_output[0][0], &f_fftw_output[0][0] + (t_center_bin - 1), &freq_data_out->get_array()[0][0] + t_center_bin);
195  std::copy(&f_fftw_output[0][0] + t_center_bin, &f_fftw_output[0][0] + f_fft_size*2, &freq_data_out->get_array()[0][0]);
196  freq_data_out->set_pkt_in_batch(time_data_in->get_pkt_in_batch());
197  freq_data_out->set_pkt_in_session(time_data_in->get_pkt_in_session());
198 
199  if ( f_enable_time_output && !out_stream< 0 >().set( stream::s_run ) )
200  {
201  LERROR( plog, "frequency_transform error setting time output stream to s_run" );
202  break;
203  }
204  if ( !out_stream< 1 >().set( stream::s_run ) )
205  {
206  LERROR( plog, "frequency_transform error setting frequency output stream to s_run" );
207  break;
208  }
209  }
210  }
211  }
212  catch( error& e )
213  {
214  throw;
215  }
216 
217  LINFO( plog, "FREQUENCY TRANSFORM is exiting" );
218 
219  // normal exit condition
220  LDEBUG( plog, "Stopping output streams" );
221  bool t_t_stop_ok = f_enable_time_output && out_stream< 0 >().set( stream::s_stop );
222  bool t_f_stop_ok = out_stream< 1 >().set( stream::s_stop );
223  if( ! t_t_stop_ok && ! t_f_stop_ok) return;
224 
225  LDEBUG( plog, "Exiting output streams" );
226  out_stream< 0 >().set( stream::s_exit );
227  out_stream< 1 >().set( stream::s_exit );
228 
229  return;
230  }
231  catch(...)
232  {
233  if( a_midge ) a_midge->throw_ex( std::current_exception() );
234  else throw;
235  }
236 
237  return;
238  }
239 
241  {
242  out_buffer< 0 >().finalize();
243  out_buffer< 1 >().finalize();
244  if (f_fftw_input != NULL )
245  {
246  fftw_free(f_fftw_input);
247  f_fftw_input = NULL;
248  }
249  if (f_fftw_output != NULL)
250  {
251  fftw_free(f_fftw_output);
252  f_fftw_output = NULL;
253  }
254  return;
255  }
256 
258  {
259  f_transform_flag_map.clear();
260  f_transform_flag_map["ESTIMATE"] = FFTW_ESTIMATE;
261  f_transform_flag_map["MEASURE"] = FFTW_MEASURE;
262  f_transform_flag_map["PATIENT"] = FFTW_PATIENT;
263  f_transform_flag_map["EXHAUSTIVE"] = FFTW_EXHAUSTIVE;
264  }
265 
266 
267  // frequency_transform_binding methods
270  {
271  }
272 
274  {
275  }
276 
277  void frequency_transform_binding::do_apply_config( frequency_transform* a_node, const scarab::param_node& a_config ) const
278  {
279  LDEBUG( plog, "Configuring frequency_transform with:\n" << a_config );
280  a_node->set_time_length( a_config.get_value( "time-length", a_node->get_time_length() ) );
281  a_node->set_freq_length( a_config.get_value( "freq-length", a_node->get_freq_length() ) );
282  a_node->set_fft_size( a_config.get_value( "fft-size", a_node->get_fft_size() ) );
283  a_node->set_transform_flag( a_config.get_value( "transform-flag", a_node->get_transform_flag() ) );
284  a_node->set_use_wisdom( a_config.get_value( "use-wisdom", a_node->get_use_wisdom() ) );
285  a_node->set_wisdom_filename( a_config.get_value( "wisdom-filename", a_node->get_wisdom_filename() ) );
286  return;
287  }
288 
289  void frequency_transform_binding::do_dump_config( const frequency_transform* a_node, scarab::param_node& a_config ) const
290  {
291  LDEBUG( plog, "Dumping frequency_transform configuration" );
292  a_config.add( "time-length", scarab::param_value( a_node->get_time_length() ) );
293  a_config.add( "freq-length", scarab::param_value( a_node->get_freq_length() ) );
294  a_config.add( "fft-size", scarab::param_value( a_node->get_fft_size() ) );
295  a_config.add( "transform-flag", scarab::param_value( a_node->get_transform_flag() ) );
296  a_config.add( "use-wisdom", scarab::param_value( a_node->get_use_wisdom() ) );
297  a_config.add( "wisdom-filename", scarab::param_value( a_node->get_wisdom_filename() ) );
298  return;
299  }
300 
301  bool frequency_transform_binding::do_run_command( frequency_transform* a_node, const std::string& a_cmd, const scarab::param_node& ) const
302  {
303  if ( a_cmd == "freq-only" )
304  {
305  LDEBUG( plog, "should enable freq-only mode" );
306  a_node->switch_to_freq_only();
307  return true;
308  }
309  else if ( a_cmd == "time-and-freq" )
310  {
311  LDEBUG( plog, "should enable time-and-freq mode" );
312  a_node->switch_to_time_and_freq();
313  return true;
314  }
315  else
316  {
317  LWARN( plog, "unrecognized command: <" << a_cmd << ">" );
318  return false;
319  }
320  }
321 
322 } /* namespace psyllid */
virtual void execute(midge::diptera *a_midge=nullptr)
void set_freq_not_time(bool a_flag)
static scarab::logger plog("batch_executor")
const iq_t * get_array() const
Definition: time_data.hh:39
A transformer to receive time data, compute an FFT, and distribute as time and frequency ROACH packet...
virtual void do_dump_config(const frequency_transform *a_node, scarab::param_node &a_config) const
void set_pkt_in_batch(uint32_t a_pkt)
size_t get_array_size() const
Definition: time_data.hh:49
virtual void do_apply_config(frequency_transform *a_node, const scarab::param_node &a_config) const
uint32_t get_pkt_in_batch() const
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
virtual bool do_run_command(frequency_transform *a_node, const std::string &a_cmd, const scarab::param_node &) const
in derived classes, should throw a std::exception if the command fails, and return false if the comma...
LOGGER(plog, "egg_writer")
const iq_t * get_array() const
Definition: freq_data.hh:39