Psyllid  v1.12.4
Project 8 Data Acquisisition Software
triggered_writer.cc
Go to the documentation of this file.
1 /*
2  * triggered_writer.cc
3  *
4  * Created on: Dec 30, 2015
5  * Author: nsoblath
6  */
7 
8 #include "triggered_writer.hh"
9 
10 #include "butterfly_house.hh"
11 #include "psyllid_error.hh"
12 
13 #include "digital.hh"
14 #include "logger.hh"
15 #include "time.hh"
16 
17 #include <cmath>
18 
19 using midge::stream;
20 
21 using std::string;
22 using std::vector;
23 
24 namespace psyllid
25 {
26  REGISTER_NODE_AND_BUILDER( triggered_writer, "triggered-writer", triggered_writer_binding );
27 
28  LOGGER( plog, "triggered_writer" );
29 
31  egg_writer(),
32  f_file_num( 0 ),
33  f_bit_depth( 8 ),
34  f_data_type_size( 1 ),
35  f_sample_size( 2 ),
36  f_record_size( 4096 ),
37  f_acq_rate( 100 ),
38  f_v_offset( 0. ),
39  f_v_range( 0.5 ),
40  f_center_freq( 50.e6 ),
41  f_freq_range( 100.e6 ),
42  f_monarch_ptr(),
43  f_stream_no( 0 )
44  {
45  }
46 
48  {
49  }
50 
52  {
53  f_monarch_ptr = a_mw_ptr;
54 
55  scarab::dig_calib_params t_dig_params;
56  scarab::get_calib_params( f_bit_depth, f_data_type_size, f_v_offset, f_v_range, true, &t_dig_params );
57 
58  vector< unsigned > t_chan_vec;
59  f_stream_no = a_hw_ptr->header().AddStream( "Psyllid - ROACH2",
60  f_acq_rate, f_record_size, f_sample_size, f_data_type_size,
61  monarch3::sDigitizedS, f_bit_depth, monarch3::sBitsAlignedLeft, &t_chan_vec );
62 
63  //unsigned i_chan_psyllid = 0; // this is the channel number in psyllid, as opposed to the channel number in the monarch file
64  for( std::vector< unsigned >::const_iterator it = t_chan_vec.begin(); it != t_chan_vec.end(); ++it )
65  {
66  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageOffset( t_dig_params.v_offset );
67  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageRange( t_dig_params.v_range );
68  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetDACGain( t_dig_params.dac_gain );
69  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyMin( f_center_freq - 0.5 * f_freq_range );
70  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyRange( f_freq_range );
71 
72  //++i_chan_psyllid;
73  }
74 
75  return;
76  }
77 
79  {
80  butterfly_house::get_instance()->register_writer( this, f_file_num );
81  return;
82  }
83 
84  void triggered_writer::execute( midge::diptera* a_midge )
85  {
86  try
87  {
88  exe_loop_context t_ctx;
89  t_ctx.f_is_running = false;
90  t_ctx.f_should_exit = false;
91  t_ctx.f_record_ptr = nullptr;
92  t_ctx.f_stream_no = 0;
93  t_ctx.f_start_file_with_next_data = false;
94  t_ctx.f_first_pkt_in_run = 0;
95  t_ctx.f_is_new_event = true;
96 
97  // outer while loop to switch between the two exe loops until canceled
98  while( ! is_canceled() && ! t_ctx.f_should_exit )
99  {
100  if( t_ctx.f_is_running )
101  {
102  exe_loop_is_running( t_ctx );
103  }
104  else
105  {
106  exe_loop_not_running( t_ctx );
107  }
108  } // end while ! is_canceled()
109 
110  return;
111  }
112  catch(...)
113  {
114  if( a_midge ) a_midge->throw_ex( std::current_exception() );
115  else throw;
116  }
117  }
118 
120  {
121  midge::enum_t t_trig_command = stream::s_none;
122  midge::enum_t t_time_command = stream::s_none;
123 
124  //time_data* t_time_data = nullptr;
125 
126  while( ! is_canceled() )
127  {
128  t_time_command = in_stream< 0 >().get();
129  if( t_time_command == stream::s_none ) continue;
130  if( t_time_command == stream::s_error )
131  {
132  a_ctx.f_should_exit = true;
133  break;
134  }
135 
136  LTRACE( plog, "Triggered writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
137 
138  if( t_time_command == stream::s_exit )
139  {
140  LDEBUG( plog, "Triggered writer is exiting due to time-stream command; no run in progress" );
141  a_ctx.f_should_exit = true;
142  break;
143  }
144 
145  if( t_time_command == stream::s_stop )
146  {
147  LDEBUG( plog, "Triggered writer received stop command on the time stream while already stopped; no action taken" );
148  continue;
149  }
150 
151  if( t_time_command == stream::s_run )
152  {
153  LWARN( plog, "Triggered writer received run command on the time stream while stopped; no action taken" );
154  continue;
155  }
156 
157  if( t_time_command == stream::s_start )
158  {
159  LDEBUG( plog, "Triggered writer received start command on the time stream; looking for start command on the trigger stream" );
160 
161  // do this in a while loop so we don't re-do the time stream get()
162  while( ! is_canceled() )
163  {
164  t_trig_command = stream::s_none;
165  for( unsigned i_attempt = 0; i_attempt < 10 && t_trig_command != stream::s_start; ++i_attempt )
166  {
167  t_trig_command = in_stream< 1 >().get();
168  LTRACE( plog, "(attempt " << i_attempt << ") Triggered writer reading stream 1 (trig) at index " << in_stream< 1 >().get_current_index() );
169  }
170 
171  if( t_trig_command == stream::s_start )
172  {
173  LDEBUG( plog, "Time and trig commands match: start" );
174  LINFO( plog, "Starting a run" );
175 
176  LDEBUG( plog, "Will start file with next data" );
177 
178  if( a_ctx.f_swrap_ptr ) a_ctx.f_swrap_ptr.reset();
179 
180  LDEBUG( plog, "Getting stream <" << a_ctx.f_stream_no << ">" );
181  a_ctx.f_swrap_ptr = f_monarch_ptr->get_stream( a_ctx.f_stream_no );
182  a_ctx.f_record_ptr = a_ctx.f_swrap_ptr->get_stream_record();
183 
184  a_ctx.f_start_file_with_next_data = true;
185  break; // break out of while loop looking for start trig command
186  }
187  else
188  {
189  throw midge::node_nonfatal_error() << "Egg writer received unexpected trig-stream command while waiting for start: " << t_trig_command;
190  } // end if-else block for start trig command
191 
192  } // end while loop looking for start trig command
193 
194  // should end up here after finding the start trig command and setting up the monarch stream
195  LDEBUG( plog, "Breaking out of not-running exe loop" );
196  a_ctx.f_is_running = true;
197  break; // break out of not-running exe loop
198  } // end if time command is start
199 
200  } // end while ! is_canceled()
201 
202  return;
203  }
204 
206  {
207  midge::enum_t t_trig_command = stream::s_none;
208  midge::enum_t t_time_command = stream::s_none;
209 
210  trigger_flag* t_trig_data = nullptr;
211  time_data* t_time_data = nullptr;
212 
213  uint64_t t_bytes_per_record = f_record_size * f_sample_size * f_data_type_size;
214  uint64_t t_record_length_nsec = llrint( (double)(PAYLOAD_SIZE / 2) / (double)f_acq_rate * 1.e3 );
215 
216  while( ! is_canceled() )
217  {
218  t_trig_command = in_stream< 1 >().get();
219  LTRACE( plog, "Egg writer reading stream 1 (trig) at index " << in_stream< 1 >().get_current_index() );
220 
221  if( t_trig_command == stream::s_none )
222  {
223  LDEBUG( plog, "Egg writer received none command on the trig stream while run is in progress");
224  t_time_command = in_stream< 0 >().get();
225  LTRACE( plog, "Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
226  LDEBUG( plog, "Advancing time stream; time command matched trig command? trig command = " << t_trig_command << "; time command = " << t_time_command );
227  if( t_time_command != stream::s_none )
228  {
229  if( a_ctx.f_swrap_ptr )
230  {
231  LDEBUG( plog, "Finishing stream <" << a_ctx.f_stream_no << ">" );
232  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
233  a_ctx.f_swrap_ptr.reset();
234  }
235  throw error() << "Trig command doesn't match time command: time command = " << t_time_command << "; trig command = " << t_trig_command;
236  }
237  continue;
238  }
239  if( t_trig_command == stream::s_error )
240  {
241  a_ctx.f_should_exit = true;
242  break;
243  }
244 
245  if( t_trig_command == stream::s_exit )
246  {
247  LDEBUG( plog, "Egg writer is exiting due to trig-stream command; run is in progress" );
248  if( a_ctx.f_swrap_ptr )
249  {
250  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
251  a_ctx.f_swrap_ptr.reset();
252  }
253  a_ctx.f_should_exit = true;
254  break;
255  }
256 
257  if( t_trig_command == stream::s_stop )
258  {
259  LDEBUG( plog, "Egg writer received stop command on the trig stream while run is in progress" );
260  if( a_ctx.f_swrap_ptr )
261  {
262  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
263  a_ctx.f_swrap_ptr.reset();
264  }
265  t_time_command = in_stream< 0 >().get();
266  LTRACE( plog, "Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
267  LDEBUG( plog, "Advancing time stream; time command matched trig command? trig command = " << t_trig_command << "; time command = " << t_time_command );
268  LDEBUG( plog, "Breaking out of is-running exe loop" );
269  a_ctx.f_is_running = false;
270  break; // out of is-running exe loop
271  }
272 
273  if( t_trig_command == stream::s_start )
274  {
275  LERROR( plog, "Egg writer received unexpected start command on the trig stream while running")
276  if( a_ctx.f_swrap_ptr )
277  {
278  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
279  a_ctx.f_swrap_ptr.reset();
280  }
281  t_time_command = in_stream< 0 >().get();
282  LTRACE( plog, "Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
283  LDEBUG( plog, "Advancing time stream; time command matched trig command? trig command = " << t_trig_command << "; time command = " << t_time_command );
284  throw midge::node_nonfatal_error() << "Egg writer received unexpected start command on the trig stream while running";
285  }
286 
287  if( t_trig_command == stream::s_run )
288  {
289  t_time_command = in_stream< 0 >().get();
290  LTRACE( plog, "Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
291  LTRACE( plog, "Advancing time stream; time command matched trig command? trig command = " << t_trig_command << "; time command = " << t_time_command );
292  if( t_time_command != stream::s_run )
293  {
294  if( a_ctx.f_swrap_ptr )
295  {
296  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
297  a_ctx.f_swrap_ptr.reset();
298  }
299  if (t_time_command == stream::s_stop)
300  {
301  t_trig_command = in_stream< 1 >().get();
302  LDEBUG( plog, "Egg writer received stop command on the time stream while run in progress.");
303  LDEBUG( plog, "Advancing trig stream; time command matched trig command? trig command = " << t_trig_command << "; time command = " << t_time_command );
304  LDEBUG( plog, "Breaking out of is-running exe loop" );
305  a_ctx.f_is_running = false;
306  break; // out of is-running exe loop
307  }
308  else
309  {
310  throw midge::node_nonfatal_error() << "Trig command doesn't match time command: time command = " << t_time_command << "; trig command = " << t_trig_command;
311  }
312  }
313 
314  // everything agrees that we're running
315 
316  t_time_data = in_stream< 0 >().data();
317  t_trig_data = in_stream< 1 >().data();
318 
319  if( a_ctx.f_start_file_with_next_data )
320  {
321  LDEBUG( plog, "Handling first packet in run" );
322 
323  a_ctx.f_first_pkt_in_run = t_time_data->get_pkt_in_session();
324 
325  a_ctx.f_is_new_event = true;
326 
327  a_ctx.f_start_file_with_next_data = false;
328  }
329 
330 
331  if( ! a_ctx.f_swrap_ptr )
332  {
333  LDEBUG( plog, "Getting stream <" << a_ctx.f_stream_no << ">" );
334  a_ctx.f_swrap_ptr = f_monarch_ptr->get_stream( a_ctx.f_stream_no );
335  a_ctx.f_record_ptr = a_ctx.f_swrap_ptr->get_stream_record();
336  }
337 
338  uint64_t t_time_id = t_time_data->get_pkt_in_session();
339  uint64_t t_trig_id = t_trig_data->get_id();
340  LTRACE( plog, "Time id: <" << t_time_id << ">; Trig id: <" << t_trig_id << ">");
341 
342  if( t_time_id != t_trig_id )
343  {
344  /*
345  LTRACE( plog, "Mismatch between time id <" << t_time_id << "> and trigger id <" << t_trig_id << ">" );
346  while( t_time_id < t_trig_id )
347  {
348  LDEBUG( plog, "Moving time stream forward" );
349  t_time_command = in_stream< 0 >().get();
350  t_time_data = in_stream< 0 >().data();
351  t_time_id = t_time_data->get_pkt_in_session();
352  }
353  while( t_time_id > t_trig_id )
354  {
355  LTRACE( plog, "Moving trig stream forward" );
356  t_trig_command = in_stream< 1 >().get();
357  t_trig_data = in_stream< 1 >().data();
358  t_trig_id = t_trig_data->get_id();
359  }
360  if( t_time_id != t_trig_id )
361  {*/
362  LERROR( plog, "Mismatch between time id <" << t_time_id << "> and trigger id <" << t_trig_id << ">" );
363  throw midge::node_nonfatal_error() << "Unable to match time and trigger streams";
364  /*}
365  LTRACE( plog, "Mismatch resolved: time id <" << t_time_id << "> and trigger id <" << t_trig_id << ">" );
366  */
367  }
368 
369  if( t_trig_data->get_flag() )
370  {
371  LTRACE( plog, "Triggered packet, id <" << t_trig_data->get_id() << ">" );
372 
373  if( a_ctx.f_is_new_event )
374  {
375  LDEBUG( plog, "New event" );
376  }
377  if( ! a_ctx.f_swrap_ptr->write_record( t_time_id, t_record_length_nsec * ( t_time_id - a_ctx.f_first_pkt_in_run ), t_time_data->get_raw_array(), t_bytes_per_record, a_ctx.f_is_new_event ) )
378  {
379  throw midge::node_nonfatal_error() << "Unable to write record to file; record ID: " << t_time_id;
380  }
381  a_ctx.f_is_new_event = false;
382  LTRACE( plog, "Packet written (" << t_time_id << ")" );
383  }
384  else
385  {
386  LTRACE( plog, "Untriggered packet, id <" << t_trig_id << ">" );
387  a_ctx.f_is_new_event = true;
388  }
389 
390  continue;
391  }
392 
393  } // end while ! is_canceled()
394 
395  // final attempt to finish the stream if the outer while loop is broken without the stream having been stopped or exited
396  // e.g. if cancelled first, before anything else happens
397  if( a_ctx.f_swrap_ptr )
398  {
399  LDEBUG( plog, "Finishing stream <" << a_ctx.f_stream_no << ">" );
400  f_monarch_ptr->finish_stream( a_ctx.f_stream_no );
401  a_ctx.f_swrap_ptr.reset();
402  }
403 
404  return;
405  }
408  {
409  butterfly_house::get_instance()->unregister_writer( this );
410  return;
411  }
412 
416  {
417  }
420  {
421  }
423  void triggered_writer_binding::do_apply_config( triggered_writer* a_node, const scarab::param_node& a_config ) const
424  {
425  LDEBUG( plog, "Configuring triggered_writer with:\n" << a_config );
426  a_node->set_file_num( a_config.get_value( "file-num", a_node->get_file_num() ) );
427  if( a_config.has( "device" ) )
428  {
429  const scarab::param_node& t_dev_config = a_config["device"].as_node();
430  a_node->set_bit_depth( t_dev_config.get_value( "bit-depth", a_node->get_bit_depth() ) );
431  a_node->set_data_type_size( t_dev_config.get_value( "data-type-size", a_node->get_data_type_size() ) );
432  a_node->set_sample_size( t_dev_config.get_value( "sample-size", a_node->get_sample_size() ) );
433  a_node->set_record_size( t_dev_config.get_value( "record-size", a_node->get_record_size() ) );
434  a_node->set_acq_rate( t_dev_config.get_value( "acq-rate", a_node->get_acq_rate() ) );
435  a_node->set_v_offset( t_dev_config.get_value( "v-offset", a_node->get_v_offset() ) );
436  a_node->set_v_range( t_dev_config.get_value( "v-range", a_node->get_v_range() ) );
437  }
438  a_node->set_center_freq( a_config.get_value( "center-freq", a_node->get_center_freq() ) );
439  a_node->set_freq_range( a_config.get_value( "freq-range", a_node->get_freq_range() ) );
440  return;
441  }
443  void triggered_writer_binding::do_dump_config( const triggered_writer* a_node, scarab::param_node& a_config ) const
444  {
445  LDEBUG( plog, "Dumping configuration for triggered_writer" );
446  a_config.add( "file-num", a_node->get_file_num() );
447  scarab::param_node t_dev_node;
448  t_dev_node.add( "bit-depth", a_node->get_bit_depth() );
449  t_dev_node.add( "data-type-size", a_node->get_data_type_size() );
450  t_dev_node.add( "sample-size", a_node->get_sample_size() );
451  t_dev_node.add( "record-size", a_node->get_record_size() );
452  t_dev_node.add( "acq-rate", a_node->get_acq_rate() );
453  t_dev_node.add( "v-offset", a_node->get_v_offset() );
454  t_dev_node.add( "v-range", a_node->get_v_range() );
455  a_config.add( "device", t_dev_node );
456  a_config.add( "center-freq", a_node->get_center_freq() );
457  a_config.add( "freq-range", a_node->get_freq_range() );
458  return;
459  }
460 
461 } /* namespace psyllid */
std::shared_ptr< header_wrapper > header_wrap_ptr
Base class for all writers.
Definition: egg_writer.hh:24
virtual void prepare_to_write(monarch_wrap_ptr a_mw_ptr, header_wrap_ptr a_hw_ptr)
static scarab::logger plog("batch_executor")
void exe_loop_is_running(exe_loop_context &a_ctx)
monarch_wrap_ptr f_monarch_ptr
#define PAYLOAD_SIZE
Definition: roach_packet.hh:15
virtual void do_dump_config(const triggered_writer *a_node, scarab::param_node &a_config) const
const int8_t * get_raw_array() const
void exe_loop_not_running(exe_loop_context &a_ctx)
std::shared_ptr< monarch_wrapper > monarch_wrap_ptr
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A consumer to that writes triggered time ROACH packets to an egg file.
LOGGER(plog, "egg_writer")
virtual void execute(midge::diptera *a_midge=nullptr)
virtual void do_apply_config(triggered_writer *a_node, const scarab::param_node &a_config) const