qtconcurrentiteratekernel.h

Go to the documentation of this file.
00001 /****************************************************************************
00002 **
00003 ** Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies).
00004 ** All rights reserved.
00005 ** Contact: Nokia Corporation (qt-info@nokia.com)
00006 **
00007 ** This file is part of the QtCore module of the Qt Toolkit.
00008 **
00009 ** $QT_BEGIN_LICENSE:LGPL$
00010 ** Commercial Usage
00011 ** Licensees holding valid Qt Commercial licenses may use this file in
00012 ** accordance with the Qt Commercial License Agreement provided with the
00013 ** Software or, alternatively, in accordance with the terms contained in
00014 ** a written agreement between you and Nokia.
00015 **
00016 ** GNU Lesser General Public License Usage
00017 ** Alternatively, this file may be used under the terms of the GNU Lesser
00018 ** General Public License version 2.1 as published by the Free Software
00019 ** Foundation and appearing in the file LICENSE.LGPL included in the
00020 ** packaging of this file.  Please review the following information to
00021 ** ensure the GNU Lesser General Public License version 2.1 requirements
00022 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
00023 **
00024 ** In addition, as a special exception, Nokia gives you certain additional
00025 ** rights.  These rights are described in the Nokia Qt LGPL Exception
00026 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this module.
00027 **
00028 ** GNU General Public License Usage
00029 ** Alternatively, this file may be used under the terms of the GNU
00030 ** General Public License version 3.0 as published by the Free Software
00031 ** Foundation and appearing in the file LICENSE.GPL included in the
00032 ** packaging of this file.  Please review the following information to
00033 ** ensure the GNU General Public License version 3.0 requirements will be
00034 ** met: http://www.gnu.org/copyleft/gpl.html.
00035 **
00036 ** If you have questions regarding the use of this file, please contact
00037 ** Nokia at qt-info@nokia.com.
00038 ** $QT_END_LICENSE$
00039 **
00040 ****************************************************************************/
00041 
00042 #ifndef QTCONCURRENT_ITERATEKERNEL_H
00043 #define QTCONCURRENT_ITERATEKERNEL_H
00044 
00045 #include <QtCore/qglobal.h>
00046 
00047 #ifndef QT_NO_CONCURRENT
00048 
00049 #include <QtCore/qatomic.h>
00050 #include <QtCore/qtconcurrentmedian.h>
00051 #include <QtCore/qtconcurrentthreadengine.h>
00052 
00053 #ifndef QT_NO_STL
00054 #  include <iterator>
00055 #endif
00056 
00057 QT_BEGIN_HEADER
00058 QT_BEGIN_NAMESPACE
00059 
00060 QT_MODULE(Core)
00061 
00062 #ifndef qdoc
00063 
00064 namespace QtConcurrent {
00065 
00066 #ifndef QT_NO_STL
00067     using std::advance;
00068 #else
00069     template <typename It, typename T>
00070     void advance(It &it, T value)
00071     {
00072         it+=value;
00073     }
00074 #endif
00075 
00076 /*
00077     The BlockSizeManager class manages how many iterations a thread should
00078     reserve and process at a time. This is done by measuring the time spent
00079     in the user code versus the control part code, and then increasing
00080     the block size if the ratio between them is to small. The block size
00081     management is done on the basis of the median of several timing measuremens,
00082     and it is done induvidualy for each thread.
00083 */
00084 class Q_CORE_EXPORT BlockSizeManager
00085 {
00086 public:
00087     BlockSizeManager(int iterationCount);
00088     void timeBeforeUser();
00089     void timeAfterUser();
00090     int blockSize();
00091 private:
00092     inline bool blockSizeMaxed()
00093     {
00094         return (m_blockSize >= maxBlockSize);
00095     }
00096 
00097     const int maxBlockSize;
00098     qint64 beforeUser;
00099     qint64 afterUser;
00100     Median<double> controlPartElapsed;
00101     Median<double> userPartElapsed;
00102     int m_blockSize;
00103 };
00104 
00105 template <typename T>
00106 class ResultReporter
00107 {
00108 public:
00109     ResultReporter(ThreadEngine<T> *_threadEngine)
00110     :threadEngine(_threadEngine)
00111     {
00112 
00113     }
00114 
00115     void reserveSpace(int resultCount)
00116     {
00117         currentResultCount = resultCount;
00118         vector.resize(qMax(resultCount, vector.count()));
00119     }
00120 
00121     void reportResults(int begin)
00122     {
00123         const int useVectorThreshold = 4; // Tunable parameter.
00124         if (currentResultCount > useVectorThreshold) {
00125             vector.resize(currentResultCount);
00126             threadEngine->reportResults(vector, begin);
00127         } else {
00128             for (int i = 0; i < currentResultCount; ++i)
00129                 threadEngine->reportResult(&vector.at(i), begin + i);
00130         }
00131     }
00132 
00133     inline T * getPointer()
00134     {
00135         return vector.data();
00136     }
00137 
00138     int currentResultCount;
00139     ThreadEngine<T> *threadEngine;
00140     QVector<T> vector;
00141 };
00142 
00143 template <>
00144 class ResultReporter<void>
00145 {
00146 public:
00147     inline ResultReporter(ThreadEngine<void> *) { }
00148     inline void reserveSpace(int) { };
00149     inline void reportResults(int) { };
00150     inline void * getPointer() { return 0; }
00151 };
00152 
00153 #ifndef QT_NO_STL
00154 inline bool selectIteration(std::bidirectional_iterator_tag)
00155 {
00156     return false; // while
00157 }
00158 
00159 inline bool selectIteration(std::forward_iterator_tag)
00160 {
00161     return false; // while
00162 }
00163 
00164 inline bool selectIteration(std::random_access_iterator_tag)
00165 {
00166     return true; // for
00167 }
00168 #else
00169 // no stl support, always use while iteration
00170 template <typename T>
00171 inline bool selectIteration(T)
00172 {
00173     return false; // while
00174 }
00175 #endif
00176 
00177 template <typename Iterator, typename T>
00178 class IterateKernel : public ThreadEngine<T>
00179 {
00180 public:
00181     typedef T ResultType;
00182 
00183     IterateKernel(Iterator _begin, Iterator _end)
00184 #if defined (QT_NO_STL)
00185         : begin(_begin), end(_end), current(_begin), currentIndex(0),
00186            forIteration(false), progressReportingEnabled(true)
00187 #elif !defined(QT_NO_PARTIAL_TEMPLATE_SPECIALIZATION)
00188         : begin(_begin), end(_end), current(_begin), currentIndex(0),
00189            forIteration(selectIteration(typename std::iterator_traits<Iterator>::iterator_category())), progressReportingEnabled(true)
00190 #else
00191         : begin(_begin), end(_end), currentIndex(0),
00192           forIteration(selectIteration(std::iterator_category(_begin))), progressReportingEnabled(true)
00193 #endif
00194     {
00195 #if defined (QT_NO_STL)
00196        iterationCount = 0;
00197 #else
00198         iterationCount =  forIteration ? std::distance(_begin, _end) : 0;
00199 
00200 #endif
00201     }
00202 
00203     virtual ~IterateKernel() { }
00204 
00205     virtual bool runIteration(Iterator it, int index , T *result)
00206         { Q_UNUSED(it); Q_UNUSED(index); Q_UNUSED(result); return false; }
00207     virtual bool runIterations(Iterator _begin, int beginIndex, int endIndex, T *results)
00208         { Q_UNUSED(_begin); Q_UNUSED(beginIndex); Q_UNUSED(endIndex); Q_UNUSED(results); return false; }
00209 
00210     void start()
00211     {
00212         progressReportingEnabled = this->isProgressReportingEnabled();
00213         if (progressReportingEnabled && iterationCount > 0)
00214             this->setProgressRange(0, iterationCount);
00215     }
00216 
00217     bool shouldStartThread()
00218     {
00219         if (forIteration)
00220             return (currentIndex < iterationCount) && !this->shouldThrottleThread();
00221         else // whileIteration
00222             return (iteratorThreads == 0);
00223     }
00224 
00225     ThreadFunctionResult threadFunction()
00226     {
00227         if (forIteration)
00228             return this->forThreadFunction();
00229         else // whileIteration
00230             return this->whileThreadFunction();
00231     }
00232 
00233     ThreadFunctionResult forThreadFunction()
00234     {
00235         BlockSizeManager blockSizeManager(iterationCount);
00236         ResultReporter<T> resultReporter(this);
00237 
00238         for(;;) {
00239             if (this->isCanceled())
00240                 break;
00241 
00242             const int currentBlockSize = blockSizeManager.blockSize();
00243 
00244             if (currentIndex >= iterationCount)
00245                 break;
00246 
00247             // Atomically reserve a block of iterationCount for this thread.
00248             const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
00249             const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
00250 
00251             if (beginIndex >= endIndex) {
00252                 // No more work
00253                 break;
00254             }
00255 
00256             this->waitForResume(); // (only waits if the qfuture is paused.)
00257 
00258             if (shouldStartThread())
00259                 this->startThread();
00260 
00261             const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
00262             resultReporter.reserveSpace(finalBlockSize);
00263 
00264             // Call user code with the current iteration range.
00265             blockSizeManager.timeBeforeUser();
00266             const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
00267             blockSizeManager.timeAfterUser();
00268 
00269             if (resultsAvailable)
00270                 resultReporter.reportResults(beginIndex);
00271 
00272             // Report progress if progress reporting enabled.
00273             if (progressReportingEnabled) {
00274                 completed.fetchAndAddAcquire(finalBlockSize);
00275                 this->setProgressValue(this->completed);
00276             }
00277 
00278             if (this->shouldThrottleThread())
00279                 return ThrottleThread;
00280         }
00281         return ThreadFinished;
00282     }
00283 
00284     ThreadFunctionResult whileThreadFunction()
00285     {
00286         if (iteratorThreads.testAndSetAcquire(0, 1) == false)
00287             return ThreadFinished;
00288 
00289         ResultReporter<T> resultReporter(this);
00290         resultReporter.reserveSpace(1);
00291 
00292         while (current != end) {
00293             // The following two lines breaks support for input iterators according to
00294             // the sgi docs: dereferencing prev after calling ++current is not allowed
00295             // on input iterators. (prev is dereferenced inside user.runIteration())
00296             Iterator prev = current;
00297             ++current;
00298             int index = currentIndex.fetchAndAddRelaxed(1);
00299             iteratorThreads.testAndSetRelease(1, 0);
00300 
00301             this->waitForResume(); // (only waits if the qfuture is paused.)
00302 
00303             if (shouldStartThread())
00304                 this->startThread();
00305 
00306             const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
00307             if (resultAavailable)
00308                 resultReporter.reportResults(index);
00309 
00310             if (this->shouldThrottleThread())
00311                 return ThrottleThread;
00312 
00313             if (iteratorThreads.testAndSetAcquire(0, 1) == false)
00314                 return ThreadFinished;
00315         }
00316 
00317         return ThreadFinished;
00318     }
00319 
00320 
00321 public:
00322     const Iterator begin;
00323     const Iterator end;
00324     Iterator current;
00325     QAtomicInt currentIndex;
00326     bool forIteration;
00327     QAtomicInt iteratorThreads;
00328     int iterationCount;
00329 
00330     bool progressReportingEnabled;
00331     QAtomicInt completed;
00332 };
00333 
00334 } // namespace QtConcurrent
00335 
00336 #endif //qdoc
00337 
00338 QT_END_NAMESPACE
00339 QT_END_HEADER
00340 
00341 #endif // QT_NO_CONCURRENT
00342 
00343 #endif