flow_graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2012 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033 
00034 // use the VC10 or gcc version of tuple if it is available.
00035 #if __TBB_CPP11_TUPLE_PRESENT
00036     #include <tuple>
00037 #else
00038     #include "compat/tuple"
00039 #endif
00040 
00041 #include<list>
00042 #include<queue>
00043 
00054 namespace tbb {
00055 namespace flow {
00056 
00058 enum concurrency { unlimited = 0, serial = 1 };
00059 
00060 namespace interface6 {
00061 
00062 namespace internal {
00063     template<typename T, typename M>
00064     class successor_cache;
00065 }
00066 
00068 class continue_msg {};
00069 
00070 template< typename T > class sender;
00071 template< typename T > class receiver;
00072 class continue_receiver;
00073 
00075 template< typename T >
00076 class sender {
00077 public:
00079     typedef T output_type;
00080 
00082     typedef receiver<T> successor_type;
00083 
00084     virtual ~sender() {}
00085 
00087     virtual bool register_successor( successor_type &r ) = 0;
00088 
00090     virtual bool remove_successor( successor_type &r ) = 0;
00091 
00093     virtual bool try_get( T & ) { return false; }
00094 
00096     virtual bool try_reserve( T & ) { return false; }
00097 
00099     virtual bool try_release( ) { return false; }
00100 
00102     virtual bool try_consume( ) { return false; }
00103 };
00104 
00105 template< typename T > class limiter_node;  // needed for resetting decrementer
00106 
00108 template< typename T >
00109 class receiver {
00110 public:
00112     typedef T input_type;
00113 
00115     typedef sender<T> predecessor_type;
00116 
00118     virtual ~receiver() {}
00119 
00121     virtual bool try_put( const T& t ) = 0;
00122 
00124     virtual bool register_predecessor( predecessor_type & ) { return false; }
00125 
00127     virtual bool remove_predecessor( predecessor_type & ) { return false; }
00128 
00129 protected:
00131     template<typename U> friend class limiter_node;
00132     virtual void reset_receiver() = 0;
00133     template<typename TT, typename M>
00134     friend class internal::successor_cache;
00135     virtual bool is_continue_receiver() { return false; }
00136 };
00137 
00139 
00140 class continue_receiver : public receiver< continue_msg > {
00141 public:
00142 
00144     typedef continue_msg input_type;
00145 
00147     typedef sender< continue_msg > predecessor_type;
00148 
00150     continue_receiver( int number_of_predecessors = 0 ) {
00151         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00152         my_current_count = 0;
00153     }
00154 
00156     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00157         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00158         my_current_count = 0;
00159     }
00160 
00162     virtual ~continue_receiver() { }
00163 
00165     /* override */ bool register_predecessor( predecessor_type & ) {
00166         spin_mutex::scoped_lock l(my_mutex);
00167         ++my_predecessor_count;
00168         return true;
00169     }
00170 
00172 
00175     /* override */ bool remove_predecessor( predecessor_type & ) {
00176         spin_mutex::scoped_lock l(my_mutex);
00177         --my_predecessor_count;
00178         return true;
00179     }
00180 
00182 
00184     /* override */ bool try_put( const input_type & ) {
00185         {
00186             spin_mutex::scoped_lock l(my_mutex);
00187             if ( ++my_current_count < my_predecessor_count )
00188                 return true;
00189             else
00190                 my_current_count = 0;
00191         }
00192         execute();
00193         return true;
00194     }
00195 
00196 protected:
00197     spin_mutex my_mutex;
00198     int my_predecessor_count;
00199     int my_current_count;
00200     int my_initial_predecessor_count;
00201     // the friend declaration in the base class did not eliminate the "protected class"
00202     // error in gcc 4.1.2
00203     template<typename U> friend class limiter_node;
00204     /*override*/void reset_receiver() {
00205         my_current_count = 0;
00206     }
00207 
00209 
00211     virtual void execute() = 0;
00212     template<typename TT, typename M>
00213     friend class internal::successor_cache;
00214     /*override*/ bool is_continue_receiver() { return true; }
00215 };
00216 
00217 #include "internal/_flow_graph_impl.h"
00218 using namespace internal::graph_policy_namespace;
00219 
00220 class graph;
00221 class graph_node;
00222 
00223 template <typename GraphContainerType, typename GraphNodeType>
00224 class graph_iterator {
00225     friend class graph;
00226     friend class graph_node;
00227 public:
00228     typedef size_t size_type;
00229     typedef GraphNodeType value_type;
00230     typedef GraphNodeType* pointer;
00231     typedef GraphNodeType& reference;
00232     typedef const GraphNodeType& const_reference;
00233     typedef std::forward_iterator_tag iterator_category;
00234 
00236     graph_iterator() : my_graph(NULL), current_node(NULL) {}
00237 
00239     graph_iterator(const graph_iterator& other) :
00240         my_graph(other.my_graph), current_node(other.current_node)
00241     {}
00242 
00244     graph_iterator& operator=(const graph_iterator& other) {
00245         if (this != &other) {
00246             my_graph = other.my_graph;
00247             current_node = other.current_node;
00248         }
00249         return *this;
00250     }
00251 
00253     reference operator*() const;
00254 
00256     pointer operator->() const;
00257 
00259     bool operator==(const graph_iterator& other) const {
00260         return ((my_graph == other.my_graph) && (current_node == other.current_node));
00261     }
00262 
00264     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00265 
00267     graph_iterator& operator++() {
00268         internal_forward();
00269         return *this;
00270     }
00271 
00273     graph_iterator operator++(int) {
00274         graph_iterator result = *this;
00275         operator++();
00276         return result;
00277     }
00278 
00279 private:
00280     // the graph over which we are iterating
00281     GraphContainerType *my_graph;
00282     // pointer into my_graph's my_nodes list
00283     pointer current_node;
00284 
00286     graph_iterator(GraphContainerType *g, bool begin);
00287     void internal_forward();
00288 };
00289 
00291 
00292 class graph : tbb::internal::no_copy {
00293     friend class graph_node;
00294 
00295     template< typename Body >
00296     class run_task : public task {
00297     public:
00298         run_task( Body& body ) : my_body(body) {}
00299         task *execute() {
00300             my_body();
00301             return NULL;
00302         }
00303     private:
00304         Body my_body;
00305     };
00306 
00307     template< typename Receiver, typename Body >
00308     class run_and_put_task : public task {
00309     public:
00310         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00311         task *execute() {
00312             my_receiver.try_put( my_body() );
00313             return NULL;
00314         }
00315     private:
00316         Receiver &my_receiver;
00317         Body my_body;
00318     };
00319 
00320 public:
00322     explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00323     {
00324         own_context = true;
00325         cancelled = false;
00326         caught_exception = false;
00327         my_context = new task_group_context();
00328         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00329         my_root_task->set_ref_count(1);
00330     }
00331 
00333     explicit graph(task_group_context& use_this_context) :
00334     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00335     {
00336         own_context = false;
00337         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00338         my_root_task->set_ref_count(1);
00339     }
00340 
00342 
00343     ~graph() {
00344         wait_for_all();
00345         my_root_task->set_ref_count(0);
00346         task::destroy( *my_root_task );
00347         if (own_context) delete my_context;
00348     }
00349 
00351 
00353     void increment_wait_count() {
00354         if (my_root_task)
00355             my_root_task->increment_ref_count();
00356     }
00357 
00359 
00361     void decrement_wait_count() {
00362         if (my_root_task)
00363             my_root_task->decrement_ref_count();
00364     }
00365 
00367 
00369     template< typename Receiver, typename Body >
00370         void run( Receiver &r, Body body ) {
00371        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00372            run_and_put_task< Receiver, Body >( r, body ) );
00373     }
00374 
00376 
00378     template< typename Body >
00379     void run( Body body ) {
00380        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00381            run_task< Body >( body ) );
00382     }
00383 
00385 
00386     void wait_for_all() {
00387         cancelled = false;
00388         caught_exception = false;
00389         if (my_root_task) {
00390 #if TBB_USE_EXCEPTIONS
00391             try {
00392 #endif
00393                 my_root_task->wait_for_all();
00394                 cancelled = my_context->is_group_execution_cancelled();
00395 #if TBB_USE_EXCEPTIONS
00396             }
00397             catch(...) {
00398                 my_root_task->set_ref_count(1);
00399                 my_context->reset();
00400                 caught_exception = true;
00401                 cancelled = true;
00402                 throw;
00403             }
00404 #endif
00405             my_context->reset();  // consistent with behavior in catch()
00406             my_root_task->set_ref_count(1);
00407         }
00408     }
00409 
00411     task * root_task() {
00412         return my_root_task;
00413     }
00414 
00415     // ITERATORS
00416     template<typename C, typename N>
00417     friend class graph_iterator;
00418 
00419     // Graph iterator typedefs
00420     typedef graph_iterator<graph,graph_node> iterator;
00421     typedef graph_iterator<const graph,const graph_node> const_iterator;
00422 
00423     // Graph iterator constructors
00425     iterator begin() { return iterator(this, true); }
00427     iterator end() { return iterator(this, false); }
00429     const_iterator begin() const { return const_iterator(this, true); }
00431     const_iterator end() const { return const_iterator(this, false); }
00433     const_iterator cbegin() const { return const_iterator(this, true); }
00435     const_iterator cend() const { return const_iterator(this, false); }
00436 
00438     bool is_cancelled() { return cancelled; }
00439     bool exception_thrown() { return caught_exception; }
00440 
00441     // un-thread-safe state reset.
00442     void reset();
00443 
00444 private:
00445     task *my_root_task;
00446     task_group_context *my_context;
00447     bool own_context;
00448     bool cancelled;
00449     bool caught_exception;
00450 
00451     graph_node *my_nodes, *my_nodes_last;
00452 
00453     spin_mutex nodelist_mutex;
00454     void register_node(graph_node *n); 
00455     void remove_node(graph_node *n);
00456 
00457 };
00458 
00459 template <typename C, typename N>
00460 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00461 {
00462     if (begin) current_node = my_graph->my_nodes;
00463     //else it is an end iterator by default
00464 }
00465 
00466 template <typename C, typename N>
00467 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00468     __TBB_ASSERT(current_node, "graph_iterator at end");
00469     return *operator->();
00470 }
00471 
00472 template <typename C, typename N>
00473 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const { 
00474     return current_node;
00475 }
00476 
00477 
00478 template <typename C, typename N>
00479 void graph_iterator<C,N>::internal_forward() {
00480     if (current_node) current_node = current_node->next;
00481 }
00482 
00484 class graph_node : tbb::internal::no_assign {
00485     friend class graph;
00486     template<typename C, typename N>
00487     friend class graph_iterator;
00488 protected:
00489     graph& my_graph;
00490     graph_node *next, *prev;
00491 public:
00492     graph_node(graph& g) : my_graph(g) {
00493         my_graph.register_node(this);
00494     }
00495     virtual ~graph_node() {
00496         my_graph.remove_node(this);
00497     }
00498 
00499 protected:
00500     virtual void reset() = 0;
00501 };
00502 
00503 inline void graph::register_node(graph_node *n) {
00504     n->next = NULL;
00505     {
00506         spin_mutex::scoped_lock lock(nodelist_mutex);
00507         n->prev = my_nodes_last;
00508         if (my_nodes_last) my_nodes_last->next = n;
00509         my_nodes_last = n;
00510         if (!my_nodes) my_nodes = n;
00511     }
00512 }
00513 
00514 inline void graph::remove_node(graph_node *n) {
00515     {
00516         spin_mutex::scoped_lock lock(nodelist_mutex);
00517         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00518         if (n->prev) n->prev->next = n->next;
00519         if (n->next) n->next->prev = n->prev;
00520         if (my_nodes_last == n) my_nodes_last = n->prev;
00521         if (my_nodes == n) my_nodes = n->next;
00522     }
00523     n->prev = n->next = NULL;
00524 }
00525 
00526 inline void graph::reset() {
00527     // reset context
00528     if(my_context) my_context->reset();
00529     cancelled = false;
00530     caught_exception = false;
00531     // reset all the nodes comprising the graph
00532     for(iterator ii = begin(); ii != end(); ++ii) {
00533         graph_node *my_p = &(*ii);
00534         my_p->reset();
00535     }
00536 }
00537 
00538 
00539 #include "internal/_flow_graph_node_impl.h"
00540 
00542 template < typename Output >
00543 class source_node : public graph_node, public sender< Output > {
00544     using graph_node::my_graph;
00545 public:
00547     typedef Output output_type;
00548 
00550     typedef receiver< Output > successor_type;
00551 
00553     template< typename Body >
00554     source_node( graph &g, Body body, bool is_active = true )
00555         : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00556         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00557         my_reserved(false), my_has_cached_item(false)
00558     {
00559         my_successors.set_owner(this);
00560     }
00561 
00563     source_node( const source_node& src ) :
00564         graph_node(src.my_graph), sender<Output>(),
00565         my_root_task( src.my_root_task), my_active(src.init_my_active),
00566         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00567         my_reserved(false), my_has_cached_item(false)
00568     {
00569         my_successors.set_owner(this);
00570     }
00571 
00573     ~source_node() { delete my_body; }
00574 
00576     /* override */ bool register_successor( receiver<output_type> &r ) {
00577         spin_mutex::scoped_lock lock(my_mutex);
00578         my_successors.register_successor(r);
00579         if ( my_active )
00580             spawn_put();
00581         return true;
00582     }
00583 
00585     /* override */ bool remove_successor( receiver<output_type> &r ) {
00586         spin_mutex::scoped_lock lock(my_mutex);
00587         my_successors.remove_successor(r);
00588         return true;
00589     }
00590 
00592     /*override */ bool try_get( output_type &v ) {
00593         spin_mutex::scoped_lock lock(my_mutex);
00594         if ( my_reserved )
00595             return false;
00596 
00597         if ( my_has_cached_item ) {
00598             v = my_cached_item;
00599             my_has_cached_item = false;
00600             return true;
00601         }
00602         return false;
00603     }
00604 
00606     /* override */ bool try_reserve( output_type &v ) {
00607         spin_mutex::scoped_lock lock(my_mutex);
00608         if ( my_reserved ) {
00609             return false;
00610         }
00611 
00612         if ( my_has_cached_item ) {
00613             v = my_cached_item;
00614             my_reserved = true;
00615             return true;
00616         } else {
00617             return false;
00618         }
00619     }
00620 
00622 
00623     /* override */ bool try_release( ) {
00624         spin_mutex::scoped_lock lock(my_mutex);
00625         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00626         my_reserved = false;
00627         if(!my_successors.empty())
00628             spawn_put();
00629         return true;
00630     }
00631 
00633     /* override */ bool try_consume( ) {
00634         spin_mutex::scoped_lock lock(my_mutex);
00635         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00636         my_reserved = false;
00637         my_has_cached_item = false;
00638         if ( !my_successors.empty() ) {
00639             spawn_put();
00640         }
00641         return true;
00642     }
00643 
00645     void activate() {
00646         spin_mutex::scoped_lock lock(my_mutex);
00647         my_active = true;
00648         if ( !my_successors.empty() )
00649             spawn_put();
00650     }
00651 
00652     template<class Body>
00653     Body copy_function_object() { 
00654         internal::source_body<output_type> &body_ref = *this->my_body;
00655         return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body(); 
00656     }
00657 
00658 protected:
00659 
00661     void reset() {
00662         my_active = init_my_active;
00663         my_reserved =false;
00664         if(my_has_cached_item) {
00665             my_has_cached_item = false;
00666         }
00667     }
00668 
00669 private:
00670     task *my_root_task;
00671     spin_mutex my_mutex;
00672     bool my_active;
00673     bool init_my_active;
00674     internal::source_body<output_type> *my_body;
00675     internal::broadcast_cache< output_type > my_successors;
00676     bool my_reserved;
00677     bool my_has_cached_item;
00678     output_type my_cached_item;
00679 
00680     friend class internal::source_task< source_node< output_type > >;
00681 
00682     // used by apply_body, can invoke body of node.
00683 
00684     bool try_reserve_apply_body(output_type &v) {
00685         spin_mutex::scoped_lock lock(my_mutex);
00686         if ( my_reserved ) {
00687             return false;
00688         }
00689 
00690         if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00691             my_has_cached_item = true;
00692 
00693         if ( my_has_cached_item ) {
00694             v = my_cached_item;
00695             my_reserved = true;
00696             return true;
00697         } else {
00698             return false;
00699         }
00700     }
00701 
00703     /* override */ void apply_body( ) {
00704         output_type v;
00705         if ( !try_reserve_apply_body(v) )
00706             return;
00707 
00708         if ( my_successors.try_put( v ) )
00709             try_consume();
00710         else
00711             try_release();
00712     }
00713 
00715     /* override */ void spawn_put( ) {
00716         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00717            internal::source_task< source_node< output_type > >( *this ) );
00718     }
00719 };  // source_node
00720 
00722 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00723 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00724     using graph_node::my_graph;
00725 public:
00726     typedef Input input_type;
00727     typedef Output output_type;
00728     typedef sender< input_type > predecessor_type;
00729     typedef receiver< output_type > successor_type;
00730     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00731     typedef internal::function_output<output_type> fOutput_type;
00732 
00734     template< typename Body >
00735     function_node( graph &g, size_t concurrency, Body body ) :
00736         graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00737     {}
00738 
00740     function_node( const function_node& src ) :
00741         graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00742         fOutput_type()
00743     {}
00744 
00745     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00746 
00747 protected:
00748 
00749     // override of graph_node's reset.
00750     /*override*/void reset() {fInput_type::reset_function_input(); }
00751 
00752     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00753 };
00754 
00756 template < typename Input, typename Output, typename Allocator >
00757 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00758     using graph_node::my_graph;
00759 public:
00760     typedef Input input_type;
00761     typedef Output output_type;
00762     typedef sender< input_type > predecessor_type;
00763     typedef receiver< output_type > successor_type;
00764     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00765     typedef internal::function_input_queue<input_type, Allocator> queue_type;
00766     typedef internal::function_output<output_type> fOutput_type;
00767 
00769     template< typename Body >
00770     function_node( graph &g, size_t concurrency, Body body ) :
00771         graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00772     {}
00773 
00775     function_node( const function_node& src ) :
00776         graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00777     {}
00778 
00779     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00780 
00781 protected:
00782 
00783     /*override*/void reset() { fInput_type::reset_function_input(); }
00784 
00785     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00786 };
00787 
00788 #include "tbb/internal/_flow_graph_types_impl.h"
00789 
00791 // Output is a tuple of output types.
00792 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00793 class multifunction_node :
00794     public graph_node,
00795     public internal::multifunction_input
00796     <
00797         Input,
00798         typename internal::wrap_tuple_elements<
00799             std::tuple_size<Output>::value,  // #elements in tuple
00800             internal::function_output,  // wrap this around each element
00801             Output // the tuple providing the types
00802         >::type,
00803         Allocator
00804     > {
00805     using graph_node::my_graph;
00806 private:
00807     static const int N = std::tuple_size<Output>::value;
00808 public:
00809     typedef Input input_type;
00810     typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00811 private:
00812     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00813     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00814 public:
00815     template<typename Body>
00816     multifunction_node( graph &g, size_t concurrency, Body body ) :
00817         graph_node(g), base_type(g,concurrency, body)
00818     {}
00819     multifunction_node( const multifunction_node &other) :
00820         graph_node(other.my_graph), base_type(other)
00821     {}
00822     // all the guts are in multifunction_input...
00823 protected:
00824     /*override*/void reset() { base_type::reset(); }
00825 };  // multifunction_node
00826 
00827 template < typename Input, typename Output, typename Allocator >
00828 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00829     typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00830     using graph_node::my_graph;
00831     static const int N = std::tuple_size<Output>::value;
00832 public:
00833     typedef Input input_type;
00834     typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00835 private:
00836     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00837     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00838 public:
00839     template<typename Body>
00840     multifunction_node( graph &g, size_t concurrency, Body body) :
00841         graph_node(g), base_type(g,concurrency, body, new queue_type())
00842     {}
00843     multifunction_node( const multifunction_node &other) :
00844         graph_node(other.my_graph), base_type(other, new queue_type())
00845     {}
00846     // all the guts are in multifunction_input...
00847 protected:
00848     /*override*/void reset() { base_type::reset(); }
00849 };  // multifunction_node
00850 
00852 //  successors.  The node has unlimited concurrency, so though it is marked as
00853 //  "rejecting" it does not reject inputs.
00854 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00855 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00856     static const int N = std::tuple_size<TupleType>::value;
00857     typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00858 public:
00859     typedef typename base_type::output_ports_type output_ports_type;
00860 private:
00861     struct splitting_body {
00862         void operator()(const TupleType& t, output_ports_type &p) {
00863             internal::emit_element<N>::emit_this(t, p);
00864         }
00865     };
00866 public:
00867     typedef TupleType input_type;
00868     typedef Allocator allocator_type;
00869     split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00870     split_node( const split_node & other) : base_type(other) {}
00871 };
00872 
00874 template <typename Output>
00875 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00876     using graph_node::my_graph;
00877 public:
00878     typedef continue_msg input_type;
00879     typedef Output output_type;
00880     typedef sender< input_type > predecessor_type;
00881     typedef receiver< output_type > successor_type;
00882     typedef internal::function_output<output_type> fOutput_type;
00883 
00885     template <typename Body >
00886     continue_node( graph &g, Body body ) :
00887         graph_node(g), internal::continue_input<output_type>( g, body )
00888     {}
00889 
00891     template <typename Body >
00892     continue_node( graph &g, int number_of_predecessors, Body body ) :
00893         graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00894     {}
00895 
00897     continue_node( const continue_node& src ) :
00898         graph_node(src.my_graph), internal::continue_input<output_type>(src),
00899         internal::function_output<Output>()
00900     {}
00901 
00902     bool try_put(const input_type &i) { return internal::continue_input<Output>::try_put(i); }
00903 
00904 protected:
00905     /*override*/void reset() { internal::continue_input<Output>::reset_receiver(); }
00906 
00907     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00908 };
00909 
00910 template< typename T >
00911 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00912     using graph_node::my_graph;
00913 public:
00914     typedef T input_type;
00915     typedef T output_type;
00916     typedef sender< input_type > predecessor_type;
00917     typedef receiver< output_type > successor_type;
00918 
00919     overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00920         my_successors.set_owner( this );
00921     }
00922 
00923     // Copy constructor; doesn't take anything from src; default won't work
00924     overwrite_node( const overwrite_node& src ) :
00925         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00926     {
00927         my_successors.set_owner( this );
00928     }
00929 
00930     ~overwrite_node() {}
00931 
00932     /* override */ bool register_successor( successor_type &s ) {
00933         spin_mutex::scoped_lock l( my_mutex );
00934         if ( my_buffer_is_valid ) {
00935             // We have a valid value that must be forwarded immediately.
00936             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
00937                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
00938                 my_successors.register_successor( s );
00939                 return true;
00940             } else {
00941                 // We don't add the successor: it rejected our put and we became its predecessor instead
00942                 return false;
00943             }
00944         } else {
00945             // No valid value yet, just add as successor
00946             my_successors.register_successor( s );
00947             return true;
00948         }
00949     }
00950 
00951     /* override */ bool remove_successor( successor_type &s ) {
00952         spin_mutex::scoped_lock l( my_mutex );
00953         my_successors.remove_successor(s);
00954         return true;
00955     }
00956 
00957     /* override */ bool try_put( const T &v ) {
00958         spin_mutex::scoped_lock l( my_mutex );
00959         my_buffer = v;
00960         my_buffer_is_valid = true;
00961         my_successors.try_put(v);
00962         return true;
00963     }
00964 
00965     /* override */ bool try_get( T &v ) {
00966         spin_mutex::scoped_lock l( my_mutex );
00967         if ( my_buffer_is_valid ) {
00968             v = my_buffer;
00969             return true;
00970         } else {
00971             return false;
00972         }
00973     }
00974 
00975     bool is_valid() {
00976        spin_mutex::scoped_lock l( my_mutex );
00977        return my_buffer_is_valid;
00978     }
00979 
00980     void clear() {
00981        spin_mutex::scoped_lock l( my_mutex );
00982        my_buffer_is_valid = false;
00983     }
00984 
00985 protected:
00986 
00987     /*override*/void reset() { my_buffer_is_valid = false; }
00988 
00989     spin_mutex my_mutex;
00990     internal::broadcast_cache< T, null_rw_mutex > my_successors;
00991     T my_buffer;
00992     bool my_buffer_is_valid;
00993     /*override*/void reset_receiver() {}
00994 };
00995 
00996 template< typename T >
00997 class write_once_node : public overwrite_node<T> {
00998 public:
00999     typedef T input_type;
01000     typedef T output_type;
01001     typedef sender< input_type > predecessor_type;
01002     typedef receiver< output_type > successor_type;
01003 
01005     write_once_node(graph& g) : overwrite_node<T>(g) {}
01006 
01008     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
01009 
01010     /* override */ bool try_put( const T &v ) {
01011         spin_mutex::scoped_lock l( this->my_mutex );
01012         if ( this->my_buffer_is_valid ) {
01013             return false;
01014         } else {
01015             this->my_buffer = v;
01016             this->my_buffer_is_valid = true;
01017             this->my_successors.try_put(v);
01018             return true;
01019         }
01020     }
01021 };
01022 
01024 template <typename T>
01025 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
01026     using graph_node::my_graph;
01027     internal::broadcast_cache<T> my_successors;
01028 public:
01029     typedef T input_type;
01030     typedef T output_type;
01031     typedef sender< input_type > predecessor_type;
01032     typedef receiver< output_type > successor_type;
01033 
01034     broadcast_node(graph& g) : graph_node(g) {
01035         my_successors.set_owner( this );
01036     }
01037 
01038     // Copy constructor
01039     broadcast_node( const broadcast_node& src ) :
01040         graph_node(src.my_graph), receiver<T>(), sender<T>()
01041     {
01042         my_successors.set_owner( this );
01043     }
01044 
01046     virtual bool register_successor( receiver<T> &r ) {
01047         my_successors.register_successor( r );
01048         return true;
01049     }
01050 
01052     virtual bool remove_successor( receiver<T> &r ) {
01053         my_successors.remove_successor( r );
01054         return true;
01055     }
01056 
01057     /* override */ bool try_put( const T &t ) {
01058         my_successors.try_put(t);
01059         return true;
01060     }
01061 protected:
01062     /*override*/void reset() {}
01063     /*override*/void reset_receiver() {}
01064 };
01065 
01066 #include "internal/_flow_graph_item_buffer_impl.h"
01067 
01069 template <typename T, typename A=cache_aligned_allocator<T> >
01070 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
01071     using graph_node::my_graph;
01072 public:
01073     typedef T input_type;
01074     typedef T output_type;
01075     typedef sender< input_type > predecessor_type;
01076     typedef receiver< output_type > successor_type;
01077     typedef buffer_node<T, A> my_class;
01078 protected:
01079     typedef size_t size_type;
01080     internal::round_robin_cache< T, null_rw_mutex > my_successors;
01081 
01082     task *my_parent;
01083 
01084     friend class internal::forward_task< buffer_node< T, A > >;
01085 
01086     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
01087     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01088 
01089     // implements the aggregator_operation concept
01090     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
01091     public:
01092         char type;
01093         T *elem;
01094         successor_type *r;
01095         buffer_operation(const T& e, op_type t) :
01096             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01097         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01098     };
01099 
01100     bool forwarder_busy;
01101     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
01102     friend class internal::aggregating_functor<my_class, buffer_operation>;
01103     internal::aggregator< my_handler, buffer_operation> my_aggregator;
01104 
01105     virtual void handle_operations(buffer_operation *op_list) {
01106         buffer_operation *tmp;
01107         bool try_forwarding=false;
01108         while (op_list) {
01109             tmp = op_list;
01110             op_list = op_list->next;
01111             switch (tmp->type) {
01112             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
01113             case rem_succ: internal_rem_succ(tmp); break;
01114             case req_item: internal_pop(tmp); break;
01115             case res_item: internal_reserve(tmp); break;
01116             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
01117             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
01118             case put_item: internal_push(tmp);  try_forwarding = true; break;
01119             case try_fwd:  internal_forward(tmp); break;
01120             }
01121         }
01122         if (try_forwarding && !forwarder_busy) {
01123             forwarder_busy = true;
01124             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01125         }
01126     }
01127 
01129     virtual void forward() {
01130         buffer_operation op_data(try_fwd);
01131         do {
01132             op_data.status = WAIT;
01133             my_aggregator.execute(&op_data);
01134         } while (op_data.status == SUCCEEDED);
01135     }
01136 
01138     virtual void internal_reg_succ(buffer_operation *op) {
01139         my_successors.register_successor(*(op->r));
01140         __TBB_store_with_release(op->status, SUCCEEDED);
01141     }
01142 
01144     virtual void internal_rem_succ(buffer_operation *op) {
01145         my_successors.remove_successor(*(op->r));
01146         __TBB_store_with_release(op->status, SUCCEEDED);
01147     }
01148 
01150     virtual void internal_forward(buffer_operation *op) {
01151         T i_copy;
01152         bool success = false; // flagged when a successor accepts
01153         size_type counter = my_successors.size();
01154         // Try forwarding, giving each successor a chance
01155         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01156             this->fetch_back(i_copy);
01157             if( my_successors.try_put(i_copy) ) {
01158                 this->invalidate_back();
01159                 --(this->my_tail);
01160                 success = true; // found an accepting successor
01161             }
01162             --counter;
01163         }
01164         if (success && !counter)
01165             __TBB_store_with_release(op->status, SUCCEEDED);
01166         else {
01167             __TBB_store_with_release(op->status, FAILED);
01168             forwarder_busy = false;
01169         }
01170     }
01171 
01172     virtual void internal_push(buffer_operation *op) {
01173         this->push_back(*(op->elem));
01174         __TBB_store_with_release(op->status, SUCCEEDED);
01175     }
01176 
01177     virtual void internal_pop(buffer_operation *op) {
01178         if(this->pop_back(*(op->elem))) {
01179             __TBB_store_with_release(op->status, SUCCEEDED);
01180         }
01181         else {
01182             __TBB_store_with_release(op->status, FAILED);
01183         }
01184     }
01185 
01186     virtual void internal_reserve(buffer_operation *op) {
01187         if(this->reserve_front(*(op->elem))) {
01188             __TBB_store_with_release(op->status, SUCCEEDED);
01189         }
01190         else {
01191             __TBB_store_with_release(op->status, FAILED);
01192         }
01193     }
01194 
01195     virtual void internal_consume(buffer_operation *op) {
01196         this->consume_front();
01197         __TBB_store_with_release(op->status, SUCCEEDED);
01198     }
01199 
01200     virtual void internal_release(buffer_operation *op) {
01201         this->release_front();
01202         __TBB_store_with_release(op->status, SUCCEEDED);
01203     }
01204 
01205 public:
01207     buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01208         my_parent( g.root_task() ), forwarder_busy(false) {
01209         my_successors.set_owner(this);
01210         my_aggregator.initialize_handler(my_handler(this));
01211     }
01212 
01214     buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01215         reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01216         my_parent( src.my_parent ) {
01217         forwarder_busy = false;
01218         my_successors.set_owner(this);
01219         my_aggregator.initialize_handler(my_handler(this));
01220     }
01221 
01222     virtual ~buffer_node() {}
01223 
01224     //
01225     // message sender implementation
01226     //
01227 
01229 
01230     /* override */ bool register_successor( receiver<output_type> &r ) {
01231         buffer_operation op_data(reg_succ);
01232         op_data.r = &r;
01233         my_aggregator.execute(&op_data);
01234         return true;
01235     }
01236 
01238 
01240     /* override */ bool remove_successor( receiver<output_type> &r ) {
01241         r.remove_predecessor(*this);
01242         buffer_operation op_data(rem_succ);
01243         op_data.r = &r;
01244         my_aggregator.execute(&op_data);
01245         return true;
01246     }
01247 
01249 
01251     /* override */ bool try_get( T &v ) {
01252         buffer_operation op_data(req_item);
01253         op_data.elem = &v;
01254         my_aggregator.execute(&op_data);
01255         return (op_data.status==SUCCEEDED);
01256     }
01257 
01259 
01261     /* override */ bool try_reserve( T &v ) {
01262         buffer_operation op_data(res_item);
01263         op_data.elem = &v;
01264         my_aggregator.execute(&op_data);
01265         return (op_data.status==SUCCEEDED);
01266     }
01267 
01269 
01270     /* override */ bool try_release() {
01271         buffer_operation op_data(rel_res);
01272         my_aggregator.execute(&op_data);
01273         return true;
01274     }
01275 
01277 
01278     /* override */ bool try_consume() {
01279         buffer_operation op_data(con_res);
01280         my_aggregator.execute(&op_data);
01281         return true;
01282     }
01283 
01285 
01286     /* override */ bool try_put(const T &t) {
01287         buffer_operation op_data(t, put_item);
01288         my_aggregator.execute(&op_data);
01289         return true;
01290     }
01291 
01292 protected:
01293 
01294     /*override*/void reset() {
01295         reservable_item_buffer<T, A>::reset();
01296         forwarder_busy = false;
01297     }
01298 
01299     /*override*/void reset_receiver() {
01300         // nothing to do; no predecesor_cache
01301     }
01302 
01303 };
01304 
01306 template <typename T, typename A=cache_aligned_allocator<T> >
01307 class queue_node : public buffer_node<T, A> {
01308 protected:
01309     typedef typename buffer_node<T, A>::size_type size_type;
01310     typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01311 
01312     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01313 
01315     /* override */ void internal_forward(queue_operation *op) {
01316         T i_copy;
01317         bool success = false; // flagged when a successor accepts
01318         size_type counter = this->my_successors.size();
01319         if (this->my_reserved || !this->item_valid(this->my_head)) {
01320             __TBB_store_with_release(op->status, FAILED);
01321             this->forwarder_busy = false;
01322             return;
01323         }
01324         // Keep trying to send items while there is at least one accepting successor
01325         while (counter>0 && this->item_valid(this->my_head)) {
01326             this->fetch_front(i_copy);
01327             if(this->my_successors.try_put(i_copy)) {
01328                  this->invalidate_front();
01329                  ++(this->my_head);
01330                 success = true; // found an accepting successor
01331             }
01332             --counter;
01333         }
01334         if (success && !counter)
01335             __TBB_store_with_release(op->status, SUCCEEDED);
01336         else {
01337             __TBB_store_with_release(op->status, FAILED);
01338             this->forwarder_busy = false;
01339         }
01340     }
01341 
01342     /* override */ void internal_pop(queue_operation *op) {
01343         if ( this->my_reserved || !this->item_valid(this->my_head)){
01344             __TBB_store_with_release(op->status, FAILED);
01345         }
01346         else {
01347             this->pop_front(*(op->elem));
01348             __TBB_store_with_release(op->status, SUCCEEDED);
01349         }
01350     }
01351     /* override */ void internal_reserve(queue_operation *op) {
01352         if (this->my_reserved || !this->item_valid(this->my_head)) {
01353             __TBB_store_with_release(op->status, FAILED);
01354         }
01355         else {
01356             this->my_reserved = true;
01357             this->fetch_front(*(op->elem));
01358             this->invalidate_front();
01359             __TBB_store_with_release(op->status, SUCCEEDED);
01360         }
01361     }
01362     /* override */ void internal_consume(queue_operation *op) {
01363         this->consume_front();
01364         __TBB_store_with_release(op->status, SUCCEEDED);
01365     }
01366 
01367 public:
01368     typedef T input_type;
01369     typedef T output_type;
01370     typedef sender< input_type > predecessor_type;
01371     typedef receiver< output_type > successor_type;
01372 
01374     queue_node( graph &g ) : buffer_node<T, A>(g) {}
01375 
01377     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01378 };
01379 
01381 template< typename T, typename A=cache_aligned_allocator<T> >
01382 class sequencer_node : public queue_node<T, A> {
01383     internal::function_body< T, size_t > *my_sequencer;
01384 public:
01385     typedef T input_type;
01386     typedef T output_type;
01387     typedef sender< input_type > predecessor_type;
01388     typedef receiver< output_type > successor_type;
01389 
01391     template< typename Sequencer >
01392     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01393         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01394 
01396     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01397         my_sequencer( src.my_sequencer->clone() ) {}
01398 
01400     ~sequencer_node() { delete my_sequencer; }
01401 protected:
01402     typedef typename buffer_node<T, A>::size_type size_type;
01403     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01404 
01405     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01406 
01407 private:
01408     /* override */ void internal_push(sequencer_operation *op) {
01409         size_type tag = (*my_sequencer)(*(op->elem));
01410 
01411         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01412 
01413         if(this->size() > this->capacity())
01414             this->grow_my_array(this->size());  // tail already has 1 added to it
01415         this->item(tag) = std::make_pair( *(op->elem), true );
01416         __TBB_store_with_release(op->status, SUCCEEDED);
01417     }
01418 };
01419 
01421 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01422 class priority_queue_node : public buffer_node<T, A> {
01423 public:
01424     typedef T input_type;
01425     typedef T output_type;
01426     typedef buffer_node<T,A> base_type;
01427     typedef sender< input_type > predecessor_type;
01428     typedef receiver< output_type > successor_type;
01429 
01431     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01432 
01434     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01435 
01436 protected:
01437 
01438     /*override*/void reset() {
01439         mark = 0;
01440         base_type::reset();
01441     }
01442 
01443     typedef typename buffer_node<T, A>::size_type size_type;
01444     typedef typename buffer_node<T, A>::item_type item_type;
01445     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01446 
01447     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01448 
01449     /* override */ void handle_operations(prio_operation *op_list) {
01450         prio_operation *tmp /*, *pop_list*/ ;
01451         bool try_forwarding=false;
01452         while (op_list) {
01453             tmp = op_list;
01454             op_list = op_list->next;
01455             switch (tmp->type) {
01456             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01457             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01458             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01459             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01460             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01461             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01462             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01463             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01464             }
01465         }
01466         // process pops!  for now, no special pop processing
01467         if (mark<this->my_tail) heapify();
01468         if (try_forwarding && !this->forwarder_busy) {
01469             this->forwarder_busy = true;
01470             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01471         }
01472     }
01473 
01475     /* override */ void internal_forward(prio_operation *op) {
01476         T i_copy;
01477         bool success = false; // flagged when a successor accepts
01478         size_type counter = this->my_successors.size();
01479 
01480         if (this->my_reserved || this->my_tail == 0) {
01481             __TBB_store_with_release(op->status, FAILED);
01482             this->forwarder_busy = false;
01483             return;
01484         }
01485         // Keep trying to send while there exists an accepting successor
01486         while (counter>0 && this->my_tail > 0) {
01487             i_copy = this->my_array[0].first;
01488             bool msg = this->my_successors.try_put(i_copy);
01489             if ( msg == true ) {
01490                  if (mark == this->my_tail) --mark;
01491                 --(this->my_tail);
01492                 this->my_array[0].first=this->my_array[this->my_tail].first;
01493                 if (this->my_tail > 1) // don't reheap for heap of size 1
01494                     reheap();
01495                 success = true; // found an accepting successor
01496             }
01497             --counter;
01498         }
01499         if (success && !counter)
01500             __TBB_store_with_release(op->status, SUCCEEDED);
01501         else {
01502             __TBB_store_with_release(op->status, FAILED);
01503             this->forwarder_busy = false;
01504         }
01505     }
01506 
01507     /* override */ void internal_push(prio_operation *op) {
01508         if ( this->my_tail >= this->my_array_size )
01509             this->grow_my_array( this->my_tail + 1 );
01510         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01511         ++(this->my_tail);
01512         __TBB_store_with_release(op->status, SUCCEEDED);
01513     }
01514     /* override */ void internal_pop(prio_operation *op) {
01515         if ( this->my_reserved == true || this->my_tail == 0 ) {
01516             __TBB_store_with_release(op->status, FAILED);
01517         }
01518         else {
01519             if (mark<this->my_tail &&
01520                 compare(this->my_array[0].first,
01521                         this->my_array[this->my_tail-1].first)) {
01522                 // there are newly pushed elems; last one higher than top
01523                 // copy the data
01524                 *(op->elem) = this->my_array[this->my_tail-1].first;
01525                 --(this->my_tail);
01526                 __TBB_store_with_release(op->status, SUCCEEDED);
01527             }
01528             else { // extract and push the last element down heap
01529                 *(op->elem) = this->my_array[0].first; // copy the data
01530                 if (mark == this->my_tail) --mark;
01531                 --(this->my_tail);
01532                 __TBB_store_with_release(op->status, SUCCEEDED);
01533                 this->my_array[0].first=this->my_array[this->my_tail].first;
01534                 if (this->my_tail > 1) // don't reheap for heap of size 1
01535                     reheap();
01536             }
01537         }
01538     }
01539     /* override */ void internal_reserve(prio_operation *op) {
01540         if (this->my_reserved == true || this->my_tail == 0) {
01541             __TBB_store_with_release(op->status, FAILED);
01542         }
01543         else {
01544             this->my_reserved = true;
01545             *(op->elem) = reserved_item = this->my_array[0].first;
01546             if (mark == this->my_tail) --mark;
01547             --(this->my_tail);
01548             __TBB_store_with_release(op->status, SUCCEEDED);
01549             this->my_array[0].first = this->my_array[this->my_tail].first;
01550             if (this->my_tail > 1) // don't reheap for heap of size 1
01551                 reheap();
01552         }
01553     }
01554     /* override */ void internal_consume(prio_operation *op) {
01555         this->my_reserved = false;
01556         __TBB_store_with_release(op->status, SUCCEEDED);
01557     }
01558     /* override */ void internal_release(prio_operation *op) {
01559         if (this->my_tail >= this->my_array_size)
01560             this->grow_my_array( this->my_tail + 1 );
01561         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01562         ++(this->my_tail);
01563         this->my_reserved = false;
01564         __TBB_store_with_release(op->status, SUCCEEDED);
01565         heapify();
01566     }
01567 private:
01568     Compare compare;
01569     size_type mark;
01570     input_type reserved_item;
01571 
01572     void heapify() {
01573         if (!mark) mark = 1;
01574         for (; mark<this->my_tail; ++mark) { // for each unheaped element
01575             size_type cur_pos = mark;
01576             input_type to_place = this->my_array[mark].first;
01577             do { // push to_place up the heap
01578                 size_type parent = (cur_pos-1)>>1;
01579                 if (!compare(this->my_array[parent].first, to_place))
01580                     break;
01581                 this->my_array[cur_pos].first = this->my_array[parent].first;
01582                 cur_pos = parent;
01583             } while( cur_pos );
01584             this->my_array[cur_pos].first = to_place;
01585         }
01586     }
01587 
01588     void reheap() {
01589         size_type cur_pos=0, child=1;
01590         while (child < mark) {
01591             size_type target = child;
01592             if (child+1<mark &&
01593                 compare(this->my_array[child].first,
01594                         this->my_array[child+1].first))
01595                 ++target;
01596             // target now has the higher priority child
01597             if (compare(this->my_array[target].first,
01598                         this->my_array[this->my_tail].first))
01599                 break;
01600             this->my_array[cur_pos].first = this->my_array[target].first;
01601             cur_pos = target;
01602             child = (cur_pos<<1)+1;
01603         }
01604         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01605     }
01606 };
01607 
01609 
01612 template< typename T >
01613 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01614     using graph_node::my_graph;
01615 public:
01616     typedef T input_type;
01617     typedef T output_type;
01618     typedef sender< input_type > predecessor_type;
01619     typedef receiver< output_type > successor_type;
01620 
01621 private:
01622     task *my_root_task;
01623     size_t my_threshold;
01624     size_t my_count;
01625     internal::predecessor_cache< T > my_predecessors;
01626     spin_mutex my_mutex;
01627     internal::broadcast_cache< T > my_successors;
01628     int init_decrement_predecessors;
01629 
01630     friend class internal::forward_task< limiter_node<T> >;
01631 
01632     // Let decrementer call decrement_counter()
01633     friend class internal::decrementer< limiter_node<T> >;
01634 
01635     void decrement_counter() {
01636         input_type v;
01637 
01638         // If we can't get / put an item immediately then drop the count
01639         if ( my_predecessors.get_item( v ) == false
01640              || my_successors.try_put(v) == false ) {
01641             spin_mutex::scoped_lock lock(my_mutex);
01642             --my_count;
01643             if ( !my_predecessors.empty() )
01644                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01645                             internal::forward_task< limiter_node<T> >( *this ) );
01646         }
01647     }
01648 
01649     void forward() {
01650         {
01651             spin_mutex::scoped_lock lock(my_mutex);
01652             if ( my_count < my_threshold )
01653                 ++my_count;
01654             else
01655                 return;
01656         }
01657         decrement_counter();
01658     }
01659 
01660 public:
01662     internal::decrementer< limiter_node<T> > decrement;
01663 
01665     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01666         graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01667         init_decrement_predecessors(num_decrement_predecessors),
01668         decrement(num_decrement_predecessors)
01669     {
01670         my_predecessors.set_owner(this);
01671         my_successors.set_owner(this);
01672         decrement.set_owner(this);
01673     }
01674 
01676     limiter_node( const limiter_node& src ) :
01677         graph_node(src.my_graph), receiver<T>(), sender<T>(),
01678         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01679         init_decrement_predecessors(src.init_decrement_predecessors),
01680         decrement(src.init_decrement_predecessors)
01681     {
01682         my_predecessors.set_owner(this);
01683         my_successors.set_owner(this);
01684         decrement.set_owner(this);
01685     }
01686 
01688     /* override */ bool register_successor( receiver<output_type> &r ) {
01689         my_successors.register_successor(r);
01690         return true;
01691     }
01692 
01694 
01695     /* override */ bool remove_successor( receiver<output_type> &r ) {
01696         r.remove_predecessor(*this);
01697         my_successors.remove_successor(r);
01698         return true;
01699     }
01700 
01702     /* override */ bool try_put( const T &t ) {
01703         {
01704             spin_mutex::scoped_lock lock(my_mutex);
01705             if ( my_count >= my_threshold )
01706                 return false;
01707             else
01708                 ++my_count;
01709         }
01710 
01711         bool msg = my_successors.try_put(t);
01712 
01713         if ( msg != true ) {
01714             spin_mutex::scoped_lock lock(my_mutex);
01715             --my_count;
01716             if ( !my_predecessors.empty() )
01717                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01718                             internal::forward_task< limiter_node<T> >( *this ) );
01719         }
01720 
01721         return msg;
01722     }
01723 
01725     /* override */ bool register_predecessor( predecessor_type &src ) {
01726         spin_mutex::scoped_lock lock(my_mutex);
01727         my_predecessors.add( src );
01728         if ( my_count < my_threshold && !my_successors.empty() )
01729             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01730                            internal::forward_task< limiter_node<T> >( *this ) );
01731         return true;
01732     }
01733 
01735     /* override */ bool remove_predecessor( predecessor_type &src ) {
01736         my_predecessors.remove( src );
01737         return true;
01738     }
01739 
01740 protected:
01741 
01742     /*override*/void reset() {
01743         my_count = 0;
01744         my_predecessors.reset();
01745         decrement.reset_receiver();
01746     }
01747 
01748     /*override*/void reset_receiver() { my_predecessors.reset(); }
01749 };
01750 
01751 #include "internal/_flow_graph_join_impl.h"
01752 
01753 using internal::reserving_port;
01754 using internal::queueing_port;
01755 using internal::tag_matching_port;
01756 using internal::input_port;
01757 using internal::tag_value;
01758 using internal::NO_TAG;
01759 
01760 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01761 
01762 template<typename OutputTuple>
01763 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01764 private:
01765     static const int N = std::tuple_size<OutputTuple>::value;
01766     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01767 public:
01768     typedef OutputTuple output_type;
01769     typedef typename unfolded_type::input_ports_type input_ports_type;
01770     join_node(graph &g) : unfolded_type(g) { }
01771     join_node(const join_node &other) : unfolded_type(other) {}
01772 };
01773 
01774 template<typename OutputTuple>
01775 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01776 private:
01777     static const int N = std::tuple_size<OutputTuple>::value;
01778     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01779 public:
01780     typedef OutputTuple output_type;
01781     typedef typename unfolded_type::input_ports_type input_ports_type;
01782     join_node(graph &g) : unfolded_type(g) { }
01783     join_node(const join_node &other) : unfolded_type(other) {}
01784 };
01785 
01786 // template for tag_matching join_node
01787 template<typename OutputTuple>
01788 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01789       tag_matching_port, OutputTuple, tag_matching> {
01790 private:
01791     static const int N = std::tuple_size<OutputTuple>::value;
01792     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01793 public:
01794     typedef OutputTuple output_type;
01795     typedef typename unfolded_type::input_ports_type input_ports_type;
01796     template<typename B0, typename B1>
01797     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01798     template<typename B0, typename B1, typename B2>
01799     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01800     template<typename B0, typename B1, typename B2, typename B3>
01801     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01802     template<typename B0, typename B1, typename B2, typename B3, typename B4>
01803     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01804 #if __TBB_VARIADIC_MAX >= 6
01805     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01806     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01807 #endif
01808 #if __TBB_VARIADIC_MAX >= 7
01809     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01810     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01811 #endif
01812 #if __TBB_VARIADIC_MAX >= 8
01813     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01814     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01815 #endif
01816 #if __TBB_VARIADIC_MAX >= 9
01817     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01818     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01819 #endif
01820 #if __TBB_VARIADIC_MAX >= 10
01821     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01822     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01823 #endif
01824     join_node(const join_node &other) : unfolded_type(other) {}
01825 };
01826 
01827 #if TBB_PREVIEW_GRAPH_NODES
01828 // or node
01829 #include "internal/_flow_graph_or_impl.h"
01830 
01831 template<typename InputTuple>
01832 class or_node : public internal::unfolded_or_node<InputTuple> {
01833 private:
01834     static const int N = std::tuple_size<InputTuple>::value;
01835 public:
01836     typedef typename internal::or_output_type<InputTuple>::type output_type;
01837     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01838     or_node(graph& g) : unfolded_type(g) { }
01839     // Copy constructor
01840     or_node( const or_node& other ) : unfolded_type(other) { }
01841 };
01842 #endif  // TBB_PREVIEW_GRAPH_NODES
01843 
01845 template< typename T >
01846 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01847     p.register_successor( s );
01848 }
01849 
01851 template< typename T >
01852 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01853     p.remove_successor( s );
01854 }
01855 
01857 template< typename Body, typename Node >
01858 Body copy_body( Node &n ) {
01859     return n.template copy_function_object<Body>();
01860 }
01861 
01862 } // interface6
01863 
01864     using interface6::graph;
01865     using interface6::graph_node;
01866     using interface6::continue_msg;
01867     using interface6::sender;
01868     using interface6::receiver;
01869     using interface6::continue_receiver;
01870 
01871     using interface6::source_node;
01872     using interface6::function_node;
01873     using interface6::multifunction_node;
01874     using interface6::split_node;
01875     using interface6::internal::output_port;
01876 #if TBB_PREVIEW_GRAPH_NODES
01877     using interface6::or_node;
01878 #endif
01879     using interface6::continue_node;
01880     using interface6::overwrite_node;
01881     using interface6::write_once_node;
01882     using interface6::broadcast_node;
01883     using interface6::buffer_node;
01884     using interface6::queue_node;
01885     using interface6::sequencer_node;
01886     using interface6::priority_queue_node;
01887     using interface6::limiter_node;
01888     using namespace interface6::internal::graph_policy_namespace;
01889     using interface6::join_node;
01890     using interface6::input_port;
01891     using interface6::copy_body; 
01892     using interface6::make_edge; 
01893     using interface6::remove_edge; 
01894     using interface6::internal::NO_TAG;
01895     using interface6::internal::tag_value;
01896 
01897 } // flow
01898 } // tbb
01899 
01900 #endif // __TBB_flow_graph_H

Copyright © 2005-2012 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.