00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #ifndef QTCONCURRENT_REDUCEKERNEL_H
00043 #define QTCONCURRENT_REDUCEKERNEL_H
00044
00045 #include <QtCore/qglobal.h>
00046
00047 #ifndef QT_NO_CONCURRENT
00048
00049 #include <QtCore/qatomic.h>
00050 #include <QtCore/qlist.h>
00051 #include <QtCore/qmap.h>
00052 #include <QtCore/qmutex.h>
00053 #include <QtCore/qthread.h>
00054 #include <QtCore/qthreadpool.h>
00055 #include <QtCore/qvector.h>
00056
00057 QT_BEGIN_HEADER
00058 QT_BEGIN_NAMESPACE
00059
00060 QT_MODULE(Core)
00061
00062 namespace QtConcurrent {
00063
00064 #ifndef qdoc
00065
00066
00067
00068
00069
00070
00071
00072
00073 enum {
00074 ReduceQueueStartLimit = 20,
00075 ReduceQueueThrottleLimit = 30
00076 };
00077
00078
00079
00080
00081 template <typename T>
00082 class IntermediateResults
00083 {
00084 public:
00085 int begin, end;
00086 QVector<T> vector;
00087 };
00088
00089 #endif // qdoc
00090
00091 enum ReduceOption {
00092 UnorderedReduce = 0x1,
00093 OrderedReduce = 0x2,
00094 SequentialReduce = 0x4
00095
00096 };
00097 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
00098 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
00099
00100 #ifndef qdoc
00101
00102
00103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
00104 class ReduceKernel
00105 {
00106 typedef QMap<int, IntermediateResults<T> > ResultsMap;
00107
00108 const ReduceOptions reduceOptions;
00109
00110 QMutex mutex;
00111 int progress, resultsMapSize, threadCount;
00112 ResultsMap resultsMap;
00113
00114 bool canReduce(int begin) const
00115 {
00116 return (((reduceOptions & UnorderedReduce)
00117 && progress == 0)
00118 || ((reduceOptions & OrderedReduce)
00119 && progress == begin));
00120 }
00121
00122 void reduceResult(ReduceFunctor &reduce,
00123 ReduceResultType &r,
00124 const IntermediateResults<T> &result)
00125 {
00126 for (int i = 0; i < result.vector.size(); ++i) {
00127 reduce(r, result.vector.at(i));
00128 }
00129 }
00130
00131 void reduceResults(ReduceFunctor &reduce,
00132 ReduceResultType &r,
00133 ResultsMap &map)
00134 {
00135 typename ResultsMap::iterator it = map.begin();
00136 while (it != map.end()) {
00137 reduceResult(reduce, r, it.value());
00138 ++it;
00139 }
00140 }
00141
00142 public:
00143 ReduceKernel(ReduceOptions _reduceOptions)
00144 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
00145 threadCount(QThreadPool::globalInstance()->maxThreadCount())
00146 { }
00147
00148 void runReduce(ReduceFunctor &reduce,
00149 ReduceResultType &r,
00150 const IntermediateResults<T> &result)
00151 {
00152 QMutexLocker locker(&mutex);
00153 if (!canReduce(result.begin)) {
00154 ++resultsMapSize;
00155 resultsMap.insert(result.begin, result);
00156 return;
00157 }
00158
00159 if (reduceOptions & UnorderedReduce) {
00160
00161 progress = -1;
00162
00163
00164 locker.unlock();
00165 reduceResult(reduce, r, result);
00166 locker.relock();
00167
00168
00169 while (!resultsMap.isEmpty()) {
00170 ResultsMap resultsMapCopy = resultsMap;
00171 resultsMap.clear();
00172
00173 locker.unlock();
00174 reduceResults(reduce, r, resultsMapCopy);
00175 locker.relock();
00176
00177 resultsMapSize -= resultsMapCopy.size();
00178 }
00179
00180 progress = 0;
00181 } else {
00182
00183 locker.unlock();
00184 reduceResult(reduce, r, result);
00185 locker.relock();
00186
00187
00188 progress += result.end - result.begin;
00189
00190
00191 typename ResultsMap::iterator it = resultsMap.begin();
00192 while (it != resultsMap.end()) {
00193 if (it.value().begin != progress)
00194 break;
00195
00196 locker.unlock();
00197 reduceResult(reduce, r, it.value());
00198 locker.relock();
00199
00200 --resultsMapSize;
00201 progress += it.value().end - it.value().begin;
00202 it = resultsMap.erase(it);
00203 }
00204 }
00205 }
00206
00207
00208 void finish(ReduceFunctor &reduce, ReduceResultType &r)
00209 {
00210 reduceResults(reduce, r, resultsMap);
00211 }
00212
00213 inline bool shouldThrottle()
00214 {
00215 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
00216 }
00217
00218 inline bool shouldStartThread()
00219 {
00220 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
00221 }
00222 };
00223
00224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
00225 struct SequenceHolder2 : public Base
00226 {
00227 SequenceHolder2(const Sequence &_sequence,
00228 Functor1 functor1,
00229 Functor2 functor2,
00230 ReduceOptions reduceOptions)
00231 : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
00232 sequence(_sequence)
00233 { }
00234
00235 Sequence sequence;
00236
00237 void finish()
00238 {
00239 Base::finish();
00240
00241
00242 sequence = Sequence();
00243 }
00244 };
00245
00246 #endif //qdoc
00247
00248 }
00249
00250 QT_END_NAMESPACE
00251 QT_END_HEADER
00252
00253 #endif // QT_NO_CONCURRENT
00254
00255 #endif