[rkward-cvs] SF.net SVN: rkward:[3184] branches/2010_10_18_backend_restructuring_branch/ rkward/rbackend

tfry at users.sourceforge.net tfry at users.sourceforge.net
Wed Nov 10 18:55:04 UTC 2010


Revision: 3184
          http://rkward.svn.sourceforge.net/rkward/?rev=3184&view=rev
Author:   tfry
Date:     2010-11-10 18:55:03 +0000 (Wed, 10 Nov 2010)

Log Message:
-----------
Work in progress: Start adding serialization/unserialization code, and transmitter backend. Entirely untested.

Modified Paths:
--------------
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.h
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.h
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-11-10 18:55:03 UTC (rev 3184)
@@ -294,7 +294,7 @@
 
 	flushOutput (true);
 	if (request->type == RBackendRequest::CommandOut) {
-		RCommandProxy *cproxy = request->command;
+		RCommandProxy *cproxy = request->takeCommand ();
 
 		// NOTE: the order of processing is: first try to submit the next command, then handle the old command.
 		// The reason for doing it this way, instead of the reverse, is that this allows the backend thread / process to continue working, concurrently

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp	2010-11-10 18:55:03 UTC (rev 3184)
@@ -1097,7 +1097,7 @@
 		if (!request->done) RKRBackendProtocolBackend::msleep (1);
 	}
 
-	RCommandProxy* command = request->command;
+	RCommandProxy* command = request->takeCommand ();
 	if (!command) return 0;
 
 	all_current_commands.append (command);

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp	2010-11-10 18:55:03 UTC (rev 3184)
@@ -21,16 +21,18 @@
 
 #include "../debug.h"
 
+#include <QCoreApplication>
+#include <QThread>
+#ifndef Q_WS_WIN
+#	include <signal.h>		// needed for pthread_kill
+#	include <pthread.h>		// seems to be needed at least on FreeBSD
+#endif
+
 #ifdef RKWARD_THREADED
-#	ifndef Q_WS_WIN
-#		include <signal.h>		// needed for pthread_kill
-#		include <pthread.h>		// seems to be needed at least on FreeBSD
-#	endif
-#	include <QThread>
-#	include <QApplication>
 #	include "rkrbackendprotocol_frontend.h"
 #else
-#	include <unistd.h>
+#	include <QLocalSocket>
+#	include <QMutex>
 #endif
 
 #ifdef RKWARD_THREADED
@@ -47,6 +49,7 @@
 
 		// called form the *other* thread, only
 		void exitThread () {
+			RK_TRACE (RBACKEND);
 			if (isRunning ()) {
 				RK_DO (qDebug ("Waiting for R thread to finish up..."), RBACKEND, DL_INFO);
 				RKRBackendProtocolBackend::interruptProcessing ();
@@ -60,11 +63,12 @@
 				}
 			}
 		};
-		
+
 		void publicmsleep (int delay) { msleep (delay); };
 
 		void run () {
-			thread_id = currentThreadId ();
+			RK_TRACE (RBACKEND);
+			RKRBackendProtocolBackend::instance ()->r_thread_id = currentThreadId ();
 			RKRBackend::this_pointer->run ();
 		}
 
@@ -74,6 +78,201 @@
 	};
 	RKRBackendThread* RKRBackendThread::instance = 0;
 #else
