15 #include <arpa/inet.h> 17 #include <linux/if_ether.h> 19 #include <linux/udp.h> 28 #include <sys/socket.h> 29 #include <sys/types.h> 33 #include <netinet/ether.h> 46 f_max_packet_size( 1048576 ),
48 f_interface(
"eth1" ),
51 f_block_size( 1 << 22 ),
52 f_frame_size( 1 << 14 ),
53 f_net_interface_index( 0 ),
73 throw error() <<
"[packet_receiver_fpa] Unable to find index for interface <" << f_interface <<
">";
77 LDEBUG(
plog,
"Preparing fast-packet-acquisition on interface " << f_interface <<
" at UDP port " << f_port );
81 f_socket = ::socket( AF_PACKET, SOCK_RAW, htons(ETH_P_IP) );
84 throw error() <<
"Could not create socket:\n\t" << strerror( errno );
87 int t_packet_ver = TPACKET_V3;
88 if( ::setsockopt(
f_socket, SOL_PACKET, PACKET_VERSION, &t_packet_ver,
sizeof(
int) ) < 0 )
90 throw error() <<
"Could not set packet version:\n\t" << strerror( errno );
97 LDEBUG(
plog,
"Ring buffer parameters:\n" <<
98 "block size: " << f_block_size <<
'\n' <<
99 "frame size: " << f_frame_size <<
'\n' <<
100 "number of blocks: " << f_n_blocks );
104 f_ring.
f_req.tp_frame_nr = (f_block_size * f_n_blocks) / f_frame_size;
106 f_ring.
f_req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH;
111 LTRACE(
plog,
"f_ring.f_rd == nullptr: " << test );
113 LTRACE(
plog,
"f_ring.f_map == nullptr: " << test );
115 LTRACE(
plog,
"f_ring.f_req.tp_block_size = " <<
f_ring.
f_req.tp_block_size );
117 LDEBUG(
plog,
"Opening packet_eater for network interface <" << f_interface <<
">" );
119 LTRACE(
plog,
"f_socket = " <<
f_socket <<
"; SOL_PACKET = " << SOL_PACKET <<
"; PACKET_RX_RING = " << PACKET_RX_RING <<
"; &f_ring.f_req = " << &
f_ring.
f_req <<
"; sizeof(f_ring.f_req) = " <<
sizeof(
f_ring.
f_req) );
122 throw error() <<
"Could not set receive ring:\n\t" << strerror( errno );
134 if( f_timeout_sec > 0 )
137 t_timeout.tv_sec = f_timeout_sec;
138 t_timeout.tv_usec = 0;
139 ::setsockopt(
f_socket, SOL_SOCKET, SO_RCVTIMEO, (
char *)&t_timeout,
sizeof(
struct timeval) );
144 PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED,
f_socket, 0);
147 throw error() <<
"Unable to setup ring map";
153 throw error() <<
"Unable to allocate memory for the ring";
155 for(
unsigned i_block = 0; i_block <
f_ring.
f_req.tp_block_nr; ++i_block )
162 sockaddr_ll t_address;
163 ::memset( &t_address, 0,
sizeof(sockaddr_ll) );
164 t_address.sll_family = PF_PACKET;
165 t_address.sll_protocol = htons(ETH_P_IP);
167 t_address.sll_hatype = 0;
168 t_address.sll_pkttype = 0;
169 t_address.sll_halen = 0;
171 if( ::bind(
f_socket, (sockaddr*)&t_address,
sizeof(t_address) ) < 0 )
173 throw error() <<
"Could not bind socket:\n\t" << strerror( errno );
176 LINFO(
plog,
"Ready to consume packets on interface <" << f_interface <<
">" );
185 LDEBUG(
plog,
"Executing the packet_receiver_fpa" );
191 ::memset( &t_pollfd, 0,
sizeof(pollfd) );
193 t_pollfd.events = POLLIN | POLLERR;
194 t_pollfd.revents = 0;
196 unsigned t_block_num = 0;
199 unsigned t_timeout_msec = 1000 * f_timeout_sec;
203 std::unique_ptr< char[] > t_buffer_ptr(
new char[ f_max_packet_size ] );
205 if( ! out_stream< 0 >().
set( stream::s_start ) )
return;
207 LINFO(
plog,
"Starting main loop; waiting for packets" );
208 while( ! is_canceled() )
211 if( (out_stream< 0 >().
get() == stream::s_stop) )
213 LWARN(
plog,
"Output stream(s) have stop condition" );
221 if( ( t_block->
f_packet_hdr.block_status & TP_STATUS_USER ) == 0 )
225 poll( &t_pollfd, 1, t_timeout_msec );
231 unsigned long t_bytes = 0;
233 tpacket3_hdr* t_packet =
reinterpret_cast< tpacket3_hdr*
>( (uint8_t*)t_block + t_block->
f_packet_hdr.offset_to_first_pkt );
235 LTRACE(
plog,
"Walking a block with " << t_num_pkts <<
" packets" );
236 for(
unsigned i = 0; i < t_num_pkts; ++i )
238 t_bytes += t_packet->tp_snaplen;
240 LTRACE(
plog,
"Processing IP packet; current iterator index is <" << out_stream< 0 >().get_current_index() <<
">" );
243 LTRACE(
plog,
"UDP packet processed; outputing to stream index <" << out_stream< 0 >().get_current_index() <<
">" );
244 if( ! out_stream< 0 >().
set( stream::s_run ) )
246 LERROR(
plog,
"Exiting due to stream error" );
252 LTRACE(
plog,
"Packet was not processed properly; skipping" );
256 t_packet =
reinterpret_cast< tpacket3_hdr*
>( (uint8_t*)t_packet + t_packet->tp_next_offset );
258 LTRACE(
plog,
"Done walking block" );
265 t_block_num = ( t_block_num + 1 ) % f_n_blocks;
269 LINFO(
plog,
"Packet receiver is exiting" );
272 LDEBUG(
plog,
"Stopping output streams" );
273 if( ! out_stream< 0 >().
set( stream::s_stop ) )
return;
275 LDEBUG(
plog,
"Exiting output streams" );
276 out_stream< 0 >().
set( stream::s_exit );
282 if( a_midge ) a_midge->throw_ex( std::current_exception() );
296 ethhdr* t_eth_hdr =
reinterpret_cast< ethhdr*
>( (uint8_t*)a_packet + a_packet->tp_mac );
299 char t_macstr_dest[3*ETH_ALEN], t_macstr_source[3*ETH_ALEN];
300 ether_ntoa_r((
struct ether_addr*)&(t_eth_hdr->h_dest), t_macstr_dest);
301 ether_ntoa_r((
struct ether_addr*)&(t_eth_hdr->h_source), t_macstr_source);
302 LTRACE(
plog,
"ethhdr: h_dest: " << t_macstr_dest <<
"; h_source: " << t_macstr_source <<
"; h_proto: " << ntohs(t_eth_hdr->h_proto) );
311 LTRACE(
plog,
"Ethernet sizes (total, header, data): ???, " << ETH_HLEN <<
", ???" );
312 LTRACE(
plog,
"Ethernet mem addresses (packet/header, data): " << t_eth_hdr <<
", " << (
void*)( (
char*)t_eth_hdr + ETH_HLEN ) );
315 static const unsigned short t_eth_p_ip = htons(ETH_P_IP);
316 if( t_eth_hdr->h_proto != t_eth_p_ip )
318 LDEBUG(
plog,
"Non-IP packet skipped" );
329 iphdr* t_ip_hdr =
reinterpret_cast< iphdr*
>( (
char*)t_eth_hdr + ETH_HLEN );
332 char t_source_ip[16], t_dest_ip[16];
333 inet_ntop(AF_INET, &t_ip_hdr->saddr, t_source_ip, 16);
334 inet_ntop(AF_INET, &t_ip_hdr->daddr, t_dest_ip, 16);
335 LTRACE(
plog,
"IP header: version: " <<
unsigned(t_ip_hdr->version) <<
"; ihl: " <<
unsigned(t_ip_hdr->ihl) <<
"; tos: " << ntohs(t_ip_hdr->tos) <<
"; tot_len: " << ntohs(t_ip_hdr->tot_len) <<
"; protocol: " <<
unsigned(t_ip_hdr->protocol) <<
"; saddr: " << t_source_ip <<
"; daddr: " << t_dest_ip );
345 LTRACE(
plog,
"IP sizes (total, header, data): " << ntohs(t_ip_hdr->tot_len) <<
", " <<
unsigned(t_ip_hdr->ihl * 4) <<
", " << ntohs(t_ip_hdr->tot_len) - t_ip_hdr->ihl * 4 );
346 LTRACE(
plog,
"IP mem addresses(packet/header, data): " << t_ip_hdr <<
", " << (
void*)( (
char*)t_ip_hdr + t_ip_hdr->ihl * 4 ) );
348 if( t_ip_hdr->protocol != 17 )
350 LDEBUG(
plog,
"Non-UDP packet skipped" );
359 static const unsigned t_udp_hdr_len =
sizeof( udphdr );
361 udphdr* t_udp_hdr =
reinterpret_cast< udphdr*
>( (
char*)t_ip_hdr + t_ip_hdr->ihl * 4 );
363 LTRACE(
plog,
"UDP header: source port: " << ntohs(t_udp_hdr->source) <<
"; dest port: " << ntohs(t_udp_hdr->dest) <<
"; len: " << ntohs(t_udp_hdr->len) <<
"; check: " << ntohs(t_udp_hdr->check) );
369 if( ntohs(t_udp_hdr->dest) != f_port )
371 LDEBUG(
plog,
"Destination port is incorrect: expected " << f_port <<
" but got " << ntohs(t_udp_hdr->dest) );
375 size_t t_udp_data_len = ntohs(t_udp_hdr->len) - t_udp_hdr_len;
377 LTRACE(
plog,
"UDP sizes (total, header, data): " << ntohs(t_udp_hdr->len) <<
", " << t_udp_hdr_len <<
", " << t_udp_data_len );
378 LTRACE(
plog,
"UDP mem addresses (packet/header, data): " << t_udp_hdr <<
", " << (
void*)((
char*)t_udp_hdr + t_udp_hdr_len) );
381 t_mem_block->
resize( f_max_packet_size );
383 LTRACE(
plog,
"Packet received (" << t_udp_data_len <<
" bytes); block address is " << (
void*)t_mem_block->
block() );
385 LTRACE(
plog,
"Packet words: " << std::hex << strtoull((
char*)t_mem_block->
block(), NULL, 0) );
386 LTRACE(
plog,
"Packet bytes: " <<
unsigned(((
char*)t_mem_block->
block())[0]) <<
" " << unsigned(((
char*)t_mem_block->
block())[1]) <<
" " << unsigned(((
char*)t_mem_block->
block())[2]) );
390 ::memcpy( reinterpret_cast< void* >( t_mem_block->
block() ),
391 reinterpret_cast< void* >( (
char*)t_udp_hdr + t_udp_hdr_len ),
393 t_mem_block->set_n_bytes_used( t_udp_data_len );
402 LINFO(
plog,
"Turning off the FPA" );
412 LDEBUG(
plog,
"Unmapping mmap ring" );
418 LDEBUG(
plog,
"freeing f_rd" );
446 LDEBUG(
plog,
"Configuring packet_receiver_fpa with:\n" << a_config );
447 a_node->set_length( a_config.get_value(
"length", a_node->get_length() ) );
448 a_node->set_port( a_config.get_value(
"port", a_node->get_port() ) );
449 a_node->interface() = a_config.get_value(
"interface", a_node->interface() );
450 a_node->set_timeout_sec( a_config.get_value(
"timeout-sec", a_node->get_timeout_sec() ) );
451 a_node->set_n_blocks( a_config.get_value(
"n-blocks", a_node->get_n_blocks() ) );
452 a_node->set_block_size( a_config.get_value(
"block-size", a_node->get_block_size() ) );
453 a_node->set_frame_size( a_config.get_value(
"frame-size", a_node->get_frame_size() ) );
459 LDEBUG(
plog,
"Dumping configuration for packet_receiver_fpa" );
460 a_config.add(
"length", scarab::param_value( a_node->get_length() ) );
461 a_config.add(
"port", scarab::param_value( a_node->get_port() ) );
462 a_config.add(
"interface", scarab::param_value( a_node->interface() ) );
463 a_config.add(
"timeout-sec", scarab::param_value( a_node->get_timeout_sec() ) );
464 a_config.add(
"n-blocks", scarab::param_value( a_node->get_n_blocks() ) );
465 a_config.add(
"block-size", scarab::param_value( a_node->get_block_size() ) );
466 a_config.add(
"frame-size", scarab::param_value( a_node->get_frame_size() ) );
virtual void do_apply_config(packet_receiver_fpa *a_node, const scarab::param_node &a_config) const
packet_receiver_fpa_binding()
virtual void initialize()
Number of blocks per frame in the mmap ring buffer.
bool process_packet(tpacket3_hdr *a_packet)
virtual void do_dump_config(const packet_receiver_fpa *a_node, scarab::param_node &a_config) const
static scarab::logger plog("batch_executor")
virtual ~packet_receiver_fpa()
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A producer to receive UDP packets via the fast-packet-acquisition interface and write them as raw blo...
LOGGER(plog, "egg_writer")
int f_net_interface_index
virtual ~packet_receiver_fpa_binding()
virtual void execute(midge::diptera *a_midge=nullptr)
void resize(size_t a_n_bytes)
tpacket_hdr_v1 f_packet_hdr