[rkward-cvs] SF.net SVN: rkward:[3185] branches/2010_10_18_backend_restructuring_branch/ rkward/rbackend

tfry at users.sourceforge.net tfry at users.sourceforge.net
Fri Nov 12 13:35:49 UTC 2010


Revision: 3185
          http://rkward.svn.sourceforge.net/rkward/?rev=3185&view=rev
Author:   tfry
Date:     2010-11-12 13:35:49 +0000 (Fri, 12 Nov 2010)

Log Message:
-----------
Split output buffer handling out of RKRBackend.
Start implementing frontend side to split process backend (still entirely untested).

Modified Paths:
--------------
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-11-12 13:35:49 UTC (rev 3185)
@@ -298,7 +298,7 @@
 
 		// NOTE: the order of processing is: first try to submit the next command, then handle the old command.
 		// The reason for doing it this way, instead of the reverse, is that this allows the backend thread / process to continue working, concurrently
-		// NOTE: cproxy should only even be 0 in the very first cycle
+		// NOTE: cproxy should only ever be 0 in the very first cycle
 		if (cproxy) popPreviousCommand ();
 		command_requests.append (request);
 		tryNextCommand ();

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp	2010-11-12 13:35:49 UTC (rev 3185)
@@ -577,7 +577,6 @@
 
 	RK_ASSERT (this_pointer == 0);
 	this_pointer = this;
-	out_buf_len = 0;
 }
 
 #ifdef Q_WS_WIN
@@ -1049,9 +1048,6 @@
 }
 
 
-#define MAX_BUF_LENGTH 16000
-#define OUTPUT_STRING_RESERVE 1000
-
 void RKRBackend::run () {
 	RK_TRACE (RBACKEND);
 	killed = NotKilled;
@@ -1118,70 +1114,13 @@
 RCommandProxy* RKRBackend::fetchNextCommand () {
 	RK_TRACE (RBACKEND);
 
-	RBackendRequest req (true, RBackendRequest::CommandOut);
+	RBackendRequest req (killed == NotKilled, RBackendRequest::CommandOut);		// when killed, we don't wait for the reply. Thus, request is async, then.
 	req.command = previous_command;
 	previous_command = 0;
 
 	return (handleRequest (&req, false));
 }
 
-void RKRBackend::waitIfOutputBufferExceeded () {
-	// don't trace
-	while (out_buf_len > MAX_BUF_LENGTH) {
-		if (isKilled ()) return;	// don't block. Frontend could be crashed
-		RKRBackendProtocolBackend::msleep (10);
-	}
-}
-
-void RKRBackend::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
-	if (!buf_length) return;
-	RK_TRACE (RBACKEND);
-
-	RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
-	waitIfOutputBufferExceeded ();
-
-	output_buffer_mutex.lock ();
-
-	ROutput *current_output = 0;
-	if (!output_buffer.isEmpty ()) {
-		// Merge with previous output fragment, if of the same type
-		current_output = output_buffer.last ();
-		if (current_output->type != output_type) current_output = 0;
-	}
-	if (!current_output) {
-		current_output = new ROutput;
-		current_output->type = output_type;
-		current_output->output.reserve (OUTPUT_STRING_RESERVE);
-		output_buffer.append (current_output);
-	}
-	current_output->output.append (output);
-	out_buf_len += buf_length;
-
-	output_buffer_mutex.unlock ();
-}
-
-ROutputList RKRBackend::flushOutput (bool forcibly) {
-	ROutputList ret;
-
-	if (out_buf_len == 0) return ret;		// if there is absolutely no output, just skip.
-	RK_TRACE (RBACKEND);
-
-	if (!forcibly) {
-		if (!output_buffer_mutex.tryLock ()) return ret;
-	} else {
-		output_buffer_mutex.lock ();
-	}
-
-	RK_ASSERT (!output_buffer.isEmpty ());	// see check for out_buf_len, above
-
-	ret = output_buffer;
-	output_buffer.clear ();
-	out_buf_len = 0;
-
-	output_buffer_mutex.unlock ();
-	return ret;
-}
-
 void RKRBackend::handleHistoricalSubstackRequest (const QStringList &list) {
 	RK_TRACE (RBACKEND);
 
@@ -1288,3 +1227,11 @@
 		changed_symbol_names.clear ();
 	}
 }
