[rkward-cvs] SF.net SVN: rkward:[3153] branches/2010_10_18_backend_restructuring_branch/ rkward/rbackend

tfry at users.sourceforge.net tfry at users.sourceforge.net
Tue Oct 26 18:33:48 UTC 2010


Revision: 3153
          http://rkward.svn.sourceforge.net/rkward/?rev=3153&view=rev
Author:   tfry
Date:     2010-10-26 18:33:48 +0000 (Tue, 26 Oct 2010)

Log Message:
-----------
Move all RCommandStack-manipulations, and most output handling to the frontend-thread.
This is in preparation of using an abridged RCommand-struct in the backend. Also it is one step further towards a  serializable communication between frontend and backend.
Certainly needs more testing, but it's work in progress, anyway.

Modified Paths:
--------------
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.h
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.h
    branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rthread.cpp

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.cpp	2010-10-26 14:52:48 UTC (rev 3152)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.cpp	2010-10-26 18:33:48 UTC (rev 3153)
@@ -189,7 +189,7 @@
 			if (RThread::this_pointer->killed) return 0;
 
 			if (RThread::repl_status.user_command_status == RThread::RKReplStatus::NoUserCommand) {
-				RCommand *command = RThread::this_pointer->fetchNextCommand (RCommandStack::regular_stack);
+				RCommand *command = RThread::this_pointer->fetchNextCommand ();
 				if (!command) {
 					return 0;	// jumps out of the event loop!
 				}
@@ -323,7 +323,7 @@
 		}
 	}
 
-	RThread::this_pointer->handleOutput (RThread::this_pointer->current_locale_codec->toUnicode (buf, buflen), buflen, type == 0);
+	RThread::this_pointer->handleOutput (RThread::this_pointer->current_locale_codec->toUnicode (buf, buflen), buflen, type == 0 ? ROutput::Output : ROutput::Warning);
 }
 
 /** For R callbacks that we want to disable, entirely */
@@ -549,10 +549,7 @@
 
 	RK_ASSERT (this_pointer == 0);
 	this_pointer = this;
-	current_output = 0;
 	out_buf_len = 0;
-	output_paused = false;
-	previously_idle = false;
 
 #ifdef Q_WS_WIN
 	// we hope that on other platforms the default is reasonable
@@ -679,10 +676,9 @@
 	if (RThread::this_pointer->repl_status.eval_depth == 0) {
 		RThread::this_pointer->repl_status.user_command_status = RThread::RKReplStatus::UserCommandFailed;
 	}
-	unsigned int count;
-	QString *strings = RKRSupport::SEXPToStringList (call, &count);
-	RThread::this_pointer->handleError (strings, count);
-	delete [] strings;
+	QString string = RKRSupport::SEXPToString (call);
+	RThread::this_pointer->handleOutput (string, string.length (), ROutput::Error);
+	RK_DO (qDebug ("error '%s'", qPrintable (string)), RBACKEND, DL_DEBUG);
 	return R_NilValue;
 }
 
@@ -1000,7 +996,7 @@
 
 	if (ctype & RCommand::DirectToOutput) runDirectCommand (".rk.print.captured.messages()");
 	if (!(ctype & RCommand::Internal)) {
-		if (!locked || killed) processX11Events ();
+		if (!RInterface::backendIsLocked () || killed) processX11Events ();
 		MUTEX_LOCK;
 	}
 
@@ -1028,11 +1024,9 @@
 			RK_DO (qDebug ("Command failed (other)"), RBACKEND, dl);
 		}
 		RK_DO (qDebug ("failed command was: '%s'", command->command ().toLatin1 ().data ()), RBACKEND, dl);
-		flushOutput ();
-		RK_DO (qDebug ("- error message was: '%s'", command->error ().toLatin1 ().data ()), RBACKEND, dl);
+/*		flushOutput ();
+		RK_DO (qDebug ("- error message was: '%s'", command->error ().toLatin1 ().data ()), RBACKEND, dl); */
 	} else {
 		command->status |= RCommand::WasTried;
 	}
-
-	flushOutput ();
 }

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.h	2010-10-26 14:52:48 UTC (rev 3152)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rembedinternal.h	2010-10-26 18:33:48 UTC (rev 3153)
@@ -23,6 +23,7 @@
 #include <QMap>
 #include <QVariant>
 #include <QThread>
+#include <QMutex>
 #include <QStringList>
 #include <QEvent>
 
@@ -68,18 +69,24 @@
 friend class RInterface;
 friend class RThread;
 	QStringList call;
-	RCommandChain *in_chain;
 };
 
