Psyllid  v1.12.4
Project 8 Data Acquisisition Software
daq_control.cc
Go to the documentation of this file.
1 /*
2  * daq_control.cc
3  *
4  * Created on: Jan 22, 2016
5  * Author: nsoblath
6  */
7 
8 #include "daq_control.hh"
9 
10 #include "butterfly_house.hh"
11 #include "message_relayer.hh"
12 #include "node_builder.hh"
13 
14 #include "diptera.hh"
15 #include "midge_error.hh"
16 
17 #include "logger.hh"
18 #include "signal_handler.hh"
19 
20 #include <chrono>
21 #include <condition_variable>
22 #include <future>
23 #include <signal.h>
24 #include <thread>
25 
26 using scarab::param_array;
27 using scarab::param_node;
28 using scarab::param_value;
29 using scarab::param_ptr_t;
30 
31 using dripline::request_ptr_t;
32 
33 using std::string;
34 
35 namespace psyllid
36 {
37  LOGGER( plog, "daq_control" );
38 
39  daq_control::daq_control( const param_node& a_master_config, std::shared_ptr< stream_manager > a_mgr ) :
40  scarab::cancelable(),
42  f_activation_condition(),
43  f_daq_mutex(),
44  f_node_manager( a_mgr ),
45  f_daq_config(),
46  f_midge_pkg(),
47  f_node_bindings( nullptr ),
48  f_run_stopper(),
49  f_run_stop_mutex(),
50  f_do_break_run( false ),
51  f_run_return(),
52  f_msg_relay( message_relayer::get_instance() ),
53  f_run_duration( 1000 ),
54  f_use_monarch( true ),
55  f_status( status::deactivated )
56  {
57  // DAQ config is optional; defaults will work just fine
58  if( a_master_config.has( "daq" ) )
59  {
60  f_daq_config.merge( a_master_config["daq"].as_node() );
61  }
62 
63  set_run_duration( f_daq_config.get_value( "duration", get_run_duration() ) );
64  }
65 
67  {
68  }
69 
71  {
72  butterfly_house::get_instance()->prepare_files( f_daq_config );
73  return;
74  }
75 
76  void daq_control::execute( std::condition_variable& a_ready_condition_variable, std::mutex& a_ready_mutex )
77  {
78  // if we're supposed to activate on startup, we'll call activate asynchronously
79  std::future< void > t_activation_return;
80  if( f_daq_config.get_value( "activate-at-startup", false ) )
81  {
82  LDEBUG( plog, "Will activate DAQ control asynchronously" );
83  t_activation_return = std::async( std::launch::async,
84  [this]()
85  {
86  std::this_thread::sleep_for( std::chrono::milliseconds(250) );
87  LDEBUG( plog, "Activating DAQ control at startup" );
88  activate();
89  } );
90  }
91 
92  // Errors caught during this loop are handled by setting the status to error, and continuing the loop,
93  // which then goes to the error clause of the outer if/elseif logic
94  while( ! is_canceled() )
95  {
96  status t_status = get_status();
97  LDEBUG( plog, "daq_control execute loop; status is <" << interpret_status( t_status ) << ">" );
98  if( ( t_status == status::deactivated ) && ! is_canceled() )
99  {
100  while( t_status == status::deactivated )
101  {
102  std::unique_lock< std::mutex > t_lock( f_daq_mutex );
103  LDEBUG( plog, "DAQ control waiting for activation signal; status is " << interpret_status( t_status ) );
104  f_activation_condition.wait_for( t_lock, std::chrono::seconds(1) );
105  t_status = get_status();
106  }
107  }
108  else if( t_status == status::activating )
109  {
110  LPROG( plog, "DAQ control activating" );
111 
112  try
113  {
114  if( f_node_manager->must_reset_midge() )
115  {
116  LDEBUG( plog, "Reseting midge" );
117  f_node_manager->reset_midge();
118  }
119  }
120  catch( error& e )
121  {
122  LWARN( plog, "Exception caught while resetting midge: " << e.what() );
123  LWARN( plog, "Returning to the \"deactivated\" state and awaiting further instructions" );
124  f_msg_relay->slack_error( std::string("Psyllid could not be activated.\nDetails: ") + e.what() );
126  continue;
127  }
128 
129  LDEBUG( plog, "Acquiring midge package" );
130  f_midge_pkg = f_node_manager->get_midge();
131 
132  if( ! f_midge_pkg.have_lock() )
133  {
134  LERROR( plog, "Could not get midge resource" );
135  f_msg_relay->slack_error( "Midge resource is locked; unable to activate the DAQ" );
137  continue;
138  }
139 
140  // set midge's running callback
141  f_midge_pkg->set_running_callback(
142  [this, &a_ready_condition_variable, &a_ready_mutex]() {
144  std::lock_guard<std::mutex> ready_lock(a_ready_mutex);
145  a_ready_condition_variable.notify_all();
146  return;
147  }
148  );
149 
150  f_node_bindings = f_node_manager->get_node_bindings();
151 
152  std::exception_ptr t_e_ptr;
153 
154  try
155  {
156  std::string t_run_string( f_node_manager->get_node_run_str() );
157  LDEBUG( plog, "Starting midge with run string <" << t_run_string << ">" );
158  t_e_ptr = f_midge_pkg->run( t_run_string );
159  LINFO( plog, "DAQ control is shutting down after midge exited" );
160  }
161  catch( std::exception& e )
162  {
163  LERROR( plog, "An exception was thrown while running midge: " << e.what() );
164  f_msg_relay->slack_error( std::string("An exception was thrown while running midge: ") + e.what() );
166  }
167 
168  LDEBUG( plog, "Midge has finished running" );
169  if( t_e_ptr )
170  {
171  LDEBUG( plog, "An exception from midge is present; rethrowing" );
172  try
173  {
174  std::rethrow_exception( t_e_ptr );
175  }
176  catch( midge::error& e )
177  {
178  LERROR( plog, "A Midge error has been caught: " << e.what() );
179  f_msg_relay->slack_error( std::string("A Midge error has been caught: ") + e.what() );
181  }
182  catch( midge::node_fatal_error& e )
183  {
184  LERROR( plog, "A fatal node error was thrown from midge: " << e.what() );
185  f_msg_relay->slack_error( std::string("A fatal node error was thrown from midge: ") + e.what() );
187  }
188  catch( midge::node_nonfatal_error& e )
189  {
190  LWARN( plog, "A non-fatal node error was thrown from midge: " << e.what() );
191  f_msg_relay->slack_error( std::string("A non-fatal node error was thrown from midge. ") +
192  "Psyllid is still running (hopefully) but its state has been reset.\n" +
193  "Error details: " + e.what() );
195  }
196  catch( std::exception& e )
197  {
198  LERROR( plog, "An unknown exception was thrown from midge: " << e.what() );
199  f_msg_relay->slack_error( std::string("An unknown exception was thrown from midge: ") + e.what() );
201  }
202  LDEBUG( plog, "Calling stop_run" );
203  stop_run();
204  }
205 
206  f_node_manager->return_midge( std::move( f_midge_pkg ) );
207  f_node_bindings = nullptr;
208 
209  if( get_status() == status::running )
210  {
211  LERROR( plog, "Midge exited abnormally; error condition is unknown; canceling" );
212  scarab::signal_handler::cancel_all( RETURN_ERROR );
213  continue;
214  }
215  else if( get_status() == status::error )
216  {
217  LERROR( plog, "Canceling due to midge error" );
218  f_msg_relay->slack_error( "Psyllid has crashed due to an error while running. Hopefully the details have already been reported." );
219  scarab::signal_handler::cancel_all( RETURN_ERROR );
220  continue;
221  }
222  else if( get_status() == status::do_restart )
223  {
224  LDEBUG( plog, "Setting status to deactivated" );
226  LINFO( plog, "Commencing restart of the DAQ" );
227  f_msg_relay->slack_warn( "Commencing restart of the Psyllid DAQ nodes" );
228  std::this_thread::sleep_for( std::chrono::milliseconds(250) );
229  LDEBUG( plog, "Will activate DAQ control asynchronously" );
230  t_activation_return = std::async( std::launch::async,
231  [this]()
232  {
233  std::this_thread::sleep_for( std::chrono::milliseconds(250) );
234  LDEBUG( plog, "Restarting DAQ control" );
235  activate();
236  } );
237  continue;
238  }
239  }
240  else if( t_status == status::activated )
241  {
242  LERROR( plog, "DAQ control status is activated in the outer execution loop!" );
243  f_msg_relay->slack_error( "DAQ control status is activated in the outer execution loop!" );
245  continue;
246  }
247  else if( t_status == status::deactivating )
248  {
249  LDEBUG( plog, "DAQ control deactivating; status now set to \"deactivated\"" );
251  }
252  else if( t_status == status::done )
253  {
254  LINFO( plog, "Exiting DAQ control" );
255  f_node_bindings = nullptr;
256  break;
257  }
258  else if( t_status == status::error )
259  {
260  LERROR( plog, "DAQ control is in an error state" );
261  f_msg_relay->slack_error( "DAQ control is in an error state and will now exit" );
262  f_node_bindings = nullptr;
263  scarab::signal_handler::cancel_all( RETURN_ERROR );
264  break;
265  }
266  }
267 
268  return;
269  }
270 
272  {
273  LDEBUG( plog, "Activating DAQ control" );
274 
275  if( is_canceled() )
276  {
277  throw error() << "DAQ control has been canceled";
278  }
279 
281  {
282  throw status_error() << "DAQ control is not in the deactivated state";
283  }
284 
285  LDEBUG( plog, "Setting status to activating" );
287  f_activation_condition.notify_one();
288 
289  return;
290  }
291 
293  {
294  deactivate();
295  std::this_thread::sleep_for( std::chrono::seconds(1) );
296  activate();
297  return;
298  }
299 
301  {
302  LDEBUG( plog, "Deactivating DAQ" );
303 
304  if( is_canceled() )
305  {
306  throw error() << "DAQ control has been canceled";
307  }
308 
309  if( get_status() != status::activated )
310  {
311  throw status_error() << "Invalid state for deactivating: <" + daq_control::interpret_status( get_status() ) + ">; DAQ control must be in activated state";
312  }
313 
315  if( f_midge_pkg.have_lock() )
316  {
317  LDEBUG( plog, "Canceling DAQ worker from DAQ control" );
318  f_midge_pkg->cancel();
319  }
320 
321  return;
322  }
323 
325  {
326  return f_daq_config["activate-at-startup"]().as_bool() ? (f_status == status::activated) : (f_status == status::activated || f_status == status::deactivated);
327  }
328 
330  {
331  LDEBUG( plog, "Preparing for run" );
332 
333  if( is_canceled() )
334  {
335  throw error() << "daq_control has been canceled";
336  }
337 
338  if( get_status() != status::activated )
339  {
340  throw status_error() << "DAQ control must be in the activated state to start a run; activate the DAQ and try again";
341  }
342 
343  if( ! f_midge_pkg.have_lock() )
344  {
345  throw error() << "Do not have midge resource";
346  }
347 
348  LDEBUG( plog, "Launching asynchronous do_run" );
349  f_run_return = std::async( std::launch::async, &daq_control::do_run, this, f_run_duration );
350  //TODO: use run return?
351 
352  return;
353  }
354 
355  void daq_control::do_run( unsigned a_duration )
356  {
357  // a_duration is in ms
358 
359  LINFO( plog, "Run is commencing" );
360  f_msg_relay->slack_notice( "Run is commencing" );
361 
362  if( f_use_monarch )
363  {
364  LDEBUG( plog, "Starting egg files" );
365  try
366  {
367  butterfly_house::get_instance()->start_files();
368  }
369  catch( std::exception& e )
370  {
371  LERROR( plog, "Unable to start files: " << e.what() );
372  f_msg_relay->slack_error( std::string("Unable to start files: ") + e.what() );
374  LDEBUG( plog, "Canceling midge" );
375  if( f_midge_pkg.have_lock() ) f_midge_pkg->cancel();
376  return;
377  }
378  }
379 
380  typedef std::chrono::steady_clock::duration duration_t;
381  typedef std::chrono::steady_clock::time_point time_point_t;
382 
383  duration_t t_run_duration = std::chrono::duration_cast< duration_t >( std::chrono::milliseconds(a_duration) ); // a_duration converted from ms to duration_t
384  duration_t t_sub_duration = std::chrono::duration_cast< duration_t >( std::chrono::milliseconds(500) ); // 500 ms sub-duration converted to duration_t
385  LINFO( plog, "Run duration will be " << a_duration << " ms; sub-durations of 500 ms will be used" );
386  LDEBUG( plog, "In units of steady_clock::duration: run duration is " << t_run_duration.count() << " and sub_duration is " << t_sub_duration.count() );
387 
388  std::unique_lock< std::mutex > t_run_stop_lock( f_run_stop_mutex );
389  f_do_break_run = false;
390 
391  LDEBUG( plog, "Unpausing midge" );
393  if( f_midge_pkg.have_lock() ) f_midge_pkg->instruct( midge::instruction::resume );
394  else
395  {
396  LERROR( plog, "Midge resource is not available" );
397  return;
398  }
399 
400  if( a_duration == 0 )
401  {
402  LDEBUG( plog, "Untimed run stopper in use" );
403  // conditions that will break the loop:
404  // - last sub-duration was not stopped for a timeout (the other possibility is that f_run_stopper was notified by e.g. stop_run())
405  // - daq_control has been canceled
406  while( ! f_do_break_run && ! is_canceled() )
407  {
408  f_run_stopper.wait_for( t_run_stop_lock, t_sub_duration );
409  }
410  }
411  else
412  {
413  LDEBUG( plog, "Timed run stopper in use; limit is " << a_duration << " ms" );
414  // all but the last sub-duration last for t_sub_duration ms
415  // conditions that will break the loop:
416  // - all sub-durations have been completed
417  // - last sub-duration was not stopped for a timeout (the other possibility is that f_run_stopper was notified by e.g. stop_run())
418  // - daq_control has been canceled
419 
420  time_point_t t_run_start = std::chrono::steady_clock::now();
421  time_point_t t_run_end = t_run_start + t_run_duration;
422 
423  while( std::chrono::steady_clock::now() < t_run_end && ! f_do_break_run && ! is_canceled() )
424  {
425  // we use wait_until so that we can break the run up with subdurations and not worry about whether a subduration was interrupted by a spurious wakeup
426  f_run_stopper.wait_until( t_run_stop_lock, std::min( std::chrono::steady_clock::now() + t_sub_duration, t_run_end ) );
427  }
428 
429  }
430 
431  // if we've reached here, we need to pause midge.
432  // reasons for this include the timer has run out in a timed run, or the run has been manually stopped
433 
434  LDEBUG( plog, "Run stopper has been released" );
435 
436  if( f_midge_pkg.have_lock() ) f_midge_pkg->instruct( midge::instruction::pause );
438 
439  LINFO( plog, "Run has stopped" );
440  f_msg_relay->slack_notice( "Run has stopped" );
441 
442  if( f_do_break_run ) LINFO( plog, "Run was stopped manually" );
443  if( is_canceled() ) LINFO( plog, "Run was cancelled" );
444 
445  if( f_use_monarch )
446  {
447  LDEBUG( plog, "Finishing egg files" );
448  try
449  {
450  butterfly_house::get_instance()->finish_files();
451  }
452  catch( std::exception& e )
453  {
454  LERROR( plog, "Unable to finish files: " << e.what() );
455  f_msg_relay->slack_error( std::string("Unable to finish files: ") + e.what() );
457  LDEBUG( plog, "Canceling midge" );
458  if( f_midge_pkg.have_lock() ) f_midge_pkg->cancel();
459  return;
460  }
461  }
462 
463  return;
464  }
465 
467  {
468  LINFO( plog, "Run stop requested" );
469 
470  if( get_status() != status::running ) return;
471 
472  if( ! f_midge_pkg.have_lock() ) LWARN( plog, "Do not have midge resource" );
473 
474  std::unique_lock< std::mutex > t_run_stop_lock( f_run_stop_mutex );
475  f_do_break_run = true;
476  f_run_stopper.notify_all();
477 
478  return;
479  }
480 
481  void daq_control::do_cancellation( int a_code )
482  {
483  LDEBUG( plog, "Canceling DAQ control" );
484 
485  if( get_status() == status::running )
486  {
487  LDEBUG( plog, "Canceling run" );
488  try
489  {
490  stop_run();
491  }
492  catch( error& e )
493  {
494  LERROR( plog, "Unable to complete stop-run: " << e.what() );
495  }
496  }
497 
498  LDEBUG( plog, "Canceling midge" );
499  if( f_midge_pkg.have_lock() ) f_midge_pkg->cancel( a_code );
500 
502 
503  return;
504  }
505 
506  void daq_control::apply_config( const std::string& a_node_name, const scarab::param_node& a_config )
507  {
508  if( f_node_bindings == nullptr )
509  {
510  throw error() << "Can't apply config to node <" << a_node_name << ">: node bindings aren't available";
511  }
512 
513  active_node_bindings::iterator t_binding_it = f_node_bindings->find( a_node_name );
514  if( t_binding_it == f_node_bindings->end() )
515  {
516  throw error() << "Can't apply config to node <" << a_node_name << ">: did not find node";
517  }
518 
519  try
520  {
521  LDEBUG( plog, "Applying config to active node <" << a_node_name << ">: " << a_config );
522  t_binding_it->second.first->apply_config( t_binding_it->second.second, a_config );
523  }
524  catch( std::exception& e )
525  {
526  throw error() << "Can't apply config to node <" << a_node_name << ">: " << e.what();
527  }
528  return;
529  }
530 
531  void daq_control::dump_config( const std::string& a_node_name, scarab::param_node& a_config )
532  {
533  if( f_node_bindings == nullptr )
534  {
535  throw error() << "Can't dump config from node <" << a_node_name << ">: node bindings aren't available";
536  }
537 
538  if( f_node_bindings == nullptr )
539  {
540  throw error() << "Can't dump config from node <" << a_node_name << ">: node bindings aren't available";
541  }
542 
543  active_node_bindings::iterator t_binding_it = f_node_bindings->find( a_node_name );
544  if( t_binding_it == f_node_bindings->end() )
545  {
546  throw error() << "Can't dump config from node <" << a_node_name << ">: did not find node";
547  }
548 
549  try
550  {
551  LDEBUG( plog, "Dumping config from active node <" << a_node_name << ">" );
552  t_binding_it->second.first->dump_config( t_binding_it->second.second, a_config );
553  }
554  catch( std::exception& e )
555  {
556  throw error() << "Can't dump config from node <" << a_node_name << ">: " << e.what();
557  }
558  return;
559  }
560 
561  bool daq_control::run_command( const std::string& a_node_name, const std::string& a_cmd, const scarab::param_node& a_args )
562  {
563  if( f_node_bindings == nullptr )
564  {
565  throw error() << "Can't run command <" << a_cmd << "> on node <" << a_node_name << ">: node bindings aren't available";
566  }
567 
568  if( f_node_bindings == nullptr )
569  {
570  throw error() << "Can't run command <" << a_cmd << "> on node <" << a_node_name << ">: node bindings aren't available";
571  }
572 
573  active_node_bindings::iterator t_binding_it = f_node_bindings->find( a_node_name );
574  if( t_binding_it == f_node_bindings->end() )
575  {
576  throw error() << "Can't run command <" << a_cmd << "> on node <" << a_node_name << ">: did not find node";
577  }
578 
579  try
580  {
581  LDEBUG( plog, "Running command <" << a_cmd << "> on active node <" << a_node_name << ">" );
582  return t_binding_it->second.first->run_command( t_binding_it->second.second, a_cmd, a_args );
583  }
584  catch( std::exception& e )
585  {
586  throw error() << "Can't run command <" << a_cmd << "> on node <" << a_node_name << ">: " << e.what();
587  }
588  }
589 
590 
591  dripline::reply_ptr_t daq_control::handle_activate_daq_control( const dripline::request_ptr_t a_request )
592  {
593  try
594  {
595  activate();
596  return a_request->reply( dripline::dl_success(), "DAQ control activated" );
597  }
598  catch( error& e )
599  {
600  return a_request->reply( dripline::dl_device_error(), string( "Unable to activate DAQ control: " ) + e.what() );
601  }
602  }
603 
604  dripline::reply_ptr_t daq_control::handle_reactivate_daq_control( const dripline::request_ptr_t a_request )
605  {
606  try
607  {
608  reactivate();
609  return a_request->reply( dripline::dl_success(), "DAQ control reactivated" );
610  }
611  catch( error& e )
612  {
613  return a_request->reply( dripline::dl_device_error(), string( "Unable to reactivate DAQ control: " ) + e.what() );
614  }
615  }
616 
617  dripline::reply_ptr_t daq_control::handle_deactivate_daq_control( const dripline::request_ptr_t a_request )
618  {
619  try
620  {
621  deactivate();
622  return a_request->reply( dripline::dl_success(), "DAQ control deactivated" );
623  }
624  catch( error& e )
625  {
626  return a_request->reply( dripline::dl_device_error(), string( "Unable to deactivate DAQ control: " ) + e.what() );
627  }
628  }
629 
630  dripline::reply_ptr_t daq_control::handle_start_run_request( const dripline::request_ptr_t a_request )
631  {
632  try
633  {
634  if( a_request->payload().is_node() )
635  {
636  param_node& t_payload = a_request->payload().as_node();
637  if( t_payload.has( "filename" ) ) set_filename( t_payload["filename"]().as_string(), 0 );
638  //TODO BUG here, if filenames exists but is not an array (only case i tried), this causes a seg fault which is not handled below
639  if( t_payload.as_node().has( "filenames" ) )
640  {
641  const scarab::param_array t_filenames = t_payload["filenames"].as_array();
642  for( unsigned i_fn = 0; i_fn < t_filenames.size(); ++i_fn )
643  {
644  set_filename( t_filenames[i_fn]().as_string(), i_fn );
645  }
646  }
647 
648  if( t_payload.has( "description" ) ) set_description( t_payload["description"]().as_string(), 0 );
649  if( t_payload.has( "descriptions" ) )
650  {
651  const scarab::param_array t_descriptions = t_payload["descriptions"].as_array();
652  for( unsigned i_fn = 0; i_fn < t_descriptions.size(); ++i_fn )
653  {
654  set_description( t_descriptions[i_fn]().as_string(), i_fn );
655  }
656  }
657 
658  f_run_duration = a_request->payload().get_value( "duration", f_run_duration );
659  }
660 
661  start_run();
662  return a_request->reply( dripline::dl_success(), "Run started" );
663  }
664  catch( std::exception& e )
665  {
666  LWARN( plog, "there was an error starting a run" );
667  return a_request->reply( dripline::dl_device_error(), string( "Unable to start run: " ) + e.what() );
668  }
669  }
670 
671  dripline::reply_ptr_t daq_control::handle_stop_run_request( const dripline::request_ptr_t a_request )
672  {
673  try
674  {
675  stop_run();
676  return a_request->reply( dripline::dl_success(), "Run stopped" );
677  }
678  catch( error& e )
679  {
680  return a_request->reply( dripline::dl_device_error(), string( "Unable to stop run: " ) + e.what() );
681  }
682  }
683 
684  dripline::reply_ptr_t daq_control::handle_apply_config_request( const dripline::request_ptr_t a_request )
685  {
686  if( a_request->parsed_specifier().size() < 2 )
687  {
688  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: active-config.[stream].[node] or node-config.[stream].[node].[parameter]" );
689  }
690 
691  //size_t t_rks_size = a_request->parsed_rks().size();
692 
693  std::string t_target_stream = a_request->parsed_specifier().front();
694  a_request->parsed_specifier().pop_front();
695 
696  std::string t_target_node = t_target_stream + "_" + a_request->parsed_specifier().front();
697  a_request->parsed_specifier().pop_front();
698 
699  param_ptr_t t_payload_ptr( new param_node() );
700  param_node& t_payload = t_payload_ptr->as_node();
701 
702  if( a_request->parsed_specifier().empty() )
703  {
704  // payload should be a map of all parameters to be set
705  LDEBUG( plog, "Performing config for multiple values in active node <" << t_target_node << ">" );
706 
707  if( ! a_request->payload().is_node() || a_request->payload().as_node().empty() )
708  {
709  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform active-config request: payload is empty" );
710  }
711 
712  try
713  {
714  apply_config( t_target_node, a_request->payload().as_node() );
715  t_payload.merge( a_request->payload().as_node() );
716  }
717  catch( std::exception& e )
718  {
719  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform node-config request: ") + e.what() );
720  }
721  }
722  else
723  {
724  // payload should be values array with a single entry for the particular parameter to be set
725  LDEBUG( plog, "Performing node config for a single value in active node <" << t_target_node << ">" );
726 
727  if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has( "values" ) )
728  {
729  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform active-config (single value): values array is missing" );
730  }
731  scarab::param_array t_values_array;
732  if ( a_request->payload().as_node().has("values") ) {
733  t_values_array.append( a_request->payload()["values"].as_array() );
734  }
735  if( t_values_array.empty() || ! t_values_array[0].is_value() )
736  {
737  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform active-config (single value): \"values\" is not an array, or the array is empty, or the first element in the array is not a value", std::move(t_payload_ptr) );
738  }
739 
740  scarab::param_node t_param_to_set;
741  t_param_to_set.add( a_request->parsed_specifier().front(), scarab::param_value( t_values_array[0].as_value() ) );
742 
743  try
744  {
745  apply_config( t_target_node, t_param_to_set );
746  t_payload.merge( t_param_to_set );
747  }
748  catch( std::exception& e )
749  {
750  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform active-config request (single value): ") + e.what(), std::move(t_payload_ptr) );
751  }
752  }
753 
754  LDEBUG( plog, "Node-config was successful" );
755  return a_request->reply( dripline::dl_success(), "Performed node-config", std::move(t_payload_ptr) );
756  }
757 
758  dripline::reply_ptr_t daq_control::handle_dump_config_request( const dripline::request_ptr_t a_request )
759  {
760  if( a_request->parsed_specifier().size() < 2 )
761  {
762  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: active-config.[stream].[node] or active-config.[stream].[node].[parameter]" );
763  }
764 
765  //size_t t_rks_size = a_request->parsed_rks().size();
766 
767  std::string t_target_stream = a_request->parsed_specifier().front();
768  a_request->parsed_specifier().pop_front();
769 
770  std::string t_target_node = t_target_stream + "_" + a_request->parsed_specifier().front();
771  a_request->parsed_specifier().pop_front();
772 
773  param_ptr_t t_payload_ptr( new param_node() );
774  param_node& t_payload = t_payload_ptr->as_node();
775 
776  if( a_request->parsed_specifier().empty() )
777  {
778  // getting full node configuration
779  LDEBUG( plog, "Getting node config for active node <" << t_target_node << ">" );
780 
781  try
782  {
783  dump_config( t_target_node, t_payload );
784  }
785  catch( std::exception& e )
786  {
787  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform get-active-config request: ") + e.what(), std::move(t_payload_ptr) );
788  }
789  }
790  else
791  {
792  // getting value for a single parameter
793  LDEBUG( plog, "Getting value for a single parameter in active node <" << t_target_node << ">" );
794 
795  std::string t_param_to_get = a_request->parsed_specifier().front();
796 
797  try
798  {
799  scarab::param_node t_param_dump;
800  dump_config( t_target_node, t_param_dump );
801  if( ! t_param_dump.has( t_param_to_get ) )
802  {
803  return a_request->reply( dripline::dl_message_error_invalid_key(), "Unable to get active-node parameter: cannot find parameter <" + t_param_to_get + ">" );
804  }
805  t_payload.add( t_param_to_get, t_param_dump[t_param_to_get]() );
806  }
807  catch( std::exception& e )
808  {
809  return a_request->reply( dripline::dl_device_error(), std::string("Unable to get active-node parameter (single value): ") + e.what(), std::move(t_payload_ptr) );
810  }
811  }
812 
813  LDEBUG( plog, "Get-active-node-config was successful" );
814  return a_request->reply( dripline::dl_success(), "Performed get-active-node-config", std::move(t_payload_ptr) );
815  }
816 
817  dripline::reply_ptr_t daq_control::handle_run_command_request( const dripline::request_ptr_t a_request )
818  {
819  if( a_request->parsed_specifier().size() < 2 )
820  {
821  return a_request->reply( dripline::dl_message_error_invalid_key(), "RKS is improperly formatted: run-command.[stream].[node].[command]" );
822  }
823 
824  //size_t t_rks_size = a_request->parsed_rks().size();
825 
826  std::string t_target_stream = a_request->parsed_specifier().front();
827  a_request->parsed_specifier().pop_front();
828 
829  std::string t_target_node = t_target_stream + "_" + a_request->parsed_specifier().front();
830  a_request->parsed_specifier().pop_front();
831 
832  scarab::param_node t_args_node;
833  if( a_request->payload().is_node() ) t_args_node = a_request->payload().as_node();
834 
835  std::string t_command( a_request->parsed_specifier().front() );
836  a_request->parsed_specifier().pop_front();
837 
838  LDEBUG( plog, "Performing run-command <" << t_command << "> for active node <" << t_target_node << ">; args:\n" << t_args_node );
839 
840  param_ptr_t t_payload_ptr( new param_node() );
841  param_node& t_payload = t_payload_ptr->as_node();
842 
843  bool t_return = false;
844  try
845  {
846  t_return = run_command( t_target_node, t_command, t_args_node );
847  t_payload.merge( t_args_node );
848  t_payload.add( "command", t_command );
849  }
850  catch( std::exception& e )
851  {
852  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform run-command request: ") + e.what(), std::move(t_payload_ptr) );
853  }
854 
855  if( t_return )
856  {
857  LDEBUG( plog, "Active run-command execution was successful" );
858  return a_request->reply( dripline::dl_success(), "Performed active run-command execution", std::move(t_payload_ptr) );
859  }
860  else
861  {
862  LWARN( plog, "Active run-command execution failed" );
863  return a_request->reply( dripline::dl_message_error_invalid_method(), "Command was not recognized", std::move(t_payload_ptr) );
864  }
865  }
866 
867  dripline::reply_ptr_t daq_control::handle_set_filename_request( const dripline::request_ptr_t a_request )
868  {
869  try
870  {
871  unsigned t_file_num = 0;
872  if( a_request->parsed_specifier().size() > 0)
873  {
874  t_file_num = std::stoi( a_request->parsed_specifier().front() );
875  }
876 
877  std::string t_filename = a_request->payload()["values"][0]().as_string();
878  LDEBUG( plog, "Setting filename for file <" << t_file_num << "> to <" << t_filename << ">" );
879  set_filename( t_filename, t_file_num );
880  return a_request->reply( dripline::dl_success(), "Filename set" );
881  }
882  catch( std::exception& e )
883  {
884  return a_request->reply( dripline::dl_device_error(), string( "Unable to set filename: " ) + e.what() );
885  }
886  }
887 
888  dripline::reply_ptr_t daq_control::handle_set_description_request( const dripline::request_ptr_t a_request )
889  {
890  try
891  {
892  unsigned t_file_num = 0;
893  if( a_request->parsed_specifier().size() > 0)
894  {
895  t_file_num = std::stoi( a_request->parsed_specifier().front() );
896  }
897 
898  std::string t_description = a_request->payload()["values"][0]().as_string();
899  LDEBUG( plog, "Setting description for file <" << t_file_num << "> to <" << t_description << ">" );
900  set_description( t_description, t_file_num );
901 
902  return a_request->reply( dripline::dl_success(), "Description set" );
903  }
904  catch( std::exception& e )
905  {
906  return a_request->reply( dripline::dl_device_error(), string( "Unable to set description: " ) + e.what() );
907  }
908  }
909 
910  dripline::reply_ptr_t daq_control::handle_set_duration_request( const dripline::request_ptr_t a_request )
911  {
912  try
913  {
914  unsigned t_new_duration = a_request->payload()["values"][0]().as_uint();
915  if( t_new_duration == 0 )
916  {
917  throw error() << "Invalid duration: " << t_new_duration;
918  }
919  f_run_duration = t_new_duration;
920 
921  LDEBUG( plog, "Duration set to <" << f_run_duration << "> ms" );
922  return a_request->reply( dripline::dl_success(), "Duration set" );
923  }
924  catch( std::exception& e )
925  {
926  return a_request->reply( dripline::dl_device_error(), string( "Unable to set duration: " ) + e.what() );
927  }
928  }
929 
930  dripline::reply_ptr_t daq_control::handle_set_use_monarch_request( const dripline::request_ptr_t a_request )
931  {
932  try
933  {
934  f_use_monarch = a_request->payload()["values"][0]().as_bool();
935  LDEBUG( plog, "Use-monarch set to <" << f_use_monarch << ">" );
936  return a_request->reply( dripline::dl_success(), "Use Monarch set" );
937  }
938  catch( std::exception& e )
939  {
940  return a_request->reply( dripline::dl_device_error(), string( "Unable to set use-monarch: " ) + e.what() );
941  }
942  }
943 
944  dripline::reply_ptr_t daq_control::handle_get_status_request( const dripline::request_ptr_t a_request )
945  {
946  param_node t_server_node;
947  t_server_node.add( "status", param_value( interpret_status( get_status() ) ) );
948  t_server_node.add( "status-value", param_value( status_to_uint( get_status() ) ) );
949 
950  // TODO: add status of nodes
951 
952  param_ptr_t t_payload_ptr( new param_node() );
953  t_payload_ptr->as_node().add( "server", t_server_node );
954 
955  return a_request->reply( dripline::dl_success(), "DAQ status request succeeded", std::move(t_payload_ptr) );
956 
957  }
958 
959  dripline::reply_ptr_t daq_control::handle_get_filename_request( const dripline::request_ptr_t a_request )
960  {
961  try
962  {
963  unsigned t_file_num = 0;
964  if( a_request->parsed_specifier().size() > 0)
965  {
966  t_file_num = std::stoi( a_request->parsed_specifier().front() );
967  }
968 
969  param_array t_values_array;
970  t_values_array.push_back( param_value( get_filename( t_file_num ) ) );
971  param_ptr_t t_payload_ptr( new param_node() );
972  t_payload_ptr->as_node().add( "values", t_values_array );
973  return a_request->reply( dripline::dl_success(), "Filename request completed", std::move(t_payload_ptr) );
974  }
975  catch( scarab::error& e )
976  {
977  return a_request->reply( dripline::dl_device_error(), string( "Unable to get description: " ) + e.what() );
978  }
979  }
980 
981  dripline::reply_ptr_t daq_control::handle_get_description_request( const dripline::request_ptr_t a_request )
982  {
983  try
984  {
985  unsigned t_file_num = 0;
986  if( a_request->parsed_specifier().size() > 0)
987  {
988  t_file_num = std::stoi( a_request->parsed_specifier().front() );
989  }
990 
991  param_array t_values_array;
992  t_values_array.push_back( param_value( get_description( t_file_num ) ) );
993  param_ptr_t t_payload_ptr( new param_node() );
994  t_payload_ptr->as_node().add( "values", t_values_array );
995  return a_request->reply( dripline::dl_success(), "Description request completed", std::move(t_payload_ptr) );
996  }
997  catch( scarab::error& e )
998  {
999  return a_request->reply( dripline::dl_device_error(), string( "Unable to get description: " ) + e.what() );
1000  }
1001  }
1002 
1003  dripline::reply_ptr_t daq_control::handle_get_duration_request( const dripline::request_ptr_t a_request )
1004  {
1005  param_array t_values_array;
1006  t_values_array.push_back( param_value( f_run_duration ) );
1007 
1008  param_ptr_t t_payload_ptr( new param_node() );
1009  t_payload_ptr->as_node().add( "values", t_values_array );
1010 
1011  return a_request->reply( dripline::dl_success(), "Duration request completed", std::move(t_payload_ptr) );
1012  }
1013 
1014  dripline::reply_ptr_t daq_control::handle_get_use_monarch_request( const dripline::request_ptr_t a_request )
1015  {
1016  param_array t_values_array;
1017  t_values_array.push_back( param_value( f_use_monarch ) );
1018 
1019  param_ptr_t t_payload_ptr( new param_node() );
1020  t_payload_ptr->as_node().add( "values", t_values_array );
1021 
1022  return a_request->reply( dripline::dl_success(), "Use Monarch request completed", std::move(t_payload_ptr) );
1023  }
1024 
1025 
1026  void daq_control::set_filename( const std::string& a_filename, unsigned a_file_num )
1027  {
1028  try
1029  {
1030  butterfly_house::get_instance()->set_filename( a_filename, a_file_num );
1031  return;
1032  }
1033  catch( error& )
1034  {
1035  throw;
1036  }
1037  }
1038 
1039  const std::string& daq_control::get_filename( unsigned a_file_num )
1040  {
1041  try
1042  {
1043  return butterfly_house::get_instance()->get_filename( a_file_num );
1044  }
1045  catch( error& )
1046  {
1047  throw;
1048  }
1049  }
1050 
1051  void daq_control::set_description( const std::string& a_desc, unsigned a_file_num )
1052  {
1053  try
1054  {
1055  butterfly_house::get_instance()->set_description( a_desc, a_file_num );
1056  return;
1057  }
1058  catch( error& )
1059  {
1060  throw;
1061  }
1062  }
1063 
1064  const std::string& daq_control::get_description( unsigned a_file_num )
1065  {
1066  try
1067  {
1068  return butterfly_house::get_instance()->get_description( a_file_num );
1069  }
1070  catch( error& )
1071  {
1072  throw;
1073  }
1074  }
1075 
1077  {
1078  return static_cast< uint32_t >( a_status );
1079  }
1081  {
1082  return static_cast< status >( a_value );
1083  }
1084  std::string daq_control::interpret_status( status a_status )
1085  {
1086  switch( a_status )
1087  {
1088  case status::deactivated:
1089  return std::string( "Deactivated" );
1090  break;
1091  case status::activating:
1092  return std::string( "Activating" );
1093  break;
1094  case status::activated:
1095  return std::string( "Activated" );
1096  break;
1097  case status::running:
1098  return std::string( "Running" );
1099  break;
1100  case status::deactivating:
1101  return std::string( "Deactivating" );
1102  break;
1103  case status::canceled:
1104  return std::string( "Canceled" );
1105  break;
1106  case status::done:
1107  return std::string( "Done" );
1108  break;
1109  case status::do_restart:
1110  return std::string( "Do Restart" );
1111  break;
1112  case status::error:
1113  return std::string( "Error" );
1114  break;
1115  default:
1116  return std::string( "Unknown" );
1117  }
1118  }
1119 
1120 
1121 } /* namespace psyllid */
dripline::reply_ptr_t handle_reactivate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:604
virtual ~daq_control()
Definition: daq_control.cc:66
std::future< void > f_run_return
Definition: daq_control.hh:163
const char * what() const
dripline::reply_ptr_t handle_apply_config_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:684
void slack_error(const std::string &a_msg_text) const
dripline::reply_ptr_t handle_get_use_monarch_request(const dripline::request_ptr_t a_request)
const std::string & get_filename(unsigned a_file_num=0)
static status uint_to_status(uint32_t a_value)
static std::string interpret_status(status a_status)
static scarab::logger plog("batch_executor")
const std::string & get_description(unsigned a_file_num=0)
void do_run(unsigned a_duration)
Definition: daq_control.cc:355
dripline::reply_ptr_t handle_stop_run_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:671
void dump_config(const std::string &a_node_name, scarab::param_node &a_config)
Definition: daq_control.cc:531
std::condition_variable f_run_stopper
Definition: daq_control.hh:159
void initialize()
Pre-execution initialization (call after setting the control_access pointer)
Definition: daq_control.cc:70
std::atomic< status > f_status
Definition: daq_control.hh:200
bool run_command(const std::string &a_node_name, const std::string &a_cmd, const scarab::param_node &a_args)
Definition: daq_control.cc:561
dripline::reply_ptr_t handle_start_run_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:630
dripline::reply_ptr_t handle_set_use_monarch_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:930
void apply_config(const std::string &a_node_name, const scarab::param_node &a_config)
Definition: daq_control.cc:506
status get_status() const
Definition: daq_control.hh:205
dripline::reply_ptr_t handle_get_duration_request(const dripline::request_ptr_t a_request)
daq_control(const scarab::param_node &a_master_config, std::shared_ptr< stream_manager > a_mgr)
Definition: daq_control.cc:39
dripline::reply_ptr_t handle_activate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:591
Gives other classes access to daq_control.
std::shared_ptr< stream_manager > f_node_manager
Definition: daq_control.hh:152
static uint32_t status_to_uint(status a_status)
void slack_notice(const std::string &a_msg_text) const
midge_package f_midge_pkg
Definition: daq_control.hh:156
dripline::reply_ptr_t handle_get_status_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:944
dripline::reply_ptr_t handle_run_command_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:817
void set_status(status a_status)
Definition: daq_control.hh:210
void set_description(const std::string &a_desc, unsigned a_file_num=0)
void execute(std::condition_variable &a_ready_condition_variable, std::mutex &a_ready_mutex)
Run the DAQ control thread.
Definition: daq_control.cc:76
std::mutex f_run_stop_mutex
Definition: daq_control.hh:160
dripline::reply_ptr_t handle_get_filename_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:959
void do_cancellation(int a_code)
Definition: daq_control.cc:481
dripline::reply_ptr_t handle_set_description_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:888
scarab::param_node f_daq_config
Definition: daq_control.hh:154
LOGGER(plog, "egg_writer")
void slack_warn(const std::string &a_msg_text) const
std::condition_variable f_activation_condition
Definition: daq_control.hh:149
active_node_bindings * f_node_bindings
Definition: daq_control.hh:157
std::mutex f_daq_mutex
Definition: daq_control.hh:150
dripline::reply_ptr_t handle_dump_config_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:758
bool is_ready_at_startup() const
Definition: daq_control.cc:324
void set_filename(const std::string &a_filename, unsigned a_file_num=0)
message_relayer * f_msg_relay
Definition: daq_control.hh:165
dripline::reply_ptr_t handle_set_duration_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:910
dripline::reply_ptr_t handle_get_description_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:981
dripline::reply_ptr_t handle_deactivate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:617
dripline::reply_ptr_t handle_set_filename_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:867