Psyllid  v1.12.4
Project 8 Data Acquisisition Software
run_server.cc
Go to the documentation of this file.
1 /*
2  * mt_run_server.cc
3  *
4  * Created on: May 6, 2015
5  * Author: nsoblath
6  */
7 
8 #include "run_server.hh"
9 
10 #include "psyllid_constants.hh"
11 #include "daq_control.hh"
12 #include "message_relayer.hh"
13 #include "request_receiver.hh"
14 #include "signal_handler.hh"
15 #include "stream_manager.hh"
16 #include "batch_executor.hh"
17 
18 #include "logger.hh"
19 
20 #include <condition_variable>
21 #include <thread>
22 
23 using dripline::request_ptr_t;
24 
25 using scarab::param_node;
26 using scarab::version_semantic;
27 
28 namespace psyllid
29 {
30  LOGGER( plog, "run_server" );
31 
33  scarab::cancelable(),
34  f_return( RETURN_SUCCESS ),
35  f_request_receiver(),
36  f_batch_executor(),
37  f_daq_control(),
38  f_stream_manager(),
39  f_component_mutex(),
40  f_status( k_initialized )
41  {
42  }
43 
45  {
46  }
47 
48  void run_server::execute( const param_node& a_config )
49  {
50  LPROG( plog, "Creating server objects" );
51 
53 
54  scarab::signal_handler t_sig_hand;
55  t_sig_hand.add_cancelable( this );
56 
57  // configuration manager
58  //config_manager t_config_mgr( a_config, &t_dev_mgr );
59 
60  std::unique_lock< std::mutex > t_lock( f_component_mutex );
61 
62  std::thread t_msg_relay_thread;
63  try
64  {
65  // dripline relayer
66  try
67  {
68  message_relayer* t_msg_relay = message_relayer::create_instance( a_config["amqp"].as_node() );
69  if( a_config["post-to-slack"]().as_bool() )
70  {
71  LDEBUG( plog, "Starting message relayer thread" );
72  t_msg_relay_thread = std::thread( &message_relayer::execute_relayer, t_msg_relay );
73  t_msg_relay->slack_notice( "Psyllid is starting up" );
74  }
75  }
76  catch(...)
77  {
78  LWARN( plog, "Message relayer already exists, and you're trying to create it again" );
79  }
80 
81  // node manager
82  LDEBUG( plog, "Creating stream manager" );
83  f_stream_manager.reset( new stream_manager() );
84 
85  // daq control
86  LDEBUG( plog, "Creating DAQ control" );
87  f_daq_control.reset( new daq_control( a_config, f_stream_manager ) );
88  // provide the pointer to the daq_control to control_access
90  f_daq_control->initialize();
91 
92  if( a_config.has( "streams" ) && a_config["streams"].is_node() )
93  {
94  if( ! f_stream_manager->initialize( a_config["streams"].as_node() ) )
95  {
96  throw error() << "Unable to initialize the stream manager";
97  }
98  }
99 
100  // request receiver
101  LDEBUG( plog, "Creating request receiver" );
102  f_request_receiver.reset( new request_receiver( a_config ) );
103  // batch executor
104  LDEBUG( plog, "Creating batch executor" );
105  f_batch_executor.reset( new batch_executor( a_config, f_request_receiver ) );
106 
107  }
108  catch( std::exception& e )
109  {
110  LERROR( plog, "Exception caught while creating server objects: " << e.what() );
111  f_return = RETURN_ERROR;
112  return;
113  }
114 
115  // tie the various request handlers of psyllid to the request receiver
116 
117  using namespace std::placeholders;
118 
119  // set the run request handler
120  f_request_receiver->set_run_handler( std::bind( &daq_control::handle_start_run_request, f_daq_control, _1 ) );
121 
122  // add get request handlers
123  //f_request_receiver->register_get_handler( "server-status", std::bind( &run_server::handle_get_server_status_request, this, _1 ) );
124  f_request_receiver->register_get_handler( "node-config", std::bind( &stream_manager::handle_dump_config_node_request, f_stream_manager, _1 ) );
125  f_request_receiver->register_get_handler( "active-config", std::bind( &daq_control::handle_dump_config_request, f_daq_control, _1 ) );
126  f_request_receiver->register_get_handler( "daq-status", std::bind( &daq_control::handle_get_status_request, f_daq_control, _1 ) );
127  f_request_receiver->register_get_handler( "filename", std::bind( &daq_control::handle_get_filename_request, f_daq_control, _1 ) );
128  f_request_receiver->register_get_handler( "description", std::bind( &daq_control::handle_get_description_request, f_daq_control, _1 ) );
129  f_request_receiver->register_get_handler( "duration", std::bind( &daq_control::handle_get_duration_request, f_daq_control, _1 ) );
130  f_request_receiver->register_get_handler( "use-monarch", std::bind( &daq_control::handle_get_use_monarch_request, f_daq_control, _1 ) );
131  f_request_receiver->register_get_handler( "stream-list", std::bind( &stream_manager::handle_get_stream_list_request, f_stream_manager, _1 ) );
132  f_request_receiver->register_get_handler( "node-list", std::bind( &stream_manager::handle_get_stream_node_list_request, f_stream_manager, _1 ) );
133 
134  // add set request handlers
135  f_request_receiver->register_set_handler( "node-config", std::bind( &stream_manager::handle_configure_node_request, f_stream_manager, _1 ) );
136  f_request_receiver->register_set_handler( "active-config", std::bind( &daq_control::handle_apply_config_request, f_daq_control, _1 ) );
137  f_request_receiver->register_set_handler( "filename", std::bind( &daq_control::handle_set_filename_request, f_daq_control, _1 ) );
138  f_request_receiver->register_set_handler( "description", std::bind( &daq_control::handle_set_description_request, f_daq_control, _1 ) );
139  f_request_receiver->register_set_handler( "duration", std::bind( &daq_control::handle_set_duration_request, f_daq_control, _1 ) );
140  f_request_receiver->register_set_handler( "use-monarch", std::bind( &daq_control::handle_set_use_monarch_request, f_daq_control, _1 ) );
141 
142  // add cmd request handlers
143  f_request_receiver->register_cmd_handler( "add-stream", std::bind( &stream_manager::handle_add_stream_request, f_stream_manager, _1 ) );
144  f_request_receiver->register_cmd_handler( "remove-stream", std::bind( &stream_manager::handle_remove_stream_request, f_stream_manager, _1 ) );
145  f_request_receiver->register_cmd_handler( "run-daq-cmd", std::bind( &daq_control::handle_run_command_request, f_daq_control, _1 ) );
146  f_request_receiver->register_cmd_handler( "stop-run", std::bind( &daq_control::handle_stop_run_request, f_daq_control, _1 ) );
147  f_request_receiver->register_cmd_handler( "start-run", std::bind( &daq_control::handle_start_run_request, f_daq_control, _1 ) );
148  f_request_receiver->register_cmd_handler( "activate-daq", std::bind( &daq_control::handle_activate_daq_control, f_daq_control, _1 ) );
149  f_request_receiver->register_cmd_handler( "reactivate-daq", std::bind( &daq_control::handle_reactivate_daq_control, f_daq_control, _1 ) );
150  f_request_receiver->register_cmd_handler( "deactivate-daq", std::bind( &daq_control::handle_deactivate_daq_control, f_daq_control, _1 ) );
151  f_request_receiver->register_cmd_handler( "quit-psyllid", std::bind( &run_server::handle_quit_server_request, this, _1 ) );
152 
153  std::condition_variable t_daq_control_ready_cv;
154  std::mutex t_daq_control_ready_mutex;
155 
156  // start threads
157  LPROG( plog, "Starting threads" );
158  std::exception_ptr t_dc_ex_ptr;
159  std::thread t_daq_control_thread( &daq_control::execute, f_daq_control.get(), std::ref(t_daq_control_ready_cv), std::ref(t_daq_control_ready_mutex) );
160  // batch execution to do initial calls (AMQP consume hasn't started yet)
161  std::thread t_executor_thread_initial( &batch_executor::execute, f_batch_executor.get(), std::ref(t_daq_control_ready_cv), std::ref(t_daq_control_ready_mutex), false );
162  LDEBUG( plog, "Waiting for the batch executor to finish" );
163  t_executor_thread_initial.join();
164  LDEBUG( plog, "Initial batch executions complete" );
165 
166  if( ! is_canceled() )
167  {
168  // now execute the request receiver to start consuming
169  // and start the batch executor in infinite mode so that more command sets may be staged later
170  std::thread t_executor_thread( &batch_executor::execute, f_batch_executor.get(), std::ref(t_daq_control_ready_cv), std::ref(t_daq_control_ready_mutex), true );
171  std::thread t_receiver_thread( &request_receiver::execute, f_request_receiver.get(), std::ref(t_daq_control_ready_cv), std::ref(t_daq_control_ready_mutex) );
172 
173  t_lock.unlock();
174 
176  LPROG( plog, "Running..." );
177 
178  t_receiver_thread.join();
179  LPROG( plog, "Receiver thread has ended" );
180  // if make_connection is false, we need to actually call cancel:
181  if ( ! f_request_receiver.get()->get_make_connection() )
182  {
183  LINFO( plog, "Request receiver not making connections, canceling run server" );
184  scarab::signal_handler::cancel_all( RETURN_ERROR );
185  }
186  // and then wait for the controllers to finish up...
187  t_executor_thread.join();
188  }
189  t_daq_control_thread.join();
190  LPROG( plog, "DAQ control thread has ended" );
191 
192  t_sig_hand.remove_cancelable( this );
193 
194  if( t_msg_relay_thread.joinable() ) t_msg_relay_thread.join();
195  LDEBUG( plog, "Message relay thread has ended" );
196 
197  set_status( k_done );
198 
199  LPROG( plog, "Threads stopped" );
200 
201  return;
202  }
203 
204  void run_server::do_cancellation( int a_code )
205  {
206  LDEBUG( plog, "Canceling run server with code <" << a_code << ">" );
207  f_return = a_code;
208  message_relayer::get_instance()->slack_notice( "Psyllid is shutting down" );
209  f_batch_executor->cancel( a_code );
210  f_request_receiver->cancel( a_code );
211  f_daq_control->cancel( a_code );
212  message_relayer::get_instance()->cancel( a_code );
213  //f_node_manager->cancel();
214  return;
215  }
216 
218  {
219  LINFO( plog, "Shutting down the server" );
220  cancel( f_status == k_error ? RETURN_ERROR : RETURN_SUCCESS );
221  return;
222  }
223 
224 
225  dripline::reply_ptr_t run_server::handle_get_server_status_request( const dripline::request_ptr_t a_request )
226  {
227  /*
228  param_node* t_server_node = new param_node();
229  t_server_node->add( "status", new param_value( run_server::interpret_status( get_status() ) ) );
230 
231  f_component_mutex.lock();
232  if( f_request_receiver != NULL )
233  {
234  param_node* t_rr_node = new param_node();
235  t_rr_node->add( "status", new param_value( request_receiver::interpret_status( f_request_receiver->get_status() ) ) );
236  t_server_node->add( "request-receiver", t_rr_node );
237  }
238  if( f_acq_request_db != NULL )
239  {
240  param_node* t_queue_node = new param_node();
241  t_queue_node->add( "size", new param_value( (uint32_t)f_acq_request_db->queue_size() ) );
242  t_queue_node->add( "is-active", new param_value( f_acq_request_db->queue_is_active() ) );
243  t_server_node->add( "queue", t_queue_node );
244  }
245  if( f_server_worker != NULL )
246  {
247  param_node* t_sw_node = new param_node();
248  t_sw_node->add( "status", new param_value( server_worker::interpret_status( f_server_worker->get_status() ) ) );
249  t_sw_node->add( "digitizer-state", new param_value( server_worker::interpret_thread_state( f_server_worker->get_digitizer_state() ) ) );
250  t_sw_node->add( "writer-state", new param_value( server_worker::interpret_thread_state( f_server_worker->get_writer_state() ) ) );
251  t_server_node->add( "server-worker", t_sw_node );
252  }
253  f_component_mutex.unlock();
254 
255  a_reply_pkg.f_payload.add( "server", t_server_node );
256 
257  return a_reply_pkg.send_reply( dripline::dl_success(), "Server status request succeeded" );
258  */
259  return a_request->reply( dripline::dl_message_error_invalid_method(), "Server status request not yet supported" );
260  }
261 
262  dripline::reply_ptr_t run_server::handle_quit_server_request( const dripline::request_ptr_t a_request )
263  {
264  dripline::reply_ptr_t t_return = a_request->reply( dripline::dl_success(), "Server-quit command processed" );
265  quit_server();
266  return t_return;
267  }
268 
269 
270  std::string run_server::interpret_status( status a_status )
271  {
272  switch( a_status )
273  {
274  case k_initialized:
275  return std::string( "Initialized" );
276  break;
277  case k_starting:
278  return std::string( "Starting" );
279  break;
280  case k_running:
281  return std::string( "Running" );
282  break;
283  case k_done:
284  return std::string( "Done" );
285  break;
286  case k_error:
287  return std::string( "Error" );
288  break;
289  default:
290  return std::string( "Unknown" );
291  }
292  }
293 
294 
295 } /* namespace mantis */
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex, bool a_run_forever=false)
dripline::reply_ptr_t handle_reactivate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:604
std::shared_ptr< stream_manager > f_stream_manager
Definition: run_server.hh:74
std::atomic< status > f_status
Definition: run_server.hh:94
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex)
dripline::reply_ptr_t handle_apply_config_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:684
dripline::reply_ptr_t handle_get_use_monarch_request(const dripline::request_ptr_t a_request)
static scarab::logger plog("batch_executor")
dripline::reply_ptr_t handle_stop_run_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:671
void cancel(int)
dripline::reply_ptr_t handle_dump_config_node_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_stream_list_request(const dripline::request_ptr_t a_request)
Controls psyllid&#39;s status and forwards requests to the DAQ nodes.
Definition: daq_control.hh:60
std::shared_ptr< batch_executor > f_batch_executor
Definition: run_server.hh:71
virtual void do_cancellation(int a_code)
Definition: run_server.cc:204
dripline::reply_ptr_t handle_start_run_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:630
dripline::reply_ptr_t handle_add_stream_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_set_use_monarch_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:930
dripline::reply_ptr_t handle_configure_node_request(const dripline::request_ptr_t a_request)
std::shared_ptr< request_receiver > f_request_receiver
Definition: run_server.hh:70
dripline::reply_ptr_t handle_get_duration_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_activate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:591
dripline::reply_ptr_t handle_get_server_status_request(const dripline::request_ptr_t a_request)
Definition: run_server.cc:225
void slack_notice(const std::string &a_msg_text) const
dripline::reply_ptr_t handle_get_status_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:944
dripline::reply_ptr_t handle_run_command_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:817
A class sequentially execute a list of actions, equivalent to a sequence of dripline requests...
static std::string interpret_status(status a_status)
Definition: run_server.cc:270
dripline::reply_ptr_t handle_quit_server_request(const dripline::request_ptr_t a_request)
Definition: run_server.cc:262
void execute(std::condition_variable &a_ready_condition_variable, std::mutex &a_ready_mutex)
Run the DAQ control thread.
Definition: daq_control.cc:76
Manages one or multiple sets of midge-nodes.
virtual ~run_server()
Definition: run_server.cc:44
dripline::reply_ptr_t handle_get_filename_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:959
Receives request from a amqp broker.
dripline::reply_ptr_t handle_get_stream_node_list_request(const dripline::request_ptr_t a_request)
void set_status(status a_status)
Definition: run_server.hh:108
dripline::reply_ptr_t handle_set_description_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:888
LOGGER(plog, "egg_writer")
std::shared_ptr< daq_control > f_daq_control
Definition: run_server.hh:73
dripline::reply_ptr_t handle_remove_stream_request(const dripline::request_ptr_t a_request)
std::mutex f_component_mutex
Definition: run_server.hh:76
dripline::reply_ptr_t handle_dump_config_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:758
dripline::reply_ptr_t handle_set_duration_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:910
dripline::reply_ptr_t handle_get_description_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:981
void execute(const scarab::param_node &a_config)
Definition: run_server.cc:48
static void set_daq_control(std::weak_ptr< daq_control > a_daq_control)
dripline::reply_ptr_t handle_deactivate_daq_control(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:617
dripline::reply_ptr_t handle_set_filename_request(const dripline::request_ptr_t a_request)
Definition: daq_control.cc:867