[Kst] branches/work/kst/portto4/kst/src

Peter Kümmel syntheticpp at gmx.net
Tue Oct 16 20:37:42 UTC 2012


SVN commit 1320819 by kuemmel:

use threads for parsing the data.

Example speedup for 4 + 4HT cores:
13s  down to 5s   for 1GB file,
66ms down to 23ms for 6MB file (gyrodata)

 M  +18 -8     datasources/ascii/asciidatareader.cpp  
 M  +8 -4      datasources/ascii/asciidatareader.h  
 M  +10 -3     datasources/ascii/asciifilebuffer.cpp  
 M  +1 -1      datasources/ascii/asciifilebuffer.h  
 M  +36 -4     datasources/ascii/asciisource.cpp  
 M  +2 -1      datasources/ascii/asciisource.h  
 M  +2 -2      libkst/measuretime.cpp  
 M  +3 -3      libkst/updatemanager.cpp  


--- branches/work/kst/portto4/kst/src/datasources/ascii/asciidatareader.cpp #1320818:1320819
@@ -21,6 +21,7 @@
 
 #include <QFile>
 #include <QDebug>
+#include <QMutexLocker>
 #include <ctype.h>
 #include <stdlib.h>
 
@@ -83,7 +84,7 @@
 }
 
 //-------------------------------------------------------------------------------------------
-void AsciiDataReader::toDouble(const LexicalCast& lexc, const char* buffer, int bufread, int ch, double* v, int)
+void AsciiDataReader::toDouble(const LexicalCast& lexc, const char* buffer, int bufread, int ch, double* v, int) const
 {
   if (   isDigit(buffer[ch])
          || buffer[ch] == '-'
@@ -192,10 +193,16 @@
 }
 
 //-------------------------------------------------------------------------------------------
+int AsciiDataReader::readFieldChunk(const AsciiFileData& chunk, int col, double *v, QString& field)
+{
+  return readField(chunk, col, v + chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
+}
+
+//-------------------------------------------------------------------------------------------
 int AsciiDataReader::readField(const AsciiFileData& buf, int col, double *v, const QString& field, int s, int n)
 {
   if (_config._columnType == AsciiSourceConfig::Fixed) {
-    MeasureTime t("AsciiSource::readField: same width for all columns");
+    //MeasureTime t("AsciiSource::readField: same width for all columns");
     LexicalCast lexc;
     lexc.setDecimalSeparator(_config._useDot);
     // &buffer[0] points to first row at _rowIndex[0] , so if we wanna find
@@ -207,16 +214,16 @@
     return n;
   } else if (_config._columnType == AsciiSourceConfig::Custom) {
     if (_config._columnDelimiter.value().size() == 1) {
-      MeasureTime t("AsciiSource::readField: 1 custom column delimiter");
+      //MeasureTime t("AsciiSource::readField: 1 custom column delimiter");
       const IsCharacter column_del(_config._columnDelimiter.value()[0].toLatin1());
       return readColumns(v, buf.constData(), buf.begin(), buf.bytesRead(), col, s, n, _lineending, column_del);
     } if (_config._columnDelimiter.value().size() > 1) {
-      MeasureTime t(QString("AsciiSource::readField: %1 custom column delimiters").arg(_config._columnDelimiter.value().size()));
+      //MeasureTime t(QString("AsciiSource::readField: %1 custom column delimiters").arg(_config._columnDelimiter.value().size()));
       const IsInString column_del(_config._columnDelimiter.value());
       return readColumns(v, buf.constData(), buf.begin(), buf.bytesRead(), col, s, n, _lineending, column_del);
     }
   } else if (_config._columnType == AsciiSourceConfig::Whitespace) {
-    MeasureTime t("AsciiSource::readField: whitespace separated columns");
+    //MeasureTime t("AsciiSource::readField: whitespace separated columns");
     const IsWhiteSpace column_del;
     return readColumns(v, buf.constData(), buf.begin(), buf.bytesRead(), col, s, n, _lineending, column_del);
   }
@@ -230,7 +237,7 @@
 //-------------------------------------------------------------------------------------------
 template<class Buffer, typename ColumnDelimiter>
 int AsciiDataReader::readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
-                                 const LineEndingType& lineending, const ColumnDelimiter& column_del)
+                                 const LineEndingType& lineending, const ColumnDelimiter& column_del) const
 {
   if (_config._delimiters.value().size() == 0) {
     const NoDelimiter comment_del;
@@ -248,7 +255,7 @@
 //-------------------------------------------------------------------------------------------
 template<class Buffer, typename ColumnDelimiter, typename CommentDelimiter>
 int AsciiDataReader::readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
-                                 const LineEndingType& lineending, const ColumnDelimiter& column_del, const CommentDelimiter& comment_del)
+                                 const LineEndingType& lineending, const ColumnDelimiter& column_del, const CommentDelimiter& comment_del) const
 {
   if (_config._columnWidthIsConst) {
     const AlwaysTrue column_withs_const;
@@ -272,10 +279,13 @@
 int AsciiDataReader::readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
                                  const IsLineBreak& isLineBreak,
                                  const ColumnDelimiter& column_del, const CommentDelimiter& comment_del,
-                                 const ColumnWidthsAreConst& are_column_widths_const)
+                                 const ColumnWidthsAreConst& are_column_widths_const) const
 {
   LexicalCast lexc;
+  {
+    QMutexLocker lock(&_localeMutex);
   lexc.setDecimalSeparator(_config._useDot);
+  }
   const QString delimiters = _config._delimiters.value();
 
   bool is_custom = (_config._columnType.value() == AsciiSourceConfig::Custom);
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciidatareader.h #1320818:1320819
@@ -17,6 +17,7 @@
 #include "asciicharactertraits.h"
 
 #include <QVarLengthArray>
+#include <QMutex>
 
 class QFile;
 struct LexicalCast;
@@ -41,6 +42,7 @@
     
     bool findDataRows(bool read_completely, QFile& file, int _byteLength);
     int readField(const AsciiFileData &buf, int col, double *v, const QString& field, int s, int n);
+    int readFieldChunk(const AsciiFileData& chunk, int col, double *v, QString& field);
 
   private:
     int _numFrames;
@@ -56,20 +58,22 @@
 
     template<class Buffer, typename ColumnDelimiter>
     int readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
-                    const AsciiCharacterTraits::LineEndingType&, const ColumnDelimiter&);
+                    const AsciiCharacterTraits::LineEndingType&, const ColumnDelimiter&) const;
 
     template<class Buffer, typename ColumnDelimiter, typename CommentDelimiter>
     int readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