+	/** Private class used by the RKRBackendProtocol, in case of the backend running in a split process.
+	This will be used as the secondary thread, and takes care of serializing, sending, receiving, and unserializing requests. */
+	class RKRBackendTransmitter : public QThread {
+	public:
+		RKRBackendTransmitter (const QString &servername) {
+			RK_TRACE (RBACKEND);
+
+			current_request = 0;
+			connection.connectToServer (servername);	// acutal connection will be done inside run()
+		};
+
+		~RKRBackendTransmitter () {
+			RK_TRACE (RBACKEND);
+			if (!current_requests.isEmpty ()) {
+				RK_DO (qDebug ("%d pending requests when exiting RKRBackendTransmitter", current_requests.size ()), RBACKEND, DL_WARNING);
+			}
+		};
+
+		void publicmsleep (int delay) { msleep (delay); };
+
+		void run () {
+			RK_TRACE (RBACKEND);
+
+			if (!connection.waitForConnected ()) handleTransmitError ("Could not connect: %s");
+#warning do handshake
+			while (1) {
+				flushOutput (false);
+				request_mutex.lock ();
+				while (!current_requests.isEmpty ()) {
+					RBackendRequest* request = current_requests.takeFirst ();
+
+					request_mutex.unlock ();
+					flushOutput (true);
+					handleRequestInternal (request);
+					request_mutex.lock ();
+				}
+				request_mutex.unlock ();
+				msleep (1);
+			}
+		};
+		
+		void postRequest (RBackendRequest *request) {
+			QMutextLocker (request_mutex);
+
+			RK_TRACE (RBACKEND);
+			RK_ASSERT (request);
+			current_requests.append (request);
+		}
+
+		void handleRequestInternal (RBackendRequest *request) {
+			RK_TRACE (RBACKEND);
+			RK_ASSERT (request);
+
+			// send request
+			QByteArray buffer = RKRBackendSerializer::serialize (*request);
+			connection.write (QString (buffer.length () + 1).local8Bit ().data () + "\n");
+			connection.write (buffer);
+			while (connection.bytesToWrite ()) {
+				if (!connection.waitForBytesWritten ()) handleTransmitError ("Could not connect: %s");
+#warning, at this point we could check for an early reply to CommandOut requests
+// currently, there is not as much concurrency between the processes as there could be. This is due to the fact, that the transmitter will always block until the result of the
+// last command has been serialized and transmitted to the frontend. Instead, it could check for and fetch the next command, already (if available), to keep the backend going.
+			}
+			if (!request->synchronous) {
+				delete request;			// async requests are posted as copy
+				return;
+			}
+
+			// wait for reply
+			// NOTE: currently, in the backend, we *never* expect a read without a synchronous request
+			QByteArray receive_buffer;
+			unsigned int expected_length = 0;
+			bool got_header = false;
+			while (1) {
+				bool have_data = waitForReadyRead ();
+				if (!connection.isOpen ()) {
+					handleTransmitError ("Connection closed unexepctedly. Last error: %s");
+					return;
+				}
+				if (!have_data) continue;
+				if (!got_header) {
+					if (!connection.canReadLine ()) continue;
+
+					QString line = QString::fromLocal8Bit (connection.readLine ());
+					bool ok;
+					expected_length = line.toInt (&ok);
+					if (!ok) handleTransmitError ("Protocol header error. Last connection error was: %s");
+					got_header = true;
+				}
+
+				receive_buffer.append (connection.readAll (expected_length - receive_buffer.length ()));
+				if (receive_buffer.length () >= expected_length () break;
+			}
+
+			RBackendRequest* reply = RBackendRequest::unserialize (fetchTransmission (true));
+			request->mergeReply (reply);
+			RK_ASSERT (reply->done);
+			delete reply;
+#warning Read up on whether volatile provides a good enough memory barrier at this point!
+			request->done = true;	// must be the very last thing we do with the request!
+		}
+
+		/** fetch the next transmission.
+		@param block Block until a transmission was received.
+		@note @em If a transmission is available, this will always block until the transmission has been received in full. */
+		QByteArray fetchTransmission (bool block) {
+			QByteArray receive_buffer;
+			unsigned int expected_length = 0;
+			bool got_header = false;
+			boo have_data = false;
+			while (1) {
+				bool have_data = have_data || waitForReadyRead (1);
+				if (!connection.isOpen ()) {
+					handleTransmitError ("Connection closed unexepctedly. Last error: %s");
+					return receive_buffer;
+				}
+				if (!have_data) {
+					if (!block) return receive_buffer;
+					continue;
+				}
+				RK_TRACE (RBACKEND);
+				if (!got_header) {
+					if (!connection.canReadLine ()) continue;	// at this point we have received *something*, but not even a full header, yet.
+
+					QString line = QString::fromLocal8Bit (connection.readLine ());
+					bool ok;
+					expected_length = line.toInt (&ok);
+					if (!ok) handleTransmitError ("Protocol header error. Last connection error was: %s");
+					got_header = true;
+				}
+
+				receive_buffer.append (connection.readAll (expected_length - receive_buffer.length ()));
+				if (receive_buffer.length () >= expected_length () return receive_buffer;
+			}
+		}
+
+		void flushOutput (bool force) {
+			ROutputList out = RKRBackend::flushOutput (force);
+			if (out.isEmpty ()) return;
+
+			// output request would not strictly need to be synchronous. However, making them synchronous ensures that the frontend is keeping up with the output sent by the backend.
+			RBackendRequest request (true, RBackendRequest::Output);
+			request.output = new ROutputList (out);
+			handleRequestInternal (&request);
+		}
+
+		void handleTransmitError (const char* message_template) {
+			printf (message_template, qPrintable (connection.errorString ()));
+		}
+
+		QLocalSocket connection;
+		QList<RBackendRequest *> current_requests;		// there *can* be multiple active requests (if the first ones are asynchronous)
+		QMutex request_mutex;
+	};
+
+	int RK_Debug_Level = 2;
+	int RK_Debug_Flags = ALL;
+	QMutex RK_Debug_Mutex;
+
+	void RKDebugMessageOutput (QtMsgType type, const char *msg) {
+		RK_Debug_Mutex.lock ();
+		if (type == QtFatalMsg) {
+			fprintf (stderr, "%s\n", msg);
+		}
+		RKSettingsModuleDebug::debug_file->write (msg);
+		RKSettingsModuleDebug::debug_file->write ("\n");
+		RKSettingsModuleDebug::debug_file->flush ();
+		RK_Debug_Mutex.unlock ();
+	}
+
+	int main(int argc, char *argv[]) {
+		QCoreApplication app (argc, argv);
+		KComponentData data ("rkward");
+		KGlobal::locale ();		// to initialize it in the primary thread
+
+		QString servername;
+		QStringList args = app->arguments ();
+		for (int i = 1; i < args.count (); ++i) {
+			if (args[i].startsWith ("--debug-level")) {
+				RK_Debug_Level = args.value (++i, QString ()).toInt ();
+			} else if (args[i].startsWith ("--server-name")) {
+				servername = args.value (++i, QString ());
+			} else {
+				printf ("unkown argument %s", qPrintable (args[i]));
+			}
+		}
+		if (severname.isEmpty ()) {
+			printf ("no server to connect to");
+		}
+
+		RKRBackendTransmitter transmitter (servername);
+		RKRBackendProtocolBackend backend ();
+		transmitter.start ();
+		RKRBackend::this_pointer->run ();
+	}
 #error
 #endif
 
@@ -84,10 +283,12 @@
 	_instance = this;
 	new RKRBackend ();
 #ifdef RKWARD_THREADED
-	new RKRBackendThread ();
+	r_thread = new RKRBackendThread ();
+	// NOTE: r_thread_id is obtained from within the thread
 	RKRBackendThread::instance->start ();
 #else
-#error
+	r_thread = QThread::currentThread ();	// R thread == main thread
+	r_thread_id = QThread::currentThreadId ();
 #endif
 }
 
@@ -97,41 +298,34 @@
 #ifdef RKWARD_THREADED
 	RKRBackendThread::instance->exitThread ();
 	delete RKRBackendThread::instance;
-#else
-#error
 #endif
 }
 
 void RKRBackendProtocolBackend::sendRequest (RBackendRequest *_request) {
 	RK_TRACE (RBACKEND);
 
-#ifdef RKWARD_THREADED
 	RBackendRequest* request = _request;
 	if (!request->synchronous) {
 		request = _request->duplicate ();	// the instance we send to the frontend will remain in there, and be deleted, there
 		_request->done = true;				// for aesthetics
 	}
-
+#ifdef RKWARD_THREADED
 	RKRBackendEvent* event = new RKRBackendEvent (request);
 	qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
 #else
-#error
+	RBackendTransmitter::instance ()->postRequest (request);
 #endif
 }
 
 bool RKRBackendProtocolBackend::inRThread () {
-#ifdef RKWARD_THREADED
-	return (QThread::currentThread () == RKRBackendThread::instance);
-#else
-#error
-#endif
+	return (QThread::currentThread () == instance ()->r_thread);
 }
 
 void RKRBackendProtocolBackend::msleep (int delay) {
-#	ifdef RKWARD_THREADED
+#ifdef RKWARD_THREADED
 	RKRBackendThread::instance->publicmsleep (delay);
 #else
-	usleep (delay * 1000);
+	RKRBackendTransmitter::instance->publicmsleep (delay);
 #endif
 }
 
