Psyllid  v1.12.4
Project 8 Data Acquisisition Software
egg3_reader.cc
Go to the documentation of this file.
1 /*
2  * egg3_reader.cc
3  *
4  * Created on: Dec. 14, 2017
5  * Author: laroque
6  */
7 
8 #include <chrono>
9 
10 #include "daq_control.hh"
11 #include "egg3_reader.hh"
12 #include "psyllid_error.hh"
13 #include "time_data.hh"
14 #include "M3Monarch.hh"
15 
16 #include "logger.hh"
17 #include "M3Monarch.hh"
18 #include "param.hh"
19 
20 using midge::stream;
21 
22 namespace psyllid
23 {
24  REGISTER_NODE_AND_BUILDER( egg3_reader, "egg3-reader", egg3_reader_binding );
25 
26  LOGGER( plog, "egg3_reader" );
27 
28  // egg3_reader methods
30  f_egg( nullptr ),
31  f_egg_path( "/dev/null" ),
32  f_read_n_records( 0 ),
33  f_repeat_egg( false ),
34  f_length( 10 ),
35  f_start_paused( true ),
36  f_paused( true ),
37  f_record_length( 0 ),
38  f_pkt_id_offset( 0 )
39  {
40  }
41 
43  {
44  cleanup_file();
45  }
46 
48  {
49  f_paused = f_start_paused;
50 
51  out_buffer< 0 >().initialize( f_length );
52 
53  LDEBUG( plog, "opening egg file [" << f_egg_path << "]" );
54  f_egg = monarch3::Monarch3::OpenForReading( f_egg_path );
55  f_egg->ReadHeader();
56  // do we want/need to do anything with the header?
57  const monarch3::M3Header *t_egg_header = f_egg->GetHeader();
58  LDEBUG( plog, "egg header content:\n" );
59  LDEBUG( plog, *t_egg_header );
60  //TODO this should probably not assume single-channel mode...
61  f_record_length = t_egg_header->ChannelHeaders()[0].GetRecordSize();
62  return;
63 
64  }
65 
66  void egg3_reader::execute( midge::diptera* a_midge )
67  {
68  try
69  {
70  LDEBUG( plog, "Executing the egg3_reader" );
71  //TODO use header to loop streams so we can send more than one?
72  //const monarch3::M3Header *t_egg_header = f_egg->GetHeader();
73  const monarch3::M3Stream* t_stream = f_egg->GetStream( 0 );
74  const monarch3::M3Record* t_record = t_stream->GetChannelRecord( 0 );
75 
76  time_data* t_data = nullptr;
77 
78  // starting not in a paused state is not currently known to work
79  if ( !f_paused )
80  {
81  if( ! out_stream< 0 >().set( stream::s_start ) ) return;
82  }
83 
84  uint64_t t_records_read = 0;
85 
86  // starting execution loop
87  while (! is_canceled() )
88  {
89  if( (out_stream< 0 >().get() == stream::s_stop) )
90  {
91  LWARN( plog, "Output stream(s) have stop condition" );
92  break;
93  }
94  if( have_instruction() )
95  {
96  if( f_paused && use_instruction() == midge::instruction::resume )
97  {
98  LDEBUG( plog, "egg reader resuming" );
99  if( ! out_stream< 0 >().set( stream::s_start ) ) throw midge::node_nonfatal_error() << "Stream 0 error while starting";
100  f_paused = false;
101  t_records_read = 0;
102  }
103  else if ( !f_paused && use_instruction() == midge::instruction::pause )
104  {
105  LDEBUG( plog, "egg reader pausing" );
106  if( ! out_stream< 0 >().set( stream::s_stop ) ) throw midge::node_nonfatal_error() << "Stream 0 error while stopping";
107  f_paused = true;
108  }
109  }
110  // only read if not paused:
111  if ( ! f_paused )
112  {
113  //if ( !read_slice(t_data, t_stream, t_record) ) break;
114  bool read_slice_ok = read_slice(t_data, t_stream, t_record);
115  if (read_slice_ok) {
116  t_records_read++;
117  }
118  if ( !read_slice_ok || (f_read_n_records > 0 && t_records_read >= f_read_n_records) )
119  {
120  LINFO( plog, "breaking out of loop because record limit or end of file reached" );
121  std::shared_ptr< daq_control > t_daq_control = use_daq_control();
122  t_daq_control->stop_run();
123  }
124  // add some sleep to try and not lap downstream nodes
125  std::this_thread::sleep_for(std::chrono::microseconds(100));
126  }
127  else
128  {
129  std::this_thread::sleep_for(std::chrono::milliseconds(100));
130  }
131  }
132  }
133  catch( std::exception )
134  {
135  LWARN( plog, "got an exception, throwing" );
136  a_midge->throw_ex( std::current_exception() );
137  }
138  LDEBUG( plog, "at the end of egg3 execute" );
139  }
140 
142  {
143  LDEBUG( plog, "finalize the egg3_reader" );
144  out_buffer< 0 >().finalize();
145  LDEBUG( plog, "buffer finalized" );
146  cleanup_file();
147  return;
148  }
149 
150  bool egg3_reader::read_slice(time_data* t_data, const monarch3::M3Stream* t_stream, const monarch3::M3Record* t_record)
151  {
152  LDEBUG( plog, "reading a slice" );
153  // update t_data to point to the next slot in the output stream
154  t_data = out_stream< 0 >().data();
155  // read next record in egg file, writing into the output_stream
156  if ( !t_stream->ReadRecord() )
157  {
158  if ( !f_repeat_egg )
159  {
160  LDEBUG( plog, "reached end of file, stopping" );
161  return false;
162  }
163  else
164  {
165  LDEBUG( plog, "reached end of file, restarting" );
166  t_stream->ReadRecord( -1 * int(t_stream->GetRecordCountInFile()) );
167  // when we loop back, we want the record ID to increment and have a gap relative to the end of the file
168  f_pkt_id_offset += 1 + t_stream->GetNRecordsInFile();
169  }
170  }
171  std::copy(&t_record->GetData()[0], &t_record->GetData()[f_record_length*2], &t_data->get_array()[0][0]);
172 
173  // packet ID logic
174  //TODO do this pkt ID logic reasonable?
175  t_data->set_pkt_in_batch( t_record->GetRecordId() + f_pkt_id_offset );
176  t_data->set_pkt_in_session( t_record->GetRecordId() + f_pkt_id_offset );
177  if ( !out_stream< 0 >().set( stream::s_run ) )
178  {
179  LERROR( plog, "egg reader exiting due to stream error" );
180  return false;
181  }
182  return true;
183  }
184 
186  {
187  LDEBUG( plog, "cleaning up file" );
188  if ( f_egg == NULL ) return;
189  LDEBUG( plog, "clean egg" );
190  if ( f_egg->GetState() != monarch3::Monarch3::eClosed )
191  {
192  LDEBUG( plog, "actually close egg" );
193  f_egg->FinishReading();
194  }
195  }
196 
197  // egg3_reader_binding methods
200  {
201  }
202 
204  {
205  }
206 
207  void egg3_reader_binding::do_apply_config( egg3_reader* a_node, const scarab::param_node& a_config ) const
208  {
209  LDEBUG( plog, "Configuring egg3_reader with:\n" << a_config );
210  a_node->set_egg_path( a_config.get_value( "egg-path", a_node->get_egg_path() ) );
211  a_node->set_read_n_records( a_config.get_value( "read-n-records", a_node->get_read_n_records() ) );
212  a_node->set_repeat_egg( a_config.get_value( "repeat-egg", a_node->get_repeat_egg() ) );
213  a_node->set_length( a_config.get_value( "length", a_node->get_length() ) );
214  a_node->set_start_paused( a_config.get_value( "start-paused", a_node->get_start_paused() ) );
215  return;
216  }
217 
218  void egg3_reader_binding::do_dump_config( const egg3_reader* a_node, scarab::param_node& a_config ) const
219  {
220  LDEBUG( plog, "Dumping configuration for egg3_reader" );
221  a_config.add( "egg-path", scarab::param_value( a_node->get_egg_path() ) );
222  a_config.add( "read-n-records", scarab::param_value( a_node->get_read_n_records() ) );
223  a_config.add( "repeat-egg", scarab::param_value( a_node->get_repeat_egg() ) );
224  a_config.add( "length", scarab::param_value( a_node->get_length() ) );
225  a_config.add( "start-paused", scarab::param_value( a_node->get_length() ) );
226  return;
227  }
228 
229 } /* namespace psyllid */
virtual void execute(midge::diptera *a_midge=nullptr)
Definition: egg3_reader.cc:66
static scarab::logger plog("batch_executor")
uint32_t f_record_length
Definition: egg3_reader.hh:67
virtual void initialize()
Definition: egg3_reader.cc:47
const iq_t * get_array() const
Definition: time_data.hh:39
virtual void do_apply_config(egg3_reader *a_node, const scarab::param_node &a_config) const
Definition: egg3_reader.cc:207
void set_pkt_in_batch(uint32_t a_pkt)
bool read_slice(time_data *t_data, const monarch3::M3Stream *t_stream, const monarch3::M3Record *t_record)
Definition: egg3_reader.cc:150
virtual void finalize()
Definition: egg3_reader.cc:141
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A producer to read time-domain slices from an egg file and place them in time data buffers...
Definition: egg3_reader.hh:51
uint64_t f_pkt_id_offset
Definition: egg3_reader.hh:68
LOGGER(plog, "egg_writer")
virtual ~egg3_reader()
Definition: egg3_reader.cc:42
virtual void do_dump_config(const egg3_reader *a_node, scarab::param_node &a_config) const
Definition: egg3_reader.cc:218