00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033
00034
00035 #if __TBB_CPP11_TUPLE_PRESENT
00036 #include <tuple>
00037 #else
00038 #include "compat/tuple"
00039 #endif
00040
00041 #include<list>
00042 #include<queue>
00043
00054 namespace tbb {
00055 namespace flow {
00056
00058 enum concurrency { unlimited = 0, serial = 1 };
00059
00060 namespace interface6 {
00061
00062 namespace internal {
00063 template<typename T, typename M>
00064 class successor_cache;
00065 }
00066
00068 class continue_msg {};
00069
00070 template< typename T > class sender;
00071 template< typename T > class receiver;
00072 class continue_receiver;
00073
00075 template< typename T >
00076 class sender {
00077 public:
00079 typedef T output_type;
00080
00082 typedef receiver<T> successor_type;
00083
00084 virtual ~sender() {}
00085
00087 virtual bool register_successor( successor_type &r ) = 0;
00088
00090 virtual bool remove_successor( successor_type &r ) = 0;
00091
00093 virtual bool try_get( T & ) { return false; }
00094
00096 virtual bool try_reserve( T & ) { return false; }
00097
00099 virtual bool try_release( ) { return false; }
00100
00102 virtual bool try_consume( ) { return false; }
00103 };
00104
00105 template< typename T > class limiter_node;
00106
00108 template< typename T >
00109 class receiver {
00110 public:
00112 typedef T input_type;
00113
00115 typedef sender<T> predecessor_type;
00116
00118 virtual ~receiver() {}
00119
00121 virtual bool try_put( const T& t ) = 0;
00122
00124 virtual bool register_predecessor( predecessor_type & ) { return false; }
00125
00127 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00128
00129 protected:
00131 template<typename U> friend class limiter_node;
00132 virtual void reset_receiver() = 0;
00133 template<typename TT, typename M>
00134 friend class internal::successor_cache;
00135 virtual bool is_continue_receiver() { return false; }
00136 };
00137
00139
00140 class continue_receiver : public receiver< continue_msg > {
00141 public:
00142
00144 typedef continue_msg input_type;
00145
00147 typedef sender< continue_msg > predecessor_type;
00148
00150 continue_receiver( int number_of_predecessors = 0 ) {
00151 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00152 my_current_count = 0;
00153 }
00154
00156 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00157 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00158 my_current_count = 0;
00159 }
00160
00162 virtual ~continue_receiver() { }
00163
00165 bool register_predecessor( predecessor_type & ) {
00166 spin_mutex::scoped_lock l(my_mutex);
00167 ++my_predecessor_count;
00168 return true;
00169 }
00170
00172
00175 bool remove_predecessor( predecessor_type & ) {
00176 spin_mutex::scoped_lock l(my_mutex);
00177 --my_predecessor_count;
00178 return true;
00179 }
00180
00182
00184 bool try_put( const input_type & ) {
00185 {
00186 spin_mutex::scoped_lock l(my_mutex);
00187 if ( ++my_current_count < my_predecessor_count )
00188 return true;
00189 else
00190 my_current_count = 0;
00191 }
00192 execute();
00193 return true;
00194 }
00195
00196 protected:
00197 spin_mutex my_mutex;
00198 int my_predecessor_count;
00199 int my_current_count;
00200 int my_initial_predecessor_count;
00201
00202
00203 template<typename U> friend class limiter_node;
00204 void reset_receiver() {
00205 my_current_count = 0;
00206 }
00207
00209
00211 virtual void execute() = 0;
00212 template<typename TT, typename M>
00213 friend class internal::successor_cache;
00214 bool is_continue_receiver() { return true; }
00215 };
00216
00217 #include "internal/_flow_graph_impl.h"
00218 using namespace internal::graph_policy_namespace;
00219
00220 class graph;
00221 class graph_node;
00222
00223 template <typename GraphContainerType, typename GraphNodeType>
00224 class graph_iterator {
00225 friend class graph;
00226 friend class graph_node;
00227 public:
00228 typedef size_t size_type;
00229 typedef GraphNodeType value_type;
00230 typedef GraphNodeType* pointer;
00231 typedef GraphNodeType& reference;
00232 typedef const GraphNodeType& const_reference;
00233 typedef std::forward_iterator_tag iterator_category;
00234
00236 graph_iterator() : my_graph(NULL), current_node(NULL) {}
00237
00239 graph_iterator(const graph_iterator& other) :
00240 my_graph(other.my_graph), current_node(other.current_node)
00241 {}
00242
00244 graph_iterator& operator=(const graph_iterator& other) {
00245 if (this != &other) {
00246 my_graph = other.my_graph;
00247 current_node = other.current_node;
00248 }
00249 return *this;
00250 }
00251
00253 reference operator*() const;
00254
00256 pointer operator->() const;
00257
00259 bool operator==(const graph_iterator& other) const {
00260 return ((my_graph == other.my_graph) && (current_node == other.current_node));
00261 }
00262
00264 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00265
00267 graph_iterator& operator++() {
00268 internal_forward();
00269 return *this;
00270 }
00271
00273 graph_iterator operator++(int) {
00274 graph_iterator result = *this;
00275 operator++();
00276 return result;
00277 }
00278
00279 private:
00280
00281 GraphContainerType *my_graph;
00282
00283 pointer current_node;
00284
00286 graph_iterator(GraphContainerType *g, bool begin);
00287 void internal_forward();
00288 };
00289
00291
00292 class graph : tbb::internal::no_copy {
00293 friend class graph_node;
00294
00295 template< typename Body >
00296 class run_task : public task {
00297 public:
00298 run_task( Body& body ) : my_body(body) {}
00299 task *execute() {
00300 my_body();
00301 return NULL;
00302 }
00303 private:
00304 Body my_body;
00305 };
00306
00307 template< typename Receiver, typename Body >
00308 class run_and_put_task : public task {
00309 public:
00310 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00311 task *execute() {
00312 my_receiver.try_put( my_body() );
00313 return NULL;
00314 }
00315 private:
00316 Receiver &my_receiver;
00317 Body my_body;
00318 };
00319
00320 public:
00322 explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00323 {
00324 own_context = true;
00325 cancelled = false;
00326 caught_exception = false;
00327 my_context = new task_group_context();
00328 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00329 my_root_task->set_ref_count(1);
00330 }
00331
00333 explicit graph(task_group_context& use_this_context) :
00334 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00335 {
00336 own_context = false;
00337 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00338 my_root_task->set_ref_count(1);
00339 }
00340
00342
00343 ~graph() {
00344 wait_for_all();
00345 my_root_task->set_ref_count(0);
00346 task::destroy( *my_root_task );
00347 if (own_context) delete my_context;
00348 }
00349
00351
00353 void increment_wait_count() {
00354 if (my_root_task)
00355 my_root_task->increment_ref_count();
00356 }
00357
00359
00361 void decrement_wait_count() {
00362 if (my_root_task)
00363 my_root_task->decrement_ref_count();
00364 }
00365
00367
00369 template< typename Receiver, typename Body >
00370 void run( Receiver &r, Body body ) {
00371 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00372 run_and_put_task< Receiver, Body >( r, body ) );
00373 }
00374
00376
00378 template< typename Body >
00379 void run( Body body ) {
00380 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00381 run_task< Body >( body ) );
00382 }
00383
00385
00386 void wait_for_all() {
00387 cancelled = false;
00388 caught_exception = false;
00389 if (my_root_task) {
00390 #if TBB_USE_EXCEPTIONS
00391 try {
00392 #endif
00393 my_root_task->wait_for_all();
00394 cancelled = my_context->is_group_execution_cancelled();
00395 #if TBB_USE_EXCEPTIONS
00396 }
00397 catch(...) {
00398 my_root_task->set_ref_count(1);
00399 my_context->reset();
00400 caught_exception = true;
00401 cancelled = true;
00402 throw;
00403 }
00404 #endif
00405 my_context->reset();
00406 my_root_task->set_ref_count(1);
00407 }
00408 }
00409
00411 task * root_task() {
00412 return my_root_task;
00413 }
00414
00415
00416 template<typename C, typename N>
00417 friend class graph_iterator;
00418
00419
00420 typedef graph_iterator<graph,graph_node> iterator;
00421 typedef graph_iterator<const graph,const graph_node> const_iterator;
00422
00423
00425 iterator begin() { return iterator(this, true); }
00427 iterator end() { return iterator(this, false); }
00429 const_iterator begin() const { return const_iterator(this, true); }
00431 const_iterator end() const { return const_iterator(this, false); }
00433 const_iterator cbegin() const { return const_iterator(this, true); }
00435 const_iterator cend() const { return const_iterator(this, false); }
00436
00438 bool is_cancelled() { return cancelled; }
00439 bool exception_thrown() { return caught_exception; }
00440
00441
00442 void reset();
00443
00444 private:
00445 task *my_root_task;
00446 task_group_context *my_context;
00447 bool own_context;
00448 bool cancelled;
00449 bool caught_exception;
00450
00451 graph_node *my_nodes, *my_nodes_last;
00452
00453 spin_mutex nodelist_mutex;
00454 void register_node(graph_node *n);
00455 void remove_node(graph_node *n);
00456
00457 };
00458
00459 template <typename C, typename N>
00460 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00461 {
00462 if (begin) current_node = my_graph->my_nodes;
00463
00464 }
00465
00466 template <typename C, typename N>
00467 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00468 __TBB_ASSERT(current_node, "graph_iterator at end");
00469 return *operator->();
00470 }
00471
00472 template <typename C, typename N>
00473 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
00474 return current_node;
00475 }
00476
00477
00478 template <typename C, typename N>
00479 void graph_iterator<C,N>::internal_forward() {
00480 if (current_node) current_node = current_node->next;
00481 }
00482
00484 class graph_node : tbb::internal::no_assign {
00485 friend class graph;
00486 template<typename C, typename N>
00487 friend class graph_iterator;
00488 protected:
00489 graph& my_graph;
00490 graph_node *next, *prev;
00491 public:
00492 graph_node(graph& g) : my_graph(g) {
00493 my_graph.register_node(this);
00494 }
00495 virtual ~graph_node() {
00496 my_graph.remove_node(this);
00497 }
00498
00499 protected:
00500 virtual void reset() = 0;
00501 };
00502
00503 inline void graph::register_node(graph_node *n) {
00504 n->next = NULL;
00505 {
00506 spin_mutex::scoped_lock lock(nodelist_mutex);
00507 n->prev = my_nodes_last;
00508 if (my_nodes_last) my_nodes_last->next = n;
00509 my_nodes_last = n;
00510 if (!my_nodes) my_nodes = n;
00511 }
00512 }
00513
00514 inline void graph::remove_node(graph_node *n) {
00515 {
00516 spin_mutex::scoped_lock lock(nodelist_mutex);
00517 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00518 if (n->prev) n->prev->next = n->next;
00519 if (n->next) n->next->prev = n->prev;
00520 if (my_nodes_last == n) my_nodes_last = n->prev;
00521 if (my_nodes == n) my_nodes = n->next;
00522 }
00523 n->prev = n->next = NULL;
00524 }
00525
00526 inline void graph::reset() {
00527
00528 if(my_context) my_context->reset();
00529 cancelled = false;
00530 caught_exception = false;
00531
00532 for(iterator ii = begin(); ii != end(); ++ii) {
00533 graph_node *my_p = &(*ii);
00534 my_p->reset();
00535 }
00536 }
00537
00538
00539 #include "internal/_flow_graph_node_impl.h"
00540
00542 template < typename Output >
00543 class source_node : public graph_node, public sender< Output > {
00544 using graph_node::my_graph;
00545 public:
00547 typedef Output output_type;
00548
00550 typedef receiver< Output > successor_type;
00551
00553 template< typename Body >
00554 source_node( graph &g, Body body, bool is_active = true )
00555 : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00556 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00557 my_reserved(false), my_has_cached_item(false)
00558 {
00559 my_successors.set_owner(this);
00560 }
00561
00563 source_node( const source_node& src ) :
00564 graph_node(src.my_graph), sender<Output>(),
00565 my_root_task( src.my_root_task), my_active(src.init_my_active),
00566 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00567 my_reserved(false), my_has_cached_item(false)
00568 {
00569 my_successors.set_owner(this);
00570 }
00571
00573 ~source_node() { delete my_body; }
00574
00576 bool register_successor( receiver<output_type> &r ) {
00577 spin_mutex::scoped_lock lock(my_mutex);
00578 my_successors.register_successor(r);
00579 if ( my_active )
00580 spawn_put();
00581 return true;
00582 }
00583
00585 bool remove_successor( receiver<output_type> &r ) {
00586 spin_mutex::scoped_lock lock(my_mutex);
00587 my_successors.remove_successor(r);
00588 return true;
00589 }
00590
00592 bool try_get( output_type &v ) {
00593 spin_mutex::scoped_lock lock(my_mutex);
00594 if ( my_reserved )
00595 return false;
00596
00597 if ( my_has_cached_item ) {
00598 v = my_cached_item;
00599 my_has_cached_item = false;
00600 return true;
00601 }
00602 return false;
00603 }
00604
00606 bool try_reserve( output_type &v ) {
00607 spin_mutex::scoped_lock lock(my_mutex);
00608 if ( my_reserved ) {
00609 return false;
00610 }
00611
00612 if ( my_has_cached_item ) {
00613 v = my_cached_item;
00614 my_reserved = true;
00615 return true;
00616 } else {
00617 return false;
00618 }
00619 }
00620
00622
00623 bool try_release( ) {
00624 spin_mutex::scoped_lock lock(my_mutex);
00625 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00626 my_reserved = false;
00627 if(!my_successors.empty())
00628 spawn_put();
00629 return true;
00630 }
00631
00633 bool try_consume( ) {
00634 spin_mutex::scoped_lock lock(my_mutex);
00635 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00636 my_reserved = false;
00637 my_has_cached_item = false;
00638 if ( !my_successors.empty() ) {
00639 spawn_put();
00640 }
00641 return true;
00642 }
00643
00645 void activate() {
00646 spin_mutex::scoped_lock lock(my_mutex);
00647 my_active = true;
00648 if ( !my_successors.empty() )
00649 spawn_put();
00650 }
00651
00652 template<class Body>
00653 Body copy_function_object() {
00654 internal::source_body<output_type> &body_ref = *this->my_body;
00655 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
00656 }
00657
00658 protected:
00659
00661 void reset() {
00662 my_active = init_my_active;
00663 my_reserved =false;
00664 if(my_has_cached_item) {
00665 my_has_cached_item = false;
00666 }
00667 }
00668
00669 private:
00670 task *my_root_task;
00671 spin_mutex my_mutex;
00672 bool my_active;
00673 bool init_my_active;
00674 internal::source_body<output_type> *my_body;
00675 internal::broadcast_cache< output_type > my_successors;
00676 bool my_reserved;
00677 bool my_has_cached_item;
00678 output_type my_cached_item;
00679
00680 friend class internal::source_task< source_node< output_type > >;
00681
00682
00683
00684 bool try_reserve_apply_body(output_type &v) {
00685 spin_mutex::scoped_lock lock(my_mutex);
00686 if ( my_reserved ) {
00687 return false;
00688 }
00689
00690 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00691 my_has_cached_item = true;
00692
00693 if ( my_has_cached_item ) {
00694 v = my_cached_item;
00695 my_reserved = true;
00696 return true;
00697 } else {
00698 return false;
00699 }
00700 }
00701
00703 void apply_body( ) {
00704 output_type v;
00705 if ( !try_reserve_apply_body(v) )
00706 return;
00707
00708 if ( my_successors.try_put( v ) )
00709 try_consume();
00710 else
00711 try_release();
00712 }
00713
00715 void spawn_put( ) {
00716 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00717 internal::source_task< source_node< output_type > >( *this ) );
00718 }
00719 };
00720
00722 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00723 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00724 using graph_node::my_graph;
00725 public:
00726 typedef Input input_type;
00727 typedef Output output_type;
00728 typedef sender< input_type > predecessor_type;
00729 typedef receiver< output_type > successor_type;
00730 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00731 typedef internal::function_output<output_type> fOutput_type;
00732
00734 template< typename Body >
00735 function_node( graph &g, size_t concurrency, Body body ) :
00736 graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00737 {}
00738
00740 function_node( const function_node& src ) :
00741 graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00742 fOutput_type()
00743 {}
00744
00745 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00746
00747 protected:
00748
00749
00750 void reset() {fInput_type::reset_function_input(); }
00751
00752 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00753 };
00754
00756 template < typename Input, typename Output, typename Allocator >
00757 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00758 using graph_node::my_graph;
00759 public:
00760 typedef Input input_type;
00761 typedef Output output_type;
00762 typedef sender< input_type > predecessor_type;
00763 typedef receiver< output_type > successor_type;
00764 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00765 typedef internal::function_input_queue<input_type, Allocator> queue_type;
00766 typedef internal::function_output<output_type> fOutput_type;
00767
00769 template< typename Body >
00770 function_node( graph &g, size_t concurrency, Body body ) :
00771 graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00772 {}
00773
00775 function_node( const function_node& src ) :
00776 graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00777 {}
00778
00779 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00780
00781 protected:
00782
00783 void reset() { fInput_type::reset_function_input(); }
00784
00785 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00786 };
00787
00788 #include "tbb/internal/_flow_graph_types_impl.h"
00789
00791
00792 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00793 class multifunction_node :
00794 public graph_node,
00795 public internal::multifunction_input
00796 <
00797 Input,
00798 typename internal::wrap_tuple_elements<
00799 std::tuple_size<Output>::value,
00800 internal::function_output,
00801 Output
00802 >::type,
00803 Allocator
00804 > {
00805 using graph_node::my_graph;
00806 private:
00807 static const int N = std::tuple_size<Output>::value;
00808 public:
00809 typedef Input input_type;
00810 typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00811 private:
00812 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00813 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00814 public:
00815 template<typename Body>
00816 multifunction_node( graph &g, size_t concurrency, Body body ) :
00817 graph_node(g), base_type(g,concurrency, body)
00818 {}
00819 multifunction_node( const multifunction_node &other) :
00820 graph_node(other.my_graph), base_type(other)
00821 {}
00822
00823 protected:
00824 void reset() { base_type::reset(); }
00825 };
00826
00827 template < typename Input, typename Output, typename Allocator >
00828 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00829 typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00830 using graph_node::my_graph;
00831 static const int N = std::tuple_size<Output>::value;
00832 public:
00833 typedef Input input_type;
00834 typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00835 private:
00836 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00837 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00838 public:
00839 template<typename Body>
00840 multifunction_node( graph &g, size_t concurrency, Body body) :
00841 graph_node(g), base_type(g,concurrency, body, new queue_type())
00842 {}
00843 multifunction_node( const multifunction_node &other) :
00844 graph_node(other.my_graph), base_type(other, new queue_type())
00845 {}
00846
00847 protected:
00848 void reset() { base_type::reset(); }
00849 };
00850
00852
00853
00854 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00855 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00856 static const int N = std::tuple_size<TupleType>::value;
00857 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00858 public:
00859 typedef typename base_type::output_ports_type output_ports_type;
00860 private:
00861 struct splitting_body {
00862 void operator()(const TupleType& t, output_ports_type &p) {
00863 internal::emit_element<N>::emit_this(t, p);
00864 }
00865 };
00866 public:
00867 typedef TupleType input_type;
00868 typedef Allocator allocator_type;
00869 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00870 split_node( const split_node & other) : base_type(other) {}
00871 };
00872
00874 template <typename Output>
00875 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00876 using graph_node::my_graph;
00877 public:
00878 typedef continue_msg input_type;
00879 typedef Output output_type;
00880 typedef sender< input_type > predecessor_type;
00881 typedef receiver< output_type > successor_type;
00882 typedef internal::function_output<output_type> fOutput_type;
00883
00885 template <typename Body >
00886 continue_node( graph &g, Body body ) :
00887 graph_node(g), internal::continue_input<output_type>( g, body )
00888 {}
00889
00891 template <typename Body >
00892 continue_node( graph &g, int number_of_predecessors, Body body ) :
00893 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00894 {}
00895
00897 continue_node( const continue_node& src ) :
00898 graph_node(src.my_graph), internal::continue_input<output_type>(src),
00899 internal::function_output<Output>()
00900 {}
00901
00902 bool try_put(const input_type &i) { return internal::continue_input<Output>::try_put(i); }
00903
00904 protected:
00905 void reset() { internal::continue_input<Output>::reset_receiver(); }
00906
00907 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00908 };
00909
00910 template< typename T >
00911 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00912 using graph_node::my_graph;
00913 public:
00914 typedef T input_type;
00915 typedef T output_type;
00916 typedef sender< input_type > predecessor_type;
00917 typedef receiver< output_type > successor_type;
00918
00919 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00920 my_successors.set_owner( this );
00921 }
00922
00923
00924 overwrite_node( const overwrite_node& src ) :
00925 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00926 {
00927 my_successors.set_owner( this );
00928 }
00929
00930 ~overwrite_node() {}
00931
00932 bool register_successor( successor_type &s ) {
00933 spin_mutex::scoped_lock l( my_mutex );
00934 if ( my_buffer_is_valid ) {
00935
00936 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
00937
00938 my_successors.register_successor( s );
00939 return true;
00940 } else {
00941
00942 return false;
00943 }
00944 } else {
00945
00946 my_successors.register_successor( s );
00947 return true;
00948 }
00949 }
00950
00951 bool remove_successor( successor_type &s ) {
00952 spin_mutex::scoped_lock l( my_mutex );
00953 my_successors.remove_successor(s);
00954 return true;
00955 }
00956
00957 bool try_put( const T &v ) {
00958 spin_mutex::scoped_lock l( my_mutex );
00959 my_buffer = v;
00960 my_buffer_is_valid = true;
00961 my_successors.try_put(v);
00962 return true;
00963 }
00964
00965 bool try_get( T &v ) {
00966 spin_mutex::scoped_lock l( my_mutex );
00967 if ( my_buffer_is_valid ) {
00968 v = my_buffer;
00969 return true;
00970 } else {
00971 return false;
00972 }
00973 }
00974
00975 bool is_valid() {
00976 spin_mutex::scoped_lock l( my_mutex );
00977 return my_buffer_is_valid;
00978 }
00979
00980 void clear() {
00981 spin_mutex::scoped_lock l( my_mutex );
00982 my_buffer_is_valid = false;
00983 }
00984
00985 protected:
00986
00987 void reset() { my_buffer_is_valid = false; }
00988
00989 spin_mutex my_mutex;
00990 internal::broadcast_cache< T, null_rw_mutex > my_successors;
00991 T my_buffer;
00992 bool my_buffer_is_valid;
00993 void reset_receiver() {}
00994 };
00995
00996 template< typename T >
00997 class write_once_node : public overwrite_node<T> {
00998 public:
00999 typedef T input_type;
01000 typedef T output_type;
01001 typedef sender< input_type > predecessor_type;
01002 typedef receiver< output_type > successor_type;
01003
01005 write_once_node(graph& g) : overwrite_node<T>(g) {}
01006
01008 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
01009
01010 bool try_put( const T &v ) {
01011 spin_mutex::scoped_lock l( this->my_mutex );
01012 if ( this->my_buffer_is_valid ) {
01013 return false;
01014 } else {
01015 this->my_buffer = v;
01016 this->my_buffer_is_valid = true;
01017 this->my_successors.try_put(v);
01018 return true;
01019 }
01020 }
01021 };
01022
01024 template <typename T>
01025 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
01026 using graph_node::my_graph;
01027 internal::broadcast_cache<T> my_successors;
01028 public:
01029 typedef T input_type;
01030 typedef T output_type;
01031 typedef sender< input_type > predecessor_type;
01032 typedef receiver< output_type > successor_type;
01033
01034 broadcast_node(graph& g) : graph_node(g) {
01035 my_successors.set_owner( this );
01036 }
01037
01038
01039 broadcast_node( const broadcast_node& src ) :
01040 graph_node(src.my_graph), receiver<T>(), sender<T>()
01041 {
01042 my_successors.set_owner( this );
01043 }
01044
01046 virtual bool register_successor( receiver<T> &r ) {
01047 my_successors.register_successor( r );
01048 return true;
01049 }
01050
01052 virtual bool remove_successor( receiver<T> &r ) {
01053 my_successors.remove_successor( r );
01054 return true;
01055 }
01056
01057 bool try_put( const T &t ) {
01058 my_successors.try_put(t);
01059 return true;
01060 }
01061 protected:
01062 void reset() {}
01063 void reset_receiver() {}
01064 };
01065
01066 #include "internal/_flow_graph_item_buffer_impl.h"
01067
01069 template <typename T, typename A=cache_aligned_allocator<T> >
01070 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
01071 using graph_node::my_graph;
01072 public:
01073 typedef T input_type;
01074 typedef T output_type;
01075 typedef sender< input_type > predecessor_type;
01076 typedef receiver< output_type > successor_type;
01077 typedef buffer_node<T, A> my_class;
01078 protected:
01079 typedef size_t size_type;
01080 internal::round_robin_cache< T, null_rw_mutex > my_successors;
01081
01082 task *my_parent;
01083
01084 friend class internal::forward_task< buffer_node< T, A > >;
01085
01086 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
01087 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01088
01089
01090 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
01091 public:
01092 char type;
01093 T *elem;
01094 successor_type *r;
01095 buffer_operation(const T& e, op_type t) :
01096 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01097 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01098 };
01099
01100 bool forwarder_busy;
01101 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
01102 friend class internal::aggregating_functor<my_class, buffer_operation>;
01103 internal::aggregator< my_handler, buffer_operation> my_aggregator;
01104
01105 virtual void handle_operations(buffer_operation *op_list) {
01106 buffer_operation *tmp;
01107 bool try_forwarding=false;
01108 while (op_list) {
01109 tmp = op_list;
01110 op_list = op_list->next;
01111 switch (tmp->type) {
01112 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
01113 case rem_succ: internal_rem_succ(tmp); break;
01114 case req_item: internal_pop(tmp); break;
01115 case res_item: internal_reserve(tmp); break;
01116 case rel_res: internal_release(tmp); try_forwarding = true; break;
01117 case con_res: internal_consume(tmp); try_forwarding = true; break;
01118 case put_item: internal_push(tmp); try_forwarding = true; break;
01119 case try_fwd: internal_forward(tmp); break;
01120 }
01121 }
01122 if (try_forwarding && !forwarder_busy) {
01123 forwarder_busy = true;
01124 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01125 }
01126 }
01127
01129 virtual void forward() {
01130 buffer_operation op_data(try_fwd);
01131 do {
01132 op_data.status = WAIT;
01133 my_aggregator.execute(&op_data);
01134 } while (op_data.status == SUCCEEDED);
01135 }
01136
01138 virtual void internal_reg_succ(buffer_operation *op) {
01139 my_successors.register_successor(*(op->r));
01140 __TBB_store_with_release(op->status, SUCCEEDED);
01141 }
01142
01144 virtual void internal_rem_succ(buffer_operation *op) {
01145 my_successors.remove_successor(*(op->r));
01146 __TBB_store_with_release(op->status, SUCCEEDED);
01147 }
01148
01150 virtual void internal_forward(buffer_operation *op) {
01151 T i_copy;
01152 bool success = false;
01153 size_type counter = my_successors.size();
01154
01155 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01156 this->fetch_back(i_copy);
01157 if( my_successors.try_put(i_copy) ) {
01158 this->invalidate_back();
01159 --(this->my_tail);
01160 success = true;
01161 }
01162 --counter;
01163 }
01164 if (success && !counter)
01165 __TBB_store_with_release(op->status, SUCCEEDED);
01166 else {
01167 __TBB_store_with_release(op->status, FAILED);
01168 forwarder_busy = false;
01169 }
01170 }
01171
01172 virtual void internal_push(buffer_operation *op) {
01173 this->push_back(*(op->elem));
01174 __TBB_store_with_release(op->status, SUCCEEDED);
01175 }
01176
01177 virtual void internal_pop(buffer_operation *op) {
01178 if(this->pop_back(*(op->elem))) {
01179 __TBB_store_with_release(op->status, SUCCEEDED);
01180 }
01181 else {
01182 __TBB_store_with_release(op->status, FAILED);
01183 }
01184 }
01185
01186 virtual void internal_reserve(buffer_operation *op) {
01187 if(this->reserve_front(*(op->elem))) {
01188 __TBB_store_with_release(op->status, SUCCEEDED);
01189 }
01190 else {
01191 __TBB_store_with_release(op->status, FAILED);
01192 }
01193 }
01194
01195 virtual void internal_consume(buffer_operation *op) {
01196 this->consume_front();
01197 __TBB_store_with_release(op->status, SUCCEEDED);
01198 }
01199
01200 virtual void internal_release(buffer_operation *op) {
01201 this->release_front();
01202 __TBB_store_with_release(op->status, SUCCEEDED);
01203 }
01204
01205 public:
01207 buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01208 my_parent( g.root_task() ), forwarder_busy(false) {
01209 my_successors.set_owner(this);
01210 my_aggregator.initialize_handler(my_handler(this));
01211 }
01212
01214 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01215 reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01216 my_parent( src.my_parent ) {
01217 forwarder_busy = false;
01218 my_successors.set_owner(this);
01219 my_aggregator.initialize_handler(my_handler(this));
01220 }
01221
01222 virtual ~buffer_node() {}
01223
01224
01225
01226
01227
01229
01230 bool register_successor( receiver<output_type> &r ) {
01231 buffer_operation op_data(reg_succ);
01232 op_data.r = &r;
01233 my_aggregator.execute(&op_data);
01234 return true;
01235 }
01236
01238
01240 bool remove_successor( receiver<output_type> &r ) {
01241 r.remove_predecessor(*this);
01242 buffer_operation op_data(rem_succ);
01243 op_data.r = &r;
01244 my_aggregator.execute(&op_data);
01245 return true;
01246 }
01247
01249
01251 bool try_get( T &v ) {
01252 buffer_operation op_data(req_item);
01253 op_data.elem = &v;
01254 my_aggregator.execute(&op_data);
01255 return (op_data.status==SUCCEEDED);
01256 }
01257
01259
01261 bool try_reserve( T &v ) {
01262 buffer_operation op_data(res_item);
01263 op_data.elem = &v;
01264 my_aggregator.execute(&op_data);
01265 return (op_data.status==SUCCEEDED);
01266 }
01267
01269
01270 bool try_release() {
01271 buffer_operation op_data(rel_res);
01272 my_aggregator.execute(&op_data);
01273 return true;
01274 }
01275
01277
01278 bool try_consume() {
01279 buffer_operation op_data(con_res);
01280 my_aggregator.execute(&op_data);
01281 return true;
01282 }
01283
01285
01286 bool try_put(const T &t) {
01287 buffer_operation op_data(t, put_item);
01288 my_aggregator.execute(&op_data);
01289 return true;
01290 }
01291
01292 protected:
01293
01294 void reset() {
01295 reservable_item_buffer<T, A>::reset();
01296 forwarder_busy = false;
01297 }
01298
01299 void reset_receiver() {
01300
01301 }
01302
01303 };
01304
01306 template <typename T, typename A=cache_aligned_allocator<T> >
01307 class queue_node : public buffer_node<T, A> {
01308 protected:
01309 typedef typename buffer_node<T, A>::size_type size_type;
01310 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01311
01312 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01313
01315 void internal_forward(queue_operation *op) {
01316 T i_copy;
01317 bool success = false;
01318 size_type counter = this->my_successors.size();
01319 if (this->my_reserved || !this->item_valid(this->my_head)) {
01320 __TBB_store_with_release(op->status, FAILED);
01321 this->forwarder_busy = false;
01322 return;
01323 }
01324
01325 while (counter>0 && this->item_valid(this->my_head)) {
01326 this->fetch_front(i_copy);
01327 if(this->my_successors.try_put(i_copy)) {
01328 this->invalidate_front();
01329 ++(this->my_head);
01330 success = true;
01331 }
01332 --counter;
01333 }
01334 if (success && !counter)
01335 __TBB_store_with_release(op->status, SUCCEEDED);
01336 else {
01337 __TBB_store_with_release(op->status, FAILED);
01338 this->forwarder_busy = false;
01339 }
01340 }
01341
01342 void internal_pop(queue_operation *op) {
01343 if ( this->my_reserved || !this->item_valid(this->my_head)){
01344 __TBB_store_with_release(op->status, FAILED);
01345 }
01346 else {
01347 this->pop_front(*(op->elem));
01348 __TBB_store_with_release(op->status, SUCCEEDED);
01349 }
01350 }
01351 void internal_reserve(queue_operation *op) {
01352 if (this->my_reserved || !this->item_valid(this->my_head)) {
01353 __TBB_store_with_release(op->status, FAILED);
01354 }
01355 else {
01356 this->my_reserved = true;
01357 this->fetch_front(*(op->elem));
01358 this->invalidate_front();
01359 __TBB_store_with_release(op->status, SUCCEEDED);
01360 }
01361 }
01362 void internal_consume(queue_operation *op) {
01363 this->consume_front();
01364 __TBB_store_with_release(op->status, SUCCEEDED);
01365 }
01366
01367 public:
01368 typedef T input_type;
01369 typedef T output_type;
01370 typedef sender< input_type > predecessor_type;
01371 typedef receiver< output_type > successor_type;
01372
01374 queue_node( graph &g ) : buffer_node<T, A>(g) {}
01375
01377 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01378 };
01379
01381 template< typename T, typename A=cache_aligned_allocator<T> >
01382 class sequencer_node : public queue_node<T, A> {
01383 internal::function_body< T, size_t > *my_sequencer;
01384 public:
01385 typedef T input_type;
01386 typedef T output_type;
01387 typedef sender< input_type > predecessor_type;
01388 typedef receiver< output_type > successor_type;
01389
01391 template< typename Sequencer >
01392 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01393 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01394
01396 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01397 my_sequencer( src.my_sequencer->clone() ) {}
01398
01400 ~sequencer_node() { delete my_sequencer; }
01401 protected:
01402 typedef typename buffer_node<T, A>::size_type size_type;
01403 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01404
01405 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01406
01407 private:
01408 void internal_push(sequencer_operation *op) {
01409 size_type tag = (*my_sequencer)(*(op->elem));
01410
01411 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01412
01413 if(this->size() > this->capacity())
01414 this->grow_my_array(this->size());
01415 this->item(tag) = std::make_pair( *(op->elem), true );
01416 __TBB_store_with_release(op->status, SUCCEEDED);
01417 }
01418 };
01419
01421 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01422 class priority_queue_node : public buffer_node<T, A> {
01423 public:
01424 typedef T input_type;
01425 typedef T output_type;
01426 typedef buffer_node<T,A> base_type;
01427 typedef sender< input_type > predecessor_type;
01428 typedef receiver< output_type > successor_type;
01429
01431 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01432
01434 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01435
01436 protected:
01437
01438 void reset() {
01439 mark = 0;
01440 base_type::reset();
01441 }
01442
01443 typedef typename buffer_node<T, A>::size_type size_type;
01444 typedef typename buffer_node<T, A>::item_type item_type;
01445 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01446
01447 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01448
01449 void handle_operations(prio_operation *op_list) {
01450 prio_operation *tmp ;
01451 bool try_forwarding=false;
01452 while (op_list) {
01453 tmp = op_list;
01454 op_list = op_list->next;
01455 switch (tmp->type) {
01456 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01457 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01458 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01459 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01460 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01461 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01462 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01463 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01464 }
01465 }
01466
01467 if (mark<this->my_tail) heapify();
01468 if (try_forwarding && !this->forwarder_busy) {
01469 this->forwarder_busy = true;
01470 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01471 }
01472 }
01473
01475 void internal_forward(prio_operation *op) {
01476 T i_copy;
01477 bool success = false;
01478 size_type counter = this->my_successors.size();
01479
01480 if (this->my_reserved || this->my_tail == 0) {
01481 __TBB_store_with_release(op->status, FAILED);
01482 this->forwarder_busy = false;
01483 return;
01484 }
01485
01486 while (counter>0 && this->my_tail > 0) {
01487 i_copy = this->my_array[0].first;
01488 bool msg = this->my_successors.try_put(i_copy);
01489 if ( msg == true ) {
01490 if (mark == this->my_tail) --mark;
01491 --(this->my_tail);
01492 this->my_array[0].first=this->my_array[this->my_tail].first;
01493 if (this->my_tail > 1)
01494 reheap();
01495 success = true;
01496 }
01497 --counter;
01498 }
01499 if (success && !counter)
01500 __TBB_store_with_release(op->status, SUCCEEDED);
01501 else {
01502 __TBB_store_with_release(op->status, FAILED);
01503 this->forwarder_busy = false;
01504 }
01505 }
01506
01507 void internal_push(prio_operation *op) {
01508 if ( this->my_tail >= this->my_array_size )
01509 this->grow_my_array( this->my_tail + 1 );
01510 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01511 ++(this->my_tail);
01512 __TBB_store_with_release(op->status, SUCCEEDED);
01513 }
01514 void internal_pop(prio_operation *op) {
01515 if ( this->my_reserved == true || this->my_tail == 0 ) {
01516 __TBB_store_with_release(op->status, FAILED);
01517 }
01518 else {
01519 if (mark<this->my_tail &&
01520 compare(this->my_array[0].first,
01521 this->my_array[this->my_tail-1].first)) {
01522
01523
01524 *(op->elem) = this->my_array[this->my_tail-1].first;
01525 --(this->my_tail);
01526 __TBB_store_with_release(op->status, SUCCEEDED);
01527 }
01528 else {
01529 *(op->elem) = this->my_array[0].first;
01530 if (mark == this->my_tail) --mark;
01531 --(this->my_tail);
01532 __TBB_store_with_release(op->status, SUCCEEDED);
01533 this->my_array[0].first=this->my_array[this->my_tail].first;
01534 if (this->my_tail > 1)
01535 reheap();
01536 }
01537 }
01538 }
01539 void internal_reserve(prio_operation *op) {
01540 if (this->my_reserved == true || this->my_tail == 0) {
01541 __TBB_store_with_release(op->status, FAILED);
01542 }
01543 else {
01544 this->my_reserved = true;
01545 *(op->elem) = reserved_item = this->my_array[0].first;
01546 if (mark == this->my_tail) --mark;
01547 --(this->my_tail);
01548 __TBB_store_with_release(op->status, SUCCEEDED);
01549 this->my_array[0].first = this->my_array[this->my_tail].first;
01550 if (this->my_tail > 1)
01551 reheap();
01552 }
01553 }
01554 void internal_consume(prio_operation *op) {
01555 this->my_reserved = false;
01556 __TBB_store_with_release(op->status, SUCCEEDED);
01557 }
01558 void internal_release(prio_operation *op) {
01559 if (this->my_tail >= this->my_array_size)
01560 this->grow_my_array( this->my_tail + 1 );
01561 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01562 ++(this->my_tail);
01563 this->my_reserved = false;
01564 __TBB_store_with_release(op->status, SUCCEEDED);
01565 heapify();
01566 }
01567 private:
01568 Compare compare;
01569 size_type mark;
01570 input_type reserved_item;
01571
01572 void heapify() {
01573 if (!mark) mark = 1;
01574 for (; mark<this->my_tail; ++mark) {
01575 size_type cur_pos = mark;
01576 input_type to_place = this->my_array[mark].first;
01577 do {
01578 size_type parent = (cur_pos-1)>>1;
01579 if (!compare(this->my_array[parent].first, to_place))
01580 break;
01581 this->my_array[cur_pos].first = this->my_array[parent].first;
01582 cur_pos = parent;
01583 } while( cur_pos );
01584 this->my_array[cur_pos].first = to_place;
01585 }
01586 }
01587
01588 void reheap() {
01589 size_type cur_pos=0, child=1;
01590 while (child < mark) {
01591 size_type target = child;
01592 if (child+1<mark &&
01593 compare(this->my_array[child].first,
01594 this->my_array[child+1].first))
01595 ++target;
01596
01597 if (compare(this->my_array[target].first,
01598 this->my_array[this->my_tail].first))
01599 break;
01600 this->my_array[cur_pos].first = this->my_array[target].first;
01601 cur_pos = target;
01602 child = (cur_pos<<1)+1;
01603 }
01604 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01605 }
01606 };
01607
01609
01612 template< typename T >
01613 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01614 using graph_node::my_graph;
01615 public:
01616 typedef T input_type;
01617 typedef T output_type;
01618 typedef sender< input_type > predecessor_type;
01619 typedef receiver< output_type > successor_type;
01620
01621 private:
01622 task *my_root_task;
01623 size_t my_threshold;
01624 size_t my_count;
01625 internal::predecessor_cache< T > my_predecessors;
01626 spin_mutex my_mutex;
01627 internal::broadcast_cache< T > my_successors;
01628 int init_decrement_predecessors;
01629
01630 friend class internal::forward_task< limiter_node<T> >;
01631
01632
01633 friend class internal::decrementer< limiter_node<T> >;
01634
01635 void decrement_counter() {
01636 input_type v;
01637
01638
01639 if ( my_predecessors.get_item( v ) == false
01640 || my_successors.try_put(v) == false ) {
01641 spin_mutex::scoped_lock lock(my_mutex);
01642 --my_count;
01643 if ( !my_predecessors.empty() )
01644 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01645 internal::forward_task< limiter_node<T> >( *this ) );
01646 }
01647 }
01648
01649 void forward() {
01650 {
01651 spin_mutex::scoped_lock lock(my_mutex);
01652 if ( my_count < my_threshold )
01653 ++my_count;
01654 else
01655 return;
01656 }
01657 decrement_counter();
01658 }
01659
01660 public:
01662 internal::decrementer< limiter_node<T> > decrement;
01663
01665 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01666 graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01667 init_decrement_predecessors(num_decrement_predecessors),
01668 decrement(num_decrement_predecessors)
01669 {
01670 my_predecessors.set_owner(this);
01671 my_successors.set_owner(this);
01672 decrement.set_owner(this);
01673 }
01674
01676 limiter_node( const limiter_node& src ) :
01677 graph_node(src.my_graph), receiver<T>(), sender<T>(),
01678 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01679 init_decrement_predecessors(src.init_decrement_predecessors),
01680 decrement(src.init_decrement_predecessors)
01681 {
01682 my_predecessors.set_owner(this);
01683 my_successors.set_owner(this);
01684 decrement.set_owner(this);
01685 }
01686
01688 bool register_successor( receiver<output_type> &r ) {
01689 my_successors.register_successor(r);
01690 return true;
01691 }
01692
01694
01695 bool remove_successor( receiver<output_type> &r ) {
01696 r.remove_predecessor(*this);
01697 my_successors.remove_successor(r);
01698 return true;
01699 }
01700
01702 bool try_put( const T &t ) {
01703 {
01704 spin_mutex::scoped_lock lock(my_mutex);
01705 if ( my_count >= my_threshold )
01706 return false;
01707 else
01708 ++my_count;
01709 }
01710
01711 bool msg = my_successors.try_put(t);
01712
01713 if ( msg != true ) {
01714 spin_mutex::scoped_lock lock(my_mutex);
01715 --my_count;
01716 if ( !my_predecessors.empty() )
01717 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01718 internal::forward_task< limiter_node<T> >( *this ) );
01719 }
01720
01721 return msg;
01722 }
01723
01725 bool register_predecessor( predecessor_type &src ) {
01726 spin_mutex::scoped_lock lock(my_mutex);
01727 my_predecessors.add( src );
01728 if ( my_count < my_threshold && !my_successors.empty() )
01729 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01730 internal::forward_task< limiter_node<T> >( *this ) );
01731 return true;
01732 }
01733
01735 bool remove_predecessor( predecessor_type &src ) {
01736 my_predecessors.remove( src );
01737 return true;
01738 }
01739
01740 protected:
01741
01742 void reset() {
01743 my_count = 0;
01744 my_predecessors.reset();
01745 decrement.reset_receiver();
01746 }
01747
01748 void reset_receiver() { my_predecessors.reset(); }
01749 };
01750
01751 #include "internal/_flow_graph_join_impl.h"
01752
01753 using internal::reserving_port;
01754 using internal::queueing_port;
01755 using internal::tag_matching_port;
01756 using internal::input_port;
01757 using internal::tag_value;
01758 using internal::NO_TAG;
01759
01760 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01761
01762 template<typename OutputTuple>
01763 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01764 private:
01765 static const int N = std::tuple_size<OutputTuple>::value;
01766 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01767 public:
01768 typedef OutputTuple output_type;
01769 typedef typename unfolded_type::input_ports_type input_ports_type;
01770 join_node(graph &g) : unfolded_type(g) { }
01771 join_node(const join_node &other) : unfolded_type(other) {}
01772 };
01773
01774 template<typename OutputTuple>
01775 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01776 private:
01777 static const int N = std::tuple_size<OutputTuple>::value;
01778 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01779 public:
01780 typedef OutputTuple output_type;
01781 typedef typename unfolded_type::input_ports_type input_ports_type;
01782 join_node(graph &g) : unfolded_type(g) { }
01783 join_node(const join_node &other) : unfolded_type(other) {}
01784 };
01785
01786
01787 template<typename OutputTuple>
01788 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01789 tag_matching_port, OutputTuple, tag_matching> {
01790 private:
01791 static const int N = std::tuple_size<OutputTuple>::value;
01792 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01793 public:
01794 typedef OutputTuple output_type;
01795 typedef typename unfolded_type::input_ports_type input_ports_type;
01796 template<typename B0, typename B1>
01797 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01798 template<typename B0, typename B1, typename B2>
01799 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01800 template<typename B0, typename B1, typename B2, typename B3>
01801 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01802 template<typename B0, typename B1, typename B2, typename B3, typename B4>
01803 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01804 #if __TBB_VARIADIC_MAX >= 6
01805 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01806 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01807 #endif
01808 #if __TBB_VARIADIC_MAX >= 7
01809 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01810 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01811 #endif
01812 #if __TBB_VARIADIC_MAX >= 8
01813 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01814 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01815 #endif
01816 #if __TBB_VARIADIC_MAX >= 9
01817 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01818 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01819 #endif
01820 #if __TBB_VARIADIC_MAX >= 10
01821 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01822 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01823 #endif
01824 join_node(const join_node &other) : unfolded_type(other) {}
01825 };
01826
01827 #if TBB_PREVIEW_GRAPH_NODES
01828
01829 #include "internal/_flow_graph_or_impl.h"
01830
01831 template<typename InputTuple>
01832 class or_node : public internal::unfolded_or_node<InputTuple> {
01833 private:
01834 static const int N = std::tuple_size<InputTuple>::value;
01835 public:
01836 typedef typename internal::or_output_type<InputTuple>::type output_type;
01837 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01838 or_node(graph& g) : unfolded_type(g) { }
01839
01840 or_node( const or_node& other ) : unfolded_type(other) { }
01841 };
01842 #endif // TBB_PREVIEW_GRAPH_NODES
01843
01845 template< typename T >
01846 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01847 p.register_successor( s );
01848 }
01849
01851 template< typename T >
01852 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01853 p.remove_successor( s );
01854 }
01855
01857 template< typename Body, typename Node >
01858 Body copy_body( Node &n ) {
01859 return n.template copy_function_object<Body>();
01860 }
01861
01862 }
01863
01864 using interface6::graph;
01865 using interface6::graph_node;
01866 using interface6::continue_msg;
01867 using interface6::sender;
01868 using interface6::receiver;
01869 using interface6::continue_receiver;
01870
01871 using interface6::source_node;
01872 using interface6::function_node;
01873 using interface6::multifunction_node;
01874 using interface6::split_node;
01875 using interface6::internal::output_port;
01876 #if TBB_PREVIEW_GRAPH_NODES
01877 using interface6::or_node;
01878 #endif
01879 using interface6::continue_node;
01880 using interface6::overwrite_node;
01881 using interface6::write_once_node;
01882 using interface6::broadcast_node;
01883 using interface6::buffer_node;
01884 using interface6::queue_node;
01885 using interface6::sequencer_node;
01886 using interface6::priority_queue_node;
01887 using interface6::limiter_node;
01888 using namespace interface6::internal::graph_policy_namespace;
01889 using interface6::join_node;
01890 using interface6::input_port;
01891 using interface6::copy_body;
01892 using interface6::make_edge;
01893 using interface6::remove_edge;
01894 using interface6::internal::NO_TAG;
01895 using interface6::internal::tag_value;
01896
01897 }
01898 }
01899
01900 #endif // __TBB_flow_graph_H