Psyllid  v1.12.4
Project 8 Data Acquisisition Software
packet_receiver_fpa.cc
Go to the documentation of this file.
1 /*
2  * packet_receiver_fpa.cc
3  *
4  * Created on: Nov 2, 2016
5  * Author: nsoblath
6  */
7 
8 #include "packet_receiver_fpa.hh"
9 
10 #include "psyllid_error.hh"
11 
12 #include "logger.hh"
13 #include "param.hh"
14 
15 #include <arpa/inet.h>
16 #include <errno.h>
17 #include <linux/if_ether.h>
18 #include <linux/ip.h>
19 #include <linux/udp.h>
20 #include <net/if.h>
21 #include <netdb.h>
22 #include <poll.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/mman.h>
27 #include <unistd.h>
28 #include <sys/socket.h>
29 #include <sys/types.h>
30 
31 // debug
32 #ifndef NDEBUG
33 #include <netinet/ether.h>
34 #endif
35 
36 using midge::stream;
37 
38 namespace psyllid
39 {
40  REGISTER_NODE_AND_BUILDER( packet_receiver_fpa, "packet-receiver-fpa", packet_receiver_fpa_binding );
41 
42  LOGGER( plog, "packet_receiver_fpa" );
43 
45  f_length( 10 ),
46  f_max_packet_size( 1048576 ),
47  f_port( 23530 ),
48  f_interface( "eth1" ),
49  f_timeout_sec( 1 ),
50  f_n_blocks( 64 ),
51  f_block_size( 1 << 22 ),
52  f_frame_size( 1 << 14 ),
53  f_net_interface_index( 0 ),
54  f_socket( 0 ),
55  f_ring(),
56  f_packets_total( 0 ),
57  f_bytes_total( 0 )
58  {
59  }
60 
62  {
63  cleanup_fpa();
64  }
65 
67  {
68  out_buffer< 0 >().initialize( f_length );
69 
70  f_net_interface_index = if_nametoindex( f_interface.c_str() );
71  if( f_net_interface_index == 0 )
72  {
73  throw error() << "[packet_receiver_fpa] Unable to find index for interface <" << f_interface << ">";
74  }
75  LDEBUG( plog, "Identified net interface <" << f_interface << "> with index <" << f_net_interface_index << ">" );
76 
77  LDEBUG( plog, "Preparing fast-packet-acquisition on interface " << f_interface << " at UDP port " << f_port );
78 
79  // open socket
80  //int fd;
81  f_socket = ::socket( AF_PACKET, SOCK_RAW, htons(ETH_P_IP) );
82  if( f_socket < 0 )
83  {
84  throw error() << "Could not create socket:\n\t" << strerror( errno );
85  }
86 
87  int t_packet_ver = TPACKET_V3;
88  if( ::setsockopt( f_socket, SOL_PACKET, PACKET_VERSION, &t_packet_ver, sizeof(int) ) < 0 )
89  {
90  throw error() << "Could not set packet version:\n\t" << strerror( errno );
91  }
92 
93  // create the ring buffer
94  //f_ring = new receive_ring();
95  //::memset( f_ring, 0, sizeof(receive_ring) );
96  //::memset( &f_ring->f_req, 0, sizeof(tpacket_req3) );
97  LDEBUG( plog, "Ring buffer parameters:\n" <<
98  "block size: " << f_block_size << '\n' <<
99  "frame size: " << f_frame_size << '\n' <<
100  "number of blocks: " << f_n_blocks );
101  f_ring.f_req.tp_block_size = f_block_size;
102  f_ring.f_req.tp_frame_size = f_frame_size;
103  f_ring.f_req.tp_block_nr = f_n_blocks;
104  f_ring.f_req.tp_frame_nr = (f_block_size * f_n_blocks) / f_frame_size;
105  f_ring.f_req.tp_retire_blk_tov = 60;
106  f_ring.f_req.tp_feature_req_word = TP_FT_REQ_FILL_RXHASH;
107 
108  //LWARN( plog, "f_ring = " << f_ring );
109 #ifndef NDEBUG
110  bool test = f_ring.f_rd == nullptr;
111  LTRACE( plog, "f_ring.f_rd == nullptr: " << test );
112  test = f_ring.f_map == nullptr;
113  LTRACE( plog, "f_ring.f_map == nullptr: " << test );
114 #endif
115  LTRACE( plog, "f_ring.f_req.tp_block_size = " << f_ring.f_req.tp_block_size );
116 
117  LDEBUG( plog, "Opening packet_eater for network interface <" << f_interface << ">" );
118 
119  LTRACE( plog, "f_socket = " << f_socket << "; SOL_PACKET = " << SOL_PACKET << "; PACKET_RX_RING = " << PACKET_RX_RING << "; &f_ring.f_req = " << &f_ring.f_req << "; sizeof(f_ring.f_req) = " << sizeof(f_ring.f_req) );
120  if( ::setsockopt( f_socket, SOL_PACKET, PACKET_RX_RING, &f_ring.f_req, sizeof(f_ring.f_req) ) < 0 )
121  {
122  throw error() << "Could not set receive ring:\n\t" << strerror( errno );
123  }
124 
125  /* setsockopt: Handy debugging trick that lets
126  * us rerun the udp_server immediately after we kill it;
127  * otherwise we have to wait about 20 secs.
128  * Eliminates "ERROR on binding: Address already in use" error.
129  */
130  //int optval = 1;
131  //::setsockopt( f_socket, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int));
132 
133  // Receive timeout
134  if( f_timeout_sec > 0 )
135  {
136  timeval t_timeout;
137  t_timeout.tv_sec = f_timeout_sec;
138  t_timeout.tv_usec = 0; // Not init'ing this can cause strange errors
139  ::setsockopt( f_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&t_timeout, sizeof(struct timeval) );
140  }
141 
142  // finish preparing the ring
143  f_ring.f_map = (uint8_t*)::mmap( nullptr, f_ring.f_req.tp_block_size * f_ring.f_req.tp_block_nr,
144  PROT_READ | PROT_WRITE, MAP_SHARED | MAP_LOCKED, f_socket, 0);
145  if( f_ring.f_map == MAP_FAILED )
146  {
147  throw error() << "Unable to setup ring map";
148  }
149 
150  f_ring.f_rd = (iovec*)::malloc(f_ring.f_req.tp_block_nr * sizeof(*f_ring.f_rd) );
151  if( f_ring.f_rd == nullptr )
152  {
153  throw error() << "Unable to allocate memory for the ring";
154  }
155  for( unsigned i_block = 0; i_block < f_ring.f_req.tp_block_nr; ++i_block )
156  {
157  f_ring.f_rd[ i_block ].iov_base = f_ring.f_map + ( i_block * f_ring.f_req.tp_block_size );
158  f_ring.f_rd[ i_block ].iov_len = f_ring.f_req.tp_block_size;
159  }
160 
161  // initialize the address
162  sockaddr_ll t_address;// = new sockaddr_ll();
163  ::memset( &t_address, 0, sizeof(sockaddr_ll) );
164  t_address.sll_family = PF_PACKET;
165  t_address.sll_protocol = htons(ETH_P_IP);
166  t_address.sll_ifindex = f_net_interface_index;
167  t_address.sll_hatype = 0;
168  t_address.sll_pkttype = 0;
169  t_address.sll_halen = 0;
170 
171  if( ::bind( f_socket, (sockaddr*)&t_address, sizeof(t_address) ) < 0 )
172  {
173  throw error() << "Could not bind socket:\n\t" << strerror( errno );
174  }
175 
176  LINFO( plog, "Ready to consume packets on interface <" << f_interface << ">" );
177 
178  return;
179  }
180 
181  void packet_receiver_fpa::execute( midge::diptera* a_midge )
182  {
183  try
184  {
185  LDEBUG( plog, "Executing the packet_receiver_fpa" );
186 
187  //memory_block* t_mem_block = nullptr;
188 
189  // Setup the polling file descriptor struct
190  pollfd t_pollfd;
191  ::memset( &t_pollfd, 0, sizeof(pollfd) );
192  t_pollfd.fd = f_socket;
193  t_pollfd.events = POLLIN | POLLERR;
194  t_pollfd.revents = 0;
195 
196  unsigned t_block_num = 0;
197  block_desc* t_block = nullptr;
198 
199  unsigned t_timeout_msec = 1000 * f_timeout_sec;
200 
201  //LDEBUG( plog, "Server is listening" );
202 
203  std::unique_ptr< char[] > t_buffer_ptr( new char[ f_max_packet_size ] );
204 
205  if( ! out_stream< 0 >().set( stream::s_start ) ) return;
206 
207  LINFO( plog, "Starting main loop; waiting for packets" );
208  while( ! is_canceled() )
209  {
210 
211  if( (out_stream< 0 >().get() == stream::s_stop) )
212  {
213  LWARN( plog, "Output stream(s) have stop condition" );
214  break;
215  }
216 
217  // get the next block
218  t_block = (block_desc *) f_ring.f_rd[ t_block_num ].iov_base;
219 
220  // make sure the next block has been made available to the user
221  if( ( t_block->f_packet_hdr.block_status & TP_STATUS_USER ) == 0 )
222  {
223  // next block isn't available yet, so poll until it is, with the specified timeout
224  // timeout or successful poll will go back to the top of the loop
225  poll( &t_pollfd, 1, t_timeout_msec );
226  continue;
227  }
228 
229  // we have a block available, so process it
230  unsigned t_num_pkts = t_block->f_packet_hdr.num_pkts;
231  unsigned long t_bytes = 0;
232 
233  tpacket3_hdr* t_packet = reinterpret_cast< tpacket3_hdr* >( (uint8_t*)t_block + t_block->f_packet_hdr.offset_to_first_pkt );
234 
235  LTRACE( plog, "Walking a block with " << t_num_pkts << " packets" );
236  for( unsigned i = 0; i < t_num_pkts; ++i )
237  {
238  t_bytes += t_packet->tp_snaplen;
239 
240  LTRACE( plog, "Processing IP packet; current iterator index is <" << out_stream< 0 >().get_current_index() << ">" );
241  if( process_packet( t_packet ) )
242  {
243  LTRACE( plog, "UDP packet processed; outputing to stream index <" << out_stream< 0 >().get_current_index() << ">" );
244  if( ! out_stream< 0 >().set( stream::s_run ) )
245  {
246  LERROR( plog, "Exiting due to stream error" );
247  break;
248  }
249  }
250  else
251  {
252  LTRACE( plog, "Packet was not processed properly; skipping" );
253  }
254 
255  // update the address of the packet to the next packet in the block
256  t_packet = reinterpret_cast< tpacket3_hdr* >( (uint8_t*)t_packet + t_packet->tp_next_offset );
257  }
258  LTRACE( plog, "Done walking block" );
259 
260  f_packets_total += t_num_pkts;
261  f_bytes_total += t_bytes;
262 
263  // return the block to the kernel; we're done with it
264  t_block->f_packet_hdr.block_status = TP_STATUS_KERNEL;
265  t_block_num = ( t_block_num + 1 ) % f_n_blocks;
266 
267  }
268 
269  LINFO( plog, "Packet receiver is exiting" );
270 
271  // normal exit condition
272  LDEBUG( plog, "Stopping output streams" );
273  if( ! out_stream< 0 >().set( stream::s_stop ) ) return;
274 
275  LDEBUG( plog, "Exiting output streams" );
276  out_stream< 0 >().set( stream::s_exit );
277 
278  return;
279  }
280  catch(...)
281  {
282  if( a_midge ) a_midge->throw_ex( std::current_exception() );
283  else throw;
284  }
285  }
286 
287  bool packet_receiver_fpa::process_packet( tpacket3_hdr* a_packet )
288  {
289  //printf("rxhash: 0x%x\n", a_packet->hv1.tp_rxhash);
290 
291  //****************
292  // Ethernet Packet
293  //****************
294 
295  // grab the ethernet interface header (defined in if_ether.h)
296  ethhdr* t_eth_hdr = reinterpret_cast< ethhdr* >( (uint8_t*)a_packet + a_packet->tp_mac );
297 
298 #ifndef NDEBUG
299  char t_macstr_dest[3*ETH_ALEN], t_macstr_source[3*ETH_ALEN];
300  ether_ntoa_r((struct ether_addr*)&(t_eth_hdr->h_dest), t_macstr_dest);
301  ether_ntoa_r((struct ether_addr*)&(t_eth_hdr->h_source), t_macstr_source);
302  LTRACE( plog, "ethhdr: h_dest: " << t_macstr_dest << "; h_source: " << t_macstr_source << "; h_proto: " << ntohs(t_eth_hdr->h_proto) );
303 #endif
304  //LWARN( plog, "Ethernet header: " << t_eth_p_ip << "; htons(ETH_P_IP): " << htons(ETH_P_IP) << "; t_eth_hdr->h_proto: " << t_eth_hdr->h_proto );
305  //for (int i=0; i<50; ++i)
306  //{
307  // printf("%02X", ((char*)a_packet)[i]);
308  //}
309  //printf("\n");
310 
311  LTRACE( plog, "Ethernet sizes (total, header, data): ???, " << ETH_HLEN << ", ???" );
312  LTRACE( plog, "Ethernet mem addresses (packet/header, data): " << t_eth_hdr << ", " << (void*)( (char*)t_eth_hdr + ETH_HLEN ) );
313 
314  // filter only IP packets
315  static const unsigned short t_eth_p_ip = htons(ETH_P_IP);
316  if( t_eth_hdr->h_proto != t_eth_p_ip )
317  {
318  LDEBUG( plog, "Non-IP packet skipped" );
319  return false;
320  }
321 
322 
323  //**********
324  // IP Packet
325  //**********
326 
327  //LWARN( plog, "Handling IP packet" );
328  // grab the ip interface header (defined in ip.h)
329  iphdr* t_ip_hdr = reinterpret_cast< iphdr* >( (char*)t_eth_hdr + ETH_HLEN );
330 
331 #ifndef NDEBUG
332  char t_source_ip[16], t_dest_ip[16];
333  inet_ntop(AF_INET, &t_ip_hdr->saddr, t_source_ip, 16);
334  inet_ntop(AF_INET, &t_ip_hdr->daddr, t_dest_ip, 16);
335  LTRACE( plog, "IP header: version: " << unsigned(t_ip_hdr->version) << "; ihl: " << unsigned(t_ip_hdr->ihl) << "; tos: " << ntohs(t_ip_hdr->tos) << "; tot_len: " << ntohs(t_ip_hdr->tot_len) << "; protocol: " << unsigned(t_ip_hdr->protocol) << "; saddr: " << t_source_ip << "; daddr: " << t_dest_ip );
336 #endif
337 
338  //TODO: filter on source address?
339  //uint32_t t_source_address = t_ip_hdr->saddr;
340 
341  // get ip packet data
342  //char* t_ip_data = (char*)t_ip_hdr + t_ip_hdr->ihl * 4;
343  //size_t t_ip_data_len = (uint8_t)htons(t_ip_hdr->tot_len) - t_ip_header_bytes;
344 
345  LTRACE( plog, "IP sizes (total, header, data): " << ntohs(t_ip_hdr->tot_len) << ", " << unsigned(t_ip_hdr->ihl * 4) << ", " << ntohs(t_ip_hdr->tot_len) - t_ip_hdr->ihl * 4 );
346  LTRACE( plog, "IP mem addresses(packet/header, data): " << t_ip_hdr << ", " << (void*)( (char*)t_ip_hdr + t_ip_hdr->ihl * 4 ) );
347 
348  if( t_ip_hdr->protocol != 17 )
349  {
350  LDEBUG( plog, "Non-UDP packet skipped" );
351  return false;
352  }
353 
354  //***********
355  // UDP Packet
356  //***********
357 
358  // this doesn't appear to be defined anywhere in standard linux headers, at least as far as I could find
359  static const unsigned t_udp_hdr_len = sizeof( udphdr );
360 
361  udphdr* t_udp_hdr = reinterpret_cast< udphdr* >( (char*)t_ip_hdr + t_ip_hdr->ihl * 4 );
362 
363  LTRACE( plog, "UDP header: source port: " << ntohs(t_udp_hdr->source) << "; dest port: " << ntohs(t_udp_hdr->dest) << "; len: " << ntohs(t_udp_hdr->len) << "; check: " << ntohs(t_udp_hdr->check) );
364 
365  // get port number
366  //unsigned t_port = ntohs(t_udp_hdr->dest);
367 
368  // check port number against configured port
369  if( ntohs(t_udp_hdr->dest) != f_port )
370  {
371  LDEBUG( plog, "Destination port is incorrect: expected " << f_port << " but got " << ntohs(t_udp_hdr->dest) );
372  return false;
373  }
374 
375  size_t t_udp_data_len = ntohs(t_udp_hdr->len) - t_udp_hdr_len;
376 
377  LTRACE( plog, "UDP sizes (total, header, data): " << ntohs(t_udp_hdr->len) << ", " << t_udp_hdr_len << ", " << t_udp_data_len );
378  LTRACE( plog, "UDP mem addresses (packet/header, data): " << t_udp_hdr << ", " << (void*)((char*)t_udp_hdr + t_udp_hdr_len) );
379 
380  memory_block* t_mem_block = out_stream< 0 >().data();
381  t_mem_block->resize( f_max_packet_size );
382 
383  LTRACE( plog, "Packet received (" << t_udp_data_len << " bytes); block address is " << (void*)t_mem_block->block() );
384 
385  LTRACE( plog, "Packet words: " << std::hex << strtoull((char*)t_mem_block->block(), NULL, 0) );
386  LTRACE( plog, "Packet bytes: " << unsigned(((char*)t_mem_block->block())[0]) << " " << unsigned(((char*)t_mem_block->block())[1]) << " " << unsigned(((char*)t_mem_block->block())[2]) );
387 
388  // copy the UPD packet from the IP packet into the appropriate buffer
389  //uint8_t* t_udp_data = (uint8_t*)t_udp_hdr + t_udp_hdr_len;
390  ::memcpy( reinterpret_cast< void* >( t_mem_block->block() ),
391  reinterpret_cast< void* >( (char*)t_udp_hdr + t_udp_hdr_len ),
392  t_udp_data_len );
393  t_mem_block->set_n_bytes_used( t_udp_data_len );
394 
395  return true;
396  }
397 
399  {
400  out_buffer< 0 >().finalize();
401 
402  LINFO( plog, "Turning off the FPA" );
403  cleanup_fpa();
404 
405  return;
406  }
407 
409  {
410  if( f_ring.f_map != nullptr )
411  {
412  LDEBUG( plog, "Unmapping mmap ring" );
413  ::munmap(f_ring.f_map, f_ring.f_req.tp_block_size * f_ring.f_req.tp_block_nr);
414  f_ring.f_map = nullptr;
415  }
416  if( f_ring.f_rd != nullptr )
417  {
418  LDEBUG( plog, "freeing f_rd" );
419  ::free( f_ring.f_rd );
420  f_ring.f_rd = nullptr;
421  }
422 
423  // close socket
424  if( f_socket != 0 )
425  {
426  ::close( f_socket );
427  f_socket = 0;
428  }
429 
430  return;
431  }
432 
433 
434 
437  {
438  }
439 
441  {
442  }
443 
444  void packet_receiver_fpa_binding::do_apply_config( packet_receiver_fpa* a_node, const scarab::param_node& a_config ) const
445  {
446  LDEBUG( plog, "Configuring packet_receiver_fpa with:\n" << a_config );
447  a_node->set_length( a_config.get_value( "length", a_node->get_length() ) );
448  a_node->set_port( a_config.get_value( "port", a_node->get_port() ) );
449  a_node->interface() = a_config.get_value( "interface", a_node->interface() );
450  a_node->set_timeout_sec( a_config.get_value( "timeout-sec", a_node->get_timeout_sec() ) );
451  a_node->set_n_blocks( a_config.get_value( "n-blocks", a_node->get_n_blocks() ) );
452  a_node->set_block_size( a_config.get_value( "block-size", a_node->get_block_size() ) );
453  a_node->set_frame_size( a_config.get_value( "frame-size", a_node->get_frame_size() ) );
454  return;
455  }
456 
457  void packet_receiver_fpa_binding::do_dump_config( const packet_receiver_fpa* a_node, scarab::param_node& a_config ) const
458  {
459  LDEBUG( plog, "Dumping configuration for packet_receiver_fpa" );
460  a_config.add( "length", scarab::param_value( a_node->get_length() ) );
461  a_config.add( "port", scarab::param_value( a_node->get_port() ) );
462  a_config.add( "interface", scarab::param_value( a_node->interface() ) );
463  a_config.add( "timeout-sec", scarab::param_value( a_node->get_timeout_sec() ) );
464  a_config.add( "n-blocks", scarab::param_value( a_node->get_n_blocks() ) );
465  a_config.add( "block-size", scarab::param_value( a_node->get_block_size() ) );
466  a_config.add( "frame-size", scarab::param_value( a_node->get_frame_size() ) );
467  return;
468  }
469 
470 } /* namespace psyllid */
virtual void do_apply_config(packet_receiver_fpa *a_node, const scarab::param_node &a_config) const
virtual void initialize()
Number of blocks per frame in the mmap ring buffer.
bool process_packet(tpacket3_hdr *a_packet)
virtual void do_dump_config(const packet_receiver_fpa *a_node, scarab::param_node &a_config) const
static scarab::logger plog("batch_executor")
REGISTER_NODE_AND_BUILDER(data_producer, "data-producer", data_producer_binding)
A producer to receive UDP packets via the fast-packet-acquisition interface and write them as raw blo...
LOGGER(plog, "egg_writer")
virtual void execute(midge::diptera *a_midge=nullptr)
void resize(size_t a_n_bytes)
Definition: memory_block.cc:27
tpacket_hdr_v1 f_packet_hdr