@@ -140,13 +334,9 @@
 		RKRBackend::scheduleInterrupt ();
 	} else {
 #ifdef Q_WS_WIN
-		RKRBackend::scheduleInterrupt ();
+		RKRBackend::scheduleInterrupt ();		// Thread-safe on windows?!
 #else
-#		ifdef RKWARD_THREADED
-		pthread_kill ((pthread_t) RKRBackendThread::instance->thread_id, SIGUSR1);	// relays to SIGINT
-#		else
-#		error
-#		endif
+		pthread_kill ((pthread_t) instance ()->r_thread_id, SIGUSR1);	// NOTE: SIGUSR1 relays to SIGINT
+#endif
 	}
-#endif
 }

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.h	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.h	2010-11-10 18:55:03 UTC (rev 3184)
@@ -20,6 +20,8 @@
 
 #include "rkrbackendprotocol_shared.h"
 
+class QThread;
+
 class RKRBackendProtocolBackend {
 public:
 	static bool inRThread ();
@@ -36,6 +38,10 @@
 	static RKRBackendProtocolBackend* instance () { return _instance; };
 private:
 	static RKRBackendProtocolBackend* _instance;
+	QThread *r_thread;
+#ifndef Q_WS_WIN
+	Qt::HANDLE r_thread_id;
+#endif
 };
 
 #endif

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.h	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.h	2010-11-10 18:55:03 UTC (rev 3184)
@@ -23,6 +23,7 @@
 #include <QObject>
 
 class RInterface;
