Psyllid  v1.12.4
Project 8 Data Acquisisition Software
grab_packet.cc
Go to the documentation of this file.
1 /*
2  * grab_packet.cc
3  *
4  * Created on: Dec 22, 2016
5  * Author: N. Oblath
6  *
7  * Grabs and parses a ROACH packet
8  *
9  * Usage: > grab_packet [options]
10  *
11  * Parameters:
12  * - port: (uint) port number to listen on for packets
13  * - interface: (string) network interface name to listen on for packets; this is only needed if using the FPA receiver; default is "eth1"
14  * - ip: (string) IP address to listen on for packets; this is only needed if using the socket receiver; default is "127.0.0.1"
15  * - fpa: (null) Flag to request use of the FPA receiver; only valid on linux machines
16  */
17 
18 
19 #include "psyllid_error.hh"
20 
21 #include "freq_data.hh"
22 #include "time_data.hh"
23 #include "psyllid_version.hh"
24 
25 #include "application.hh"
26 #include "logger.hh"
27 #include "param.hh"
28 
29 #include "dripline_constants.hh" // for RETURN constants
30 
31 #include <arpa/inet.h>
32 #include <errno.h>
33 #include <memory>
34 #include <netinet/in.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <unistd.h>
39 #include <netdb.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42 
43 
44 using namespace psyllid;
45 
46 LOGGER( plog, "grab_packet" );
47 
48 
49 bool ProcessUnknownPacket( uint8_t* a_buffer );
50 bool ProcessROACHPacket( uint8_t* a_buffer );
51 
52 using namespace psyllid;
53 
54 int main( int argc, char** argv )
55 {
56  try
57  {
58  // The application
59  scarab::main_app the_main;
60 
61  // Default configuration
62  scarab::param_node t_default_config;
63  t_default_config.add( "n", scarab::param_value( 1U ) );
64  t_default_config.add( "ip", scarab::param_value( "127.0.0.1" ) );
65  t_default_config.add( "port", scarab::param_value( 23530U ) );
66  t_default_config.add( "interface", scarab::param_value( "eth1" ) );
67  t_default_config.add( "timeout", scarab::param_value( 10U ) );
68  t_default_config.add( "max-packet-size", scarab::param_value( 16384U ) );
69  t_default_config.add( "packet-type", scarab::param_value( "roach" ) );
70  the_main.default_config() = t_default_config;
71 
72  // Command line options
73  the_main.add_config_option< unsigned >( "-n,--n-packets", "n", "Number of packets to grab" );
74  the_main.add_config_option< std::string >( "--ip", "ip", "IP address from which to receive packets" );
75  the_main.add_config_option< unsigned >( "-p,--port", "port", "Port on which to receive packets" );
76  the_main.add_config_option< std::string >( "-i,--interface", "interface", "Ethernet interface to grab packets off of" );
77  the_main.add_config_option< unsigned >( "-t,--timeout", "timeout", "Timeout" );
78  the_main.add_config_option< unsigned >( "-m,--max-packet-size", "max-packet-size", "Maximum packet size" );
79  the_main.add_config_option< std::string >( "---packet-type", "packet-type", "Packet type" );
80 
81  // Package version
82  the_main.set_version( new psyllid::version() );
83 
84  // The main execution callback
85  the_main.callback( [&]() {
86  unsigned t_n_packets( the_main.master_config()["n"]().as_uint() );
87  std::string t_ip( the_main.master_config()["ip"]().as_string() );
88  unsigned t_port = the_main.master_config()["port"]().as_uint();
89  std::string t_interface( the_main.master_config()["interface"]().as_string() );
90  unsigned t_timeout_sec = the_main.master_config()["timeout"]().as_uint();
91  //bool t_use_fpa( the_main.master_config()["fpa"]().as_bool() );
92  unsigned t_max_packet_size = the_main.master_config()["max-packet-size"]().as_uint();
93  std::string t_packet_type( the_main.master_config()["packet-type"]().as_string() );
94 
95  bool (*t_proc_pkt_func)( uint8_t* ) = nullptr;
96  if( t_packet_type == "roach" )
97  {
98  t_proc_pkt_func = &ProcessROACHPacket;
99  LDEBUG( plog, "Processing packets as ROACH packets" );
100  }
101  else if( t_packet_type == "unknown" )
102  {
103  t_proc_pkt_func= &ProcessUnknownPacket;
104  LDEBUG( plog, "Processing packets as unknown packets" );
105  }
106  else
107  {
108  throw error() << "Unknown packet type supplied: " << t_packet_type;
109  }
110 
111 
112  LDEBUG( plog, "Opening UDP socket receiving at " << t_ip << ":" << t_port );
113 
114  //initialize address
115  socklen_t t_socket_length = sizeof(sockaddr_in);
116  sockaddr_in* t_address = new sockaddr_in();
117  ::memset( t_address, 0, t_socket_length );
118 
119  //prepare address
120  t_address->sin_family = AF_INET;
121  t_address->sin_addr.s_addr = inet_addr( t_ip.c_str() );
122  if( t_address->sin_addr.s_addr == INADDR_NONE )
123  {
124  throw error() << "Invalid IP address";
125  }
126  t_address->sin_port = htons( t_port );
127 
128  //MTLINFO( pmsg, "address prepared..." );
129 
130  //open socket
131  int t_socket = ::socket( AF_INET, SOCK_DGRAM, 0 );
132  if( t_socket < 0 )
133  {
134  throw error() << "Could not create socket:\n\t" << strerror( errno );
135  }
136 
137  /* setsockopt: Handy debugging trick that lets
138  * us rerun the udp_server immediately after we kill it;
139  * otherwise we have to wait about 20 secs.
140  * Eliminates "ERROR on binding: Address already in use" error.
141  */
142  int optval = 1;
143  ::setsockopt( t_socket, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int));
144 
145  // Receive timeout
146  if( t_timeout_sec > 0 )
147  {
148  struct timeval t_timeout;
149  t_timeout.tv_sec = t_timeout_sec;
150  t_timeout.tv_usec = 0; // Not init'ing this can cause strange errors
151  ::setsockopt( t_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&t_timeout, sizeof(struct timeval) );
152  }
153 
154  //msg_normal( pmsg, "socket open..." );
155 
156  //bind socket
157  if( ::bind( t_socket, (const sockaddr*) (t_address), t_socket_length ) < 0 )
158  {
159  throw error() << "Could not bind socket:\n\t" << strerror( errno );
160  }
161 
162 
163 
164  uint8_t* t_buffer = new uint8_t [t_max_packet_size];
165 
166  for( unsigned i_packet = 0; i_packet < t_n_packets; ++i_packet )
167  {
168  LINFO( plog, "Waiting for next packet" );
169  ssize_t t_size_received = 0;
170  while( t_size_received <= 0 )
171  {
172  t_size_received = ::recv( t_socket, (void*)t_buffer, t_max_packet_size, 0 );
173 
174  if( t_size_received > 0 )
175  {
176  LINFO( plog, "Packet received (" << t_size_received << " bytes)" );
177 
178  if( ! (*t_proc_pkt_func)( t_buffer ) )
179  {
180  LWARN( plog, "Packet not processed correctly" );
181  }
182  }
183  }
184  }
185 
186  //clean up udp_server address
187  if( t_address != nullptr )
188  {
189  delete t_address;
190  t_address = nullptr;
191  }
192 
193  //close udp_server socket
194  if( t_socket != 0 )
195  {
196  ::close( t_socket );
197  t_socket = 0;
198  }
199  } );
200 
201  // Parse CL options and run the application
202  CLI11_PARSE( the_main, argc, argv );
203 
204  return RETURN_SUCCESS;
205  }
206  catch( std::exception& e )
207  {
208  LERROR( plog, "Caught an exception: " << e.what() );
209  }
210  return RETURN_ERROR;
211 }
212 
213 
214 bool ProcessROACHPacket( uint8_t* a_buffer )
215 {
216  byteswap_inplace( reinterpret_cast< raw_roach_packet* >( a_buffer ) );
217  roach_packet* t_roach_packet = reinterpret_cast< roach_packet* >( a_buffer );
218 
219  // debug purposes only
220 #ifndef NDEBUG
221  raw_roach_packet* t_raw_packet = reinterpret_cast< raw_roach_packet* >( a_buffer );
222  LDEBUG( plog, "Raw packet header: " << std::hex << t_raw_packet->f_word_0 << ", " << t_raw_packet->f_word_1 << ", " << t_raw_packet->f_word_2 << ", " << t_raw_packet->f_word_3 );
223  LDEBUG( plog, "Raw packet data, first 8 bins, true order: " << (int)t_raw_packet->f_data[0] << ", " << (int)t_raw_packet->f_data[1] << "; " << (int)t_raw_packet->f_data[2] << ", " << (int)t_raw_packet->f_data[3] << "; " << (int)t_raw_packet->f_data[4] << ", " << (int)t_raw_packet->f_data[5] << "; " << (int)t_raw_packet->f_data[6] << ", " << (int)t_raw_packet->f_data[7] );
224 #endif
225 
226  LINFO( plog, "ROACH data received:\n"
227  " digital_id (channel) = " << t_roach_packet->f_digital_id << '\n' <<
228  " unix time = " << t_roach_packet->f_unix_time << '\n' <<
229  " pkt_batch = " << t_roach_packet->f_pkt_in_batch << '\n' <<
230  " freqNotTime = " << (int)t_roach_packet->f_freq_not_time << '\n' <<
231  " if id = " << t_roach_packet->f_if_id << '\n' <<
232  " first 8 bins (true order) = " << (int)t_roach_packet->f_data[ 0 ] << ", " << (int)t_roach_packet->f_data[ 1 ] << "; " << (int)t_roach_packet->f_data[ 2 ] << ", " << (int)t_roach_packet->f_data[ 3 ] << "; " << (int)t_roach_packet->f_data[ 4 ] << ", " << (int)t_roach_packet->f_data[ 5 ] << "; " << (int)t_roach_packet->f_data[ 6 ] << ", " << (int)t_roach_packet->f_data[ 7 ] );
233 /*
234  std::stringstream allbins;
235  allbins << "[ ";
236  for( unsigned i=0; i<8192; ++i )
237  {
238  allbins << (int)t_roach_packet->f_data[ i ];
239  if( i != 8191 ) allbins << ", ";
240  }
241  allbins << " ]";
242  LWARN( plog, "All bins:\n" << allbins.str() );
243 */
244  return true;
245 }
246 
247 bool ProcessUnknownPacket( uint8_t* /*a_buffer*/ )
248 {
249  return true;
250 }
251 
252 
static scarab::logger plog("batch_executor")
void byteswap_inplace(raw_roach_packet *a_pkt)
Definition: roach_packet.cc:22
int main(int argc, char **argv)
Definition: grab_packet.cc:54
bool ProcessROACHPacket(uint8_t *a_buffer)
Definition: grab_packet.cc:214
bool ProcessUnknownPacket(uint8_t *a_buffer)
Definition: grab_packet.cc:247
LOGGER(plog, "egg_writer")