-                    const AsciiCharacterTraits::LineEndingType&, const ColumnDelimiter&, const CommentDelimiter&);
+                    const AsciiCharacterTraits::LineEndingType&, const ColumnDelimiter&, const CommentDelimiter&) const;
 
     template<class Buffer, typename IsLineBreak, typename ColumnDelimiter, typename CommentDelimiter, typename ColumnWidthsAreConst>
     int readColumns(double* v, const Buffer& buffer, int bufstart, int bufread, int col, int s, int n,
-                    const IsLineBreak&, const ColumnDelimiter&, const CommentDelimiter&, const ColumnWidthsAreConst&);    
+                    const IsLineBreak&, const ColumnDelimiter&, const CommentDelimiter&, const ColumnWidthsAreConst&) const;
 
     template<class Buffer, typename IsLineBreak, typename CommentDelimiter>
     bool findDataRows(const Buffer& buffer, int bufstart, int bufread, const IsLineBreak&, const CommentDelimiter&);
 
-    void toDouble(const LexicalCast& lexc, const char* buffer, int bufread, int ch, double* v, int row);
+    void toDouble(const LexicalCast& lexc, const char* buffer, int bufread, int ch, double* v, int row) const;
+
+    mutable QMutex _localeMutex;
 };
 
 #endif
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.cpp #1320818:1320819
@@ -123,12 +123,13 @@
 
 
 //-------------------------------------------------------------------------------------------
-void AsciiFileBuffer::readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes)
+void AsciiFileBuffer::readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int numChunks, int maximalBytes)
 {
   clear();
   if (!_file)
     return;
 
+  if (numChunks == 1) {
   // first try to read the whole file into one array
   AsciiFileData wholeFile;
   wholeFile.read(*_file, start, bytesToRead, maximalBytes);
@@ -140,9 +141,15 @@
     _fileData << wholeFile;
     return;
   }
+  }
 
-  // reading whole file into one array failed, try to read into smaller arrays
-  int chunkSize = _defaultChunkSize;
+  // reading whole file failed or numChunks > 1: read into smaller arrays
+  int chunkSize;
+  if (numChunks > 1) {
+    chunkSize = bytesToRead / numChunks;
+  } else {
+    chunkSize = _defaultChunkSize;
+  }
   _fileData = splitFile(chunkSize, rowIndex, start, bytesToRead);
   for (int i = 0; i < _fileData.size(); i++) {
     // use alread set
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.h #1320818:1320819
@@ -32,7 +32,7 @@
   void clear();
 
   void setFile(QFile* file);
-  void readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes = -1);
+  void readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int numChunks, int maximalBytes = -1);
   void readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes = -1);
 
   const QVector<AsciiFileData>& data() const;
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.cpp #1320818:1320819
@@ -25,6 +25,9 @@
 
 #include <QFile>
 #include <QMessageBox>
+#include <QThread>
+#include <QtConcurrentRun>
+#include <QFutureSynchronizer>
 #include <ctype.h>
 #include <stdlib.h>
 
@@ -48,6 +51,7 @@
   Kst::DataSource(store, cfg, filename, type),
   _reader(_config),
   _fileBuffer(),
