15 #include <arpa/inet.h> 23 #include <sys/socket.h> 24 #include <sys/types.h> 36 f_max_packet_size( 1048576 ),
55 LDEBUG(
plog,
"Opening UDP socket receiving at " << f_ip <<
":" << f_port );
58 socklen_t t_socket_length =
sizeof(sockaddr_in);
60 ::memset(
f_address, 0, t_socket_length );
64 f_address->sin_addr.s_addr = inet_addr( f_ip.c_str() );
65 if(
f_address->sin_addr.s_addr == INADDR_NONE )
67 throw error() <<
"[packet_receiver_socket] invalid IP address\n";
75 f_socket = ::socket( AF_INET, SOCK_DGRAM, 0 );
78 throw error() <<
"[packet_receiver_socket] could not create socket:\n\t" << strerror( errno );
88 ::setsockopt(
f_socket, SOL_SOCKET, SO_REUSEADDR, (
const void *)&optval ,
sizeof(
int));
91 if( f_timeout_sec > 0 )
93 struct timeval t_timeout;
94 t_timeout.tv_sec = f_timeout_sec;
95 t_timeout.tv_usec = 0;
96 ::setsockopt(
f_socket, SOL_SOCKET, SO_RCVTIMEO, (
char *)&t_timeout,
sizeof(
struct timeval) );
104 throw error() <<
"[packet_receiver_socket] could not bind socket:\n\t" << strerror( errno );
108 LINFO(
plog,
"Ready to receive messages at port " << f_ip <<
":" << f_port );
117 LDEBUG(
plog,
"Executing the packet_receiver_socket" );
123 if( ! out_stream< 0 >().
set( stream::s_start ) )
return;
125 ssize_t t_size_received = 0;
127 LINFO(
plog,
"Starting main loop; waiting for packets" );
128 while( ! is_canceled() )
130 t_block = out_stream< 0 >().data();
131 t_block->
resize( f_max_packet_size );
135 if( (out_stream< 0 >().
get() == stream::s_stop) )
137 LWARN(
plog,
"Output stream(s) have stop condition" );
141 LTRACE(
plog,
"Waiting for packets" );
144 while( t_size_received <= 0 && ! is_canceled() )
146 t_size_received = ::recv(
f_socket, (
void*)t_block->
block(), f_max_packet_size, 0 );
148 if( t_size_received > 0 )
150 LTRACE(
plog,
"Packet received (" << t_size_received <<
" bytes)" );
151 LTRACE(
plog,
"Packet written to stream index <" << out_stream< 0 >().get_current_index() <<
">" );
153 t_block->set_n_bytes_used( t_size_received );
155 if( ! out_stream< 0 >().
set( stream::s_run ) )
157 LERROR(
plog,
"Exiting due to stream error" );
170 else if( t_size_received == 0 )
172 LWARN(
plog,
"No message present" );
176 LWARN(
"Unable to receive; error message: " << strerror(
f_last_errno ) );
181 LINFO(
plog,
"Packet receiver is exiting" );
184 LDEBUG(
plog,
"Stopping output streams" );
185 if( ! out_stream< 0 >().
set( stream::s_stop ) )
return;
187 LDEBUG(
plog,
"Exiting output streams" );
188 out_stream< 0 >().
set( stream::s_exit );
194 a_midge->throw_ex( std::current_exception() );
238 LDEBUG(
plog,
"Configuring packet_receiver_socket with:\n" << a_config );
239 a_node->set_length( a_config.get_value(
"length", a_node->get_length() ) );
240 a_node->set_port( a_config.get_value(
"port", a_node->get_port() ) );
241 a_node->ip() = a_config.get_value(
"ip", a_node->ip() );
242 a_node->set_timeout_sec( a_config.get_value(
"timeout-sec", a_node->get_timeout_sec() ) );
248 LDEBUG(
plog,
"Dumping configuration for packet_receiver_socket" );
249 a_config.add(
"length", scarab::param_value( a_node->get_length() ) );
250 a_config.add(
"port", scarab::param_value( a_node->get_port() ) );
251 a_config.add(
"ip", scarab::param_value( a_node->ip() ) );
252 a_config.add(
"timeout-sec", scarab::param_value( a_node->get_timeout_sec() ) );
static scarab::logger plog("batch_executor")
virtual void do_dump_config(const packet_receiver_socket *a_node, scarab::param_node &a_config) const
virtual void initialize()
Timeout in seconds for waiting on socket recv function.
virtual ~packet_receiver_socket_binding()
packet_receiver_socket_binding()
virtual void do_apply_config(packet_receiver_socket *a_node, const scarab::param_node &a_config) const
A producer to receive UDP packets via the standard socket interface and write them as raw blocks of m...
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
virtual void execute(midge::diptera *a_midge=nullptr)
LOGGER(plog, "egg_writer")
virtual ~packet_receiver_socket()
void resize(size_t a_n_bytes)