13 #include "midge_error.hh" 36 f_data_type_size( 1 ),
38 f_record_size( 4096 ),
42 f_center_freq( 50.e6 ),
43 f_freq_range( 100.e6 ),
44 f_last_pkt_in_batch( 0 ),
58 scarab::dig_calib_params t_dig_params;
59 scarab::get_calib_params( f_bit_depth, f_data_type_size, f_v_offset, f_v_range,
true, &t_dig_params );
61 vector< unsigned > t_chan_vec;
62 f_stream_no = a_hw_ptr->header().AddStream(
"Psyllid - ROACH2",
63 f_acq_rate, f_record_size, f_sample_size, f_data_type_size,
64 monarch3::sDigitizedS, f_bit_depth, monarch3::sBitsAlignedLeft, &t_chan_vec );
67 for( std::vector< unsigned >::const_iterator it = t_chan_vec.begin(); it != t_chan_vec.end(); ++it )
69 a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageOffset( t_dig_params.v_offset );
70 a_hw_ptr->header().GetChannelHeaders()[ *it ].SetVoltageRange( t_dig_params.v_range );
71 a_hw_ptr->header().GetChannelHeaders()[ *it ].SetDACGain( t_dig_params.dac_gain );
72 a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyMin( f_center_freq - 0.5 * f_freq_range );
73 a_hw_ptr->header().GetChannelHeaders()[ *it ].SetFrequencyRange( f_freq_range );
83 butterfly_house::get_instance()->register_writer(
this, f_file_num );
89 LDEBUG(
plog,
"execute streaming writer" );
92 midge::enum_t t_time_command = stream::s_none;
98 uint64_t t_bytes_per_record = f_record_size * f_sample_size * f_data_type_size;
99 uint64_t t_record_length_nsec = llrint( (
double)(
PAYLOAD_SIZE / 2) / (
double)f_acq_rate * 1.e3 );
101 uint64_t t_first_pkt_in_run = 0;
103 bool t_is_new_acquisition =
true;
104 bool t_start_file_with_next_data =
false;
106 while( ! is_canceled() )
108 t_time_command = in_stream< 0 >().
get();
109 if( t_time_command == stream::s_none )
continue;
110 if( t_time_command == stream::s_error )
break;
112 LTRACE(
plog,
"Egg writer reading stream 0 (time) at index " << in_stream< 0 >().get_current_index() );
114 if( t_time_command == stream::s_exit )
116 LDEBUG(
plog,
"Streaming writer is exiting" );
127 if( t_time_command == stream::s_stop )
129 LDEBUG(
plog,
"Streaming writer is stopping" );
140 if( t_time_command == stream::s_start )
142 LDEBUG(
plog,
"Will start file with next data" );
144 if( t_swrap_ptr ) t_swrap_ptr.reset();
149 t_start_file_with_next_data =
true;
153 if( t_time_command == stream::s_run )
155 t_time_data = in_stream< 0 >().data();
157 if( t_start_file_with_next_data )
159 LDEBUG(
plog,
"Handling first packet in run" );
161 t_first_pkt_in_run = t_time_data->get_pkt_in_session();
163 t_is_new_acquisition =
true;
165 t_start_file_with_next_data =
false;
168 uint64_t t_time_id = t_time_data->get_pkt_in_session();
169 LTRACE(
plog,
"Writing packet (in session) " << t_time_id );
173 if( ! t_is_new_acquisition && t_time_data->
get_pkt_in_batch() != t_expected_pkt_in_batch ) t_is_new_acquisition =
true;
176 if( ! t_swrap_ptr->write_record( t_time_id, t_record_length_nsec * ( t_time_id - t_first_pkt_in_run ), t_time_data->
get_raw_array(), t_bytes_per_record, t_is_new_acquisition ) )
178 throw midge::node_nonfatal_error() <<
"Unable to write record to file; record ID: " << t_time_id;
181 LTRACE(
plog,
"Packet written (" << t_time_id <<
")" );
183 t_is_new_acquisition =
false;
202 LWARN(
plog,
"an error occurred executing streaming writer" );
203 if( a_midge ) a_midge->throw_ex( std::current_exception() );
210 LDEBUG(
plog,
"finalize streaming writer" );
211 butterfly_house::get_instance()->unregister_writer(
this );
227 LDEBUG(
plog,
"Configuring streaming_writer with:\n" << a_config );
228 a_node->set_file_num( a_config.get_value(
"file-num", a_node->get_file_num() ) );
229 if( a_config.has(
"device" ) )
231 const scarab::param_node& t_dev_config = a_config[
"device"].as_node();
232 a_node->set_bit_depth( t_dev_config.get_value(
"bit-depth", a_node->get_bit_depth() ) );
233 a_node->set_data_type_size( t_dev_config.get_value(
"data-type-size", a_node->get_data_type_size() ) );
234 a_node->set_sample_size( t_dev_config.get_value(
"sample-size", a_node->get_sample_size() ) );
235 a_node->set_record_size( t_dev_config.get_value(
"record-size", a_node->get_record_size() ) );
236 a_node->set_acq_rate( t_dev_config.get_value(
"acq-rate", a_node->get_acq_rate() ) );
237 a_node->set_v_offset( t_dev_config.get_value(
"v-offset", a_node->get_v_offset() ) );
238 a_node->set_v_range( t_dev_config.get_value(
"v-range", a_node->get_v_range() ) );
240 a_node->set_center_freq( a_config.get_value(
"center-freq", a_node->get_center_freq() ) );
241 a_node->set_freq_range( a_config.get_value(
"freq-range", a_node->get_freq_range() ) );
247 LDEBUG(
plog,
"Dumping configuration for streaming_writer" );
248 a_config.add(
"file-num", a_node->get_file_num() );
249 scarab::param_node t_dev_node = scarab::param_node();
250 t_dev_node.add(
"bit-depth", a_node->get_bit_depth() );
251 t_dev_node.add(
"data-type-size", a_node->get_data_type_size() );
252 t_dev_node.add(
"sample-size", a_node->get_sample_size() );
253 t_dev_node.add(
"record-size", a_node->get_record_size() );
254 t_dev_node.add(
"acq-rate", a_node->get_acq_rate() );
255 t_dev_node.add(
"v-offset", a_node->get_v_offset() );
256 t_dev_node.add(
"v-range", a_node->get_v_range() );
257 a_config.add(
"device", t_dev_node );
258 a_config.add(
"center-freq", a_node->get_center_freq() );
259 a_config.add(
"freq-range", a_node->get_freq_range() );
streaming_writer_binding()
std::shared_ptr< header_wrapper > header_wrap_ptr
Base class for all writers.
std::shared_ptr< stream_wrapper > stream_wrap_ptr
static scarab::logger plog("batch_executor")
virtual void do_apply_config(streaming_writer *a_node, const scarab::param_node &a_config) const
virtual void initialize()
const int8_t * get_raw_array() const
virtual ~streaming_writer()
virtual void execute(midge::diptera *a_midge=nullptr)
std::shared_ptr< monarch_wrapper > monarch_wrap_ptr
A consumer to that writes all time ROACH packets to an egg file.
uint32_t get_pkt_in_batch() const
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
virtual ~streaming_writer_binding()
LOGGER(plog, "egg_writer")
monarch_wrap_ptr f_monarch_ptr
virtual void prepare_to_write(monarch_wrap_ptr a_mw_ptr, header_wrap_ptr a_hw_ptr)
unsigned f_last_pkt_in_batch
virtual void do_dump_config(const streaming_writer *a_node, scarab::param_node &a_config) const
#define BATCH_COUNTER_SIZE