Psyllid  v1.12.4
Project 8 Data Acquisisition Software
request_receiver.cc
Go to the documentation of this file.
1 
2 #include "daq_control.hh"
3 #include "request_receiver.hh"
4 #include "dripline_constants.hh"
5 
6 #include "psyllid_constants.hh"
7 #include "psyllid_error.hh"
8 
9 #include "logger.hh"
10 #include "signal_handler.hh"
11 
12 #include <cstddef>
13 #include <signal.h>
14 #include <sstream>
15 
16 using std::string;
17 
18 using scarab::param_node;
19 using scarab::param_ptr_t;
20 
21 using dripline::request_ptr_t;
22 
23 
24 namespace psyllid
25 {
26 
27  LOGGER( plog, "request_receiver" );
28 
29  request_receiver::request_receiver( const param_node& a_master_config ) :
30  hub( a_master_config["amqp"].as_node() ),
32  f_set_conditions( a_master_config["set-conditions"].as_node() ),
33  f_status( k_initialized )
34  {
35  }
36 
38  {
39  }
40 
41  void request_receiver::execute( std::condition_variable& a_daq_control_ready_cv, std::mutex& a_daq_control_ready_mutex )
42  {
44 
45  if( daq_control_expired() )
46  {
47  LERROR( plog, "Unable to get access to the DAQ control" );
48  scarab::signal_handler::cancel_all( RETURN_ERROR );
49  return;
50  }
51  dc_ptr_t t_daq_control_ptr = use_daq_control();
52 
53  // start the service
54  if( ! start() && f_make_connection )
55  {
56  LERROR( plog, "Unable to start the dripline service" );
57  scarab::signal_handler::cancel_all( RETURN_ERROR );
58  return;
59  }
60 
61  while ( ! t_daq_control_ptr->is_ready_at_startup() && ! cancelable::is_canceled() )
62  {
63  std::unique_lock< std::mutex > t_daq_control_lock( a_daq_control_ready_mutex );
64  a_daq_control_ready_cv.wait_for( t_daq_control_lock, std::chrono::seconds(1) );
65  }
66 
67  if ( f_make_connection && ! cancelable::is_canceled() ) {
68  LINFO( plog, "Waiting for incoming messages" );
69 
71 
72  while( ! cancelable::is_canceled() )
73  {
74  // blocking call to wait for incoming message
75  listen();
76  }
77  }
78 
79  LINFO( plog, "No longer waiting for messages" );
80 
81  stop();
82 
83  set_status( k_done );
84  LDEBUG( plog, "Request receiver is done" );
85 
86  return;
87  }
88 
90  {
91  LDEBUG( plog, "Canceling request receiver" );
93  return;
94  }
95 
97  {
98  switch( a_status )
99  {
100  case k_initialized:
101  return std::string( "Initialized" );
102  break;
103  case k_starting:
104  return std::string( "Starting" );
105  break;
106  case k_listening:
107  return std::string( "Listening" );
108  break;
109  case k_canceled:
110  return std::string( "Canceled" );
111  break;
112  case k_done:
113  return std::string( "Done" );
114  break;
115  case k_error:
116  return std::string( "Error" );
117  break;
118  default:
119  return std::string( "Unknown" );
120  }
121  }
122 
123  dripline::reply_ptr_t request_receiver::__do_handle_set_condition_request( const dripline::request_ptr_t a_request )
124  {
125  std::string t_condition = a_request->payload()["values"][0]().as_string();
126  if ( f_set_conditions.has( t_condition ) )
127  {
128  std::string t_rks = f_set_conditions[t_condition]().as_string();
129  dripline::request_ptr_t t_request = dripline::msg_request::create( param_ptr_t(new param_node()), dripline::op_t::cmd, a_request->routing_key(), t_rks );
130  //t_request->specifier = t_rks; //, dripline::routing_key_specifier( t_rks ) );
131 
132  dripline::reply_ptr_t t_reply_ptr = submit_request_message( t_request );
133  return a_request->reply( t_reply_ptr->get_return_code(), t_reply_ptr->return_msg(), param_ptr_t(new param_node(t_reply_ptr->payload().as_node())) );
134  }
135  return a_request->reply( dripline::dl_daq_error(), "set condition not configured" );
136  }
137 
138 }
void set_status(status a_status)
void execute(std::condition_variable &a_daq_control_ready_cv, std::mutex &a_daq_control_ready_mutex)
request_receiver(const scarab::param_node &a_master_config)
static scarab::logger plog("batch_executor")
static std::string interpret_status(status a_status)
Gives other classes access to daq_control.
virtual void do_cancellation(int a_code)
LOGGER(plog, "egg_writer")
std::shared_ptr< daq_control > dc_ptr_t
virtual dripline::reply_ptr_t __do_handle_set_condition_request(const dripline::request_ptr_t a_request)