4 #include "dripline_constants.hh" 10 #include "signal_handler.hh" 18 using scarab::param_node;
19 using scarab::param_ptr_t;
21 using dripline::request_ptr_t;
30 hub( a_master_config[
"amqp"].as_node() ),
32 f_set_conditions( a_master_config[
"set-conditions"].as_node() ),
33 f_status( k_initialized )
47 LERROR(
plog,
"Unable to get access to the DAQ control" );
48 scarab::signal_handler::cancel_all( RETURN_ERROR );
54 if( ! start() && f_make_connection )
56 LERROR(
plog,
"Unable to start the dripline service" );
57 scarab::signal_handler::cancel_all( RETURN_ERROR );
61 while ( ! t_daq_control_ptr->is_ready_at_startup() && ! cancelable::is_canceled() )
63 std::unique_lock< std::mutex > t_daq_control_lock( a_daq_control_ready_mutex );
64 a_daq_control_ready_cv.wait_for( t_daq_control_lock, std::chrono::seconds(1) );
67 if ( f_make_connection && ! cancelable::is_canceled() ) {
68 LINFO(
plog,
"Waiting for incoming messages" );
72 while( ! cancelable::is_canceled() )
79 LINFO(
plog,
"No longer waiting for messages" );
84 LDEBUG(
plog,
"Request receiver is done" );
91 LDEBUG(
plog,
"Canceling request receiver" );
101 return std::string(
"Initialized" );
104 return std::string(
"Starting" );
107 return std::string(
"Listening" );
110 return std::string(
"Canceled" );
113 return std::string(
"Done" );
116 return std::string(
"Error" );
119 return std::string(
"Unknown" );
125 std::string t_condition = a_request->payload()[
"values"][0]().as_string();
126 if ( f_set_conditions.has( t_condition ) )
128 std::string t_rks = f_set_conditions[t_condition]().as_string();
129 dripline::request_ptr_t t_request = dripline::msg_request::create( param_ptr_t(
new param_node()), dripline::op_t::cmd, a_request->routing_key(), t_rks );
132 dripline::reply_ptr_t t_reply_ptr = submit_request_message( t_request );
133 return a_request->reply( t_reply_ptr->get_return_code(), t_reply_ptr->return_msg(), param_ptr_t(
new param_node(t_reply_ptr->payload().as_node())) );
135 return a_request->reply( dripline::dl_daq_error(),
"set condition not configured" );
void set_status(status a_status)
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex)
request_receiver(const scarab::param_node &a_master_config)
static scarab::logger plog("batch_executor")
dc_ptr_t use_daq_control()
static std::string interpret_status(status a_status)
Gives other classes access to daq_control.
virtual void do_cancellation(int a_code)
bool daq_control_expired()
LOGGER(plog, "egg_writer")
virtual ~request_receiver()
std::shared_ptr< daq_control > dc_ptr_t
virtual dripline::reply_ptr_t __do_handle_set_condition_request(const dripline::request_ptr_t a_request)