Psyllid  v1.12.4
Project 8 Data Acquisisition Software
streaming_writer.cc
Go to the documentation of this file.
1 /*
2  * streaming_writer.cc
3  *
4  * Created on: May 31, 2016
5  * Author: nsoblath
6  */
7 
8 #include "streaming_writer.hh"
9 
10 #include "butterfly_house.hh"
11 #include "psyllid_error.hh"
12 
13 #include "midge_error.hh"
14 
15 #include "digital.hh"
16 #include "logger.hh"
17 #include "time.hh"
18 
19 #include <cmath>
20 
21 using midge::stream;
22 
23 using std::string;
24 using std::vector;
25 
26 namespace psyllid
27 {
28  REGISTER_NODE_AND_BUILDER( streaming_writer, "streaming-writer", streaming_writer_binding );
29 
30  LOGGER( plog, "streaming_writer" );
31 
33  egg_writer(),
34  f_file_num( 0 ),
35  f_bit_depth( 8 ),
36  f_data_type_size( 1 ),
37  f_sample_size( 2 ),
38  f_record_size( 4096 ),
39  f_acq_rate( 100 ),
40  f_v_offset( 0. ),
41  f_v_range( 0.5 ),
42  f_center_freq( 50.e6 ),
43  f_freq_range( 100.e6 ),
44  f_last_pkt_in_batch( 0 ),
45  f_monarch_ptr(),
46  f_stream_no( 0 )
47  {
48  }
49 
51  {
52  }
53 
55  {
56  f_monarch_ptr = a_mw_ptr;
57 
58  scarab::dig_calib_params t_dig_params;
59  scarab::get_calib_params( f_bit_depth, f_data_type_size, f_v_offset, f_v_range, true, &t_dig_params );
60 
61  vector< unsigned > t_chan_vec;
62  f_stream_no = a_hw_ptr->header().AddStream( "Psyllid - ROACH2",
63  f_acq_rate, f_record_size, f_sample_size, f_data_type_size,
64  monarch3::sDigitizedS, f_bit_depth, monarch3::sBitsAlignedLeft, &t_chan_vec );
65 
66  //unsigned i_chan_psyllid = 0; // this is the channel number in psyllid, as opposed to the channel number in the monarch file
67  for( std::vector< unsigned >::const_iterator it = t_chan_vec.begin(); it != t_chan_vec.end(); ++it )
68  {
69  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageOffset( t_dig_params.v_offset );
70  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageRange( t_dig_params.v_range );
71  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetDACGain( t_dig_params.dac_gain );
72  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyMin( f_center_freq - 0.5 * f_freq_range );
73  a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyRange( f_freq_range );
74 
75  //++i_chan_psyllid;
76  }
77 
78  return;
79  }
80 
82  {
83  butterfly_house::get_instance()->register_writer( this, f_file_num );
84  return;
85  }
86 
87  void streaming_writer::execute( midge::diptera* a_midge )
88  {
89  LDEBUG( plog, "execute streaming writer" );
90  try
91  {
92  midge::enum_t t_time_command = stream::s_none;
93 
94  time_data* t_time_data = nullptr;
95 
96  stream_wrap_ptr t_swrap_ptr;
97 
98  uint64_t t_bytes_per_record = f_record_size * f_sample_size * f_data_type_size;
99  uint64_t t_record_length_nsec = llrint( (double)(PAYLOAD_SIZE / 2) / (double)f_acq_rate * 1.e3 );
100 
101  uint64_t t_first_pkt_in_run = 0;
102 
103  bool t_is_new_acquisition = true;
104  bool t_start_file_with_next_data = false;
105 
106  while( ! is_canceled() )
107  {
108  t_time_command = in_stream< 0 >().get();
109  if( t_time_command == stream::s_none ) continue;
110  if( t_time_command == stream::s_error ) break;
111 
112  LTRACE( plog, "Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
113 
114  if( t_time_command == stream::s_exit )
115  {
116  LDEBUG( plog, "Streaming writer is exiting" );
117 
118  if( t_swrap_ptr )
119  {
120  f_monarch_ptr->finish_stream( f_stream_no );
121  t_swrap_ptr.reset();
122  }
123 
124  break;
125  }
126 
127  if( t_time_command == stream::s_stop )
128  {
129  LDEBUG( plog, "Streaming writer is stopping" );
130 
131  if( t_swrap_ptr )
132  {
133  f_monarch_ptr->finish_stream( f_stream_no );
134  t_swrap_ptr.reset();
135  }
136 
137  continue;
138  }
139 
140  if( t_time_command == stream::s_start )
141  {
142  LDEBUG( plog, "Will start file with next data" );
143 
144  if( t_swrap_ptr ) t_swrap_ptr.reset();
145 
146  LDEBUG( plog, "Getting stream <" << f_stream_no << ">" );
147  t_swrap_ptr = f_monarch_ptr->get_stream( f_stream_no );
148 
149  t_start_file_with_next_data = true;
150  continue;
151  }
152 
153  if( t_time_command == stream::s_run )
154  {
155  t_time_data = in_stream< 0 >().data();
156 
157  if( t_start_file_with_next_data )
158  {
159  LDEBUG( plog, "Handling first packet in run" );
160 
161  t_first_pkt_in_run = t_time_data->get_pkt_in_session();
162 
163  t_is_new_acquisition = true;
164 
165  t_start_file_with_next_data = false;
166  }
167 
168  uint64_t t_time_id = t_time_data->get_pkt_in_session();
169  LTRACE( plog, "Writing packet (in session) " << t_time_id );
170 
171  uint32_t t_expected_pkt_in_batch = f_last_pkt_in_batch + 1;
172  if( t_expected_pkt_in_batch >= BATCH_COUNTER_SIZE ) t_expected_pkt_in_batch = 0;
173  if( ! t_is_new_acquisition && t_time_data->get_pkt_in_batch() != t_expected_pkt_in_batch ) t_is_new_acquisition = true;
174  f_last_pkt_in_batch = t_time_data->get_pkt_in_batch();
175 
176  if( ! t_swrap_ptr->write_record( t_time_id, t_record_length_nsec * ( t_time_id - t_first_pkt_in_run ), t_time_data->get_raw_array(), t_bytes_per_record, t_is_new_acquisition ) )
177  {
178  throw midge::node_nonfatal_error() << "Unable to write record to file; record ID: " << t_time_id;
179  }
180 
181  LTRACE( plog, "Packet written (" << t_time_id << ")" );
182 
183  t_is_new_acquisition = false;
184 
185  continue;
186  }
187 
188  } // end while( ! is_cancelled() )
189 
190  // final attempt to finish the stream if the outer while loop is broken without the stream having been stopped or exited
191  // e.g. if cancelled first, before anything else happens
192  if( t_swrap_ptr )
193  {
194  f_monarch_ptr->finish_stream( f_stream_no );
195  t_swrap_ptr.reset();
196  }
197 
198  return;
199  }
200  catch(...)
201  {
202  LWARN( plog, "an error occurred executing streaming writer" );
203  if( a_midge ) a_midge->throw_ex( std::current_exception() );
204  else throw;
205  }
206  }
207 
209  {
210  LDEBUG( plog, "finalize streaming writer" );
211  butterfly_house::get_instance()->unregister_writer( this );
212  return;
213  }
214 
215 
218  {
219  }
220 
222  {
223  }
224 
225  void streaming_writer_binding::do_apply_config( streaming_writer* a_node, const scarab::param_node& a_config ) const
226  {
227  LDEBUG( plog, "Configuring streaming_writer with:\n" << a_config );
228  a_node->set_file_num( a_config.get_value( "file-num", a_node->get_file_num() ) );
229  if( a_config.has( "device" ) )
230  {
231  const scarab::param_node& t_dev_config = a_config["device"].as_node();
232  a_node->set_bit_depth( t_dev_config.get_value( "bit-depth", a_node->get_bit_depth() ) );
233  a_node->set_data_type_size( t_dev_config.get_value( "data-type-size", a_node->get_data_type_size() ) );
234  a_node->set_sample_size( t_dev_config.get_value( "sample-size", a_node->get_sample_size() ) );
235  a_node->set_record_size( t_dev_config.get_value( "record-size", a_node->get_record_size() ) );
236  a_node->set_acq_rate( t_dev_config.get_value( "acq-rate", a_node->get_acq_rate() ) );
237  a_node->set_v_offset( t_dev_config.get_value( "v-offset", a_node->get_v_offset() ) );
238  a_node->set_v_range( t_dev_config.get_value( "v-range", a_node->get_v_range() ) );
239  }
240  a_node->set_center_freq( a_config.get_value( "center-freq", a_node->get_center_freq() ) );
241  a_node->set_freq_range( a_config.get_value( "freq-range", a_node->get_freq_range() ) );
242  return;
243  }
244 
245  void streaming_writer_binding::do_dump_config( const streaming_writer* a_node, scarab::param_node& a_config ) const
246  {
247  LDEBUG( plog, "Dumping configuration for streaming_writer" );
248  a_config.add( "file-num", a_node->get_file_num() );
249  scarab::param_node t_dev_node = scarab::param_node();
250  t_dev_node.add( "bit-depth", a_node->get_bit_depth() );
251  t_dev_node.add( "data-type-size", a_node->get_data_type_size() );
252  t_dev_node.add( "sample-size", a_node->get_sample_size() );
253  t_dev_node.add( "record-size", a_node->get_record_size() );
254  t_dev_node.add( "acq-rate", a_node->get_acq_rate() );
255  t_dev_node.add( "v-offset", a_node->get_v_offset() );
256  t_dev_node.add( "v-range", a_node->get_v_range() );
257  a_config.add( "device", t_dev_node );
258  a_config.add( "center-freq", a_node->get_center_freq() );
259  a_config.add( "freq-range", a_node->get_freq_range() );
260  return;
261  }
262 
263 
264 } /* namespace psyllid */
std::shared_ptr< header_wrapper > header_wrap_ptr
Base class for all writers.
Definition: egg_writer.hh:24
std::shared_ptr< stream_wrapper > stream_wrap_ptr
static scarab::logger plog("batch_executor")
virtual void do_apply_config(streaming_writer *a_node, const scarab::param_node &a_config) const
#define PAYLOAD_SIZE
Definition: roach_packet.hh:15
const int8_t * get_raw_array() const
virtual void execute(midge::diptera *a_midge=nullptr)
std::shared_ptr< monarch_wrapper > monarch_wrap_ptr
A consumer to that writes all time ROACH packets to an egg file.
uint32_t get_pkt_in_batch() const
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
LOGGER(plog, "egg_writer")
monarch_wrap_ptr f_monarch_ptr
virtual void prepare_to_write(monarch_wrap_ptr a_mw_ptr, header_wrap_ptr a_hw_ptr)
virtual void do_dump_config(const streaming_writer *a_node, scarab::param_node &a_config) const
#define BATCH_COUNTER_SIZE
Definition: roach_packet.hh:18