18 #include <boost/algorithm/string/replace.hpp> 22 using scarab::param_ptr_t;
24 using scarab::param_array;
25 using scarab::param_node;
26 using scarab::param_value;
37 f_must_reset_midge( true ),
44 for( streams_t::iterator t_stream_it =
f_streams.begin(); t_stream_it !=
f_streams.end(); ++t_stream_it )
46 for( stream_template::nodes_t::iterator t_node_it = t_stream_it->second.f_nodes.begin(); t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
48 delete t_node_it->second;
49 t_node_it->second =
nullptr;
58 for( param_node::const_iterator t_str_conf_it = a_config.begin(); t_str_conf_it != a_config.end(); ++t_str_conf_it )
60 if( ! t_str_conf_it->is_node() )
62 LERROR(
plog,
"Invalid stream configuration" );
65 if( !
add_stream( t_str_conf_it.name(), t_str_conf_it->as_node() ) )
67 LERROR(
plog,
"Something went wrong while adding a stream" );
82 catch( std::exception& e )
84 LERROR(
plog, e.what() );
96 catch( std::exception& e )
98 LWARN(
plog, e.what() );
110 catch( std::exception& e )
112 LWARN(
plog,
"Unable to configure node <" << a_stream_name <<
"." << a_node_name <<
">: " << e.what() );
124 catch( std::exception& e )
126 LWARN(
plog,
"Unable to dump node config <" << a_stream_name <<
"." << a_node_name <<
">: " << e.what() );
135 streams_t::iterator t_stream_it =
f_streams.find( a_stream_name );
138 throw error() <<
"Did not find stream <" << a_stream_name <<
">";
141 stream_template::nodes_t::iterator t_node_it = t_stream_it->second.f_nodes.find( a_node_name );
142 if( t_node_it == t_stream_it->second.f_nodes.end() )
144 throw error() <<
"Did not find node <" << a_node_name <<
"> in stream <" << a_stream_name <<
">";
147 t_node_it->second->configure_builder( a_config );
156 streams_t::const_iterator t_stream_it =
f_streams.find( a_stream_name );
159 throw error() <<
"Did not find stream <" << a_stream_name <<
">";
162 stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.find( a_node_name );
163 if( t_node_it == t_stream_it->second.f_nodes.end() )
165 throw error() <<
"Did not find node <" << a_node_name <<
"> in stream <" << a_stream_name <<
">";
168 t_node_it->second->dump_builder_config( a_config );
181 if( ! a_node.has(
"preset" ) )
183 throw error() <<
"No preset specified";
185 const param& t_preset_param = a_node[
"preset"];
187 if( t_preset_param.is_node() )
189 const param_node& t_preset_param_node = t_preset_param.as_node();
192 throw error() <<
"Runtime preset could not be added";
196 return _add_stream( a_name, t_preset_param_node[
"type"]().as_string(), a_node );
198 else if( t_preset_param.is_value() )
200 return _add_stream( a_name, t_preset_param().as_string(), a_node );
204 throw error() <<
"Invalid preset specification";
207 catch( std::exception& e )
217 stream_preset* t_preset = scarab::factory< stream_preset, const std::string& >::get_instance()->create( a_type, a_type );
219 if( t_preset ==
nullptr )
221 throw error() <<
"Unable to create preset called <" << a_name <<
"> of type <" << a_type <<
">. The type may not be registered or there may be a typo.";
229 throw error() <<
"Already have a stream called <" << a_name <<
">";
232 LINFO(
plog,
"Preparing stream <" << a_name <<
">");
237 const preset_nodes_t& t_new_nodes = t_preset->
get_nodes();
238 std::map< std::string, std::string > t_name_replacements;
239 for( preset_nodes_t::const_iterator t_node_it = t_new_nodes.begin(); t_node_it != t_new_nodes.end(); ++t_node_it )
241 std::stringstream t_nn_str;
242 t_nn_str << a_name <<
"_" << t_node_it->first;
243 std::string t_node_name = t_nn_str.str();
244 t_name_replacements[ t_node_it->first ] = t_node_name;
246 LDEBUG(
plog,
"Creating node of type <" << t_node_it->second <<
"> called <" << t_node_name <<
">" );
247 node_builder* t_builder = scarab::factory< node_builder >::get_instance()->create( t_node_it->second );
248 if( t_builder ==
nullptr )
250 throw error() <<
"Cannot find binding for node type <" << t_node_it->second <<
">";
253 t_builder->name() = t_node_name;
256 param_node t_node_config;
258 if( a_node.has( t_node_it->first ) ) t_node_config.merge( a_node[t_node_it->first].as_node() );
260 if( a_node.has(
"device" ) ) t_node_config.add(
"device", a_node[
"device"].as_node() );
264 t_stream.
f_nodes.insert( stream_template::nodes_t::value_type( t_node_it->first, t_builder ) );
269 for( preset_conn_t::const_iterator t_conn_it = t_new_connections.begin(); t_conn_it != t_new_connections.end(); ++t_conn_it )
271 std::string t_connection( *t_conn_it );
272 for( std::map< std::string, std::string >::const_iterator t_it = t_name_replacements.begin(); t_it != t_name_replacements.end(); ++t_it )
274 boost::replace_all( t_connection, t_it->first, t_it->second );
276 LDEBUG(
plog,
"Adding connection: " << t_connection );
282 f_streams.insert( streams_t::value_type( a_name, t_stream ) );
283 LDEBUG(
plog,
"Added stream <" << a_name <<
">" );
292 streams_t::iterator t_to_erase =
f_streams.find( a_name );
295 throw error() <<
"Stream <" << a_name <<
"> does not exist";
300 for( stream_template::nodes_t::iterator t_node_it = t_to_erase->second.f_nodes.begin(); t_node_it != t_to_erase->second.f_nodes.end(); ++t_node_it )
302 delete t_node_it->second;
303 t_node_it->second =
nullptr;
319 throw error() <<
"No streams have been setup";
324 std::unique_lock< std::mutex > t_midge_lock(
f_midge_mutex );
325 f_midge.reset(
new midge::diptera() );
328 for( streams_t::const_iterator t_stream_it =
f_streams.begin(); t_stream_it !=
f_streams.end(); ++t_stream_it )
330 for( stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.begin(); t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
332 midge::node* t_new_node = t_node_it->second->build();
336 LINFO(
plog,
"Adding node <" << t_node_it->first <<
">" );
340 LDEBUG(
plog,
"Adding new node binding for node <" << t_node_it->second->name() <<
">");
341 f_node_bindings[ t_node_it->second->name() ] = std::make_pair( t_new_binding, t_new_node );
343 catch( std::exception& e )
347 throw error() <<
"Unable to add processor <" << t_node_it->first <<
">: " << e.
what();
352 for( stream_template::connections_t::const_iterator t_conn_it = t_stream_it->second.f_connections.begin(); t_conn_it != t_stream_it->second.f_connections.end(); ++t_conn_it )
356 LINFO(
plog,
"Adding connection <" << *t_conn_it <<
">" );
359 catch( std::exception& e )
361 throw error() <<
"Unable to join nodes: " << e.
what();
364 LINFO(
plog,
"Node connection made: <" << *t_conn_it <<
">" );
393 std::string t_run_str;
394 for( streams_t::const_iterator t_stream_it =
f_streams.begin(); t_stream_it !=
f_streams.end(); ++t_stream_it )
396 stream_template::nodes_t::const_iterator t_node_it = t_stream_it->second.f_nodes.begin();
397 t_run_str = t_stream_it->first +
"_" + t_node_it->first;
398 for( ++t_node_it; t_node_it != t_stream_it->second.f_nodes.end(); ++t_node_it )
400 t_run_str += midge::diptera::separator() + t_stream_it->first +
"_" + t_node_it->first;
408 LERROR(
plog,
"Checking if manager mutex is in use" );
419 LDEBUG(
plog,
"Clearing node bindings" );
422 delete t_it->second.first;
423 t_it->second.first =
nullptr;
431 if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has(
"name" ) || ! a_request->payload().as_node().has(
"config" ) )
433 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Add-stream request is missing either \"name\" or \"config\"" );
438 add_stream( a_request->payload()[
"name"]().as_string(), a_request->payload()[
"config"].as_node() );
440 catch( std::exception& e )
442 return a_request->reply( dripline::dl_warning_no_action_taken(), e.what() );
445 return a_request->reply( dripline::dl_success(),
"Stream " + a_request->payload()[
"name"]().as_string() +
" has been added" );
450 if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has(
"values" ) )
452 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform remove-stream: values array is missing" );
454 if( ! a_request->payload()[
"values"].is_array() )
456 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform remove-stream: values must be an array" );
458 const param_array t_values_array = a_request->payload()[
"values"].as_array();
459 if( t_values_array.empty() || ! t_values_array[0].is_value() )
461 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform remove-stream: \"values\" is not an array, or the array is empty, or the first element in the array is not a value" );
470 return a_request->reply( dripline::dl_warning_no_action_taken(), e.
what() );
473 return a_request->reply( dripline::dl_success(),
"Stream " + t_values_array[0]().as_string() +
" has been removed" );
478 if( a_request->parsed_specifier().size() < 2 )
480 return a_request->reply( dripline::dl_message_error_invalid_key(),
"Specifier is improperly formatted: node-config.[stream].[node] or node-config.[stream].[node].[parameter]" );
485 std::string t_target_stream = a_request->parsed_specifier().front();
486 a_request->parsed_specifier().pop_front();
488 std::string t_target_node = a_request->parsed_specifier().front();
489 a_request->parsed_specifier().pop_front();
491 param_ptr_t t_payload_ptr(
new param_node() );
492 param_node& t_payload = t_payload_ptr->as_node();
494 if( a_request->parsed_specifier().empty() )
497 LDEBUG(
plog,
"Performing node config for multiple values in stream <" << t_target_stream <<
"> and node <" << t_target_node <<
">" );
499 if( ! a_request->payload().is_node() || a_request->payload().as_node().empty() )
501 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform node-config request: payload is empty" );
504 param_node& t_req_payload = a_request->payload().as_node();
509 t_payload.merge( t_req_payload );
511 catch( std::exception& e )
513 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to perform node-config request: ") + e.what() );
519 LDEBUG(
plog,
"Performing node config for a single value in stream <" << t_target_stream <<
"> and node <" << t_target_node <<
">" );
521 if( ! a_request->payload().is_node() || ! a_request->payload().as_node().has(
"values" ) )
523 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform node-config (single value): values array is missing" );
526 param_node& t_req_payload = a_request->payload().as_node();
528 if( ! t_req_payload[
"values"].is_array() )
530 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform node-config (single value): values entry is not an array" );
532 const param_array t_values_array = t_req_payload[
"values"].as_array();
533 if( t_values_array.empty() || ! t_values_array[0]().is_value() )
535 return a_request->reply( dripline::dl_message_error_bad_payload(),
"Unable to perform node-config (single value): \"values\" is not an array, or the array is empty, or the first element in the array is not a value" );
538 param_node t_param_to_set;
539 t_param_to_set.add( a_request->parsed_specifier().front(), param_value( t_values_array[0]() ) );
544 t_payload.merge( t_param_to_set );
546 catch( std::exception& e )
548 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to perform node-config request (single value): ") + e.what() );
552 LDEBUG(
plog,
"Node-config was successful" );
553 return a_request->reply( dripline::dl_success(),
"Performed node-config", std::move( t_payload_ptr ) );
558 if( a_request->parsed_specifier().size() < 2 )
560 return a_request->reply( dripline::dl_message_error_invalid_key(),
"RKS is improperly formatted: [queue].node-config.[stream].[node] or [queue].node-config.[stream].[node].[parameter]" );
565 std::string t_target_stream = a_request->parsed_specifier().front();
566 a_request->parsed_specifier().pop_front();
568 std::string t_target_node = a_request->parsed_specifier().front();
569 a_request->parsed_specifier().pop_front();
571 param_ptr_t t_payload_ptr(
new param_node() );
572 param_node& t_payload = t_payload_ptr->as_node();
573 if( a_request->parsed_specifier().empty() )
576 LDEBUG(
plog,
"Getting full node config for stream <" << t_target_stream <<
"> and node <" << t_target_node <<
">" );
582 catch( std::exception& e )
584 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to perform get-node-config request: ") + e.what() );
590 LDEBUG(
plog,
"Getting value for a single parameter in stream <" << t_target_stream <<
"> and node <" << t_target_node <<
">" );
592 std::string t_param_to_get = a_request->parsed_specifier().front();
596 param_node t_param_dump;
598 if( ! t_param_dump.has( t_param_to_get ) )
600 return a_request->reply( dripline::dl_message_error_invalid_key(),
"Unable to get node parameter: cannot find parameter <" + t_param_to_get +
">" );
602 t_payload.add( t_param_to_get, param_value( t_param_dump[t_param_to_get]() ) );
604 catch( std::exception& e )
606 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to get node parameter (single value): ") + e.what() );
610 LDEBUG(
plog,
"Get-node-config was successful" );
611 return a_request->reply( dripline::dl_success(),
"Performed get-node-config", std::move( t_payload_ptr ) );
616 if( a_request->parsed_specifier().size() > 1 )
618 return a_request->reply( dripline::dl_message_error_invalid_key(),
"Specifier is improperly formatted: node-config.[stream]" );
621 param_array t_streams_list;
622 LDEBUG(
plog,
"Getting list of streams from the stream handler" );
623 param_ptr_t t_payload_ptr(
new param_node() );
624 param_node& t_payload = t_payload_ptr->as_node();
627 for ( streams_t::iterator t_stream_it =
f_streams.begin(); t_stream_it !=
f_streams.end(); ++t_stream_it )
629 t_streams_list.push_back( param_value( t_stream_it->first ) );
631 t_payload.add(
"streams", t_streams_list );
633 catch( std::exception& e )
635 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to perform get-stream-list request: ") + e.what() );
637 LDEBUG(
plog,
"Get-stream-list was successful" );
638 return a_request->reply( dripline::dl_success(),
"Performed get-stream-list", std::move( t_payload_ptr ) );
643 if( a_request->parsed_specifier().size() < 1 )
645 return a_request->reply( dripline::dl_message_error_invalid_key(),
"Specifier is improperly formatted: node-list.[stream]" );
648 std::string t_target_stream = a_request->parsed_specifier().front();
649 a_request->parsed_specifier().pop_front();
651 if( !
f_streams.count( t_target_stream ) )
653 return a_request->reply( dripline::dl_message_error_invalid_key(),
"Specifier is improperly formatted: node-list.[stream]" );
657 LDEBUG(
plog,
"Getting list of nodes from the stream handler" );
658 param_ptr_t t_payload_ptr(
new param_node() );
659 param_node& t_payload = t_payload_ptr->as_node();
662 param_array t_node_list;
663 for ( stream_manager::stream_template::nodes_t::iterator t_nodes_it = t_these_nodes->begin(); t_nodes_it != t_these_nodes->end(); ++t_nodes_it )
665 t_node_list.push_back( param_value( t_nodes_it->first ) );
667 t_payload.add(
"nodes", t_node_list );
669 catch( std::exception& e )
671 return a_request->reply( dripline::dl_device_error(), std::string(
"Unable to perform get-stream-node-list request: ") + e.what() );
673 LDEBUG(
plog,
"Get-stream-node-list was successful" );
674 return a_request->reply( dripline::dl_success(),
"Performed get-stream-node-list", std::move(t_payload_ptr) );
active_node_bindings f_node_bindings
void remove_stream(const std::string &a_name)
static bool add_preset(const scarab::param_node &a_preset_node)
void clear_node_bindings()
const char * what() const
connections_t f_connections
std::string get_node_run_str() const
std::map< std::string, std::string > nodes_t
static scarab::logger plog("batch_executor")
bool dump_node_config(const std::string &a_stream_name, const std::string &a_node_name, scarab::param_node &a_config) const
dripline::reply_ptr_t handle_dump_config_node_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_get_stream_list_request(const dripline::request_ptr_t a_request)
bool configure_node(const std::string &a_stream_name, const std::string &a_node_name, const scarab::param_node &a_config)
void _add_stream(const std::string &a_name, const scarab::param_node &a_node)
midge_package get_midge()
const nodes_t & get_nodes() const
virtual ~stream_manager()
dripline::reply_ptr_t handle_add_stream_request(const dripline::request_ptr_t a_request)
dripline::reply_ptr_t handle_configure_node_request(const dripline::request_ptr_t a_request)
Holds node configuration and can be used to create node classes and node binding classes.
void configure_builder(const scarab::param_node &a_config)
void _remove_stream(const std::string &a_name)
const connections_t & get_connections() const
void _dump_node_config(const std::string &a_stream_name, const std::string &a_node_name, scarab::param_node &a_config) const
std::mutex f_manager_mutex
void _configure_node(const std::string &a_stream_name, const std::string &a_node_name, const scarab::param_node &a_config)
void return_midge(midge_package &&a_midge)
locked_resource< midge::diptera, stream_manager > midge_package
dripline::reply_ptr_t handle_get_stream_node_list_request(const dripline::request_ptr_t a_request)
std::map< std::string, node_builder *> nodes_t
LOGGER(plog, "egg_writer")
bool initialize(const scarab::param_node &a_config)
dripline::reply_ptr_t handle_remove_stream_request(const dripline::request_ptr_t a_request)
bool add_stream(const std::string &a_name, const scarab::param_node &a_node)
virtual node_binding * clone() const =0
std::set< std::string > connections_t
Allows access to midge nodes.