[Kst] branches/work/kst/portto4/kst/src/datasources/ascii
Peter Kümmel
syntheticpp at gmx.net
Wed Oct 17 07:54:55 UTC 2012
SVN commit 1320841 by kuemmel:
multi threaded read with sliding window
sliding window with 8 threads:
15s down to 7s
M +8 -1 asciidatareader.cpp
M +39 -7 asciifilebuffer.cpp
M +6 -2 asciifilebuffer.h
M +7 -0 asciifiledata.cpp
M +4 -2 asciifiledata.h
M +44 -10 asciisource.cpp
M +6 -1 asciisource.h
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciidatareader.cpp #1320840:1320841
@@ -281,11 +281,13 @@
const ColumnDelimiter& column_del, const CommentDelimiter& comment_del,
const ColumnWidthsAreConst& are_column_widths_const) const
{
- LexicalCast lexc;
+ LexicalCast& lexc(*new LexicalCast);
{
+ // TODO move
QMutexLocker lock(&_localeMutex);
lexc.setDecimalSeparator(_config._useDot);
}
+
const QString delimiters = _config._delimiters.value();
bool is_custom = (_config._columnType.value() == AsciiSourceConfig::Custom);
@@ -333,6 +335,11 @@
}
}
}
+
+ {
+ QMutexLocker lock(&_localeMutex);
+ delete &lexc;
+ }
return n;
}
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.cpp #1320840:1320841
@@ -64,13 +64,6 @@
return _fileData;
}
-//-------------------------------------------------------------------------------------------
-void AsciiFileBuffer::logData(const QVector<AsciiFileData>& chunks) const
-{
- foreach (const AsciiFileData& chunk, chunks) {
- chunk.logData();
- }
-}
//-------------------------------------------------------------------------------------------
int AsciiFileBuffer::findRowOfPosition(const AsciiFileBuffer::RowIndex& rowIndex, int searchStart, int pos) const
@@ -169,6 +162,45 @@
}
//-------------------------------------------------------------------------------------------
+void AsciiFileBuffer::readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int chunkSize, int numSubChunks)
+{
+ _slidingWindow.clear();
+ _bytesRead = 0;
+
+ int subChunkSize = chunkSize / numSubChunks;
+
+ QVector<AsciiFileData> sharedArrays;
+ for (int i = 0; i < numSubChunks; i++) {
+ AsciiFileData sharedArray;
+ sharedArray.setFile(_file);
+ if (!sharedArray.resize(subChunkSize)) {
+ Kst::Debug::self()->log(QString("AsciiFileBuffer: not enough memory available for creating sub chunks for sliding window"));
+ return;
+ }
+ sharedArrays.push_back(sharedArray);
+ }
+
+ QVector<AsciiFileData> chunks = splitFile(subChunkSize, rowIndex, start, bytesToRead);
+ int i = 0;
+ while (i < chunks.size()) {
+ QVector<AsciiFileData> subChunks;
+ for (int s = 0; s < sharedArrays.size(); s++) {
+ chunks[i].setSharedArray(sharedArrays[s]);
+ chunks[i].setFile(_file);
+ chunks[i].setLazyRead(false); //!
+ subChunks.push_back(chunks[i]);
+ i++;
+ if (i >= chunks.size())
+ break;
+ }
+ //qDebug() << "Sub chunks:"; AsciiFileData::logData(subChunks);
+ _slidingWindow.push_back(subChunks);
+ }
+ _begin = start;
+ _bytesRead = bytesToRead;
+}
+
+//-------------------------------------------------------------------------------------------
void AsciiFileBuffer::readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes)
{
clear();
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifilebuffer.h #1320840:1320841
@@ -33,20 +33,24 @@
void setFile(QFile* file);
void readWholeFile(const RowIndex& rowIndex, int start, int bytesToRead, int numChunks, int maximalBytes = -1);
+ const QVector<AsciiFileData>& data() const; // -> wholeFile();
+
void readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int maximalBytes = -1);
+ void readFileSlidingWindow(const RowIndex& rowIndex, int start, int bytesToRead, int chunkSize, int numSubChunks);
- const QVector<AsciiFileData>& data() const;
+ const QVector<QVector<AsciiFileData> >& slidingWindow() const { return _slidingWindow; }
static bool openFile(QFile &file);
private:
QFile* _file;
QVector<AsciiFileData> _fileData;
+ QVector<QVector<AsciiFileData> > _slidingWindow;
+
int _begin;
int _bytesRead;
const int _defaultChunkSize;
- void logData(const QVector<AsciiFileData>& chunks) const;
const QVector<AsciiFileData> splitFile(int chunkSize, const RowIndex& rowIndex, int start, int bytesToRead) const;
int findRowOfPosition(const AsciiFileBuffer::RowIndex& rowIndex, int searchStart, int pos) const;
};
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifiledata.cpp #1320840:1320841
@@ -217,6 +217,13 @@
.arg(_lazyRead).arg(bytesRead(), 8).arg(rowsRead(), 8);
}
+//-------------------------------------------------------------------------------------------
+void AsciiFileData::logData(const QVector<AsciiFileData>& chunks)
+{
+ foreach (const AsciiFileData& chunk, chunks) {
+ chunk.logData();
+ }
+}
//-------------------------------------------------------------------------------------------
void AsciiFileData::setSharedArray(AsciiFileData& arrayData)
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciifiledata.h #1320840:1320841
@@ -64,10 +64,12 @@
inline void setRowBegin(int begin) { _rowBegin = begin; }
inline void setRowsRead(int read) { _rowsRead = read; }
+ void setSharedArray(AsciiFileData&);
+
+
void logData() const;
+ static void logData(const QVector<AsciiFileData>& chunks);
- void setSharedArray(AsciiFileData&);
-
private:
QSharedPointer<Array> _array;
QFile* _file;
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.cpp #1320840:1320841
@@ -278,7 +278,11 @@
_fileBuffer.setFile(file);
if (useSlidingWindow(bytesToRead)) {
+ if (_config._useThreads) {
+ _fileBuffer.readFileSlidingWindow(_reader.rowIndex(), begin, bytesToRead, _config._limitFileBufferSize, numThreads);
+ } else {
_fileBuffer.readFileSlidingWindow(_reader.rowIndex(), begin, bytesToRead);
+ }
} else {
_fileBuffer.readWholeFile(_reader.rowIndex(), begin, bytesToRead, numThreads);
}
@@ -289,26 +293,56 @@
_reader.detectLineEndingType(*file);
}
+ if (_config._useThreads) {
+ if (!useSlidingWindow(bytesToRead)) {
+ //Q_ASSERT(n == readFieldSingleThreaded(_fileBuffer.data(), col, v, field));
+ return readFieldMultiThreaded(_fileBuffer.data(), col, v, field);
+ } else {
+ const QVector<QVector<AsciiFileData> >& slidingWindow = _fileBuffer.slidingWindow();
+ int sRead = 0;
+ //int sRead_check = 0;
+ foreach (const QVector<AsciiFileData>& window, slidingWindow) {
+ foreach(AsciiFileData fileData, window) {
+ if (!fileData.read()) {
+ return 0;
+ }
+ }
+ //Q_ASSERT(sRead_check = readFieldSingleThreaded(window, col, v, field, sRead_check));
+ sRead += readFieldMultiThreaded(window, col, v, field);
+ }
+ return sRead;
+ }
+ } else {
+ return readFieldSingleThreaded(_fileBuffer.data(), col, v, field);
+ }
+}
- int sRead = 0;
- if (_config._useThreads && !useSlidingWindow(bytesToRead)) {
+//-------------------------------------------------------------------------------------------
+int AsciiSource::readFieldSingleThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field, int sRead)
+{
+ AsciiFileData::logData(fileData);
+ foreach (const AsciiFileData& chunk, fileData) {
+ Q_ASSERT(sRead == chunk.rowBegin());
+ sRead += _reader.readField(chunk, col, v + chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
+ }
+ return sRead;
+}
+
+
+//-------------------------------------------------------------------------------------------
+int AsciiSource::readFieldMultiThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field)
+{
QFutureSynchronizer<int> readFutures;
const QVector<AsciiFileData>& data = _fileBuffer.data();
- foreach (const AsciiFileData& chunk, data) {
+ foreach (const AsciiFileData& chunk, fileData) {
QFuture<int> future = QtConcurrent::run(&_reader, &AsciiDataReader::readFieldChunk, chunk, col, v, field);
readFutures.addFuture(future);
}
readFutures.waitForFinished();
+ int sRead = 0;
foreach (const QFuture<int> future, readFutures.futures()) {
sRead += future.result();
}
- } else {
- const QVector<AsciiFileData>& data = _fileBuffer.data();
- foreach (const AsciiFileData& chunk, data) {
- Q_ASSERT(sRead == chunk.rowBegin());
- sRead += _reader.readField(chunk, col, v + chunk.rowBegin(), field, chunk.rowBegin(), chunk.rowsRead());
- }
- }
return sRead;
}
--- branches/work/kst/portto4/kst/src/datasources/ascii/asciisource.h #1320840:1320841
@@ -82,9 +82,14 @@
QStringList _fieldList;
QMap<QString, QString> _fieldUnits;
- int readField(double *v, const QString &field, int s, int n, bool& success);
bool useSlidingWindow(int bytesToRead) const;
+ int readField(double *v, const QString &field, int s, int n, bool& success);
+ int readFieldSingleThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field, int sRead = 0);
+ int readFieldMultiThreaded(const QVector<AsciiFileData>& fileData, int col, double* v, const QString& field);
+
+
+
int columnOfField(const QString& field) const;
static QStringList splitHeaderLine(const QByteArray& line, AsciiSourceConfig* cfg);
More information about the Kst
mailing list