+  _useThreads(true),
   is(new DataInterfaceAsciiString(*this)),
   iv(new DataInterfaceAsciiVector(*this))
 {
@@ -145,7 +149,7 @@
 //-------------------------------------------------------------------------------------------
 Kst::Object::UpdateType AsciiSource::internalDataSourceUpdate(bool read_completely)
 {
-  MeasureTime t("AsciiSource::internalDataSourceUpdate: " + _filename);
+  //MeasureTime t("AsciiSource::internalDataSourceUpdate: " + _filename);
   
   // forget about cached data
   _fileBuffer.clear();
@@ -229,6 +233,13 @@
 
 
 //-------------------------------------------------------------------------------------------
+bool AsciiSource::useSlidingWindow(int bytesToRead)  const
+{
+  return _config._limitFileBuffer && _config._limitFileBufferSize < bytesToRead;
+}
+
+
+//-------------------------------------------------------------------------------------------
 int AsciiSource::readField(double *v, const QString& field, int s, int n, bool& success) 
 {
   success = true;
@@ -258,11 +269,19 @@
       return 0;
     }
 
+    int numThreads;
+    if (!_useThreads) {
+      numThreads = 1;
+    } else {
+      numThreads = QThread::idealThreadCount();
+      numThreads = (numThreads > 0) ? numThreads : 1;
+    }
+
     _fileBuffer.setFile(file);
-    if (_config._limitFileBuffer && _config._limitFileBufferSize < bytesToRead) {
+    if (useSlidingWindow(bytesToRead)) {
       _fileBuffer.readFileSlidingWindow(_reader.rowIndex(), begin, bytesToRead);
     } else {
-      _fileBuffer.readWholeFile(_reader.rowIndex(), begin, bytesToRead);
+      _fileBuffer.readWholeFile(_reader.rowIndex(), begin, bytesToRead, numThreads);
     }
     if (_fileBuffer.bytesRead() == 0) {
       success = false;
@@ -271,13 +290,26 @@
     _reader.detectLineEndingType(*file);
   }
   
+
   int sRead = 0;
+  if (_useThreads) {
+    QFutureSynchronizer<int> readFutures;
   const QVector<AsciiFileData>& data = _fileBuffer.data();
   foreach (const AsciiFileData& chunk, data) {
+      QFuture<int> future = QtConcurrent::run(&_reader, &AsciiDataReader::readFieldChunk, chunk, col, v, field);
+      readFutures.addFuture(future);
+    }
+    readFutures.waitForFinished();
+    foreach (const QFuture<int> future, readFutures.futures()) {
+      sRead += future.result();
+    }
+  } else  {
+    const QVector<AsciiFileData>& data = _fileBuffer.data();
+    foreach (const AsciiFileData& chunk, data) {
     Q_ASSERT(sRead ==  chunk.rowBegin());
     sRead += _reader.readField(chunk, col, v +  chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
   }
-
+  }
   return sRead;
 }
 
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.h #1320818:1320819
@@ -69,8 +69,8 @@
   private:
     AsciiDataReader _reader;
     AsciiFileBuffer _fileBuffer;
+    const bool _useThreads;
     
-
     friend class ConfigWidgetAscii;
     mutable AsciiSourceConfig _config;
 
@@ -84,6 +84,7 @@
     QMap<QString, QString> _fieldUnits;
 
     int readField(double *v, const QString &field, int s, int n, bool& success);
+    bool useSlidingWindow(int bytesToRead)  const;
 
     int columnOfField(const QString& field) const;
     static QStringList splitHeaderLine(const QByteArray& line, AsciiSourceConfig* cfg);
--- branches/work/kst/portto4/kst/src/libkst/measuretime.cpp #1320818:1320819
@@ -112,8 +112,8 @@
 void MeasureTime::print()
 {
   measure();
-  //std::cout << qPrintable(name) << ": " << interval << " seconds\n";
-  //Kst::Debug::self()->log(QString("Timing: %2 sec, Scope: %1").arg(name).arg(interval), Kst::Debug::DebugLog);
+  //qDebug() << "MeasureTime in " << name << ": " << interval << " seconds\n";
+  Kst::Debug::self()->log(QString("Timing: %2 sec, Scope: %1").arg(name).arg(interval), Kst::Debug::DebugLog);
 }
 
 
--- branches/work/kst/portto4/kst/src/libkst/updatemanager.cpp #1320818:1320819
@@ -15,7 +15,7 @@
 #include "primitive.h"
 #include "datasource.h"
 #include "objectstore.h"
-
+#include "measuretime.h"
 #include <QCoreApplication>
 #include <QTimer>
 #include <QDebug>
@@ -105,9 +105,10 @@
 
   //qDebug() << "ds up: " << n_updated << "  ds def: " << n_deferred << " n_no: " << n_unchanged;
 
+  MeasureTime t(" UpdateManager::doUpdates loop");
+
   int i_loop = retval = 0;
   int maxloop = _store->objectList().size();
-  //qDebug() << "starting update loop.  Maxloop: " << maxloop;
   do {
     n_updated = n_unchanged = n_deferred = 0;
     // update data objects
@@ -125,7 +126,6 @@
     i_loop++;
   } while ((n_deferred + n_updated > 0) && (i_loop<=maxloop));
 
-  //qDebug() << " update elapsed:" << i_loop << double(_time.elapsed())/1000.0;
   emit objectsUpdated(_serial);
 }
 }


More information about the Kst mailing list