15 #include "dripline_constants.hh" 16 #include "dripline_error.hh" 19 #include "signal_handler.hh" 44 f_batch_commands( a_master_config[
"batch-commands" ].as_node() ),
49 if ( a_master_config.has(
"on-startup" ) )
51 LINFO(
plog,
"have an initial action array" );
52 add_to_queue( a_master_config[
"on-startup"].as_array() );
56 LINFO(
plog,
"no initial batch actions" );
60 using namespace std::placeholders;
61 for ( scarab::param_node::iterator command_it = f_batch_commands.begin(); command_it != f_batch_commands.end(); ++command_it )
86 for( scarab::param_array::const_iterator action_it = actions_array.begin();
87 action_it!=actions_array.end();
90 LDEBUG(
plog,
"adding an item: " << action_it->as_node() )
97 if ( f_batch_commands.has( a_batch_command_name ) )
99 add_to_queue( f_batch_commands[a_batch_command_name].as_array() );
103 LWARN(
plog,
"no configured batch action for: '" << a_batch_command_name <<
"' ignoring" );
132 catch( dripline::dripline_error& e )
134 return a_request->reply( dripline::dl_daq_error(), std::string(
"Error processing command: ") + e.what() );
136 return a_request->reply( dripline::dl_success(),
"" );
147 catch( dripline::dripline_error& e )
149 return a_request->reply( dripline::dl_daq_error(), std::string(
"Error processing command: ") + e.what() );
151 return a_request->reply( dripline::dl_success(),
"" );
163 void batch_executor::execute( std::condition_variable& a_daq_control_ready_cv, std::mutex& a_daq_control_ready_mutex,
bool a_run_forever )
167 LERROR(
plog,
"Unable to get access to the DAQ control" );
168 scarab::signal_handler::cancel_all( RETURN_ERROR );
173 while ( ! t_daq_control_ptr->is_ready_at_startup() && ! is_canceled() )
175 std::unique_lock< std::mutex > t_daq_control_lock( a_daq_control_ready_mutex );
176 a_daq_control_ready_cv.wait_for( t_daq_control_lock, std::chrono::seconds(1) );
179 while ( ( a_run_forever ||
f_action_queue.size() ) && ! is_canceled() )
187 LINFO(
plog,
"action loop complete" );
195 LDEBUG(
plog,
"there are no actions in the queue" );
199 if ( ! t_request_reply )
201 LWARN(
plog,
"failed submitting action request" );
219 if ( t_request_reply->get_return_code() >= 100 )
221 LWARN(
plog,
"batch action received an error-level return code; exiting" );
222 throw psyllid::error() <<
"error completing batch action, received code [" <<
223 t_request_reply->get_return_code() <<
"]: \"" <<
224 t_request_reply->return_msg() <<
"\"";
232 dripline::op_t t_msg_op;
233 if ( ! a_action[
"payload"].is_node() )
235 LERROR(
plog,
"payload must be a param_node" );
240 t_rks = a_action[
"rks" ]().as_string();
244 catch( scarab::error& e )
246 LERROR(
plog,
"error parsing action param_node, check keys and value types: " << e.what() );
253 catch( dripline::dripline_error& )
255 LDEBUG(
plog,
"got a dripline error parsing request type" );
256 if ( a_action[
"type"]().as_string() ==
"wait-for" && t_rks ==
"daq-status" )
258 LDEBUG(
plog,
"action is poll on run status" );
259 t_msg_op = dripline::op_t::get;
264 LDEBUG(
plog,
"build request object" );
266 scarab::param_ptr_t t_payload_ptr(
new scarab::param_node( a_action[
"payload"].as_node() ) );
270 std::move(t_payload_ptr),
273 t_action_info.
f_request_ptr->parsed_specifier().parse( t_rks );
275 return t_action_info;
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex, bool a_run_forever=false)
unsigned f_sleep_duration_ms
dripline::reply_ptr_t do_batch_cmd_request(const std::string &a_command, const dripline::request_ptr_t a_request)
static status uint_to_status(uint32_t a_value)
static scarab::logger plog("batch_executor")
dc_ptr_t use_daq_control()
dripline::reply_ptr_t do_replace_actions_request(const std::string &a_command, const dripline::request_ptr_t a_request)
monarch_stage to_op_t(uint32_t a_stage_uint)
Gives other classes access to daq_control.
void replace_queue(const scarab::param_node &an_action)
scarab::concurrent_queue< action_info > f_action_queue
bool daq_control_expired()
void add_to_queue(const scarab::param_node &an_action)
LOGGER(plog, "egg_writer")
scarab::param_node f_condition_actions
virtual ~batch_executor()
std::shared_ptr< daq_control > dc_ptr_t
static action_info parse_action(const scarab::param_node &a_action)
dripline::request_ptr_t f_request_ptr
std::shared_ptr< request_receiver > f_request_receiver