[Kst] branches/work/kst/portto4/kst/src/datasources/ascii

Peter Kümmel syntheticpp at gmx.net
Wed Oct 17 07:54:55 UTC 2012


SVN commit 1320841 by kuemmel:

multi threaded read with sliding window

sliding window with 8 threads:
15s down to 7s

 M  +8 -1      asciidatareader.cpp  
 M  +39 -7     asciifilebuffer.cpp  
 M  +6 -2      asciifilebuffer.h  
 M  +7 -0      asciifiledata.cpp  
 M  +4 -2      asciifiledata.h  
 M  +44 -10    asciisource.cpp  
 M  +6 -1      asciisource.h  


--- branches/work/kst/portto4/kst/src/datasources/ascii/asciidatareader.cpp #1320840:1320841
@@ -281,11 +281,13 @@
                                  const ColumnDelimiter& column_del, const CommentDelimiter& comment_del,
                                  const ColumnWidthsAreConst& are_column_widths_const) const
 {
-  LexicalCast lexc;
+  LexicalCast& lexc(*new LexicalCast);
   {
+    // TODO move
     QMutexLocker lock(&_localeMutex);
     lexc.setDecimalSeparator(_config._useDot);
   }
+
   const QString delimiters = _config._delimiters.value();
 
   bool is_custom = (_config._columnType.value() == AsciiSourceConfig::Custom);
@@ -333,6 +335,11 @@
       }
     }
   }
+
+  {
+    QMutexLocker lock(&_localeMutex);
+    delete &lexc;
+  }
   return n;
 }
 
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.cpp #1320840:1320841
@@ -64,13 +64,6 @@
   return _fileData;
 }
 
-//-------------------------------------------------------------------------------------------
-void AsciiFileBuffer::logData(const QVector<AsciiFileData>& chunks) const
-{
-  foreach (const AsciiFileData& chunk, chunks) {
-    chunk.logData();
-  }
-}
 
 //-------------------------------------------------------------------------------------------
 int AsciiFileBuffer::findRowOfPosition(const AsciiFileBuffer::RowIndex& rowIndex, int searchStart, int pos) const
@@ -169,6 +162,45 @@
 }
 
 //-------------------------------------------------------------------------------------------
+void AsciiFileBuffer::readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int chunkSize, int numSubChunks)
+{
+  _slidingWindow.clear();
+  _bytesRead = 0;
+
+  int subChunkSize = chunkSize / numSubChunks;
+
+  QVector<AsciiFileData> sharedArrays;
+  for (int i = 0; i < numSubChunks; i++) {
+    AsciiFileData sharedArray;
+    sharedArray.setFile(_file);
+    if (!sharedArray.resize(subChunkSize)) {
+      Kst::Debug::self()->log(QString("AsciiFileBuffer: not enough memory available for creating sub chunks for sliding window"));
+      return;
+    }
+    sharedArrays.push_back(sharedArray);
+  }
+
+  QVector<AsciiFileData> chunks = splitFile(subChunkSize, rowIndex, start, bytesToRead);
+  int i = 0;
+  while (i < chunks.size()) {
+    QVector<AsciiFileData> subChunks;
+    for (int s = 0; s < sharedArrays.size(); s++) {
+      chunks[i].setSharedArray(sharedArrays[s]);
+      chunks[i].setFile(_file);
+      chunks[i].setLazyRead(false); //!
+      subChunks.push_back(chunks[i]);
+      i++;
+      if (i >= chunks.size())
+        break;
+    }
+    //qDebug() << "Sub chunks:"; AsciiFileData::logData(subChunks);
+    _slidingWindow.push_back(subChunks);
+  }
+  _begin = start;
+  _bytesRead = bytesToRead;
+}
+
+//-------------------------------------------------------------------------------------------
 void AsciiFileBuffer::readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes)
 {
   clear();
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.h #1320840:1320841
@@ -33,20 +33,24 @@
 
   void setFile(QFile* file);
   void readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int numChunks, int maximalBytes = -1);
+  const QVector<AsciiFileData>& data() const; // -> wholeFile();
+  
   void readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes = -1);
+  void readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int chunkSize, int numSubChunks);
 
-  const QVector<AsciiFileData>& data() const;
+  const QVector<QVector<AsciiFileData> >& slidingWindow() const { return _slidingWindow; }
 
   static bool openFile(QFile &file);
   
 private:
   QFile* _file;
   QVector<AsciiFileData> _fileData;
+  QVector<QVector<AsciiFileData> > _slidingWindow;
+
   int _begin;
   int _bytesRead;
   const int _defaultChunkSize;
 
-  void logData(const QVector<AsciiFileData>& chunks) const;
   const QVector<AsciiFileData> splitFile(int chunkSize, const RowIndex& rowIndex, int start, int bytesToRead) const;
   int findRowOfPosition(const AsciiFileBuffer::RowIndex& rowIndex, int searchStart, int pos) const;
 };
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifiledata.cpp #1320840:1320841
@@ -217,6 +217,13 @@
     .arg(_lazyRead).arg(bytesRead(), 8).arg(rowsRead(), 8);
 }
 