+class QThread;
 
 class RKRBackendProtocolFrontend : public QObject {
 public:
@@ -39,6 +40,8 @@
 #ifdef RKWARD_THREADED
 /** needed to handle the QEvents, the R thread is sending (notifications on what's happening in the backend thread) */
 	void customEvent (QEvent *e);
+#else
+	QThread* main_thread;
 #endif
 private:
 	static RKRBackendProtocolFrontend* _instance;

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp	2010-11-10 18:55:03 UTC (rev 3184)
@@ -33,3 +33,154 @@
 
 	RK_ASSERT ((type & RCommand::Internal) || (getDataType () == RData::NoData));
 }
+
+RBackendRequest::RBackendRequest (bool synchronous, RCallbackType type) {
+	RK_TRACE (RBACKEND);
+
+	RBackendRequest::synchronous = synchronous;
+	RBackendRequest::type = type;
+	done = false;
+	command = 0;
+	output = 0;
+}
+
+RBackendRequest::~RBackendRequest () {
+	RK_TRACE (RBACKEND);
+
+	delete command;
+	delete output;
+};
+
+#ifndef RKWARD_THREADED
+void RBackendRequest::mergeReply (RBackendRequest *reply) {
+	RK_TRACE (RBACKEND);
+
+	command = reply->command;
+	params = reply->params;
+	output = reply->output;
+	reply->command = 0;
+	reply->output = 0;
+}
+#endif
+
+RBackendRequest* RBackendRequest::duplicate () {
+	RK_TRACE (RBACKEND);
+
+	RBackendRequest* ret = new RBackendRequest (synchronous, type);
+	ret->done = done;
+	ret->command = command;
+	ret->params = params;
+	ret->output = output;
+	// prevent double deletion issues
+	command = 0;
+	output = 0;
+	return ret;
+}
+
+
+#ifndef RKWARD_THREADED
+	QByteArray RKRBackendSerializer::serialize (const RBackendRequest &request) {
+		RK_TRACE (RBACKEND);
+
+		QByteArray ret;
+		QDataStream stream (&ret, QIODevice::WriteOnly);
+
+		stream << (qint8) request.type;
+		stream << request.synchronous;
+		stream << request.done;		// well, not really needed, but...
+		if (request.command) {
+			stream << true;
+			serializeProxy (*(request.command), stream);
+		} else {
+			stream << false;
+		}
+		if (request.output) {
+			RK_ASSERT (request.type == RBackendRequest::Output);
+			sream << true;
+			serializeOutput (*(request.output), stream);
+		} else {
+			stream << false;
+		}
+		stream << request.params;
+
+		return ret;
+	}
+
+	RBackendRequest *RKRBackendSerializer::unserialize (const QByteArray &buffer) {
+		RK_TRACE (RBACKEND);
+
+		QDataStream stream (buffer);
+		RBackendRequest *request = new RBackendRequest (false, RBackendRequest::OtherType);		// will be overwritten
+
+		stream >> (qint8) (RBackendRequest::RCallbackType) (*request.type);
+		stream >> (*request.synchronous);
+		stream >> (*request.done);
+		bool has_command << stream;
+		if (has_command) (*request.command) = unserializeProxy (stream);
+		bool has_output << stream;
+		if (has_output) (*request.output) = unserializeProxy (stream);
+		(*request.params) << stream;
+
+		return request;
+	}
+
+	void RKRBackendSerializer::serializeOutput (const ROutputList &list, QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		stream << (qint32) list.size ();
+		for (qint32 i = 0; i < list.size (); ++i) {
+			stream << (qint8) list[i].type;
+			stream << list[i].output;
+		}
+	}
+
+	*ROutputList RKRBackendSerializer::unserializeOutput (QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		ROutputList *ret = new ROutputList ();
+		qint32 len << stream;
+#if QT_VERSION >= 0x040700
+		ret->reserve (len);
+#endif
+
+		for (qint32 i = 0; i < len; ++i) {
+			ROutput out;
+			out.type << (ROutput::ROutputType) (qint8) stream;
+			out.output << stream;
+			ret->append (out);
+		}
+
+		return ret;
+	}
+
+	void RKRBackendSerializer::serializeData (const RData &data, QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		RDataType type = data.getDataType ();
+		stream << (qint8) type;
+		if (type == RData::IntVector) stream << data.getIntVector ();
+		else if (type == RData::StringVector) stream << data.getStringVector ();
+		else if (type == RData::RealVector) stream << data.getRealVector ();
+		else if (type == RData::StructureVector) {
+			RData::RDataStorage list = data.getStructureVector ();
+			qint32 len = list.size ();
+			stream << len;
+			for (qint32 i = 0; i < list.size (); ++i) {
+				serializeData (*(list[i]));
+			}
+		} else {
+			RK_ASSERT (type == RData::NoData);
+		}
+	}
+
+	RData* RKRBackendSerializer::unserializeData (QDataStream &stream) {
+		RK_TRACE (RBACKEND);
+
+		RData* ret = new RData;
+		#error
+	}
+
+	void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
+#error
+	*RCommandProxy RKRBackendSerializer::unserializeProxy (QDataStream &stream);
+#endif

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h	2010-11-09 18:56:03 UTC (rev 3183)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h	2010-11-10 18:55:03 UTC (rev 3184)
@@ -23,6 +23,7 @@
 #endif
 
 #include <QVariantMap>
