Psyllid  v1.12.4
Project 8 Data Acquisisition Software
stream_manager.cc
Go to the documentation of this file.
1 /*
2  * stream_manager.cc
3  *
4  * Created on: Jan 5, 2017
5  * Author: obla999
6  */
7 
8 #include "stream_manager.hh"
9 
10 #include "node_builder.hh"
11 #include "psyllid_error.hh"
12 #include "stream_preset.hh"
13 
14 #include "node.hh"
15 
16 #include "logger.hh"
17 
18 #include <boost/algorithm/string/replace.hpp>
19 
20 #include <utility>
21 
22 using scarab::param_ptr_t;
23 using scarab::param;
24 using scarab::param_array;
25 using scarab::param_node;
26 using scarab::param_value;
27 
28 namespace psyllid
29 {
30  LOGGER( plog, "stream_manager" );
31 
33  f_streams(),
34  f_manager_mutex(),
35  f_midge(),
36  f_node_bindings(),
37  f_must_reset_midge( true ),
38  f_midge_mutex()
39  {
40  }
41 
43  {
44  for( streams_t::iterator t_stream_it = f_streams.begin(); t_stream_it != f_streams.end(); ++t_stream_it )
45  {
46  for( stream_template::nodes_t::iterator t_node_it = t_stream_it->second.f_nodes.begin(); t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
47  {
48  delete t_node_it->second;
49  t_node_it->second = nullptr;
50  }
51  }
52 
54  }
55 
56  bool stream_manager::initialize( const param_node& a_config )
57  {
58  for( param_node::const_iterator t_str_conf_it = a_config.begin(); t_str_conf_it != a_config.end(); ++t_str_conf_it )
59  {
60  if( ! t_str_conf_it->is_node() )
61  {
62  LERROR( plog, "Invalid stream configuration" );
63  return false;
64  }
65  if( ! add_stream( t_str_conf_it.name(), t_str_conf_it->as_node() ) )
66  {
67  LERROR( plog, "Something went wrong while adding a stream" );
68  return false;
69  }
70  }
71  return true;
72  }
73 
74  bool stream_manager::add_stream( const std::string& a_name, const param_node& a_node )
75  {
76  // do not need to lock the mutex here because we're not doing anything to the stream_manager until inside _add_stream()
77  try
78  {
79  _add_stream( a_name, a_node );
80  return true;
81  }
82  catch( std::exception& e )
83  {
84  LERROR( plog, e.what() );
85  return false;
86  }
87  }
88 
89  void stream_manager::remove_stream( const std::string& a_name )
90  {
91  // do not need to lock the mutex here because we're not doing anything to the stream_manager until inside _remove_stream()
92  try
93  {
94  return _remove_stream( a_name );
95  }
96  catch( std::exception& e )
97  {
98  LWARN( plog, e.what() );
99  return;
100  }
101  }
102 
103  bool stream_manager::configure_node( const std::string& a_stream_name, const std::string& a_node_name, const param_node& a_config )
104  {
105  try
106  {
107  _configure_node( a_stream_name, a_node_name, a_config );
108  return true;
109  }
110  catch( std::exception& e )
111  {
112  LWARN( plog, "Unable to configure node <" << a_stream_name << "." << a_node_name << ">: " << e.what() );
113  return false;
114  }
115  }
116 
117  bool stream_manager::dump_node_config( const std::string& a_stream_name, const std::string& a_node_name, param_node& a_config ) const
118  {
119  try
120  {
121  _dump_node_config( a_stream_name, a_node_name, a_config );
122  return true;
123  }
124  catch( std::exception& e )
125  {
126  LWARN( plog, "Unable to dump node config <" << a_stream_name << "." << a_node_name << ">: " << e.what() );
127  return false;
128  }
129  }
130 
131  void stream_manager::_configure_node( const std::string& a_stream_name, const std::string& a_node_name, const param_node& a_config )
132  {
133  std::unique_lock< std::mutex > t_lock( f_manager_mutex );
134 
135  streams_t::iterator t_stream_it = f_streams.find( a_stream_name );
136  if( t_stream_it == f_streams.end() )
137  {
138  throw error() << "Did not find stream <" << a_stream_name << ">";
139  }
140 
141  stream_template::nodes_t::iterator t_node_it = t_stream_it->second.f_nodes.find( a_node_name );
142  if( t_node_it == t_stream_it->second.f_nodes.end() )
143  {
144  throw error() << "Did not find node <" << a_node_name << "> in stream <" << a_stream_name << ">";
145  }
146 
147  t_node_it->second->configure_builder( a_config );
148 
149  return;
150  }
151 
152  void stream_manager::_dump_node_config( const std::string& a_stream_name, const std::string& a_node_name, param_node& a_config ) const
153  {
154  std::unique_lock< std::mutex > t_lock( f_manager_mutex );
155 
156  streams_t::const_iterator t_stream_it = f_streams.find( a_stream_name );
157  if( t_stream_it == f_streams.end() )
158  {
159  throw error() << "Did not find stream <" << a_stream_name << ">";
160  }
161 
162  stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.find( a_node_name );
163  if( t_node_it == t_stream_it->second.f_nodes.end() )
164  {
165  throw error() << "Did not find node <" << a_node_name << "> in stream <" << a_stream_name << ">";
166  }
167 
168  t_node_it->second->dump_builder_config( a_config );
169 
170  return;
171 
172  }
173 
174 
175  void stream_manager::_add_stream( const std::string& a_name, const param_node& a_node )
176  {
177  // do not need to lock the mutex here because we're not doing anything to the stream_manager until inside _add_stream( string, param_node )
178 
179  try
180  {
181  if( ! a_node.has( "preset" ) )
182  {
183  throw error() << "No preset specified";
184  }
185  const param& t_preset_param = a_node["preset"];
186 
187  if( t_preset_param.is_node() )
188  {
189  const param_node& t_preset_param_node = t_preset_param.as_node();
190  if( ! runtime_stream_preset::add_preset( t_preset_param_node ) )
191  {
192  throw error() << "Runtime preset could not be added";
193  }
194 
195  // "name" is guaranteed to be there by the successful completion of runtime_stream_preset::add_preset
196  return _add_stream( a_name, t_preset_param_node["type"]().as_string(), a_node );
197  }
198  else if( t_preset_param.is_value() )
199  {
200  return _add_stream( a_name, t_preset_param().as_string(), a_node );
201  }
202  else
203  {
204  throw error() << "Invalid preset specification";
205  }
206  }
207  catch( std::exception& e )
208  {
209  throw;
210  }
211  }
212 
213  void stream_manager::_add_stream( const std::string& a_name, const std::string& a_type, const param_node& a_node )
214  {
215  // do not need to lock the mutex here because we're not doing anything to the stream_manager until later
216 
217  stream_preset* t_preset = scarab::factory< stream_preset, const std::string& >::get_instance()->create( a_type, a_type );
218 
219  if( t_preset == nullptr )
220  {
221  throw error() << "Unable to create preset called <" << a_name << "> of type <" << a_type << ">. The type may not be registered or there may be a typo.";
222  }
223 
224  // lock the mutex here so that we know which stream name this will be if it succeeds
225  std::unique_lock< std::mutex > t_lock( f_manager_mutex );
226 
227  if( f_streams.find( a_name ) != f_streams.end() )
228  {
229  throw error() << "Already have a stream called <" << a_name << ">";
230  }
231 
232  LINFO( plog, "Preparing stream <" << a_name << ">");
233 
234  stream_template t_stream;
235 
236  typedef stream_preset::nodes_t preset_nodes_t;
237  const preset_nodes_t& t_new_nodes = t_preset->get_nodes();
238  std::map< std::string, std::string > t_name_replacements;
239  for( preset_nodes_t::const_iterator t_node_it = t_new_nodes.begin(); t_node_it != t_new_nodes.end(); ++t_node_it )
240  {
241  std::stringstream t_nn_str;
242  t_nn_str << a_name << "_" << t_node_it->first;
243  std::string t_node_name = t_nn_str.str();
244  t_name_replacements[ t_node_it->first ] = t_node_name;
245 
246  LDEBUG( plog, "Creating node of type <" << t_node_it->second << "> called <" << t_node_name << ">" );
247  node_builder* t_builder = scarab::factory< node_builder >::get_instance()->create( t_node_it->second );
248  if( t_builder == nullptr )
249  {
250  throw error() << "Cannot find binding for node type <" << t_node_it->second << ">";
251  }
252 
253  t_builder->name() = t_node_name;
254 
255  // setup the node config
256  param_node t_node_config;
257  // first get the node-specific config, if present (refer to it by its original name, not t_node_name)
258  if( a_node.has( t_node_it->first ) ) t_node_config.merge( a_node[t_node_it->first].as_node() );
259  // add stream-wide config data to the node config
260  if( a_node.has( "device" ) ) t_node_config.add( "device", a_node["device"].as_node() );
261  // pass the configuration to the builder
262  t_builder->configure_builder( t_node_config );
263 
264  t_stream.f_nodes.insert( stream_template::nodes_t::value_type( t_node_it->first, t_builder ) );
265  }
266 
267  typedef stream_preset::connections_t preset_conn_t;
268  const preset_conn_t& t_new_connections = t_preset->get_connections();
269  for( preset_conn_t::const_iterator t_conn_it = t_new_connections.begin(); t_conn_it != t_new_connections.end(); ++t_conn_it )
270  {
271  std::string t_connection( *t_conn_it );
272  for( std::map< std::string, std::string >::const_iterator t_it = t_name_replacements.begin(); t_it != t_name_replacements.end(); ++t_it )
273  {
274  boost::replace_all( t_connection, t_it->first, t_it->second );
275  }
276  LDEBUG( plog, "Adding connection: " << t_connection );
277  t_stream.f_connections.insert( t_connection );
278  }
279 
280  // add the new stream to the vector of streams
281  f_must_reset_midge = true;
282  f_streams.insert( streams_t::value_type( a_name, t_stream ) );
283  LDEBUG( plog, "Added stream <" << a_name << ">" );
284  return;
285  }
286 
287  void stream_manager::_remove_stream( const std::string& a_name )
288  {
289  std::unique_lock< std::mutex > t_lock( f_manager_mutex );
290 
291  // delete node_builder objects
292  streams_t::iterator t_to_erase = f_streams.find( a_name );
293  if( t_to_erase == f_streams.end() )
294  {
295  throw error() << "Stream <" << a_name << "> does not exist";
296  }
297 
298  f_must_reset_midge = true;
299 
300  for( stream_template::nodes_t::iterator t_node_it = t_to_erase->second.f_nodes.begin(); t_node_it != t_to_erase->second.f_nodes.end(); ++t_node_it )
301  {
302  delete t_node_it->second;
303  t_node_it->second = nullptr;
304  }
305 
306  f_streams.erase( t_to_erase );
307 
308  return;
309  }
310 
311 
312 
314  {
315  std::unique_lock< std::mutex > t_mgr_lock( f_manager_mutex );
316 
317  if( f_streams.empty() )
318  {
319  throw error() << "No streams have been setup";
320  }
321 
322  f_must_reset_midge = true;
323 
324  std::unique_lock< std::mutex > t_midge_lock( f_midge_mutex );
325  f_midge.reset( new midge::diptera() );
327 
328  for( streams_t::const_iterator t_stream_it = f_streams.begin(); t_stream_it != f_streams.end(); ++t_stream_it )
329  {
330  for( stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.begin(); t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
331  {
332  midge::node* t_new_node = t_node_it->second->build();
333 
334  try
335  {
336  LINFO( plog, "Adding node <" << t_node_it->first << ">" );
337  f_midge->add( t_new_node );
338 
339  node_binding* t_new_binding = t_node_it->second->binding().clone();
340  LDEBUG( plog, "Adding new node binding for node <" << t_node_it->second->name() << ">");
341  f_node_bindings[ t_node_it->second->name() ] = std::make_pair( t_new_binding, t_new_node );
342  }
343  catch( std::exception& e )
344  {
346  delete t_new_node;
347  throw error() << "Unable to add processor <" << t_node_it->first << ">: " << e.what();
348  }
349  }
350 
351  // Then deal with connections
352  for( stream_template::connections_t::const_iterator t_conn_it = t_stream_it->second.f_connections.begin(); t_conn_it != t_stream_it->second.f_connections.end(); ++t_conn_it )
353  {
354  try
355  {
356  LINFO( plog, "Adding connection <" << *t_conn_it << ">" );
357  f_midge->join( *t_conn_it );
358  }
359  catch( std::exception& e )
360  {
361  throw error() << "Unable to join nodes: " << e.what();
362  }
363 
364  LINFO( plog, "Node connection made: <" << *t_conn_it << ">" );
365  }
366  }
367 
368  f_must_reset_midge = false;
369  return;
370  }
371 
373  {
374  if( f_must_reset_midge )
375  {
376  reset_midge();
377  }
379  }
380 
382  {
383  midge_package t_returned( std::move( a_midge ) );
384  t_returned.unlock();
385  f_must_reset_midge = true;
386  return;
387  }
388 
390  {
391  std::unique_lock< std::mutex > t_lock( f_manager_mutex );
392 
393  std::string t_run_str;
394  for( streams_t::const_iterator t_stream_it = f_streams.begin(); t_stream_it != f_streams.end(); ++t_stream_it )
395  {
396  stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.begin();
397  t_run_str = t_stream_it->first + "_" + t_node_it->first;
398  for( ++t_node_it; t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
399  {
400  t_run_str += midge::diptera::separator() + t_stream_it->first + "_" + t_node_it->first;
401  }
402  }
403  return t_run_str;
404  }
405 
407  {
408  LERROR( plog, "Checking if manager mutex is in use" );
409  if( f_manager_mutex.try_lock() )
410  {
411  f_manager_mutex.unlock();
412  return false;
413  }
414  else return true;
415  }
416 
418  {
419  LDEBUG( plog, "Clearing node bindings" );
420  for( active_node_bindings::iterator t_it = f_node_bindings.begin(); t_it != f_node_bindings.end(); ++t_it )
421  {
422  delete t_it->second.first;
423  t_it->second.first = nullptr;
424  }
425  f_node_bindings.clear();
426  return;
427  }
428 
429  dripline::reply_ptr_t stream_manager::handle_add_stream_request( const dripline::request_ptr_t a_request )
430  {
431  if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has( "name" ) || ! a_request->payload().as_node().has( "config" ) )
432  {
433  return a_request->reply( dripline::dl_message_error_bad_payload(), "Add-stream request is missing either \"name\" or \"config\"" );
434  }
435 
436  try
437  {
438  add_stream( a_request->payload()["name"]().as_string(), a_request->payload()["config"].as_node() );
439  }
440  catch( std::exception& e )
441  {
442  return a_request->reply( dripline::dl_warning_no_action_taken(), e.what() );
443  }
444 
445  return a_request->reply( dripline::dl_success(), "Stream " + a_request->payload()["name"]().as_string() + " has been added" );
446  }
447 
448  dripline::reply_ptr_t stream_manager::handle_remove_stream_request( const dripline::request_ptr_t a_request )
449  {
450  if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has( "values" ) )
451  {
452  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform remove-stream: values array is missing" );
453  }
454  if( ! a_request->payload()["values"].is_array() )
455  {
456  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform remove-stream: values must be an array" );
457  }
458  const param_array t_values_array = a_request->payload()["values"].as_array();
459  if( t_values_array.empty() || ! t_values_array[0].is_value() )
460  {
461  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform remove-stream: \"values\" is not an array, or the array is empty, or the first element in the array is not a value" );
462  }
463 
464  try
465  {
466  _remove_stream( t_values_array[0]().as_string() );
467  }
468  catch( error& e )
469  {
470  return a_request->reply( dripline::dl_warning_no_action_taken(), e.what() );
471  }
472 
473  return a_request->reply( dripline::dl_success(), "Stream " + t_values_array[0]().as_string() + " has been removed" );
474  }
475 
476  dripline::reply_ptr_t stream_manager::handle_configure_node_request( const dripline::request_ptr_t a_request )
477  {
478  if( a_request->parsed_specifier().size() < 2 )
479  {
480  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: node-config.[stream].[node] or node-config.[stream].[node].[parameter]" );
481  }
482 
483  //size_t t_rks_size = a_request->parsed_rks().size();
484 
485  std::string t_target_stream = a_request->parsed_specifier().front();
486  a_request->parsed_specifier().pop_front();
487 
488  std::string t_target_node = a_request->parsed_specifier().front();
489  a_request->parsed_specifier().pop_front();
490 
491  param_ptr_t t_payload_ptr( new param_node() );
492  param_node& t_payload = t_payload_ptr->as_node();
493 
494  if( a_request->parsed_specifier().empty() )
495  {
496  // payload should be a map of all parameters to be set
497  LDEBUG( plog, "Performing node config for multiple values in stream <" << t_target_stream << "> and node <" << t_target_node << ">" );
498 
499  if( ! a_request->payload().is_node() || a_request->payload().as_node().empty() )
500  {
501  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform node-config request: payload is empty" );
502  }
503 
504  param_node& t_req_payload = a_request->payload().as_node();
505 
506  try
507  {
508  _configure_node( t_target_stream, t_target_node, t_req_payload );
509  t_payload.merge( t_req_payload );
510  }
511  catch( std::exception& e )
512  {
513  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform node-config request: ") + e.what() );
514  }
515  }
516  else
517  {
518  // payload should be values array with a single entry for the particular parameter to be set
519  LDEBUG( plog, "Performing node config for a single value in stream <" << t_target_stream << "> and node <" << t_target_node << ">" );
520 
521  if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has( "values" ) )
522  {
523  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform node-config (single value): values array is missing" );
524  }
525 
526  param_node& t_req_payload = a_request->payload().as_node();
527 
528  if( ! t_req_payload["values"].is_array() )
529  {
530  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform node-config (single value): values entry is not an array" );
531  }
532  const param_array t_values_array = t_req_payload["values"].as_array();
533  if( t_values_array.empty() || ! t_values_array[0]().is_value() )
534  {
535  return a_request->reply( dripline::dl_message_error_bad_payload(), "Unable to perform node-config (single value): \"values\" is not an array, or the array is empty, or the first element in the array is not a value" );
536  }
537 
538  param_node t_param_to_set;
539  t_param_to_set.add( a_request->parsed_specifier().front(), param_value( t_values_array[0]() ) );
540 
541  try
542  {
543  _configure_node( t_target_stream, t_target_node, t_param_to_set );
544  t_payload.merge( t_param_to_set );
545  }
546  catch( std::exception& e )
547  {
548  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform node-config request (single value): ") + e.what() );
549  }
550  }
551 
552  LDEBUG( plog, "Node-config was successful" );
553  return a_request->reply( dripline::dl_success(), "Performed node-config", std::move( t_payload_ptr ) );
554  }
555 
556  dripline::reply_ptr_t stream_manager::handle_dump_config_node_request( const dripline::request_ptr_t a_request )
557  {
558  if( a_request->parsed_specifier().size() < 2 )
559  {
560  return a_request->reply( dripline::dl_message_error_invalid_key(), "RKS is improperly formatted: [queue].node-config.[stream].[node] or [queue].node-config.[stream].[node].[parameter]" );
561  }
562 
563  //size_t t_rks_size = a_request->parsed_rks().size();
564 
565  std::string t_target_stream = a_request->parsed_specifier().front();
566  a_request->parsed_specifier().pop_front();
567 
568  std::string t_target_node = a_request->parsed_specifier().front();
569  a_request->parsed_specifier().pop_front();
570 
571  param_ptr_t t_payload_ptr( new param_node() );
572  param_node& t_payload = t_payload_ptr->as_node();
573  if( a_request->parsed_specifier().empty() )
574  {
575  // getting full node configuration
576  LDEBUG( plog, "Getting full node config for stream <" << t_target_stream << "> and node <" << t_target_node << ">" );
577 
578  try
579  {
580  _dump_node_config( t_target_stream, t_target_node, t_payload );
581  }
582  catch( std::exception& e )
583  {
584  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform get-node-config request: ") + e.what() );
585  }
586  }
587  else
588  {
589  // getting value for a single parameter
590  LDEBUG( plog, "Getting value for a single parameter in stream <" << t_target_stream << "> and node <" << t_target_node << ">" );
591 
592  std::string t_param_to_get = a_request->parsed_specifier().front();
593 
594  try
595  {
596  param_node t_param_dump;
597  _dump_node_config( t_target_stream, t_target_node, t_param_dump );
598  if( ! t_param_dump.has( t_param_to_get ) )
599  {
600  return a_request->reply( dripline::dl_message_error_invalid_key(), "Unable to get node parameter: cannot find parameter <" + t_param_to_get + ">" );
601  }
602  t_payload.add( t_param_to_get, param_value( t_param_dump[t_param_to_get]() ) );
603  }
604  catch( std::exception& e )
605  {
606  return a_request->reply( dripline::dl_device_error(), std::string("Unable to get node parameter (single value): ") + e.what() );
607  }
608  }
609 
610  LDEBUG( plog, "Get-node-config was successful" );
611  return a_request->reply( dripline::dl_success(), "Performed get-node-config", std::move( t_payload_ptr ) );
612  }
613 
614  dripline::reply_ptr_t stream_manager::handle_get_stream_list_request( const dripline::request_ptr_t a_request )
615  {
616  if( a_request->parsed_specifier().size() > 1 )
617  {
618  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: node-config.[stream]" );
619  }
620 
621  param_array t_streams_list;
622  LDEBUG( plog, "Getting list of streams from the stream handler" );
623  param_ptr_t t_payload_ptr( new param_node() );
624  param_node& t_payload = t_payload_ptr->as_node();
625  try
626  {
627  for ( streams_t::iterator t_stream_it = f_streams.begin(); t_stream_it != f_streams.end(); ++t_stream_it )
628  {
629  t_streams_list.push_back( param_value( t_stream_it->first ) );
630  }
631  t_payload.add( "streams", t_streams_list );
632  }
633  catch( std::exception& e )
634  {
635  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform get-stream-list request: ") + e.what() );
636  }
637  LDEBUG( plog, "Get-stream-list was successful" );
638  return a_request->reply( dripline::dl_success(), "Performed get-stream-list", std::move( t_payload_ptr ) );
639  }
640 
641  dripline::reply_ptr_t stream_manager::handle_get_stream_node_list_request( const dripline::request_ptr_t a_request )
642  {
643  if( a_request->parsed_specifier().size() < 1 )
644  {
645  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: node-list.[stream]" );
646  }
647 
648  std::string t_target_stream = a_request->parsed_specifier().front();
649  a_request->parsed_specifier().pop_front();
650 
651  if( !f_streams.count( t_target_stream ) )
652  {
653  return a_request->reply( dripline::dl_message_error_invalid_key(), "Specifier is improperly formatted: node-list.[stream]" );
654  }
655  stream_manager::stream_template::nodes_t* t_these_nodes = &(f_streams[ t_target_stream ].f_nodes);
656 
657  LDEBUG( plog, "Getting list of nodes from the stream handler" );
658  param_ptr_t t_payload_ptr( new param_node() );
659  param_node& t_payload = t_payload_ptr->as_node();
660  try
661  {
662  param_array t_node_list;
663  for ( stream_manager::stream_template::nodes_t::iterator t_nodes_it = t_these_nodes->begin(); t_nodes_it != t_these_nodes->end(); ++t_nodes_it )
664  {
665  t_node_list.push_back( param_value( t_nodes_it->first ) );
666  }
667  t_payload.add( "nodes", t_node_list );
668  }
669  catch( std::exception& e )
670  {
671  return a_request->reply( dripline::dl_device_error(), std::string("Unable to perform get-stream-node-list request: ") + e.what() );
672  }
673  LDEBUG( plog, "Get-stream-node-list was successful" );
674  return a_request->reply( dripline::dl_success(), "Performed get-stream-node-list", std::move(t_payload_ptr) );
675  }
676 
677 } /* namespace psyllid */
active_node_bindings f_node_bindings
void remove_stream(const std::string &a_name)
static bool add_preset(const scarab::param_node &a_preset_node)
const char * what() const
std::string get_node_run_str() const
std::map< std::string, std::string > nodes_t
static scarab::logger plog("batch_executor")
bool dump_node_config(const std::string &a_stream_name, const std::string &a_node_name, scarab::param_node &a_config) const
dripline::reply_ptr_t handle_dump_config_node_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_stream_list_request(const dripline::request_ptr_t a_request)
bool configure_node(const std::string &a_stream_name, const std::string &a_node_name, const scarab::param_node &a_config)
void _add_stream(const std::string &a_name, const scarab::param_node &a_node)
midge_package get_midge()
const nodes_t & get_nodes() const
dripline::reply_ptr_t handle_add_stream_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_configure_node_request(const dripline::request_ptr_t a_request)
Holds node configuration and can be used to create node classes and node binding classes.
void configure_builder(const scarab::param_node &a_config)
void _remove_stream(const std::string &a_name)
const connections_t & get_connections() const
void _dump_node_config(const std::string &a_stream_name, const std::string &a_node_name, scarab::param_node &a_config) const
void _configure_node(const std::string &a_stream_name, const std::string &a_node_name, const scarab::param_node &a_config)
void return_midge(midge_package &&a_midge)
locked_resource< midge::diptera, stream_manager > midge_package
dripline::reply_ptr_t handle_get_stream_node_list_request(const dripline::request_ptr_t a_request)
std::map< std::string, node_builder *> nodes_t
LOGGER(plog, "egg_writer")
bool initialize(const scarab::param_node &a_config)
dripline::reply_ptr_t handle_remove_stream_request(const dripline::request_ptr_t a_request)
bool add_stream(const std::string &a_name, const scarab::param_node &a_node)
virtual node_binding * clone() const =0
std::set< std::string > connections_t
Allows access to midge nodes.
Definition: node_builder.hh:38