Psyllid  v1.12.4
Project 8 Data Acquisisition Software
event_builder.cc
Go to the documentation of this file.
1 /*
2  * event_builder.cc
3  *
4  * Created on: Dec 29, 2015
5  * Author: nsoblath
6  */
7 
8 #include "event_builder.hh"
9 
10 #include <limits>
11 
12 using midge::stream;
13 
14 namespace psyllid
15 {
16  REGISTER_NODE_AND_BUILDER( event_builder, "event-builder", event_builder_binding );
17 
18  LOGGER( plog, "event_builder" );
19 
21  f_length( 10 ),
22  f_pretrigger( 0 ),
23  f_skip_tolerance( 0 ),
24  f_n_triggers( 1 ),
25  f_state( state_t::untriggered ),
26  f_pretrigger_buffer(),
27  f_skip_buffer()
28  {
29  }
30 
32  {
33  }
34 
36  {
37  f_pretrigger_buffer.resize( f_pretrigger + 1 );
38  f_skip_buffer.resize( f_skip_tolerance + 1);
39  out_buffer< 0 >().initialize( f_length );
40  return;
41  }
42 
43  void event_builder::execute( midge::diptera* a_midge )
44  {
45  try
46  {
47  f_pretrigger_buffer.clear();
48  f_skip_buffer.clear();
50 
51  midge::enum_t t_in_command = stream::s_none;
52  trigger_flag* t_trigger_flag = nullptr;
53  trigger_flag* t_write_flag = nullptr;
54  unsigned t_trigger_count = 0;
55 
56  bool t_current_trig_flag = false;
57  bool t_current_trig_high_thr = false;
58 
59  while( ! is_canceled() )
60  {
61  t_in_command = in_stream< 0 >().get();
62  if( t_in_command == stream::s_none ) continue;
63  if( t_in_command == stream::s_error ) break;
64 
65  LTRACE( plog, "Event builder reading stream at index " << in_stream< 0 >().get_current_index() );
66 
67  t_trigger_flag = in_stream< 0 >().data();
68  t_write_flag = out_stream< 0 >().data();
69 
70  if( t_in_command == stream::s_start )
71  {
72  LDEBUG( plog, "Starting the event builder" );
73  if( ! out_stream< 0 >().set( stream::s_start ) ) break;
74  continue;
75  }
76 
77  if( t_in_command == stream::s_run )
78  {
79  t_current_trig_flag = t_trigger_flag->get_flag();
80  t_current_trig_high_thr = t_trigger_flag->get_high_threshold();
81 
82  LTRACE( plog, "Event builder received id <" << t_trigger_flag->get_id() << "> with flag value <" << t_trigger_flag->get_flag() << ">" );
83 
84  // if currently untriggered, fill pretrigger buffer
86  {
87  f_pretrigger_buffer.push_back(t_trigger_flag->get_id());
88  LTRACE( plog, "new id in pt buffer: " << f_pretrigger_buffer.back() );
89 
90  }
92  {
93  f_skip_buffer.push_back( t_trigger_flag->get_id());
94  LTRACE( plog, "new id in skip buffer: " << f_skip_buffer.back() );
95  }
96  // if state is skipping or triggered fill both buffers
97  else
98  {
99  f_skip_buffer.push_back( t_trigger_flag->get_id());
100  f_pretrigger_buffer.push_back( t_trigger_flag->get_id());
101  }
102 
104  {
105  LTRACE( plog, "Currently in untriggered state" );
106  if( t_current_trig_flag and t_current_trig_high_thr == true)
107  {
108  LINFO( plog, "New trigger" );
109  ++t_trigger_count;
110  if (t_trigger_count == f_n_triggers)
111  {
112  t_trigger_count = 0;
113  // flush the pretrigger buffer as true, which includes the current trig id
114  while( ! f_pretrigger_buffer.empty() )
115  {
116  LTRACE( plog, "Current state untriggered. Writing id "<<f_pretrigger_buffer.front()<<" as true" );
117  if( ! write_output_from_ptbuff_front( true, t_write_flag ) )
118  {
119  goto exit_outer_loop;
120  }
121  // advance our output data pointer to the next in the stream
122  t_write_flag = out_stream< 0 >().data();
123  }
124  // set state to waiting
125  LDEBUG( plog, "Next state is triggered" );
127  }
128  else
129  {
130  // set state to waiting
131  LDEBUG( plog, "Next state is collecting" );
133  }
134  }
135  else
136  {
137  LTRACE( plog, "No new trigger; Writing to from pretrig buffer only if buffer is full: " << f_pretrigger_buffer.full() );
138  // contents of the buffer are the existing pretrigger plus the current trig id
139  // only write out from the front of the buffer if the buffer is full; otherwise we're filling the buffer
140  if( f_pretrigger_buffer.full() )
141  {
142  LTRACE( plog, "Current state untriggered. Writing id "<<f_pretrigger_buffer.front()<<" as false");
143  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
144  {
145  break;
146  }
147  // pretrigger buffer is full - 1
148  }
149  }
150  }
152  {
153  LTRACE( plog, "Currently in collecting state" );
154  if (t_current_trig_flag)
155  {
156  ++t_trigger_count;
157  LDEBUG(plog, "Got another trigger: "<<t_trigger_count<<", need N triggers: "<<f_n_triggers);
158  if (t_trigger_count == f_n_triggers)
159  {
160  t_trigger_count = 0;
161  // flush the pretrigger buffer as true, which includes the current trig id
162  while( ! f_pretrigger_buffer.empty() )
163  {
164  LTRACE( plog, "Current state waiting. Writing id "<<f_pretrigger_buffer.front()<<" as true");
165  if( ! write_output_from_ptbuff_front( true, t_write_flag ) )
166  {
167  goto exit_outer_loop;
168  }
169  // advance our output data pointer to the next in the stream
170  t_write_flag = out_stream< 0 >().data();
171  }
172  while( ! f_skip_buffer.empty() )
173  {
174  LTRACE( plog, "Current state waiting. Writing id "<<f_skip_buffer.front()<<" as true");
175  if( ! write_output_from_skipbuff_front( true, t_write_flag ) )
176  {
177  goto exit_outer_loop;
178  }
179  // advance our output data pointer to the next in the stream
180  t_write_flag = out_stream< 0 >().data();
181  }
182  // set state to triggered
183  LTRACE( plog, "pt buffer is empty: "<<f_pretrigger_buffer.empty()<<", skip buffer is emptry: "<<f_skip_buffer.empty()<<". Next state is triggered");
185  LDEBUG( plog, "Next state is triggered");
186  }
187  }
188  if( f_skip_buffer.full() )
189  {
190  LDEBUG(plog, "Not enough triggers arrived. Capacities and sizes are (pre/skip): "<<f_pretrigger_buffer.capacity()<<"/"<<f_pretrigger_buffer.size()<<" "<<f_skip_buffer.capacity()<<"/"<<f_skip_buffer.size());
191  LTRACE( plog, "first and last ids are: "<<f_pretrigger_buffer.front()<<"/"<<f_pretrigger_buffer.back()<<", "<<f_skip_buffer.front()<<"/"<<f_skip_buffer.back());
192  t_trigger_count = 0;
193  if (f_skip_buffer.capacity() >= f_pretrigger_buffer.capacity())
194  {
195  while( ! f_pretrigger_buffer.empty() )
196  {
197  LTRACE( plog, "Current state waiting. Writing id "<<f_pretrigger_buffer.front()<<" as false");
198  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
199  {
200  goto exit_outer_loop;
201  }
202  // advance our output data pointer to the next in the stream
203  t_write_flag = out_stream< 0 >().data();
204  }
205  // empty skip buffer, write as false and fill pretrigger buffer
206  while( f_skip_buffer.size() >= f_pretrigger_buffer.capacity() )
207  {
208  LTRACE( plog, "Current state waiting. Writing id "<<f_skip_buffer.front()<<" as false");
209  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
210  {
211  goto exit_outer_loop;
212  }
213  // advance our output data pointer to the next in the stream
214  t_write_flag = out_stream< 0 >().data();
215  }
216  //
217  while( ! f_skip_buffer.empty())
218  {
219  LTRACE(plog, "Writing skip buffer front: "<<f_skip_buffer.front());
220  f_pretrigger_buffer.push_back(f_skip_buffer.front());
221  LTRACE(plog, "to pt buffer back: "<<f_pretrigger_buffer.back());
222  f_skip_buffer.pop_front();
223  }
224  LTRACE( plog, "Finished moving IDs. Capacities and sizes are (pre/skip): "<<f_pretrigger_buffer.capacity()<<"/"<<f_pretrigger_buffer.size()<<" "<<f_skip_buffer.capacity()<<"/"<<f_skip_buffer.size());
225  }
226  else
227  {
228  while( f_pretrigger_buffer.capacity() <= f_skip_buffer.size() + f_pretrigger_buffer.size() )
229  {
230  LTRACE( plog, "Current state waiting. Writing id "<<f_pretrigger_buffer.front()<<" as false");
231  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
232  {
233  goto exit_outer_loop;
234  }
235  // advance our output data pointer to the next in the stream
236  t_write_flag = out_stream< 0 >().data();
237  }
238  while( !f_skip_buffer.empty() )
239  {
240  f_pretrigger_buffer.push_back(f_skip_buffer.front());
241  f_skip_buffer.pop_front();
242  }
243  LTRACE( plog, "Finished moving IDs. Capacities and sizes are (pre/skip): " << f_pretrigger_buffer.capacity() << "/" << f_pretrigger_buffer.size() << " " << f_skip_buffer.capacity() << "/" << f_skip_buffer.size() );
244  }
245  // set state to untriggered
247  LDEBUG( plog, "Next state is untriggered" );
248  }
249  }
250  else if( f_state == state_t::triggered )
251  {
252  LTRACE( plog, "Currently in triggered state" );
253  if( t_current_trig_flag )
254  {
255  LTRACE( plog, "Continuing as triggered" );
256  // contents of the buffer (the current trig id) need to be written out
257  // write the one thing in the pt buffer as true, which is the current trig id
258  LDEBUG( plog, "Current state triggered. Writing id "<<f_skip_buffer.front()<<" as true");
259 
260  if( ! write_output_from_skipbuff_front( true, t_write_flag ) )
261  {
262  break;
263  }
264  // current front of pretrigger has already been written
265  f_pretrigger_buffer.pop_front();
266  }
267  else
268  {
269  LDEBUG( plog, "No new trigger; Switching state" );
270  // contents of the skip buffer (the current trig id) are the first ids to be skipped
271  // only write out if the buffer is full (in this case, equivalent to f_skip_tolerance == 0)
272  if( f_skip_buffer.full() )
273  {
274  // no need to write, id is also stored in pretrigger buffer
275  f_skip_buffer.clear();
276  LDEBUG( plog, "Next state is untriggered");
277  // in this case, next state is untriggered
279 
280  // if pretrigger is also full write id out as true
281  if( f_pretrigger_buffer.full())
282  {
283  LDEBUG( plog, "Current state triggered. Writing id " << f_pretrigger_buffer.front() << " as false" );
284  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
285  {
286  break;
287  }
288  }
289  }
290  else
291  {
292  LDEBUG( plog, "Next state is skipping" );
293  // set state to untriggered
295 
296  // if pretrigger is 0
297  if (f_pretrigger_buffer.full())
298  {
299  f_pretrigger_buffer.clear();
300  }
301  }
302  }
303  }
304  else if( f_state == state_t::skipping)
305  {
306  LTRACE( plog, "Currently in skipping state" );
307 
308  if( t_current_trig_flag )
309  {
310  LINFO( plog, "New trigger; flushing skip buffer" );
311  while( ! f_skip_buffer.empty() )
312  {
313  LTRACE( plog, "Current state skipping. Writing id " << f_skip_buffer.front() << " as true" );
314  if( ! write_output_from_skipbuff_front( true, t_write_flag ) )
315  {
316  goto exit_outer_loop;
317  }
318  // advance our output data pointer to the next in the stream
319  t_write_flag = out_stream< 0 >().data();
320  }
321  // also remove all entries from pretrigger buffer
322  // ids were already written
323  f_pretrigger_buffer.clear();
324  LDEBUG( plog, "Next state is triggered" );
325  // set state to triggered
327  }
328  else
329  {
330  if(f_skip_buffer.full() )
331  {
332  LINFO( plog, "Skip_tolerance reached. Continuing as untriggered");
333  // if skip buffer is not bigger than pretrigger buffer, write out ids as true
334  if ( f_skip_buffer.capacity() <= f_pretrigger_buffer.capacity() )
335  {
336  while( ! f_skip_buffer.empty() )
337  {
338  LTRACE( plog, "Current state skipping. Writing id " << f_skip_buffer.front() << " as true" );
339  if( ! write_output_from_skipbuff_front( true, t_write_flag ) )
340  {
341  goto exit_outer_loop;
342  }
343  // advance our output data pointer to the next in the stream
344  t_write_flag = out_stream< 0 >().data();
345  f_pretrigger_buffer.pop_front();
346  }
347  }
348  else
349  {
350  // write out ids as true that are only in the skip buffer
351  while( f_skip_buffer.size() > f_pretrigger_buffer.size() )
352  {
353  LTRACE( plog, "Current state skipping. Writing id " << f_skip_buffer.front() << " as true" );
354 
355  if( ! write_output_from_skipbuff_front( true, t_write_flag ) )
356  {
357  goto exit_outer_loop;
358  }
359  // advance our output data pointer to the next in the stream
360  t_write_flag = out_stream< 0 >().data();
361  }
362  // then delete the remaining content of the skip buffer
363  f_skip_buffer.clear();
364 
365  // contents of the buffer are the existing pretrigger plus the current trig id
366  // only write out from the front of the buffer if the buffer is full; otherwise we're filling the buffer
367  if( f_pretrigger_buffer.full() )
368  {
369  LTRACE( plog, "Current state skipping. Writing id "<<f_pretrigger_buffer.front()<<" as false");
370  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
371  {
372  break;
373  }
374  }
375  }
376  // set state to untriggered
378  }
379 
380  else
381  {
382  // if pretrigger is full remove first item, no need to write, ids are also in skip buffer and will be written from there
383  if( f_pretrigger_buffer.full() )
384  {
385  f_pretrigger_buffer.pop_front();
386  }
387  LTRACE( plog, "No new trigger. Continue to fill skip and pretrigger buffer buffer." )
388  }
389  }
390  }
391  } // end if( t_in_command == stream::s_run )
392 
393 
394  if( t_in_command == stream::s_stop )
395  {
396  LDEBUG( plog, "Event builder is stopping at stream index " << out_stream< 0 >().get_current_index() );
397  LDEBUG( plog, "Flushing buffers as untriggered" );
398 
399  while( ! f_pretrigger_buffer.empty() and ! f_skip_buffer.empty() )
400  {
401  if( f_pretrigger_buffer.front() == f_skip_buffer.front() )
402  {
403  LTRACE( plog, "Skip id " << f_skip_buffer.front() );
404  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
405  {
406  goto exit_outer_loop;
407  }
408  t_write_flag = out_stream< 0 >().data();
409  f_pretrigger_buffer.pop_front();
410  }
411  else if( f_pretrigger_buffer.front() < f_skip_buffer.front() )
412  {
413  LTRACE( plog, "Pretrigger id "<<f_pretrigger_buffer.front());
414  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
415  {
416  goto exit_outer_loop;
417  }
418  t_write_flag = out_stream< 0 >().data();
419  }
420  else
421  {
422  LTRACE( plog, "Skip id "<<f_skip_buffer.front() );
423  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
424  {
425  goto exit_outer_loop;
426  }
427  t_write_flag = out_stream< 0 >().data();
428  }
429  }
430 
431  if( f_skip_buffer.empty() )
432  {
433  LDEBUG( plog, "Skip buffer is empty" );
434  while( ! f_pretrigger_buffer.empty() )
435  {
436  LTRACE( plog, "Pretrigger id " << f_pretrigger_buffer.front() );
437  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
438  {
439  goto exit_outer_loop;
440  }
441  // advance our output data pointer to the next in the stream
442  t_write_flag = out_stream< 0 >().data();
443  }
444  }
445  else if (f_pretrigger_buffer.empty() )
446  {
447  LDEBUG( plog, "Pretrigger buffer is empty");
448  while( ! f_skip_buffer.empty() )
449  {
450  LTRACE( plog, "Skip id "<<f_skip_buffer.front() );
451  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
452  {
453  goto exit_outer_loop;
454  }
455  // advance our output data pointer to the next in the stream
456  t_write_flag = out_stream< 0 >().data();
457  }
458  }
459 
461 
462  if( ! out_stream< 0 >().set( stream::s_stop ) )
463  {
464  LERROR( plog, "Exiting due to stream error" );
465  break;
466  }
467  continue;
468  }
469 
470  if( t_in_command == stream::s_exit )
471  {
472  LDEBUG( plog, "Event builder is exiting at stream index " << out_stream< 0 >().get_current_index() );
473  LDEBUG( plog, "Flushing buffers as untriggered" );
474 
475  while( ! f_pretrigger_buffer.empty() and !f_skip_buffer.empty() )
476  {
477  if( f_pretrigger_buffer.front() == f_skip_buffer.front() )
478  {
479  LTRACE( plog, "Skip id "<<f_skip_buffer.front() );
480  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
481  {
482  goto exit_outer_loop;
483  }
484  t_write_flag = out_stream< 0 >().data();
485  f_pretrigger_buffer.pop_front();
486  }
487  else if( f_pretrigger_buffer.front() < f_skip_buffer.front() )
488  {
489  LTRACE( plog, "Pretrigger id "<<f_pretrigger_buffer.front() );
490  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
491  {
492  goto exit_outer_loop;
493  }
494  t_write_flag = out_stream< 0 >().data();
495  }
496  else
497  {
498  LTRACE( plog, "Skip id "<<f_skip_buffer.front() );
499  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
500  {
501  goto exit_outer_loop;
502  }
503  t_write_flag = out_stream< 0 >().data();
504  }
505  }
506  if( f_skip_buffer.empty())
507  {
508  LTRACE( plog, "Skip buffer is empty" );
509  while( !f_pretrigger_buffer.empty())
510  {
511  LTRACE( plog, "Pretrigger id "<<f_pretrigger_buffer.front() );
512  if( ! write_output_from_ptbuff_front( false, t_write_flag ) )
513  {
514  goto exit_outer_loop;
515  }
516  // advance our output data pointer to the next in the stream
517  t_write_flag = out_stream< 0 >().data();
518  }
519  }
520  else if( f_pretrigger_buffer.empty() )
521  {
522  LTRACE( plog, "Pretrigger buffer is empty" );
523  while( !f_skip_buffer.empty() )
524  {
525  LTRACE( plog, "Skip id "<<f_skip_buffer.front() );
526  if( ! write_output_from_skipbuff_front( false, t_write_flag ) )
527  {
528  goto exit_outer_loop;
529  }
530  // advance our output data pointer to the next in the stream
531  t_write_flag = out_stream< 0 >().data();
532  }
533  }
534 
536 
537  out_stream< 0 >().set( stream::s_exit );
538  break;
539  }
540 
541  } // end while( ! is_canceled() )
542 
543 exit_outer_loop:
544  LDEBUG( plog, "Stopping output stream" );
545  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
546 
547  LDEBUG( plog, "Exiting output stream" );
548  out_stream< 0 >().set( stream::s_exit );
549 
550  }
551  catch(...)
552  {
553  if( a_midge ) a_midge->throw_ex( std::current_exception() );
554  else throw;
555  }
556  }
558  void event_builder::advance_output_stream( trigger_flag* a_write_flag, uint64_t a_id, bool a_trig_flag )
559  {
560  a_write_flag->set_id( a_id );
561  a_write_flag->set_flag( a_trig_flag );
562  LDEBUG( plog, "Event builder writing data to the output stream at index " << out_stream< 0 >().get_current_index() );
563  out_stream< 0 >().set( midge::stream::s_run );
564  return;
565  }
568  {
569  out_buffer< 0 >().finalize();
570  return;
571  }
572 
576  {
577  }
580  {
581  }
583  void event_builder_binding::do_apply_config( event_builder* a_node, const scarab::param_node& a_config ) const
584  {
585  LDEBUG( plog, "Configuring event_builder with:\n" << a_config );
586  a_node->set_length( a_config.get_value( "length", a_node->get_length() ) );
587  a_node->set_pretrigger( a_config.get_value( "pretrigger", a_node->get_pretrigger() ) );
588  a_node->set_skip_tolerance( a_config.get_value( "skip-tolerance", a_node->get_skip_tolerance() ) );
589  a_node->set_n_triggers( a_config.get_value( "n-triggers", a_node->get_n_triggers() ) );
590  return;
591  }
593  void event_builder_binding::do_dump_config( const event_builder* a_node, scarab::param_node& a_config ) const
594  {
595  LDEBUG( plog, "Dumping configuration for event_builder" );
596  a_config.add( "length", scarab::param_value( a_node->get_length() ) );
597  a_config.add( "pretrigger", scarab::param_value( a_node->get_pretrigger() ) );
598  a_config.add( "skip-tolerance", scarab::param_value( a_node->get_skip_tolerance() ) );
599  a_config.add( "n-triggers", scarab::param_value( a_node->get_n_triggers() ) );
600  return;
601  }
602 
603 
604 } /* namespace psyllid */
virtual void finalize()
pretrigger_buffer_t f_skip_buffer
static scarab::logger plog("batch_executor")
virtual void execute(midge::diptera *a_midge=nullptr)
bool write_output_from_ptbuff_front(bool a_flag, trigger_flag *a_data)
bool write_output_from_skipbuff_front(bool a_flag, trigger_flag *a_data)
A transformer that considers a sequence of triggered packets and decides what constitutes a contiguou...
virtual void initialize()
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
LOGGER(plog, "egg_writer")
pretrigger_buffer_t f_pretrigger_buffer
virtual void do_dump_config(const event_builder *a_node, scarab::param_node &a_config) const
virtual void do_apply_config(event_builder *a_node, const scarab::param_node &a_config) const
void advance_output_stream(trigger_flag *a_write_flag, uint64_t a_id, bool a_trig_flag)