+#include "rcommand.h"
 
 class RCommandProxy;
 
@@ -41,26 +42,42 @@
 		CallbackRequest,
 		HistoricalSubstackRequest,
 		SetParamsFromBackend,
+#ifndef RKWARD_THREADED
+		Output,		/**< A piece of output. Note: If the backend runs in a single process, output is handled in a pull fashion, instead of using requests. */
+#endif
 		OtherRequest		/**< Any other type of request. Note: which requests are in the enum, and which are not has mostly historical reasons. @see params */
 	};
 
-	RBackendRequest (bool synchronous, RCallbackType type) {
-		RBackendRequest::synchronous = synchronous;
-		RBackendRequest::type = type;
-		done = false;
+	RBackendRequest (bool synchronous, RCallbackType type);
+	~RBackendRequest ();
+
+	RCommandProxy *takeCommand () {
+		RCommandProxy* ret = command;
 		command = 0;
+		return ret;
 	}
-	~RBackendRequest () {};
 
+	ROutputList *takeOutput () {
+		ROutputList* ret = output;
+		output = 0;
+		return ret;
+	}
+
 /** Should this request be handled synchronously? False by default. */
 	bool synchronous;
 /** For synchronous requests, only: The frontend-thread will set this to true (using completed()), once the request has been "completed". Important: The backend thread MUST NOT touch a request after it has been sent, and before "done" has been set to true. */
-	bool done;
+	bool volatile done;
 	RCallbackType type;
 /** For synchronous requests, only: If the the frontend wants any commands to be executed, it will place the next one in this slot. The backend thread should keep executing commands (in a sub-eventloop) while this is non-zero. Also, the backend-thread may place here any command that has just finished. */
 	RCommandProxy *command;
 /** Any other parameters, esp. for RCallbackType::OtherRequest. Can be used in both directions. */
 	QVariantMap params;
+/** NOTE: only used for separate process backend. See RCallbackType::Output */
+	ROutputList *output;
+#ifndef RKWARD_THREADED
+/** NOTE: this does @em not copy merge the "done" flag. Do that manually, @em after merging (and don't touch the request from the transmitter thread, after that). */
+	void mergeReply (RBackendRequest *reply);
+#endif
 protected:
 	friend class RKRBackendProtocolFrontend;
 	friend class RKRBackendProtocolBackend;
@@ -70,13 +87,8 @@
 		else done = true;
 	}
 