+
+bool RKRBackend::doMSleep (int msecs) {
+	// do not trace!
+	if (isKilled ()) return false;
+	RKRBackendProtocolBackend::msleep (msecs);
+	return true;
+}
+

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.h	2010-11-12 13:35:49 UTC (rev 3185)
@@ -67,7 +67,7 @@
 
 @author Thomas Friedrichsmeier
 */
-class RKRBackend {
+class RKRBackend : public RKROutputBuffer {
 public: 
 /** constructor. Only one RKRBackend should ever be created, and that happens in RInterface::RInterface (). */
 	RKRBackend ();
@@ -93,6 +93,8 @@
 protected:
 /** low-level initialization of R */
 	bool startR ();
+/** reimplemented from RKROutputBuffer */
+	bool doMSleep (int msecs);
 public:
 /** convenience low-level function for running a command, directly
 @param command command to be runCommand
@@ -106,13 +108,6 @@
 /** call this periodically to make R's x11 windows process their events */
 	static void processX11Events ();
 
-/** This gets called on normal R output (R_WriteConsole). Used to get at output. */
-	void handleOutput (const QString &output, int len, ROutput::ROutputType type);
-
-/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
- at param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
-	ROutputList flushOutput (bool forcibly=false);
-
 	void handleRequest (RBackendRequest *request) { handleRequest (request, true); };
 /** A relic of history. Eventually most of these will be replaced by dedicated RBackendRequests. */
 	void handleHistoricalSubstackRequest (const QStringList &list);
@@ -175,8 +170,6 @@
 	void run ();
 	static void scheduleInterrupt ();
 protected:
-/** If the length of the current output buffer is too long, this will pause any further output until the main thread has had a chance to catch up. */
-	void waitIfOutputBufferExceeded ();
 	RCommandProxy* handleRequest (RBackendRequest *request, bool mayHandleSubstack);
 private:
 /** set up R standard callbacks */
@@ -193,13 +186,6 @@
 /** check wether the object list / global environment / individual symbols have changed, and updates them, if needed */
 	void checkObjectUpdatesNeeded (bool check_list);
 
-	/** current output */
-	ROutputList output_buffer;
-/** Provides thread-safety for the output_buffer */
-	QMutex output_buffer_mutex;
-/** current length of output. If the backlog of output which has not yet been processed by the frontend becomes too long, output will be paused, automatically */
-	int out_buf_len;
-
 	/** The previously executed command. Only non-zero until a new command has been requested. */
 	RCommandProxy *previous_command;
 };

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp	2010-11-12 13:35:49 UTC (rev 3185)
@@ -18,14 +18,98 @@
 #include "rkrbackendprotocol_frontend.h"
 
 #include "rinterface.h"
+
+#include <QThread>
+
 #ifdef RKWARD_THREADED
-#	include <QThread>
 #	include "rkrbackend.h"
 #	include "rkrbackendprotocol_backend.h"
+#else
+#	include <QLocalServer>
+#	include <QLocalSocket>
+#	include <QProcess>
 #endif
 
 #include "../debug.h"
 
+#ifndef RKWARD_THREADED
+	class RKFrontendTransmitter : public QThread, public QObject, public RKROutputBuffer {
+		Q_OBJECT
+		public:
+			RKFrontendTransmitter () {
+				RK_TRACE (RBACKEND);
+
+				// start server
+				connection = 0;
+				if (!server.listen ("rkward")) handleTransmitError ("failure to start frontend server: " + server.errorString ());
+				connect (&server, SIGNAL (newConnection ()), this, SLOT (connectAndEnterLoop ()));
+
+				// start backend
+				QStringList args;
+				args.append ("--debug-level " + QString::number (RK_Debug_Level));
+				args.append ("--server-name " + server.fullName ());
+				backend.setProcessChannelMode (QProcess::MergedChannels);	// at least for now. Seems difficult to get interleaving right, without this.
+				connect (&backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
+				connect (&backend, SIGNAL (finished (int, QProcess::ExitStatus)), this, SLOT (backendExit (int, QProcess::ExitStatus)));
+				backend.start ("rkward_backend", args, QIODevice::ReadOnly);
+			};
+
+			~RKFrontendTransmitter () {
+				RK_TRACE (RBACKEND);
+
+				RK_ASSERT (!server.isListening ());
+				delete connection;
+			};
+
+			void run () {
+				RK_ASSERT (connection);
+
+				connection->moveToThread (this);
+				connect (connection, SIGNAL (stateChanged (QLocalSocket::LocalSocketState)), this, SLOT (connectionStateChanged (QLocalSocket::LocalSocketState)));
+				connect (connection, SIGNAL (readyRead ()), this, SLOT (newConnectionData ()));
+				backend.moveToThread (this);
+
+				exec ();
+			}
+		private slots:
+			void connectAndEnterLoop () {
+				RK_TRACE (RBACKEND);
+				RK_ASSERT (server.hasPendingConnections ());
+
+				connection = server.nextPendingConnection ();
+				server.close ();
+
+				start ();
+			};
+
+			void newProcessOutput () {
+				#error
+			};
+			void newConnectionData () {
+				#error
+			};
+			void backendExit (int exitcode, QProcess::ExitStatus exitstatus) {
+				#error
+			};
+			void connectionStateChanged (QLocalSocket::LocalSocketState state) {
+				if (state != QLocalSocket::UnconnectedState) return;		// only interested in connection failure
+				#error
+
+			};
+		private:
+			void handleTransmitError (const QString &message) {
+				RK_TRACE (RBACKEND);
+				#warning: Show those errors to the user!
+
+				qDebug ("%s", qPrintable (message));
+			}
+
+			QProcess backend;
+			QLocalServer server;
+			QLocalSocket* connection;
+	};
+#endif
+
 RKRBackendProtocolFrontend* RKRBackendProtocolFrontend::_instance = 0;
 RKRBackendProtocolFrontend::RKRBackendProtocolFrontend (RInterface* parent) : QObject (parent) {
 	RK_TRACE (RBACKEND);

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp	2010-11-12 13:35:49 UTC (rev 3185)
@@ -34,6 +34,7 @@
 	RK_ASSERT ((type & RCommand::Internal) || (getDataType () == RData::NoData));
 }
 
+
 RBackendRequest::RBackendRequest (bool synchronous, RCallbackType type) {
 	RK_TRACE (RBACKEND);
 
@@ -177,10 +178,123 @@
 		RK_TRACE (RBACKEND);
 
 		RData* ret = new RData;
-		#error
+		RDataType type;
+		stream >> (qint8) (RDataType) type;
+		if (type == RData::IntVector) {
+			RData::IntStorage data << stream;
+			ret->setData (data);
+		} else if (type == RData::StringVector) {
+			RData::StringStorage data << steam;
+			ret->setData (data);
+		} else if (type == RData::RealVector) {
+			RData::RealStorage data << stream;
+			ret->setData (data);
+		} else if (type == RData::StructureVector) {
+			RData::RDataStorage data;
+			qint32 len << stream;
+#if QT_VERSION >= 0x040700
+			data.reserve (len);
+#endif
+			for (qint32 i = 0; i < lne; ++i) {
+				data.append (unserializeData (stream));
+			}
+			ret->setData (data);
+		} else {
+			RK_ASSERT (type == RData::NoData);
+		}
 	}
 
-	void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
-#error
-	*RCommandProxy RKRBackendSerializer::unserializeProxy (QDataStream &stream);
+	void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		stream << proxy.command;
+		stream << (qint32) proxy.type;
+		stream << (qint32) proxy.id;
+		stream << (qint32) proxy.status;
+
+		serializeData (proxy, stream);
+	}
+
+	*RCommandProxy RKRBackendSerializer::unserializeProxy (QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		QString command << stream;
+		qint32 type << stream;
+		RCommandProxy* ret = new RCommandProxy (command, type);
+		proxy->id << (qint32) stream;
+		proxy->status << (qint32) stream;
+
+		RData *data = unserializeData (stream);
+		proxy->swallowData (*data);
+		delete (data);
+		return proxy;
+	}
 #endif
+
+
+#define MAX_BUF_LENGTH 16000
+#define OUTPUT_STRING_RESERVE 1000
+
+RKROutputBuffer::RKROutputBuffer () {
+	RK_TRACE (RBACKEND);
+
+	out_buf_len = 0;
+}
+
+RKROutputBuffer::~RKROutputBuffer () {
+	RK_TRACE (RBACKEND);
+}
+
+void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
+	if (!buf_length) return;
+	RK_TRACE (RBACKEND);
+
+	RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
+
+	// wait while the output buffer is exceeded to give downstream threads a chance to catch up
+	while (out_buf_len > MAX_BUF_LENGTH) {
+		if (!doMSleep (10)) break;
+	}
+
+	output_buffer_mutex.lock ();
+
+	ROutput *current_output = 0;
+	if (!output_buffer.isEmpty ()) {
+		// Merge with previous output fragment, if of the same type
+		current_output = output_buffer.last ();
+		if (current_output->type != output_type) current_output = 0;
+	}
+	if (!current_output) {
+		current_output = new ROutput;
+		current_output->type = output_type;
+		current_output->output.reserve (OUTPUT_STRING_RESERVE);
+		output_buffer.append (current_output);
+	}
+	current_output->output.append (output);
+	out_buf_len += buf_length;
+
+	output_buffer_mutex.unlock ();
+}
+
+ROutputList RKROutputBuffer::flushOutput (bool forcibly) {
+	ROutputList ret;
+
+	if (out_buf_len == 0) return ret;		// if there is absolutely no output, just skip.
+	RK_TRACE (RBACKEND);
+
+	if (!forcibly) {
+		if (!output_buffer_mutex.tryLock ()) return ret;
+	} else {
+		output_buffer_mutex.lock ();
+	}
+
+	RK_ASSERT (!output_buffer.isEmpty ());	// see check for out_buf_len, above
+
+	ret = output_buffer;
+	output_buffer.clear ();
+	out_buf_len = 0;
+
+	output_buffer_mutex.unlock ();
+	return ret;
+}
+

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h	2010-11-10 18:55:03 UTC (rev 3184)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h	2010-11-12 13:35:49 UTC (rev 3185)
@@ -23,6 +23,7 @@
 #endif
 
 #include <QVariantMap>
+#include <QMutex>
 #include "rcommand.h"
 
 class RCommandProxy;
@@ -142,4 +143,27 @@
 	};
 #endif
 
+class RKROutputBuffer {
+public:
+	RKROutputBuffer ();
+	virtual ~RKROutputBuffer ();
+
+/** This gets called on normal R output (R_WriteConsole). Used to get at output. */
+	void handleOutput (const QString &output, int len, ROutput::ROutputType type);
+
+/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
+ at param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
+	ROutputList flushOutput (bool forcibly=false);
+protected:
+/** Function to be called while waiting for downstream threads to catch up. Return false to make the buffer continue, immediately (e.g. to prevent lockups after a crash) */
+	virtual bool doMSleep (int msecs) = 0;
+private:
+/** current output */
+	ROutputList output_buffer;
+/** Provides thread-safety for the output_buffer */
+	QMutex output_buffer_mutex;
+/** current length of output. If the backlog of output which has not yet been processed by the frontend becomes too long, output will be paused, automatically */
+	int out_buf_len;
+};
+
 #endif


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.




More information about the rkward-tracker mailing list