Psyllid  v1.12.4
Project 8 Data Acquisisition Software
batch_executor.cc
Go to the documentation of this file.
1 /*
2  * batch_executor.cc
3  *
4  * Created on: April 12, 2018
5  * Author: laroque
6  */
7 
8 //psyllid includes
9 #include "batch_executor.hh"
10 #include "daq_control.hh"
11 #include "psyllid_constants.hh"
12 #include "request_receiver.hh"
13 
14 //non-psyllid P8 includes
15 #include "dripline_constants.hh"
16 #include "dripline_error.hh"
17 
18 #include "logger.hh"
19 #include "signal_handler.hh"
20 
21 //external includes
22 #include <chrono>
23 #include <signal.h>
24 #include <thread>
25 
26 namespace psyllid
27 {
28 
29  LOGGER( plog, "batch_executor" );
30 
33  scarab::cancelable(),
34  f_batch_commands(),
35  f_request_receiver(),
36  f_action_queue(),
37  f_condition_actions()
38  {
39  }
40 
41  batch_executor::batch_executor( const scarab::param_node& a_master_config, std::shared_ptr< request_receiver > a_request_receiver ) :
43  scarab::cancelable(),
44  f_batch_commands( a_master_config[ "batch-commands" ].as_node() ),
45  f_request_receiver( a_request_receiver ),
48  {
49  if ( a_master_config.has( "on-startup" ) )
50  {
51  LINFO( plog, "have an initial action array" );
52  add_to_queue( a_master_config["on-startup"].as_array() );
53  }
54  else
55  {
56  LINFO( plog, "no initial batch actions" );
57  }
58 
59  // register batch commands
60  using namespace std::placeholders;
61  for ( scarab::param_node::iterator command_it = f_batch_commands.begin(); command_it != f_batch_commands.end(); ++command_it )
62  {
63  a_request_receiver->register_cmd_handler( command_it.name(), std::bind( &batch_executor::do_batch_cmd_request, this, command_it.name(), _1 ) );
64  }
65  }
66 
68  {
69  }
70 
72  {
73  action_info t_action;
74  while ( f_action_queue.try_pop( t_action ) )
75  {
76  }
77  }
78 
79  void batch_executor::add_to_queue( const scarab::param_node& an_action )
80  {
81  f_action_queue.push( parse_action( an_action ) );
82  }
83 
84  void batch_executor::add_to_queue( const scarab::param_array& actions_array )
85  {
86  for( scarab::param_array::const_iterator action_it = actions_array.begin();
87  action_it!=actions_array.end();
88  ++action_it )
89  {
90  LDEBUG( plog, "adding an item: " << action_it->as_node() )
91  add_to_queue( action_it->as_node() );
92  }
93  }
94 
95  void batch_executor::add_to_queue( const std::string& a_batch_command_name )
96  {
97  if ( f_batch_commands.has( a_batch_command_name ) )
98  {
99  add_to_queue( f_batch_commands[a_batch_command_name].as_array() );
100  }
101  else
102  {
103  LWARN( plog, "no configured batch action for: '" << a_batch_command_name << "' ignoring" );
104  }
105  }
107  void batch_executor::replace_queue( const scarab::param_node& an_action )
108  {
109  clear_queue();
110  add_to_queue( an_action );
111  }
113  void batch_executor::replace_queue( const scarab::param_array& actions_array )
114  {
115  clear_queue();
116  add_to_queue( actions_array );
117  }
119  void batch_executor::replace_queue( const std::string& a_batch_command_name )
120  {
121  clear_queue();
122  add_to_queue( a_batch_command_name );
123  }
124 
125  // this method should be bound in the request receiver to be called with a command name, the request_ptr_t is not used
126  dripline::reply_ptr_t batch_executor::do_batch_cmd_request( const std::string& a_command, const dripline::request_ptr_t a_request )
127  {
128  try
129  {
130  add_to_queue( a_command );
131  }
132  catch( dripline::dripline_error& e )
133  {
134  return a_request->reply( dripline::dl_daq_error(), std::string("Error processing command: ") + e.what() );
135  }
136  return a_request->reply( dripline::dl_success(), "" );
137  }
138 
139  // this method should be bound in the request receiver to be called with a command name, the request_ptr_t is not used
140  dripline::reply_ptr_t batch_executor::do_replace_actions_request( const std::string& a_command, const dripline::request_ptr_t a_request )
141  {
142  try
143  {
144  //TODO do we have a good way to interrupt an ongoing action here?
145  replace_queue( a_command );
146  }
147  catch( dripline::dripline_error& e )
148  {
149  return a_request->reply( dripline::dl_daq_error(), std::string("Error processing command: ") + e.what() );
150  }
151  return a_request->reply( dripline::dl_success(), "" );
152  }
153 
154  /* considering yaml that looks like:
155  batch-actions:
156  - type: cmd
157  sleep-for: 500 # [ms], optional element to specify the sleep after issuing the cmd, before proceeding to the next.
158  rks: start-run
159  payload:
160  duration: 200
161  filenames: '["/tmp/foo_t.yaml", "/tmp/foo_f.yaml"]'
162  */
163  void batch_executor::execute( std::condition_variable& a_daq_control_ready_cv, std::mutex& a_daq_control_ready_mutex, bool a_run_forever )
164  {
165  if( daq_control_expired() )
166  {
167  LERROR( plog, "Unable to get access to the DAQ control" );
168  scarab::signal_handler::cancel_all( RETURN_ERROR );
169  return;
170  }
171  dc_ptr_t t_daq_control_ptr = use_daq_control();
172 
173  while ( ! t_daq_control_ptr->is_ready_at_startup() && ! is_canceled() )
174  {
175  std::unique_lock< std::mutex > t_daq_control_lock( a_daq_control_ready_mutex );
176  a_daq_control_ready_cv.wait_for( t_daq_control_lock, std::chrono::seconds(1) );
177  }
178 
179  while ( ( a_run_forever || f_action_queue.size() ) && ! is_canceled() )
180  {
181  if ( f_action_queue.size() )
182  {
183  do_an_action();
184  }
185  }
186 
187  LINFO( plog, "action loop complete" );
188  }
191  {
192  action_info t_action;
193  if ( !f_action_queue.try_pop( t_action ) )
194  {
195  LDEBUG( plog, "there are no actions in the queue" );
196  return;
197  }
198  dripline::reply_ptr_t t_request_reply = f_request_receiver->submit_request_message( t_action.f_request_ptr );
199  if ( ! t_request_reply )
200  {
201  LWARN( plog, "failed submitting action request" );
202  throw psyllid::error() << "error while submitting command";
203  }
204  // wait until daq status is no longer "running"
205  if ( t_action.f_is_custom_action )
206  {
207  daq_control::status t_status = daq_control::uint_to_status( t_request_reply->payload()["server"]["status-value"]().as_uint() );
208  while ( t_status == daq_control::status::running )
209  {
210  t_request_reply = f_request_receiver->submit_request_message( t_action.f_request_ptr );
211  t_status = daq_control::uint_to_status( t_request_reply->payload()["server"]["status-value"]().as_uint() );
212  std::this_thread::sleep_for( std::chrono::milliseconds( t_action.f_sleep_duration_ms ) );
213  }
214  }
215  else
216  {
217  std::this_thread::sleep_for( std::chrono::milliseconds( t_action.f_sleep_duration_ms ) );
218  }
219  if ( t_request_reply->get_return_code() >= 100 )
220  {
221  LWARN( plog, "batch action received an error-level return code; exiting" );
222  throw psyllid::error() << "error completing batch action, received code [" <<
223  t_request_reply->get_return_code() << "]: \"" <<
224  t_request_reply->return_msg() << "\"";
225  }
226  }
228  action_info batch_executor::parse_action( const scarab::param_node& a_action )
229  {
230  action_info t_action_info;
231  std::string t_rks;
232  dripline::op_t t_msg_op;
233  if ( ! a_action["payload"].is_node() )
234  {
235  LERROR( plog, "payload must be a param_node" );
236  throw psyllid::error() << "batch action payload must be a node";
237  }
238  try
239  {
240  t_rks = a_action["rks" ]().as_string();
241  t_action_info.f_sleep_duration_ms = std::stoi( a_action.get_value( "sleep-for", "500" ) );
242  t_action_info.f_is_custom_action = false;
243  }
244  catch( scarab::error& e )
245  {
246  LERROR( plog, "error parsing action param_node, check keys and value types: " << e.what() );
247  throw;
248  }
249  try
250  {
251  t_msg_op = dripline::to_op_t( a_action["type"]().as_string() );
252  }
253  catch( dripline::dripline_error& )
254  {
255  LDEBUG( plog, "got a dripline error parsing request type" );
256  if ( a_action["type"]().as_string() == "wait-for" && t_rks == "daq-status" )
257  {
258  LDEBUG( plog, "action is poll on run status" );
259  t_msg_op = dripline::op_t::get;
260  t_action_info.f_is_custom_action = true;
261  }
262  else throw;
263  }
264  LDEBUG( plog, "build request object" );
265 
266  scarab::param_ptr_t t_payload_ptr( new scarab::param_node( a_action["payload"].as_node() ) );
267 
268  // put it together into a request
269  t_action_info.f_request_ptr = dripline::msg_request::create(
270  std::move(t_payload_ptr),
271  t_msg_op,
272  std::string() );// reply-to is empty because no reply for batch requests
273  t_action_info.f_request_ptr->parsed_specifier().parse( t_rks );
274  LINFO( plog, "next action will be " << t_action_info.f_request_ptr->payload() );
275  return t_action_info;
276  }
277 
278 } /* namespace psyllid */
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex, bool a_run_forever=false)
dripline::reply_ptr_t do_batch_cmd_request(const std::string &a_command, const dripline::request_ptr_t a_request)
static status uint_to_status(uint32_t a_value)
static scarab::logger plog("batch_executor")
dripline::reply_ptr_t do_replace_actions_request(const std::string &a_command, const dripline::request_ptr_t a_request)
monarch_stage to_op_t(uint32_t a_stage_uint)
Gives other classes access to daq_control.
void replace_queue(const scarab::param_node &an_action)
scarab::concurrent_queue< action_info > f_action_queue
void add_to_queue(const scarab::param_node &an_action)
LOGGER(plog, "egg_writer")
scarab::param_node f_condition_actions
std::shared_ptr< daq_control > dc_ptr_t
static action_info parse_action(const scarab::param_node &a_action)
dripline::request_ptr_t f_request_ptr
std::shared_ptr< request_receiver > f_request_receiver