-	RBackendRequest *duplicate () {
-		RBackendRequest* ret = new RBackendRequest (synchronous, type);
-		ret->done = done;
-		ret->command = command;
-		ret->params = params;
-		return ret;
-	}
+/** duplicates the request. NOTE: The command, and output, if any are @em taken from the original, and transferred to the copy, not really duplicated. */
+	RBackendRequest *duplicate ();
 };
 
 #ifdef RKWARD_THREADED
@@ -96,13 +108,12 @@
 	};
 #endif
 
-#include "rcommand.h"
-
 /** This is a reduced version of an RCommand, intended for use in the R backend. */
 class RCommandProxy : public RData {
 protected:
 friend class RCommand;
 friend class RKRBackend;
+friend class RBackendRequest;
 	RCommandProxy ();
 	~RCommandProxy ();
 	RCommandProxy (const QString &command, int type);
@@ -113,4 +124,22 @@
 	int status;
 };
 
+#ifndef RKWARD_THREADED
+	/** functions for serialization / unserialization of communication between backend and frontend.
+	NOTE: This could really be a namespace, instead of a class, but "friending" a class is simply easier... */
+	class RKRBackendSerializer {
+	public:
+		static QByteArray serialize (const RBackendRequest &request);
+		static RBackendRequest *unserialize (const QByteArray &buffer);
+
+	private:
+		static void serializeOutput (const ROutputList &list, QDataStream &stream);
+		static void serializeData (const RData &data, QDataStream &stream);
+		static void serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
+		static ROutputList* unserializeOutput (QDataStream &stream);
+		static RData* unserializeData (QDataStream &stream);
+		static RCommandProxy* unserializeProxy (QDataStream &stream);
+	};
 #endif
+
+#endif


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.




More information about the rkward-tracker mailing list