qtconcurrentreducekernel.h

Go to the documentation of this file.
00001 /****************************************************************************
00002 **
00003 ** Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies).
00004 ** All rights reserved.
00005 ** Contact: Nokia Corporation (qt-info@nokia.com)
00006 **
00007 ** This file is part of the QtCore module of the Qt Toolkit.
00008 **
00009 ** $QT_BEGIN_LICENSE:LGPL$
00010 ** Commercial Usage
00011 ** Licensees holding valid Qt Commercial licenses may use this file in
00012 ** accordance with the Qt Commercial License Agreement provided with the
00013 ** Software or, alternatively, in accordance with the terms contained in
00014 ** a written agreement between you and Nokia.
00015 **
00016 ** GNU Lesser General Public License Usage
00017 ** Alternatively, this file may be used under the terms of the GNU Lesser
00018 ** General Public License version 2.1 as published by the Free Software
00019 ** Foundation and appearing in the file LICENSE.LGPL included in the
00020 ** packaging of this file.  Please review the following information to
00021 ** ensure the GNU Lesser General Public License version 2.1 requirements
00022 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
00023 **
00024 ** In addition, as a special exception, Nokia gives you certain additional
00025 ** rights.  These rights are described in the Nokia Qt LGPL Exception
00026 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this module.
00027 **
00028 ** GNU General Public License Usage
00029 ** Alternatively, this file may be used under the terms of the GNU
00030 ** General Public License version 3.0 as published by the Free Software
00031 ** Foundation and appearing in the file LICENSE.GPL included in the
00032 ** packaging of this file.  Please review the following information to
00033 ** ensure the GNU General Public License version 3.0 requirements will be
00034 ** met: http://www.gnu.org/copyleft/gpl.html.
00035 **
00036 ** If you have questions regarding the use of this file, please contact
00037 ** Nokia at qt-info@nokia.com.
00038 ** $QT_END_LICENSE$
00039 **
00040 ****************************************************************************/
00041 
00042 #ifndef QTCONCURRENT_REDUCEKERNEL_H
00043 #define QTCONCURRENT_REDUCEKERNEL_H
00044 
00045 #include <QtCore/qglobal.h>
00046 
00047 #ifndef QT_NO_CONCURRENT
00048 
00049 #include <QtCore/qatomic.h>
00050 #include <QtCore/qlist.h>
00051 #include <QtCore/qmap.h>
00052 #include <QtCore/qmutex.h>
00053 #include <QtCore/qthread.h>
00054 #include <QtCore/qthreadpool.h>
00055 #include <QtCore/qvector.h>
00056 
00057 QT_BEGIN_HEADER
00058 QT_BEGIN_NAMESPACE
00059 
00060 QT_MODULE(Core)
00061 
00062 namespace QtConcurrent {
00063 
00064 #ifndef qdoc
00065 
00066 /*
00067     The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
00068     limit the reduce queue size for MapReduce. When the number of
00069     reduce blocks in the queue exceeds ReduceQueueStartLimit,
00070     MapReduce won't start any new threads, and when it exceeds
00071     ReduceQueueThrottleLimit running threads will be stopped.
00072 */
00073 enum {
00074     ReduceQueueStartLimit = 20,
00075     ReduceQueueThrottleLimit = 30
00076 };
00077 
00078 // IntermediateResults holds a block of intermediate results from a
00079 // map or filter functor. The begin/end offsets indicates the origin
00080 // and range of the block.
00081 template <typename T>
00082 class IntermediateResults
00083 {
00084 public:
00085     int begin, end;
00086     QVector<T> vector;
00087 };
00088 
00089 #endif // qdoc
00090 
00091 enum ReduceOption {
00092     UnorderedReduce = 0x1,
00093     OrderedReduce = 0x2,
00094     SequentialReduce = 0x4
00095     // ParallelReduce = 0x8
00096 };
00097 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
00098 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
00099 
00100 #ifndef qdoc
00101 
00102 // supports both ordered and out-of-order reduction
00103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
00104 class ReduceKernel
00105 {
00106     typedef QMap<int, IntermediateResults<T> > ResultsMap;
00107 
00108     const ReduceOptions reduceOptions;
00109 
00110     QMutex mutex;
00111     int progress, resultsMapSize, threadCount;
00112     ResultsMap resultsMap;
00113 
00114     bool canReduce(int begin) const
00115     {
00116         return (((reduceOptions & UnorderedReduce)
00117                  && progress == 0)
00118                 || ((reduceOptions & OrderedReduce)
00119                     && progress == begin));
00120     }
00121 
00122     void reduceResult(ReduceFunctor &reduce,
00123                       ReduceResultType &r,
00124                       const IntermediateResults<T> &result)
00125     {
00126         for (int i = 0; i < result.vector.size(); ++i) {
00127             reduce(r, result.vector.at(i));
00128         }
00129     }
00130 
00131     void reduceResults(ReduceFunctor &reduce,
00132                        ReduceResultType &r,
00133                        ResultsMap &map)
00134     {
00135         typename ResultsMap::iterator it = map.begin();
00136         while (it != map.end()) {
00137             reduceResult(reduce, r, it.value());
00138             ++it;
00139         }
00140     }
00141 
00142 public:
00143     ReduceKernel(ReduceOptions _reduceOptions)
00144         : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0), 
00145           threadCount(QThreadPool::globalInstance()->maxThreadCount())
00146     { }
00147 
00148     void runReduce(ReduceFunctor &reduce,
00149                    ReduceResultType &r,
00150                    const IntermediateResults<T> &result)
00151     {
00152         QMutexLocker locker(&mutex);
00153         if (!canReduce(result.begin)) {
00154             ++resultsMapSize;
00155             resultsMap.insert(result.begin, result);
00156             return;
00157         }
00158 
00159         if (reduceOptions & UnorderedReduce) {
00160             // UnorderedReduce
00161             progress = -1;
00162 
00163             // reduce this result
00164             locker.unlock();
00165             reduceResult(reduce, r, result);
00166             locker.relock();
00167 
00168             // reduce all stored results as well
00169             while (!resultsMap.isEmpty()) {
00170                 ResultsMap resultsMapCopy = resultsMap;
00171                 resultsMap.clear();
00172 
00173                 locker.unlock();
00174                 reduceResults(reduce, r, resultsMapCopy);
00175                 locker.relock();
00176 
00177                 resultsMapSize -= resultsMapCopy.size();
00178             }
00179 
00180             progress = 0;
00181         } else {
00182             // reduce this result
00183             locker.unlock();
00184             reduceResult(reduce, r, result);
00185             locker.relock();
00186 
00187             // OrderedReduce
00188             progress += result.end - result.begin;
00189 
00190             // reduce as many other results as possible
00191             typename ResultsMap::iterator it = resultsMap.begin();
00192             while (it != resultsMap.end()) {
00193                 if (it.value().begin != progress)
00194                     break;
00195 
00196                 locker.unlock();
00197                 reduceResult(reduce, r, it.value());
00198                 locker.relock();
00199 
00200                 --resultsMapSize;
00201                 progress += it.value().end - it.value().begin;
00202                 it = resultsMap.erase(it);
00203             }
00204         }
00205     }
00206 
00207     // final reduction
00208     void finish(ReduceFunctor &reduce, ReduceResultType &r)
00209     {
00210         reduceResults(reduce, r, resultsMap);
00211     }
00212 
00213     inline bool shouldThrottle()
00214     {
00215         return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
00216     }
00217 
00218     inline bool shouldStartThread()
00219     {
00220         return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
00221     }
00222 };
00223 
00224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
00225 struct SequenceHolder2 : public Base
00226 {
00227     SequenceHolder2(const Sequence &_sequence,
00228                     Functor1 functor1,
00229                     Functor2 functor2,
00230                     ReduceOptions reduceOptions)
00231         : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
00232           sequence(_sequence)
00233     { }
00234 
00235     Sequence sequence;
00236 
00237     void finish()
00238     {
00239         Base::finish();
00240         // Clear the sequence to make sure all temporaries are destroyed
00241         // before finished is signaled.
00242         sequence = Sequence();
00243     }
00244 };
00245 
00246 #endif //qdoc
00247 
00248 } // namespace QtConcurrent
00249 
00250 QT_END_NAMESPACE
00251 QT_END_HEADER
00252 
00253 #endif // QT_NO_CONCURRENT
00254 
00255 #endif