+/** Used to request the next command from the front-end */
+struct RNextCommandRequest {
+private:
+friend class RInterface;
+friend class RThread;
+	bool *done;
+	RCommand *command;
+};
+
 /** Simple event class to relay information from the RThread to the main thread. This is basically like QCustomEvent in Qt3*/
 class RKRBackendEvent : public QEvent {
 public:
 	enum EventType {
 		Base = QEvent::User + 1,
-		RCommandIn,
+		RNextCommandRequest,
 		RCommandOut,
-		RBusy,
-		RIdle,
 		RCommandOutput,
 		RStarted,
 		REvalRequest,
@@ -131,11 +138,6 @@
 /** destructor */
 	virtual ~RThread ();
 
-/** Pause output by placing it in a delay loop, until unpaused again */
-	void pauseOutput (bool paused) { output_paused = paused; };
-/** the internal counterpart to pauseOutput () */
-	void waitIfOutputPaused ();
-
 /** interrupt processing of the current command. This is much like the user pressing Ctrl+C in a terminal with R. This is probably the only non-portable function in RThread, but I can't see a good way around placing it here, or to make it portable. */
 	void interruptProcessing (bool interrupt);
 
@@ -180,29 +182,13 @@
 /** call this periodically to make R's x11 windows process their events */
 	static void processX11Events ();
 
-/** convenience struct for event passing */
-	struct ROutputContainer {
-		/** the actual output fragment */
-		ROutput *output;
-		/** the corresponding command */
-		RCommand *command;
-	};
-/** current output */
-	ROutput *current_output;
-/** current length of output. Used so we can flush every once in a while, if output becomes too long */
-	int out_buf_len;
-
 /** This gets called on normal R output (R_WriteConsole). Used to get at output. */
-	void handleOutput (const QString &output, int len, bool regular);
+	void handleOutput (const QString &output, int len, ROutput::ROutputType type);
 
-/** Flushes current output buffer. Lock the mutex before calling this function! It is called from both threads and is not re-entrant */
-	void flushOutput ();
+/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
+ at param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
+	ROutputList flushOutput (bool forcibly=false);
 
-/** This gets called, when R reports an error (override of options ("error") in R). Used to get at error-output.
-This function is public for technical reasons, only. Don't use except from R-backend code!
-reports an error. */
-	void handleError (QString *call, int call_length);
-
 /** This is a sub-eventloop, being run when the backend request information from the frontend. See \ref RThread for a more detailed description */
 	void handleSubstackCall (QStringList &call);
 
@@ -227,20 +213,6 @@
 		return (r_version >= (1000 * major + 10 * minor + revision));
 	}
 
-/** @see lock (), @see unlock ()*/
-	enum LockType {
-		User=1,		/**< locked on user request */
-		Cancel=2,	/**< locked to safely cancel a running command */
-		Startup=4	/**< locked on startup */
-	};
-
-/** Locks the thread. This is called by RInterface, when the currently running command is to be cancelled. It is used to make sure that the
-backend thread does not proceed with further commands, before the main thread takes notice. Also it is called, if the RThread is paused on User request. Further, the thread is initially locked so the main thread can check for some conditions before the backend thread may produce
-more errors/crashes. @see unlock @see RInterface::cancelCommand @see RInterface::pauseProcessing
- at param reason As there are several reasons to lock the thread, and more than one reason may be in place at a given time, a reason needs to be specified for both lock () and unlock (). Only if all "reasons are unlocked ()", processing continues. */
-	void lock (LockType reason) { locked |= reason; };
-/** Unlocks the thread.  Also the thread may get locked when canceling the currently running command. @see lock */
-	void unlock (LockType reason) { locked -= (locked & reason); };
 /** "Kills" the thread. Actually this just tells the thread that is is about to be terminated. Allows the thread to terminate gracefully */
 	void kill () { killed = true; };
 	bool isKilled () { return killed; };
@@ -266,16 +238,15 @@
 	static RKReplStatus repl_status;
 
 	// fetch next command (and do event processing while waiting)
-	RCommand *fetchNextCommand (RCommandStack *stack);
+	RCommand *fetchNextCommand ();
 	void commandFinished (bool check_object_updates_needed=true);
 /** thread is killed. Should exit as soon as possible. @see kill */
 	bool killed;
 protected:
-/** thread is locked. No new commands will be executed. @see LockType @see lock @see unlock */
-	int locked;
-	bool previously_idle;
 /** On pthread systems this is the pthread_id of the backend thread. It is needed to send SIGINT to the R backend */
 	Qt::HANDLE thread_id;
+/** If the length of the current output buffer is too long, this will pause any further output until the main thread has had a chance to catch up. */
+	void waitIfOutputBufferExceeded ();
 private:
 /** set up R standard callbacks */
 	void setupCallbacks ();
@@ -290,8 +261,6 @@
 /** This is the function in which an RCommand actually gets processed. Basically it passes the command to runCommand () and sends RInterface some events about what is currently happening. */
 	void doCommand (RCommand *command);
 	void notifyCommandDone (RCommand *command);
-/** The internal storage for pauseOutput () */
-	bool output_paused;
 
 /** A copy of the names of the toplevel environments (as returned by "search ()"). */
 	QStringList toplevel_env_names;
@@ -302,6 +271,13 @@
 /** check wether the object list / global environment / individual symbols have changed, and updates them, if needed */
 	void checkObjectUpdatesNeeded (bool check_list);
 	QList<RCommand*> all_current_commands;
+
+	/** current output */
+	ROutputList output_buffer;
+/** Provides thread-safety for the output_buffer */
+	QMutex output_buffer_mutex;
+/** current length of output. If the backlog of output which has not yet been processed by the frontend becomes too long, output will be paused, automatically */
+	int out_buf_len;
 };
  
 #endif

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-10-26 14:52:48 UTC (rev 3152)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp	2010-10-26 18:33:48 UTC (rev 3153)
@@ -82,11 +82,16 @@
 	RCommandStack::regular_stack = new RCommandStack (0);
 	running_command_canceled = 0;
 	command_logfile_mode = NotRecordingCommands;
+	command_request = 0;
+	previously_idle = false;
+	locked = Startup;
 
 	r_thread = new RThread ();
 
 	// create a fake init command
-	issueCommand (new RCommand (i18n ("R Startup"), RCommand::App | RCommand::Sync | RCommand::ObjectListUpdate, i18n ("R Startup")));
+	RCommand *fake = new RCommand (i18n ("R Startup"), RCommand::App | RCommand::Sync | RCommand::ObjectListUpdate, i18n ("R Startup"));
+	issueCommand (fake);
+	all_current_commands.append (fake);
 
 	flush_timer = new QTimer (this);
 	connect (flush_timer, SIGNAL (timeout ()), this, SLOT (flushOutput ()));
