[rkward-cvs] SF.net SVN: rkward:[3185] branches/2010_10_18_backend_restructuring_branch/ rkward/rbackend
tfry at users.sourceforge.net
tfry at users.sourceforge.net
Fri Nov 12 13:35:49 UTC 2010
Revision: 3185
http://rkward.svn.sourceforge.net/rkward/?rev=3185&view=rev
Author: tfry
Date: 2010-11-12 13:35:49 +0000 (Fri, 12 Nov 2010)
Log Message:
-----------
Split output buffer handling out of RKRBackend.
Start implementing frontend side to split process backend (still entirely untested).
Modified Paths:
--------------
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp 2010-11-12 13:35:49 UTC (rev 3185)
@@ -298,7 +298,7 @@
// NOTE: the order of processing is: first try to submit the next command, then handle the old command.
// The reason for doing it this way, instead of the reverse, is that this allows the backend thread / process to continue working, concurrently
- // NOTE: cproxy should only even be 0 in the very first cycle
+ // NOTE: cproxy should only ever be 0 in the very first cycle
if (cproxy) popPreviousCommand ();
command_requests.append (request);
tryNextCommand ();
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp 2010-11-12 13:35:49 UTC (rev 3185)
@@ -577,7 +577,6 @@
RK_ASSERT (this_pointer == 0);
this_pointer = this;
- out_buf_len = 0;
}
#ifdef Q_WS_WIN
@@ -1049,9 +1048,6 @@
}
-#define MAX_BUF_LENGTH 16000
-#define OUTPUT_STRING_RESERVE 1000
-
void RKRBackend::run () {
RK_TRACE (RBACKEND);
killed = NotKilled;
@@ -1118,70 +1114,13 @@
RCommandProxy* RKRBackend::fetchNextCommand () {
RK_TRACE (RBACKEND);
- RBackendRequest req (true, RBackendRequest::CommandOut);
+ RBackendRequest req (killed == NotKilled, RBackendRequest::CommandOut); // when killed, we don't wait for the reply. Thus, request is async, then.
req.command = previous_command;
previous_command = 0;
return (handleRequest (&req, false));
}
-void RKRBackend::waitIfOutputBufferExceeded () {
- // don't trace
- while (out_buf_len > MAX_BUF_LENGTH) {
- if (isKilled ()) return; // don't block. Frontend could be crashed
- RKRBackendProtocolBackend::msleep (10);
- }
-}
-
-void RKRBackend::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
- if (!buf_length) return;
- RK_TRACE (RBACKEND);
-
- RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
- waitIfOutputBufferExceeded ();
-
- output_buffer_mutex.lock ();
-
- ROutput *current_output = 0;
- if (!output_buffer.isEmpty ()) {
- // Merge with previous output fragment, if of the same type
- current_output = output_buffer.last ();
- if (current_output->type != output_type) current_output = 0;
- }
- if (!current_output) {
- current_output = new ROutput;
- current_output->type = output_type;
- current_output->output.reserve (OUTPUT_STRING_RESERVE);
- output_buffer.append (current_output);
- }
- current_output->output.append (output);
- out_buf_len += buf_length;
-
- output_buffer_mutex.unlock ();
-}
-
-ROutputList RKRBackend::flushOutput (bool forcibly) {
- ROutputList ret;
-
- if (out_buf_len == 0) return ret; // if there is absolutely no output, just skip.
- RK_TRACE (RBACKEND);
-
- if (!forcibly) {
- if (!output_buffer_mutex.tryLock ()) return ret;
- } else {
- output_buffer_mutex.lock ();
- }
-
- RK_ASSERT (!output_buffer.isEmpty ()); // see check for out_buf_len, above
-
- ret = output_buffer;
- output_buffer.clear ();
- out_buf_len = 0;
-
- output_buffer_mutex.unlock ();
- return ret;
-}
-
void RKRBackend::handleHistoricalSubstackRequest (const QStringList &list) {
RK_TRACE (RBACKEND);
@@ -1288,3 +1227,11 @@
changed_symbol_names.clear ();
}
}
+
+bool RKRBackend::doMSleep (int msecs) {
+ // do not trace!
+ if (isKilled ()) return false;
+ RKRBackendProtocolBackend::msleep (msecs);
+ return true;
+}
+
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h 2010-11-12 13:35:49 UTC (rev 3185)
@@ -67,7 +67,7 @@
@author Thomas Friedrichsmeier
*/
-class RKRBackend {
+class RKRBackend : public RKROutputBuffer {
public:
/** constructor. Only one RKRBackend should ever be created, and that happens in RInterface::RInterface (). */
RKRBackend ();
@@ -93,6 +93,8 @@
protected:
/** low-level initialization of R */
bool startR ();
+/** reimplemented from RKROutputBuffer */
+ bool doMSleep (int msecs);
public:
/** convenience low-level function for running a command, directly
@param command command to be runCommand
@@ -106,13 +108,6 @@
/** call this periodically to make R's x11 windows process their events */
static void processX11Events ();
-/** This gets called on normal R output (R_WriteConsole). Used to get at output. */
- void handleOutput (const QString &output, int len, ROutput::ROutputType type);
-
-/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
- at param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
- ROutputList flushOutput (bool forcibly=false);
-
void handleRequest (RBackendRequest *request) { handleRequest (request, true); };
/** A relic of history. Eventually most of these will be replaced by dedicated RBackendRequests. */
void handleHistoricalSubstackRequest (const QStringList &list);
@@ -175,8 +170,6 @@
void run ();
static void scheduleInterrupt ();
protected:
-/** If the length of the current output buffer is too long, this will pause any further output until the main thread has had a chance to catch up. */
- void waitIfOutputBufferExceeded ();
RCommandProxy* handleRequest (RBackendRequest *request, bool mayHandleSubstack);
private:
/** set up R standard callbacks */
@@ -193,13 +186,6 @@
/** check wether the object list / global environment / individual symbols have changed, and updates them, if needed */
void checkObjectUpdatesNeeded (bool check_list);
- /** current output */
- ROutputList output_buffer;
-/** Provides thread-safety for the output_buffer */
- QMutex output_buffer_mutex;
-/** current length of output. If the backlog of output which has not yet been processed by the frontend becomes too long, output will be paused, automatically */
- int out_buf_len;
-
/** The previously executed command. Only non-zero until a new command has been requested. */
RCommandProxy *previous_command;
};
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp 2010-11-12 13:35:49 UTC (rev 3185)
@@ -18,14 +18,98 @@
#include "rkrbackendprotocol_frontend.h"
#include "rinterface.h"
+
+#include <QThread>
+
#ifdef RKWARD_THREADED
-# include <QThread>
# include "rkrbackend.h"
# include "rkrbackendprotocol_backend.h"
+#else
+# include <QLocalServer>
+# include <QLocalSocket>
+# include <QProcess>
#endif
#include "../debug.h"
+#ifndef RKWARD_THREADED
+ class RKFrontendTransmitter : public QThread, public QObject, public RKROutputBuffer {
+ Q_OBJECT
+ public:
+ RKFrontendTransmitter () {
+ RK_TRACE (RBACKEND);
+
+ // start server
+ connection = 0;
+ if (!server.listen ("rkward")) handleTransmitError ("failure to start frontend server: " + server.errorString ());
+ connect (&server, SIGNAL (newConnection ()), this, SLOT (connectAndEnterLoop ()));
+
+ // start backend
+ QStringList args;
+ args.append ("--debug-level " + QString::number (RK_Debug_Level));
+ args.append ("--server-name " + server.fullName ());
+ backend.setProcessChannelMode (QProcess::MergedChannels); // at least for now. Seems difficult to get interleaving right, without this.
+ connect (&backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
+ connect (&backend, SIGNAL (finished (int, QProcess::ExitStatus)), this, SLOT (backendExit (int, QProcess::ExitStatus)));
+ backend.start ("rkward_backend", args, QIODevice::ReadOnly);
+ };
+
+ ~RKFrontendTransmitter () {
+ RK_TRACE (RBACKEND);
+
+ RK_ASSERT (!server.isListening ());
+ delete connection;
+ };
+
+ void run () {
+ RK_ASSERT (connection);
+
+ connection->moveToThread (this);
+ connect (connection, SIGNAL (stateChanged (QLocalSocket::LocalSocketState)), this, SLOT (connectionStateChanged (QLocalSocket::LocalSocketState)));
+ connect (connection, SIGNAL (readyRead ()), this, SLOT (newConnectionData ()));
+ backend.moveToThread (this);
+
+ exec ();
+ }
+ private slots:
+ void connectAndEnterLoop () {
+ RK_TRACE (RBACKEND);
+ RK_ASSERT (server.hasPendingConnections ());
+
+ connection = server.nextPendingConnection ();
+ server.close ();
+
+ start ();
+ };
+
+ void newProcessOutput () {
+ #error
+ };
+ void newConnectionData () {
+ #error
+ };
+ void backendExit (int exitcode, QProcess::ExitStatus exitstatus) {
+ #error
+ };
+ void connectionStateChanged (QLocalSocket::LocalSocketState state) {
+ if (state != QLocalSocket::UnconnectedState) return; // only interested in connection failure
+ #error
+
+ };
+ private:
+ void handleTransmitError (const QString &message) {
+ RK_TRACE (RBACKEND);
+ #warning: Show those errors to the user!
+
+ qDebug ("%s", qPrintable (message));
+ }
+
+ QProcess backend;
+ QLocalServer server;
+ QLocalSocket* connection;
+ };
+#endif
+
RKRBackendProtocolFrontend* RKRBackendProtocolFrontend::_instance = 0;
RKRBackendProtocolFrontend::RKRBackendProtocolFrontend (RInterface* parent) : QObject (parent) {
RK_TRACE (RBACKEND);
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp 2010-11-12 13:35:49 UTC (rev 3185)
@@ -34,6 +34,7 @@
RK_ASSERT ((type & RCommand::Internal) || (getDataType () == RData::NoData));
}
+
RBackendRequest::RBackendRequest (bool synchronous, RCallbackType type) {
RK_TRACE (RBACKEND);
@@ -177,10 +178,123 @@
RK_TRACE (RBACKEND);
RData* ret = new RData;
- #error
+ RDataType type;
+ stream >> (qint8) (RDataType) type;
+ if (type == RData::IntVector) {
+ RData::IntStorage data << stream;
+ ret->setData (data);
+ } else if (type == RData::StringVector) {
+ RData::StringStorage data << steam;
+ ret->setData (data);
+ } else if (type == RData::RealVector) {
+ RData::RealStorage data << stream;
+ ret->setData (data);
+ } else if (type == RData::StructureVector) {
+ RData::RDataStorage data;
+ qint32 len << stream;
+#if QT_VERSION >= 0x040700
+ data.reserve (len);
+#endif
+ for (qint32 i = 0; i < lne; ++i) {
+ data.append (unserializeData (stream));
+ }
+ ret->setData (data);
+ } else {
+ RK_ASSERT (type == RData::NoData);
+ }
}
- void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
-#error
- *RCommandProxy RKRBackendSerializer::unserializeProxy (QDataStream &stream);
+ void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ stream << proxy.command;
+ stream << (qint32) proxy.type;
+ stream << (qint32) proxy.id;
+ stream << (qint32) proxy.status;
+
+ serializeData (proxy, stream);
+ }
+
+ *RCommandProxy RKRBackendSerializer::unserializeProxy (QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ QString command << stream;
+ qint32 type << stream;
+ RCommandProxy* ret = new RCommandProxy (command, type);
+ proxy->id << (qint32) stream;
+ proxy->status << (qint32) stream;
+
+ RData *data = unserializeData (stream);
+ proxy->swallowData (*data);
+ delete (data);
+ return proxy;
+ }
#endif
+
+
+#define MAX_BUF_LENGTH 16000
+#define OUTPUT_STRING_RESERVE 1000
+
+RKROutputBuffer::RKROutputBuffer () {
+ RK_TRACE (RBACKEND);
+
+ out_buf_len = 0;
+}
+
+RKROutputBuffer::~RKROutputBuffer () {
+ RK_TRACE (RBACKEND);
+}
+
+void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
+ if (!buf_length) return;
+ RK_TRACE (RBACKEND);
+
+ RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
+
+ // wait while the output buffer is exceeded to give downstream threads a chance to catch up
+ while (out_buf_len > MAX_BUF_LENGTH) {
+ if (!doMSleep (10)) break;
+ }
+
+ output_buffer_mutex.lock ();
+
+ ROutput *current_output = 0;
+ if (!output_buffer.isEmpty ()) {
+ // Merge with previous output fragment, if of the same type
+ current_output = output_buffer.last ();
+ if (current_output->type != output_type) current_output = 0;
+ }
+ if (!current_output) {
+ current_output = new ROutput;
+ current_output->type = output_type;
+ current_output->output.reserve (OUTPUT_STRING_RESERVE);
+ output_buffer.append (current_output);
+ }
+ current_output->output.append (output);
+ out_buf_len += buf_length;
+
+ output_buffer_mutex.unlock ();
+}
+
+ROutputList RKROutputBuffer::flushOutput (bool forcibly) {
+ ROutputList ret;
+
+ if (out_buf_len == 0) return ret; // if there is absolutely no output, just skip.
+ RK_TRACE (RBACKEND);
+
+ if (!forcibly) {
+ if (!output_buffer_mutex.tryLock ()) return ret;
+ } else {
+ output_buffer_mutex.lock ();
+ }
+
+ RK_ASSERT (!output_buffer.isEmpty ()); // see check for out_buf_len, above
+
+ ret = output_buffer;
+ output_buffer.clear ();
+ out_buf_len = 0;
+
+ output_buffer_mutex.unlock ();
+ return ret;
+}
+
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h 2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h 2010-11-12 13:35:49 UTC (rev 3185)
@@ -23,6 +23,7 @@
#endif
#include <QVariantMap>
+#include <QMutex>
#include "rcommand.h"
class RCommandProxy;
@@ -142,4 +143,27 @@
};
#endif
+class RKROutputBuffer {
+public:
+ RKROutputBuffer ();
+ virtual ~RKROutputBuffer ();
+
+/** This gets called on normal R output (R_WriteConsole). Used to get at output. */
+ void handleOutput (const QString &output, int len, ROutput::ROutputType type);
+
+/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
+ at param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
+ ROutputList flushOutput (bool forcibly=false);
+protected:
+/** Function to be called while waiting for downstream threads to catch up. Return false to make the buffer continue, immediately (e.g. to prevent lockups after a crash) */
+ virtual bool doMSleep (int msecs) = 0;
+private:
+/** current output */
+ ROutputList output_buffer;
+/** Provides thread-safety for the output_buffer */
+ QMutex output_buffer_mutex;
+/** current length of output. If the backlog of output which has not yet been processed by the frontend becomes too long, output will be paused, automatically */
+ int out_buf_len;
+};
+
#endif
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
More information about the rkward-tracker
mailing list