14 #include "signal_handler.hh" 20 #include <condition_variable> 23 using dripline::request_ptr_t;
25 using scarab::param_node;
26 using scarab::version_semantic;
34 f_return( RETURN_SUCCESS ),
40 f_status( k_initialized )
50 LPROG(
plog,
"Creating server objects" );
54 scarab::signal_handler t_sig_hand;
55 t_sig_hand.add_cancelable(
this );
62 std::thread t_msg_relay_thread;
68 message_relayer* t_msg_relay = message_relayer::create_instance( a_config[
"amqp"].as_node() );
69 if( a_config[
"post-to-slack"]().as_bool() )
71 LDEBUG(
plog,
"Starting message relayer thread" );
72 t_msg_relay_thread = std::thread( &message_relayer::execute_relayer, t_msg_relay );
78 LWARN(
plog,
"Message relayer already exists, and you're trying to create it again" );
82 LDEBUG(
plog,
"Creating stream manager" );
86 LDEBUG(
plog,
"Creating DAQ control" );
92 if( a_config.has(
"streams" ) && a_config[
"streams"].is_node() )
96 throw error() <<
"Unable to initialize the stream manager";
101 LDEBUG(
plog,
"Creating request receiver" );
104 LDEBUG(
plog,
"Creating batch executor" );
108 catch( std::exception& e )
110 LERROR(
plog,
"Exception caught while creating server objects: " << e.what() );
117 using namespace std::placeholders;
153 std::condition_variable t_daq_control_ready_cv;
154 std::mutex t_daq_control_ready_mutex;
157 LPROG(
plog,
"Starting threads" );
158 std::exception_ptr t_dc_ex_ptr;
162 LDEBUG(
plog,
"Waiting for the batch executor to finish" );
163 t_executor_thread_initial.join();
164 LDEBUG(
plog,
"Initial batch executions complete" );
166 if( ! is_canceled() )
176 LPROG(
plog,
"Running..." );
178 t_receiver_thread.join();
179 LPROG(
plog,
"Receiver thread has ended" );
183 LINFO(
plog,
"Request receiver not making connections, canceling run server" );
184 scarab::signal_handler::cancel_all( RETURN_ERROR );
187 t_executor_thread.join();
189 t_daq_control_thread.join();
190 LPROG(
plog,
"DAQ control thread has ended" );
192 t_sig_hand.remove_cancelable(
this );
194 if( t_msg_relay_thread.joinable() ) t_msg_relay_thread.join();
195 LDEBUG(
plog,
"Message relay thread has ended" );
199 LPROG(
plog,
"Threads stopped" );
206 LDEBUG(
plog,
"Canceling run server with code <" << a_code <<
">" );
208 message_relayer::get_instance()->slack_notice(
"Psyllid is shutting down" );
212 message_relayer::get_instance()->cancel( a_code );
219 LINFO(
plog,
"Shutting down the server" );
259 return a_request->reply( dripline::dl_message_error_invalid_method(),
"Server status request not yet supported" );
264 dripline::reply_ptr_t t_return = a_request->reply( dripline::dl_success(),
"Server-quit command processed" );
275 return std::string(
"Initialized" );
278 return std::string(
"Starting" );
281 return std::string(
"Running" );
284 return std::string(
"Done" );
287 return std::string(
"Error" );
290 return std::string(
"Unknown" );
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex, bool a_run_forever=false)
dripline::reply_ptr_t handle_reactivate_daq_control(const dripline::request_ptr_t a_request)
std::shared_ptr< stream_manager > f_stream_manager
std::atomic< status > f_status
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex)
dripline::reply_ptr_t handle_apply_config_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_use_monarch_request(const dripline::request_ptr_t a_request)
static scarab::logger plog("batch_executor")
dripline::reply_ptr_t handle_stop_run_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_dump_config_node_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_stream_list_request(const dripline::request_ptr_t a_request)
Controls psyllid's status and forwards requests to the DAQ nodes.
std::shared_ptr< batch_executor > f_batch_executor
virtual void do_cancellation(int a_code)
dripline::reply_ptr_t handle_start_run_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_add_stream_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_set_use_monarch_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_configure_node_request(const dripline::request_ptr_t a_request)
std::shared_ptr< request_receiver > f_request_receiver
dripline::reply_ptr_t handle_get_duration_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_activate_daq_control(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_server_status_request(const dripline::request_ptr_t a_request)
void slack_notice(const std::string &a_msg_text) const
dripline::reply_ptr_t handle_get_status_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_run_command_request(const dripline::request_ptr_t a_request)
A class sequentially execute a list of actions, equivalent to a sequence of dripline requests...
static std::string interpret_status(status a_status)
dripline::reply_ptr_t handle_quit_server_request(const dripline::request_ptr_t a_request)
void execute(std::condition_variable &a_ready_condition_variable, std::mutex &a_ready_mutex)
Run the DAQ control thread.
Manages one or multiple sets of midge-nodes.
dripline::reply_ptr_t handle_get_filename_request(const dripline::request_ptr_t a_request)
Receives request from a amqp broker.
dripline::reply_ptr_t handle_get_stream_node_list_request(const dripline::request_ptr_t a_request)
void set_status(status a_status)
dripline::reply_ptr_t handle_set_description_request(const dripline::request_ptr_t a_request)
LOGGER(plog, "egg_writer")
std::shared_ptr< daq_control > f_daq_control
dripline::reply_ptr_t handle_remove_stream_request(const dripline::request_ptr_t a_request)
std::mutex f_component_mutex
dripline::reply_ptr_t handle_dump_config_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_set_duration_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_description_request(const dripline::request_ptr_t a_request)
void execute(const scarab::param_node &a_config)
static void set_daq_control(std::weak_ptr< daq_control > a_daq_control)
dripline::reply_ptr_t handle_deactivate_daq_control(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_set_filename_request(const dripline::request_ptr_t a_request)