@@ -133,6 +138,10 @@
 	return (idle);
 }
 
+bool RInterface::backendIsLocked () {
+	return (RKGlobals::rInterface ()->locked != 0);
+}
+
 bool RInterface::inRThread () {
 	return (QThread::currentThread () == RKGlobals::rInterface ()->r_thread);
 }
@@ -156,31 +165,62 @@
 	 return r_thread->current_command;
 }
 
-void RInterface::customEvent (QEvent *e) {
+void RInterface::tryNextCommand () {
 	RK_TRACE (RBACKEND);
+	if (!command_request) return;
 
-	RKRBackendEvent *ev;
-	if (((int) e->type ()) >= ((int) RKRBackendEvent::Base)) {
-		ev = static_cast<RKRBackendEvent*> (e);
-	} else {
-		RK_ASSERT (false);
-		return;
+	RCommandStack *stack = RCommandStack::currentStack ();
+	bool main_stack = (stack == RCommandStack::regular_stack);
+
+	if ((!(main_stack && locked)) && stack->isActive ()) {		// do not respect locks in substacks
+		RCommand *command = stack->currentCommand ();
+
+		if (command) {
+			all_current_commands.append (command);
+
+			if ((command->type () & RCommand::EmptyCommand) || (command->status & RCommand::Canceled) || (command->type () & RCommand::QuitCommand)) {
+				// some commands are not actually run by R, but handled inline, here
+				if (command->status & RCommand::Canceled) {
+					command->status |= RCommand::Failed;
+				}
+				if (command->type () & RCommand::QuitCommand) {
+					r_thread->kill ();
+				}
+				stack->pop ();
+				// notify ourselves...
+				RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::RCommandOut, command);
+				qApp->postEvent (this, event);
+				return;
+			}
+
+			if (previously_idle) RKWardMainWindow::getMain ()->setRStatus (RKWardMainWindow::Busy);
+			previously_idle = false;
+
+			doNextCommand (command);
+			return;
+		}
 	}
 
-	if (ev->etype () == RKRBackendEvent::RCommandOutput) {
-		RThread::ROutputContainer *container = (static_cast <RThread::ROutputContainer *> (ev->data ()));
-		container->command->newOutput (container->output);
-		delete container;
+	if ((!stack->isActive ()) && stack->isEmpty () && !main_stack) {
+		// a substack was depleted
+		delete stack;
+		doNextCommand (0);
+	} else if (main_stack) {
+		if (!previously_idle) RKWardMainWindow::getMain ()->setRStatus (RKWardMainWindow::Idle);
+		previously_idle = true;
+	}
+}
 
-// TODO: not quite good, yet, leads to staggering output (but overall throughput is the same):
-	// output events can easily stack up in the hundreds, not allowing GUI events to get through. Let's block further output events for a minute and then catch up with the event queue
-		if (qApp->hasPendingEvents ()) {
-			r_thread->pauseOutput (true);
-			qApp->processEvents ();
-			r_thread->pauseOutput (false);
-		}
-	} else if (ev->etype () == RKRBackendEvent::RCommandIn) {
-		RCommand *command = static_cast <RCommand *> (ev->data ());
+void RInterface::doNextCommand (RCommand *command) {
+	RK_TRACE (RBACKEND);
+	RK_ASSERT (command_request);
+
+	flushOutput (true);
+	if (command) {
+		RK_DO (qDebug ("running command: %s", command->command ().toLatin1().data ()), RBACKEND, DL_DEBUG);
+		command->status |= RCommand::Running;
+		RCommandStackModel::getModel ()->itemChange (command);
+
 		RKCommandLog::getLog ()->addInput (command);
 
 		if (command_logfile_mode != NotRecordingCommands) {
@@ -189,8 +229,38 @@
 				command_logfile.write ("\n");
 			}
 		}
+	}
+
+	command_request->command = command;
+	*(command_request->done) = true;
+	command_request = 0;
+	QThread::yieldCurrentThread ();
+}
+
+void RInterface::customEvent (QEvent *e) {
+	RK_TRACE (RBACKEND);
+
+	RKRBackendEvent *ev;
+	if (((int) e->type ()) >= ((int) RKRBackendEvent::Base)) {
+		ev = static_cast<RKRBackendEvent*> (e);
+	} else {
+		RK_ASSERT (false);
+		return;
+	}
+
+	if (ev->etype () == RKRBackendEvent::RNextCommandRequest) {
+		RK_ASSERT (!command_request);
+		command_request = static_cast<RNextCommandRequest*> (ev->data ());
+		tryNextCommand ();
 	} else if (ev->etype () == RKRBackendEvent::RCommandOut) {
+		flushOutput (true);
+
 		RCommand *command = static_cast <RCommand *> (ev->data ());
+		RK_ASSERT (!all_current_commands.isEmpty ());
+		RK_ASSERT (all_current_commands.last () == command);
+		all_current_commands.pop_back ();
+		RCommandStack::currentStack ()->pop ();
+
 		if (command->status & RCommand::Canceled) {
 			command->status |= RCommand::HasError;
 			ROutput *out = new ROutput;
@@ -202,25 +272,20 @@
 				RK_ASSERT (command == running_command_canceled);
 				running_command_canceled = 0;
 				r_thread->interruptProcessing (false);
-				r_thread->unlock (RThread::Cancel);
+				locked -= locked & Cancel;
 			}
 		}
 		command->finished ();
 		delete command;
-	} else if ((ev->etype () == RKRBackendEvent::RIdle)) {
-		RKWardMainWindow::getMain ()->setRStatus (RKWardMainWindow::Idle);	
-	} else if ((ev->etype () == RKRBackendEvent::RBusy)) {
-		RKWardMainWindow::getMain ()->setRStatus (RKWardMainWindow::Busy);
 	} else if ((ev->etype () == RKRBackendEvent::REvalRequest)) {
-		r_thread->pauseOutput (false); // we may be recursing downwards into event loops here. Hence we need to make sure, we don't create a deadlock
 		processREvalRequest (static_cast<REvalRequest *> (ev->data ()));
 	} else if ((ev->etype () == RKRBackendEvent::RCallbackRequest)) {
-		r_thread->pauseOutput (false); // see above
 		processRCallbackRequest (static_cast<RCallbackArgs *> (ev->data ()));
 	} else if ((ev->etype () == RKRBackendEvent::RStarted)) {
-		r_thread->unlock (RThread::Startup);
+		locked -= locked & Startup;
 		RKWardMainWindow::discardStartupOptions ();
 	} else if ((ev->etype () == RKRBackendEvent::RStartupError)) {
+		flushOutput (true);
 		int* err_p = static_cast<int*> (ev->data ());
 		int err = *err_p;
 		delete err_p;
@@ -251,16 +316,62 @@
 void RInterface::flushOutput () {
 // do not trace. called periodically
 //	RK_TRACE (RBACKEND);
-	MUTEX_LOCK;
-	r_thread->flushOutput ();
-	MUTEX_UNLOCK;
+
+	flushOutput (false);
 }
 
+void RInterface::flushOutput (bool forced) {
+// do not trace. called periodically
+//	RK_TRACE (RBACKEND);
+
+	ROutputList list = r_thread->flushOutput (forced);
+qDebug ("fo %d", list.size ());
+	foreach (ROutput *output, list) {
+		if (all_current_commands.isEmpty ()) {
+			RK_DO (qDebug ("output without receiver'%s'", qPrintable (output->output)), RBACKEND, DL_WARNING);
+			delete output;
+			continue;	// to delete the other output pointers, too
+		} else {
+			RK_DO (qDebug ("output '%s'", qPrintable (output->output)), RBACKEND, DL_DEBUG);
+		}
+
+		bool first = true;
+		foreach (RCommand* command, all_current_commands) {
+			ROutput *coutput = output;
+			if (!first) {		// this output belongs to several commands at once. So we need to copy it.
+				coutput = new ROutput;
+				coutput->type = output->type;
+				coutput->output = output->output;
+			}
+			first = false;
+
+			if (coutput->type == ROutput::Output) {
+				command->status |= RCommand::HasOutput;
+				command->output_list.append (coutput);
+			} else if (coutput->type == ROutput::Warning) {
+				command->status |= RCommand::HasWarnings;
+				command->output_list.append (coutput);
+			} else if (coutput->type == ROutput::Error) {
+				command->status |= RCommand::HasError;
+				// An error output is typically just the copy of the previous output, so merge if possible
+				if (command->output_list.isEmpty ()) {
+					command->output_list.append (coutput);
+				}
+				if (command->output_list.last ()->output == coutput->output) {
+					command->output_list.last ()->type = ROutput::Error;
+				}
+			}
+			command->newOutput (coutput);
+		}
+	}
+}
+
 void RInterface::issueCommand (RCommand *command, RCommandChain *chain) { 
 	RK_TRACE (RBACKEND);
 	MUTEX_LOCK;
 	if (command->command ().isEmpty ()) command->_type |= RCommand::EmptyCommand;
 	RCommandStack::issueCommand (command, chain);
+	tryNextCommand ();
 	MUTEX_UNLOCK;
 }
 
@@ -278,6 +389,7 @@
 
 	MUTEX_LOCK;
 	RCommandStack::closeChain (chain);
+	tryNextCommand ();
 	MUTEX_UNLOCK;
 };
 
@@ -290,7 +402,7 @@
 		if (command->type () && RCommand::Running) {
 			if (running_command_canceled != command) {
 				RK_ASSERT (!running_command_canceled);
-				r_thread->lock (RThread::Cancel);
+				locked |= Cancel;
 				running_command_canceled = command;
 				r_thread->interruptProcessing (true);
 			}
@@ -306,18 +418,22 @@
 void RInterface::pauseProcessing (bool pause) {
 	RK_TRACE (RBACKEND);
 
-	if (pause) r_thread->lock (RThread::User);
-	else r_thread->unlock (RThread::User);
+	if (pause) locked |= User;
+	else locked -= locked & User;
 }
 
 void RInterface::processREvalRequest (REvalRequest *request) {
 	RK_TRACE (RBACKEND);
 
+	RK_ASSERT (!all_current_commands.isEmpty ());
+	RCommandStack *reply_stack = new RCommandStack (all_current_commands.last ());
+	RCommandChain *in_chain = reply_stack->startChain (reply_stack);
+
 	// clear reply object
-	issueCommand (".rk.set.reply (NULL)", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+	issueCommand (".rk.set.reply (NULL)", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 	if (request->call.isEmpty ()) {
 		RK_ASSERT (false);
-		closeChain (request->in_chain);
+		closeChain (in_chain);
 		return;
 	}
 	
@@ -327,9 +443,9 @@
 			QString file_prefix = request->call[1];
 			QString file_extension = request->call[2];
 
-			issueCommand (".rk.set.reply (\"" + RKCommonFunctions::getUseableRKWardSavefileName (file_prefix, file_extension) + "\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			issueCommand (".rk.set.reply (\"" + RKCommonFunctions::getUseableRKWardSavefileName (file_prefix, file_extension) + "\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		} else {
-			issueCommand (".rk.set.reply (\"Too few arguments in call to get.tempfile.name.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			issueCommand (".rk.set.reply (\"Too few arguments in call to get.tempfile.name.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		}
 	} else if (call == "set.output.file") {
 		RK_ASSERT (request->call.count () == 2);
@@ -344,35 +460,35 @@
 			if (obj) {
 				RK_DO (qDebug ("triggering update for symbol %s", object_name.toLatin1 ().data()), RBACKEND, DL_DEBUG);
 				obj->markDataDirty ();
-				obj->updateFromR (request->in_chain);
+				obj->updateFromR (in_chain);
 			} else {
 				RK_DO (qDebug ("lookup failed for changed symbol %s", object_name.toLatin1 ().data()), RBACKEND, DL_WARNING);
 			}
 		}
 	} else if (call == "syncenvs") {
 		RK_DO (qDebug ("triggering update of object list"), RBACKEND, DL_DEBUG);
-		RObjectList::getObjectList ()->updateFromR (request->in_chain, request->call.mid (1));
+		RObjectList::getObjectList ()->updateFromR (in_chain, request->call.mid (1));
 	} else if (call == "syncglobal") {
 		RK_DO (qDebug ("triggering update of globalenv"), RBACKEND, DL_DEBUG);
-		RObjectList::getGlobalEnv ()->updateFromR (request->in_chain, request->call.mid (1));
+		RObjectList::getGlobalEnv ()->updateFromR (in_chain, request->call.mid (1));
 	} else if (call == "edit") {
 		RK_ASSERT (request->call.count () >= 2);
 
 		QStringList object_list = request->call.mid (1);
-		new RKEditObjectAgent (object_list, request->in_chain);
+		new RKEditObjectAgent (object_list, in_chain);
 	} else if (call == "require") {
 		if (request->call.count () >= 2) {
 			QString lib_name = request->call[1];
 			KMessageBox::information (0, i18n ("The R-backend has indicated that in order to carry out the current task it needs the package '%1', which is not currently installed. We will open the package-management tool, and there you can try to locate and install the needed package.", lib_name), i18n ("Require package '%1'", lib_name));
-			RKLoadLibsDialog::showInstallPackagesModal (0, request->in_chain, lib_name);
-			issueCommand (".rk.set.reply (\"\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			RKLoadLibsDialog::showInstallPackagesModal (0, in_chain, lib_name);
+			issueCommand (".rk.set.reply (\"\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		} else {
-			issueCommand (".rk.set.reply (\"Too few arguments in call to require.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			issueCommand (".rk.set.reply (\"Too few arguments in call to require.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		}
 	} else if (call == "quit") {
 		RKWardMainWindow::getMain ()->close ();
 		// if we're still alive, quitting was cancelled
-		issueCommand (".rk.set.reply (\"Quitting was cancelled\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+		issueCommand (".rk.set.reply (\"Quitting was cancelled\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 #ifndef DISABLE_RKWINDOWCATCHER
  	} else if (call == "startOpenX11") {
 		// TODO: error checking/handling (wrong parameter count/type)
@@ -402,7 +518,7 @@
 	} else if (call == "preLocaleChange") {
 		int res = KMessageBox::warningContinueCancel (0, i18n ("A command in the R backend is trying to change the character encoding. While RKWard offers support for this, and will try to adjust to the new locale, this operation may cause subtle bugs, if data windows are currently open. Also the feature is not well tested, yet, and it may be advisable to save your workspace before proceeding.\nIf you have any data editor opened, or in any doubt, it is recommended to close those first (this will probably be auto-detected in later versions of RKWard). In this case, please chose 'Cancel' now, then close the data windows, save, and retry."), i18n ("Locale change"));
 		if (res != KMessageBox::Continue) {
-			issueCommand (".rk.set.reply (FALSE)", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			issueCommand (".rk.set.reply (FALSE)", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		}
 	} else if (call == "doPlugin") {
 		if (request->call.count () >= 3) {
@@ -411,14 +527,14 @@
 			RKComponentMap::ComponentInvocationMode mode = RKComponentMap::ManualSubmit;
 			if (request->call[2] == "auto") mode = RKComponentMap::AutoSubmit;
 			else if (request->call[2] == "submit") mode = RKComponentMap::AutoSubmitOrFail;
-			ok = RKComponentMap::invokeComponent (request->call[1], request->call.mid (3), mode, &message, request->in_chain);
+			ok = RKComponentMap::invokeComponent (request->call[1], request->call.mid (3), mode, &message, in_chain);
 
 			if (message.isEmpty ()) {
-				issueCommand (".rk.set.reply (NULL)", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+				issueCommand (".rk.set.reply (NULL)", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 			} else {
 				QString type = "warning";
 				if (!ok) type = "error";
-				issueCommand (".rk.set.reply (list (type=\"" + type + "\", message=\"" + RKCommonFunctions::escape (message) + "\"))", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+				issueCommand (".rk.set.reply (list (type=\"" + type + "\", message=\"" + RKCommonFunctions::escape (message) + "\"))", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 			}
 		} else {
 			RK_ASSERT (false);
@@ -426,7 +542,7 @@
 	} else if (call == "listPlugins") {
 		if (request->call.count () == 1) {
 			QStringList list = RKComponentMap::getMap ()->allComponentIds ();
-			issueCommand (".rk.set.reply (c (\"" + list.join ("\", \"") + "\"))\n", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+			issueCommand (".rk.set.reply (c (\"" + list.join ("\", \"") + "\"))\n", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 		} else {
 			RK_ASSERT (false);
 		}
@@ -453,7 +569,7 @@
 		}
 		command.append ("))");
 
-		issueCommand (command, RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+		issueCommand (command, RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 	} else if (call == "recordCommands") {
 		if (request->call.count () == 3) {
 			QString filename = request->call[1];
@@ -464,7 +580,7 @@
 				command_logfile.close ();
 			} else {
 				if (command_logfile_mode != NotRecordingCommands) {
-					issueCommand (".rk.set.reply (\"Attempt to start recording, while already recording commands. Ignoring.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+					issueCommand (".rk.set.reply (\"Attempt to start recording, while already recording commands. Ignoring.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 				} else {
 					command_logfile.setFileName (filename);
 					bool ok = command_logfile.open (QIODevice::WriteOnly | QIODevice::Truncate);
@@ -472,7 +588,7 @@
 						command_logfile_mode = RecordingCommands;
 						if (with_sync) command_logfile_mode = RecordingCommandsWithSync;
 					} else {
-						issueCommand (".rk.set.reply (\"Could not open file for writing. Not recording commands.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+						issueCommand (".rk.set.reply (\"Could not open file for writing. Not recording commands.\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 					}
 				}
 			}
@@ -480,10 +596,10 @@
 			RK_ASSERT (false);
 		}
 	} else {
-		issueCommand (".rk.set.reply (\"Unrecognized call '" + call + "'. Ignoring\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, request->in_chain);
+		issueCommand (".rk.set.reply (\"Unrecognized call '" + call + "'. Ignoring\")", RCommand::App | RCommand::Sync, QString::null, 0, 0, in_chain);
 	}
 	
-	closeChain (request->in_chain);
+	closeChain (in_chain);
 }
 
 void RInterface::processRCallbackRequest (RCallbackArgs *args) {

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.h	2010-10-26 14:52:48 UTC (rev 3152)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.h	2010-10-26 18:33:48 UTC (rev 3153)
@@ -2,7 +2,7 @@
                           rinterface.h  -  description
                              -------------------
     begin                : Fri Nov 1 2002
-    copyright            : (C) 2002, 2004, 2005, 2006, 2007, 2009 by Thomas Friedrichsmeier
+    copyright            : (C) 2002, 2004, 2005, 2006, 2007, 2009, 2010 by Thomas Friedrichsmeier
     email                : tfry at users.sourceforge.net
  ***************************************************************************/
 
@@ -41,6 +41,7 @@
 class RCommandReceiver;
 struct REvalRequest;
 struct RKWardStartupOptions;
+struct RNextCommandRequest;
 
 /** This class provides the main interface to the R-processor.
 
@@ -91,12 +92,16 @@
 	bool backendIsDead ();
 	bool backendIsIdle ();
 
+	static bool backendIsLocked ();
+
 	static bool inRThread ();
 	static void tryToDoEmergencySave ();
-public slots:
+private slots:
 /** called periodically to flush output buffer in RThread */
 	void flushOutput ();
 private:
+/** Calls RThread::flushOutput(), and takes care of adding the output to all applicable commands */
+	void flushOutput (bool forced);
 /** pointer to the RThread */
 	RThread *r_thread;
 /** Timer to trigger flushing output */
@@ -116,6 +121,25 @@
 //	void processRGetValueRequest (RGetValueRequest);
 /** See \ref RThread::doStandardCallback (). Does the actual job. */
 	void processRCallbackRequest (RCallbackArgs *args);
+
+/** A list of all commands that have entered, and not yet left, the backend thread */
+	QList<RCommand*> all_current_commands;
+	RNextCommandRequest *command_request;
+	void tryNextCommand ();
+	void doNextCommand (RCommand *command);
+	bool previously_idle;
+
+/** @see locked */
+	enum LockType {
+		User=1,		/**< locked on user request */
+		Cancel=2,	/**< locked to safely cancel a running command */
+		Startup=4	/**< locked on startup */
+	};
+
+/** Used for locking the backend, meaning not further commands will be given to the backend. This is used, when the currently running command is to be cancelled. It is used to make sure that the backend thread does not proceed with further commands, before the main thread takes notice. Also it is called, if the RThread is paused on User request. Further, the thread is initially locked so the main thread can check for some conditions before the backend thread may produce
+more errors/crashes. @see RInterface::cancelCommand @see RInterface::pauseProcessing
+May be an OR'ed combination of several LockType s */
+	int locked;
 friend class RKWardMainWindow;
 friend class RCommand;
 /** Used (once!) to start the RThread. Need to make this separate to avoid race conditions */

Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rthread.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rthread.cpp	2010-10-26 14:52:48 UTC (rev 3152)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rthread.cpp	2010-10-26 18:33:48 UTC (rev 3153)
@@ -40,7 +40,8 @@
 #	include <pthread.h>		// seems to be needed at least on FreeBSD
 #endif
 
-#define MAX_BUF_LENGTH 4000
+#define MAX_BUF_LENGTH 16000
+#define OUTPUT_STRING_RESERVE 1000
 
 void RThread::interruptProcessing (bool interrupt) {
 	if (!interrupt) return;
@@ -54,7 +55,6 @@
 void RThread::run () {
 	RK_TRACE (RBACKEND);
 	thread_id = currentThreadId ();
-	locked = Startup;
 	killed = false;
 	int err;
 
@@ -68,13 +68,12 @@
 	if ((err = initialize ())) {
 		int* err_c = new int;
 		*err_c = err;
-		flushOutput ();		// to make errors/warnings available to the main thread
 		qApp->postEvent (RKGlobals::rInterface (), new RKRBackendEvent (RKRBackendEvent::RStartupError, err_c));
 	}
 	qApp->postEvent (RKGlobals::rInterface (), new RKRBackendEvent (RKRBackendEvent::RStarted));
 
 	// wait until RKWard is set to go (esp, it has handled any errors during startup, etc.)
-	while (locked) {
+	while (RInterface::backendIsLocked ()) {
 		msleep (10);
 	}
 
@@ -95,7 +94,6 @@
 	if (check_object_updates_needed || (current_command->type () & RCommand::ObjectListUpdate)) {
 		checkObjectUpdatesNeeded (current_command->type () & (RCommand::User | RCommand::ObjectListUpdate));
 	}
-	RCommandStack::currentStack ()->pop ();
 	notifyCommandDone (current_command);	// command may be deleted after this
 
 	all_current_commands.pop_back();
@@ -103,77 +101,30 @@
 	MUTEX_UNLOCK;
 }
 
-RCommand* RThread::fetchNextCommand (RCommandStack* stack) {
+RCommand* RThread::fetchNextCommand () {
 	RK_TRACE (RBACKEND);
 
-	bool main_stack = (stack == RCommandStack::regular_stack);
-#warning We would need so much less mutex locking everywhere, if the command would simply be copied between threads!
-	while (1) {
-		if (killed) {
-			return 0;
-		}
+	RNextCommandRequest req;
+	bool done = false;
+	req.done = &done;
+	req.command = 0;
 
+	RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::RNextCommandRequest, &req);
+	qApp->postEvent (RKGlobals::rInterface (), event);
+
+	while (!done) {
+		if (killed) return 0;
 		processX11Events ();
+		if (!done) msleep (10);
+	}
 
-		MUTEX_LOCK;
-		if (previously_idle) {
-			if (!RCommandStack::regular_stack->isEmpty ()) {
-				qApp->postEvent (RKGlobals::rInterface (), new RKRBackendEvent (RKRBackendEvent::RBusy));
-				previously_idle = false;
-			}
-		}
-
-		if ((!(main_stack && locked)) && stack->isActive ()) {		// do not respect locks in substacks
-			RCommand *command = stack->currentCommand ();
-
-			if (command) {
-				// notify GUI-thread that a new command is being tried and initialize
-				RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::RCommandIn, command);
-				qApp->postEvent (RKGlobals::rInterface (), event);
-				all_current_commands.append (command);
-				current_command = command;
-
-				if ((command->type () & RCommand::EmptyCommand) || (command->status & RCommand::Canceled) || (command->type () & RCommand::QuitCommand)) {
-					// some commands are not actually run by R, but handled inline, here
-					if (command->status & RCommand::Canceled) {
-						command->status |= RCommand::Failed;
-					} else if (command->type () & RCommand::QuitCommand) {
-						killed = true;
-					}
-					commandFinished ();
-				} else {
-					RK_DO (qDebug ("running command: %s", command->command ().toLatin1().data ()), RBACKEND, DL_DEBUG);
-				
-					command->status |= RCommand::Running;	// it is important that this happens before the Mutex is unlocked!
-					RCommandStackModel::getModel ()->itemChange (command);
-
-					MUTEX_UNLOCK;
-					return command;
-				}
-			}
-		}
-
-		if ((!stack->isActive ()) && stack->isEmpty () && !main_stack) {
-			MUTEX_UNLOCK;
-			return 0;		// substack depleted
-		}
-
-		if (!previously_idle) {
-			if (RCommandStack::regular_stack->isEmpty ()) {
-				qApp->postEvent (RKGlobals::rInterface (), new RKRBackendEvent (RKRBackendEvent::RIdle));
-				previously_idle = true;
-			}
-		}
-		
-		// if no commands are in queue, sleep for a while
-		MUTEX_UNLOCK;
-		if (killed) {
-			return 0;
-		}
-		msleep (10);
+	RCommand *command = req.command;
+	if (command) {
+		all_current_commands.append (command);
+		current_command = command;
 	}
 
-	return 0;
+	return command;
 }
 
 void RThread::notifyCommandDone (RCommand *command) {
@@ -187,110 +138,59 @@
 	qApp->postEvent (RKGlobals::rInterface (), event);
 }
 
-void RThread::waitIfOutputPaused () {
+void RThread::waitIfOutputBufferExceeded () {
 	// don't trace
-	while (output_paused) {
+	while (out_buf_len > MAX_BUF_LENGTH) {
 		msleep (10);
 	}
 }
 
-void RThread::handleOutput (const QString &output, int buf_length, bool regular) {
+void RThread::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
 	RK_TRACE (RBACKEND);
 
-// TODO: output sometimes arrives in small chunks. Maybe it would be better to keep an internal buffer, and only append it to the output, when R_FlushConsole gets called?
 	if (!buf_length) return;
-	waitIfOutputPaused ();
+	waitIfOutputBufferExceeded ();
 
-	MUTEX_LOCK;
-	ROutput::ROutputType output_type;
-	if (regular) {
-		output_type = ROutput::Output;
-	} else {
-		output_type = ROutput::Warning;
-	}
+	output_buffer_mutex.lock ();
 
-	if (current_output) {
-		if (current_output->type != output_type) {
-			flushOutput ();
-		}
+	ROutput *current_output = 0;
+	if (!output_buffer.isEmpty ()) {
+		// Merge with previous output fragment, if of the same type
+		current_output = output_buffer.last ();
+		if (current_output->type != output_type) current_output = 0;
 	}
-	if (!current_output) {	// not an else, might have been set to 0 in the above if
+	if (!current_output) {
 		current_output = new ROutput;
 		current_output->type = output_type;
-		current_output->output.reserve (MAX_BUF_LENGTH + 50);
+		current_output->output.reserve (OUTPUT_STRING_RESERVE);
+		output_buffer.append (current_output);
 	}
 	current_output->output.append (output);
+	out_buf_len += buf_length;
 
-	
-	if ((out_buf_len += buf_length) > MAX_BUF_LENGTH) {
-		RK_DO (qDebug ("Output buffer has %d characters. Forcing flush", out_buf_len), RBACKEND, DL_DEBUG);
-		flushOutput ();
-	}
-	MUTEX_UNLOCK;
+	output_buffer_mutex.unlock ();
 }
 
-void RThread::flushOutput () {
-	if (!current_output) return;		// avoid creating loads of traces
+ROutputList RThread::flushOutput (bool forcibly) {
+	ROutputList ret;
+
+	if (out_buf_len == 0) return ret;		// if there is absolutely no output, just skip.
 	RK_TRACE (RBACKEND);
 
-	if (current_command) {
-		for (QList<RCommand*>::const_iterator it = all_current_commands.constBegin (); it != all_current_commands.constEnd(); ++it) {
-			ROutput *output = current_output;
-			if ((*it) != current_command) {		// this output belongs to several commands at once. So we need to copy it.
-				output = new ROutput;
-				output->type = current_output->type;
-				output->output = current_output->output;
-			}
-
-			(*it)->output_list.append (output);
-			if (output->type == ROutput::Output) {
-				(*it)->status |= RCommand::HasOutput;
-			} else if (output->type == ROutput::Warning) {
-				(*it)->status |= RCommand::HasWarnings;
-			} else if (output->type == ROutput::Error) {
-				(*it)->status |= RCommand::HasError;
-			}
-
-			// pass a signal to the main thread for real-time update of output
-			ROutputContainer *outc = new ROutputContainer;
-			outc->output = output;
-			outc->command = *it;
-			RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::RCommandOutput, outc);
-			qApp->postEvent (RKGlobals::rInterface (), event);
-		}
-
-		RK_DO (qDebug ("output '%s'", current_output->output.toLatin1 ().data ()), RBACKEND, DL_DEBUG);
+	if (!forcibly) {
+		if (!output_buffer_mutex.tryLock ()) return ret;
 	} else {
-		// running Rcmdr, eh?
-		RK_DO (qDebug ("output without receiver'%s'", current_output->output.toLatin1 ().data ()), RBACKEND, DL_WARNING);
-		delete current_output;
+		output_buffer_mutex.lock ();
 	}
 
-// forget output
-	current_output = 0;
+	RK_ASSERT (!output_buffer.isEmpty ());	// see check for out_buf_len, above
+
+	ret = output_buffer;
+	output_buffer.clear ();
 	out_buf_len = 0;
-}
 
-void RThread::handleError (QString *call, int call_length) {
-	RK_TRACE (RBACKEND);
-
-	if (!call_length) return;
-	waitIfOutputPaused ();
-
-	MUTEX_LOCK;
-	// Unfortunately, errors still get printed to the output, UNLESS a sink() is in place. We try this crude method for the time being:
-	flushOutput ();
-	if (current_command) {
-		if (!current_command->output_list.isEmpty ()) {
-			if (current_command->output_list.last ()->output == call[0]) {
-				current_command->output_list.last ()->type = ROutput::Error;
-			}
-		}
-		current_command->status |= RCommand::HasError;
-	}
-
-	RK_DO (qDebug ("error '%s'", call[0].toLatin1 ().data ()), RBACKEND, DL_DEBUG);
-	MUTEX_UNLOCK;
+	output_buffer_mutex.unlock ();
+	return ret;
 }
 
 void RThread::handleSubstackCall (QStringList &call) {
@@ -308,34 +208,21 @@
 
 	REvalRequest request;
 	request.call = call;
-	MUTEX_LOCK;
-	flushOutput ();
-	RCommandStack *reply_stack = new RCommandStack (current_command);
-	request.in_chain = reply_stack->startChain (reply_stack);
-	MUTEX_UNLOCK;
-
 	RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::REvalRequest, &request);
 	qApp->postEvent (RKGlobals::rInterface (), event);
 
 	RCommand *c;
-	while ((c = fetchNextCommand (reply_stack))) {
+	while ((c = fetchNextCommand ())) {
 		MUTEX_LOCK;
 		runCommand (c);
 		MUTEX_UNLOCK;
 		commandFinished (false);
 	}
-
-	MUTEX_LOCK;
-	delete reply_stack;
-	MUTEX_UNLOCK;
 }
 
 void RThread::handleStandardCallback (RCallbackArgs *args) {
 	RK_TRACE (RBACKEND);
 
-	MUTEX_LOCK;
-	flushOutput ();
-	MUTEX_UNLOCK;
 	args->done = false;
 
 	RKRBackendEvent* event = new RKRBackendEvent (RKRBackendEvent::RCallbackRequest, args);
@@ -345,7 +232,7 @@
 	while (!(*done)) {
 		msleep (10); // callback not done yet? Sleep for a while
 
-		if (!(locked || killed)) processX11Events ();
+		if (!(RInterface::backendIsLocked () || killed)) processX11Events ();
 	}
 
 	RK_DO (qDebug ("standard callback done"), RBACKEND, DL_DEBUG);
@@ -399,10 +286,6 @@
 	if (!runDirectCommand ("options (error=quote (.rk.do.error ()))\n")) status |= SinkFail;
 	if (!runDirectCommand ("rk.set.output.html.file (\"" + RKSettingsModuleGeneral::filesPath () + "/rk_out.html\")\n")) status |= SinkFail;
 
-	MUTEX_LOCK;
-	flushOutput ();
-	MUTEX_UNLOCK;
-
 	return status;
 }
 


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.




More information about the rkward-tracker mailing list