+//-------------------------------------------------------------------------------------------
+void AsciiFileData::logData(const QVector<AsciiFileData>& chunks)
+{
+  foreach (const AsciiFileData& chunk, chunks) {
+    chunk.logData();
+  }
+}
 
 //-------------------------------------------------------------------------------------------
 void AsciiFileData::setSharedArray(AsciiFileData& arrayData)
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifiledata.h #1320840:1320841
@@ -64,10 +64,12 @@
   inline void setRowBegin(int begin) { _rowBegin = begin; }
   inline void setRowsRead(int read) { _rowsRead = read; }
 
+  void setSharedArray(AsciiFileData&);
+
+
   void logData() const;
+  static void logData(const QVector<AsciiFileData>& chunks);
 
-  void setSharedArray(AsciiFileData&);
-
 private:
   QSharedPointer<Array> _array;
   QFile* _file;
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.cpp #1320840:1320841
@@ -278,7 +278,11 @@
 
     _fileBuffer.setFile(file);
     if (useSlidingWindow(bytesToRead)) {
+      if (_config._useThreads) {
+        _fileBuffer.readFileSlidingWindow(_reader.rowIndex(), begin, bytesToRead, _config._limitFileBufferSize, numThreads);
+      } else {
       _fileBuffer.readFileSlidingWindow(_reader.rowIndex(), begin, bytesToRead);
+      }
     } else {
       _fileBuffer.readWholeFile(_reader.rowIndex(), begin, bytesToRead, numThreads);
     }
@@ -289,26 +293,56 @@
     _reader.detectLineEndingType(*file);
   }
   
+  if (_config._useThreads) {
+    if (!useSlidingWindow(bytesToRead)) {
+      //Q_ASSERT(n == readFieldSingleThreaded(_fileBuffer.data(), col, v, field));
+      return readFieldMultiThreaded(_fileBuffer.data(), col, v, field);
+    } else {
+      const QVector<QVector<AsciiFileData> >& slidingWindow = _fileBuffer.slidingWindow();
+      int sRead = 0;
+      //int sRead_check = 0;
+      foreach (const QVector<AsciiFileData>& window, slidingWindow) {
+        foreach(AsciiFileData fileData, window) {
+          if (!fileData.read()) {
+            return 0;
+          }
+        }
+        //Q_ASSERT(sRead_check = readFieldSingleThreaded(window, col, v, field, sRead_check));
+        sRead += readFieldMultiThreaded(window, col, v, field);
+      }
+      return sRead;
+    }
+  } else  {
+    return readFieldSingleThreaded(_fileBuffer.data(), col, v, field);
+  }
+}
 
-  int sRead = 0;
-  if (_config._useThreads && !useSlidingWindow(bytesToRead)) {
+//-------------------------------------------------------------------------------------------
+int AsciiSource::readFieldSingleThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field, int sRead)
+{
+  AsciiFileData::logData(fileData);
+  foreach (const AsciiFileData& chunk, fileData) {
+    Q_ASSERT(sRead ==  chunk.rowBegin());
+    sRead += _reader.readField(chunk, col, v + chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
+  }
+  return sRead;
+}
+
+
+//-------------------------------------------------------------------------------------------
+int AsciiSource::readFieldMultiThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field)
+{
     QFutureSynchronizer<int> readFutures;
     const QVector<AsciiFileData>& data = _fileBuffer.data();
-    foreach (const AsciiFileData& chunk, data) {
+  foreach (const AsciiFileData& chunk, fileData) {
       QFuture<int> future = QtConcurrent::run(&_reader, &AsciiDataReader::readFieldChunk, chunk, col, v, field);
       readFutures.addFuture(future);
     }
     readFutures.waitForFinished();
+  int sRead = 0;
     foreach (const QFuture<int> future, readFutures.futures()) {
       sRead += future.result();
     }
-  } else  {
-    const QVector<AsciiFileData>& data = _fileBuffer.data();
-    foreach (const AsciiFileData& chunk, data) {
-      Q_ASSERT(sRead ==  chunk.rowBegin());
-      sRead += _reader.readField(chunk, col, v + chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
-    }
-  }
   return sRead;
 }
 
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.h #1320840:1320841
@@ -82,9 +82,14 @@
     QStringList _fieldList;
     QMap<QString, QString> _fieldUnits;
 
-    int readField(double *v, const QString &field, int s, int n, bool& success);
     bool useSlidingWindow(int bytesToRead)  const;
 
+    int readField(double *v, const QString &field, int s, int n, bool& success);
+    int readFieldSingleThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field, int sRead = 0);
+    int readFieldMultiThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field);
+    
+    
+
     int columnOfField(const QString& field) const;
     static QStringList splitHeaderLine(const QByteArray& line, AsciiSourceConfig* cfg);
 


More information about the Kst mailing list