Psyllid  v1.12.4
Project 8 Data Acquisisition Software
packet_receiver_socket.cc
Go to the documentation of this file.
1 /*
2  * packet_receiver_socket.cc
3  *
4  * Created on: Nov 2, 2016
5  * Author: nsoblath
6  */
7 
9 
10 #include "psyllid_error.hh"
11 
12 #include "logger.hh"
13 #include "param.hh"
14 
15 #include <arpa/inet.h>
16 #include <errno.h>
17 #include <memory>
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <unistd.h>
22 #include <netdb.h>
23 #include <sys/socket.h>
24 #include <sys/types.h>
25 
26 using midge::stream;
27 
28 namespace psyllid
29 {
30  REGISTER_NODE_AND_BUILDER( packet_receiver_socket, "packet-receiver-socket", packet_receiver_socket_binding );
31 
32  LOGGER( plog, "packet_receiver_socket" );
33 
35  f_length( 10 ),
36  f_max_packet_size( 1048576 ),
37  f_port( 23530 ),
38  f_ip( "127.0.0.1" ),
39  f_timeout_sec( 1 ),
40  f_socket( 0 ),
41  f_address( nullptr ),
42  f_last_errno( 0 )
43  {
44  }
45 
47  {
49  }
50 
52  {
53  out_buffer< 0 >().initialize( f_length );
54 
55  LDEBUG( plog, "Opening UDP socket receiving at " << f_ip << ":" << f_port );
56 
57  //initialize address
58  socklen_t t_socket_length = sizeof(sockaddr_in);
59  f_address = new sockaddr_in();
60  ::memset( f_address, 0, t_socket_length );
61 
62  //prepare address
63  f_address->sin_family = AF_INET;
64  f_address->sin_addr.s_addr = inet_addr( f_ip.c_str() );
65  if( f_address->sin_addr.s_addr == INADDR_NONE )
66  {
67  throw error() << "[packet_receiver_socket] invalid IP address\n";
68  return;
69  }
70  f_address->sin_port = htons( f_port );
71 
72  //MTLINFO( pmsg, "address prepared..." );
73 
74  //open socket
75  f_socket = ::socket( AF_INET, SOCK_DGRAM, 0 );
76  if( f_socket < 0 )
77  {
78  throw error() << "[packet_receiver_socket] could not create socket:\n\t" << strerror( errno );
79  return;
80  }
81 
82  /* setsockopt: Handy debugging trick that lets
83  * us rerun the udp_server immediately after we kill it;
84  * otherwise we have to wait about 20 secs.
85  * Eliminates "ERROR on binding: Address already in use" error.
86  */
87  int optval = 1;
88  ::setsockopt( f_socket, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int));
89 
90  // Receive timeout
91  if( f_timeout_sec > 0 )
92  {
93  struct timeval t_timeout;
94  t_timeout.tv_sec = f_timeout_sec;
95  t_timeout.tv_usec = 0; // Not init'ing this can cause strange errors
96  ::setsockopt( f_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&t_timeout, sizeof(struct timeval) );
97  }
98 
99  //msg_normal( pmsg, "socket open..." );
100 
101  //bind socket
102  if( ::bind( f_socket, (const sockaddr*) (f_address), t_socket_length ) < 0 )
103  {
104  throw error() << "[packet_receiver_socket] could not bind socket:\n\t" << strerror( errno );
105  return;
106  }
107 
108  LINFO( plog, "Ready to receive messages at port " << f_ip << ":" << f_port );
109 
110  return;
111  }
112 
113  void packet_receiver_socket::execute( midge::diptera* a_midge )
114  {
115  try
116  {
117  LDEBUG( plog, "Executing the packet_receiver_socket" );
118 
119  memory_block* t_block = nullptr;
120 
121  //LDEBUG( plog, "Server is listening" );
122 
123  if( ! out_stream< 0 >().set( stream::s_start ) ) return;
124 
125  ssize_t t_size_received = 0;
126 
127  LINFO( plog, "Starting main loop; waiting for packets" );
128  while( ! is_canceled() )
129  {
130  t_block = out_stream< 0 >().data();
131  t_block->resize( f_max_packet_size );
132 
133  t_size_received = 0;
134 
135  if( (out_stream< 0 >().get() == stream::s_stop) )
136  {
137  LWARN( plog, "Output stream(s) have stop condition" );
138  break;
139  }
140 
141  LTRACE( plog, "Waiting for packets" );
142 
143  // inner loop over packet-receive timeouts
144  while( t_size_received <= 0 && ! is_canceled() )
145  {
146  t_size_received = ::recv( f_socket, (void*)t_block->block(), f_max_packet_size, 0 );
147 
148  if( t_size_received > 0 )
149  {
150  LTRACE( plog, "Packet received (" << t_size_received << " bytes)" );
151  LTRACE( plog, "Packet written to stream index <" << out_stream< 0 >().get_current_index() << ">" );
152 
153  t_block->set_n_bytes_used( t_size_received );
154 
155  if( ! out_stream< 0 >().set( stream::s_run ) )
156  {
157  LERROR( plog, "Exiting due to stream error" );
158  break;
159  }
160  break;
161  }
162 
163  f_last_errno = errno;
164  if( f_last_errno == EWOULDBLOCK || f_last_errno == EAGAIN )
165  {
166  // recv timed out without anything being available to receive
167  // nothing seems to be wrong with the socket
168  break;
169  }
170  else if( t_size_received == 0 )
171  {
172  LWARN( plog, "No message present" );
173  }
174  else // t_size_received < 0 && f_last_errno != EWOULDBLOCK && f_last_errno != EAGAIN
175  {
176  LWARN( "Unable to receive; error message: " << strerror( f_last_errno ) );
177  }
178  }
179  }
180 
181  LINFO( plog, "Packet receiver is exiting" );
182 
183  // normal exit condition
184  LDEBUG( plog, "Stopping output streams" );
185  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
186 
187  LDEBUG( plog, "Exiting output streams" );
188  out_stream< 0 >().set( stream::s_exit );
189 
190  return;
191  }
192  catch(...)
193  {
194  a_midge->throw_ex( std::current_exception() );
195  }
196  }
197 
199  {
200  out_buffer< 0 >().finalize();
201 
202  cleanup_socket();
203 
204  return;
205  }
206 
208  {
209  //clean up udp_server address
210  if( f_address != nullptr )
211  {
212  delete f_address;
213  f_address = nullptr;
214  }
215 
216  //close udp_server socket
217  if( f_socket != 0 )
218  {
219  ::close( f_socket );
220  f_socket = 0;
221  }
222 
223  return;
224  }
225 
226 
229  {
230  }
231 
233  {
234  }
235 
236  void packet_receiver_socket_binding::do_apply_config( packet_receiver_socket* a_node, const scarab::param_node& a_config ) const
237  {
238  LDEBUG( plog, "Configuring packet_receiver_socket with:\n" << a_config );
239  a_node->set_length( a_config.get_value( "length", a_node->get_length() ) );
240  a_node->set_port( a_config.get_value( "port", a_node->get_port() ) );
241  a_node->ip() = a_config.get_value( "ip", a_node->ip() );
242  a_node->set_timeout_sec( a_config.get_value( "timeout-sec", a_node->get_timeout_sec() ) );
243  return;
244  }
245 
246  void packet_receiver_socket_binding::do_dump_config( const packet_receiver_socket* a_node, scarab::param_node& a_config ) const
247  {
248  LDEBUG( plog, "Dumping configuration for packet_receiver_socket" );
249  a_config.add( "length", scarab::param_value( a_node->get_length() ) );
250  a_config.add( "port", scarab::param_value( a_node->get_port() ) );
251  a_config.add( "ip", scarab::param_value( a_node->ip() ) );
252  a_config.add( "timeout-sec", scarab::param_value( a_node->get_timeout_sec() ) );
253  return;
254  }
255 
256 
257 } /* namespace psyllid */
static scarab::logger plog("batch_executor")
virtual void do_dump_config(const packet_receiver_socket *a_node, scarab::param_node &a_config) const
virtual void initialize()
Timeout in seconds for waiting on socket recv function.
virtual void do_apply_config(packet_receiver_socket *a_node, const scarab::param_node &a_config) const
A producer to receive UDP packets via the standard socket interface and write them as raw blocks of m...
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
virtual void execute(midge::diptera *a_midge=nullptr)
LOGGER(plog, "egg_writer")
void resize(size_t a_n_bytes)
Definition: memory_block.cc:27