[rkward-cvs] SF.net SVN: rkward:[3661] trunk/rkward

tfry at users.sourceforge.net tfry at users.sourceforge.net
Thu Jun 2 18:19:30 UTC 2011


Revision: 3661
          http://rkward.svn.sourceforge.net/rkward/?rev=3661&view=rev
Author:   tfry
Date:     2011-06-02 18:19:29 +0000 (Thu, 02 Jun 2011)

Log Message:
-----------
Simplify system() output handling.
Thanks to Simon Urbanek for reminding me that it is possible to capture stdout inside a single process using a pipe.
This makes synchronization a lot easier.

Modified Paths:
--------------
    trunk/rkward/ChangeLog
    trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp
    trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp
    trunk/rkward/rkward/rbackend/rkrbackend.cpp
    trunk/rkward/rkward/rbackend/rkrbackend.h
    trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp
    trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h
    trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R

Modified: trunk/rkward/ChangeLog
===================================================================
--- trunk/rkward/ChangeLog	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/ChangeLog	2011-06-02 18:19:29 UTC (rev 3661)
@@ -1,4 +1,5 @@
-- Simplify code produced by several plugins 											# TODO: convert remaining uses of substitute()
+- Simplify internal handling of system() and system2() output					TODO: test on Windows!
+- Simplify code produced by several plugins
 - Fixed: Occasional duplication of first letter of keyword, when using "Copy lines to output" from the console window
 
 --- Version 0.5.6 - May-30-2011

Modified: trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp	2011-06-02 18:19:29 UTC (rev 3661)
@@ -114,7 +114,8 @@
 
 void RKRBackendTransmitter::flushOutput (bool force) {
 	if (!current_sync_requests.isEmpty ()) return;
-	
+
+	if (!RKRBackend::this_pointer->fetchStdoutStderr (force, false)) return;
 	ROutputList out = RKRBackend::this_pointer->flushOutput (force);
 	if (out.isEmpty ()) return;
 

Modified: trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp	2011-06-02 18:19:29 UTC (rev 3661)
@@ -131,29 +131,6 @@
 		RK_ASSERT (request->synchronous);
 		writeRequest (request);	// to tell the backend, that we are keeping up. Also deletes the request.
 		return;
-	} else if (request->type == RBackendRequest::SyncOutput) {
-		RK_ASSERT (request->synchronous);
-
-		QString token = request->params["endtoken"].toString ();
-		writeRequest (request);
-
-		if (!token.isEmpty ()) {
-			QString buffer;
-
-			disconnect (backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
-			for (int i=5; i > 0; --i) {		// don't wait forever for the end-token.
-				buffer.append (QString::fromLocal8Bit (backend->readAll ()));
-				if (buffer.endsWith (token)) {
-					buffer = buffer.left (buffer.size () - token.size ());
-					break;
-				}
-				backend->waitForReadyRead (500);
-			}
-			connect (backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
-
-			if (!buffer.isEmpty ()) handleOutput (buffer, buffer.size (), ROutput::Warning);
-		}
-		return;
 	}
 
 	RKRBackendEvent* event = new RKRBackendEvent (request);

Modified: trunk/rkward/rkward/rbackend/rkrbackend.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackend.cpp	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackend.cpp	2011-06-02 18:19:29 UTC (rev 3661)
@@ -96,6 +96,8 @@
 #ifndef Q_WS_WIN
 #	include <signal.h>		// needed for pthread_kill
 #	include <pthread.h>		// seems to be needed at least on FreeBSD
+#	include <unistd.h>		// for non-blocking pipes
+#	include <fcntl.h>
 #endif
 
 void RK_scheduleIntr () {
@@ -381,6 +383,29 @@
 }
 #endif
 
+bool RKRBackend::fetchStdoutStderr (bool forcibly, bool allow_blocking) {
+#ifndef Q_OS_WIN
+	if (!forcibly) {
+		if (!stdout_stderr_mutex.tryLock ()) return false;
+	} else {
+		stdout_stderr_mutex.lock ();
+	}
+
+	char buffer[1024];
+	while (true) {
+		int bytes = read (stdout_stderr_fd, buffer, 1023);
+		if (bytes < 0) break;
+		if (bytes != 0) {
+			buffer[bytes] = '\0';
+			handleOutput (current_locale_codec->toUnicode (buffer, bytes), bytes, ROutput::Warning, allow_blocking);
+		}
+	}
+
+	stdout_stderr_mutex.unlock ();
+#endif
+	return true;
+}
+
 void RWriteConsoleEx (const char *buf, int buflen, int type) {
 	RK_TRACE (RBACKEND);
 
@@ -400,6 +425,7 @@
 		}
 	}
 
+	RKRBackend::this_pointer->fetchStdoutStderr (true, true);
 	RKRBackend::this_pointer->handleOutput (RKRBackend::this_pointer->current_locale_codec->toUnicode (buf, buflen), buflen, type == 0 ? ROutput::Output : ROutput::Warning);
 }
 
@@ -864,25 +890,6 @@
 	return R_NilValue;
 }
 
-SEXP doSyncOutput (SEXP flushstdout) {
-	RK_TRACE (RBACKEND);
-
-#if (!defined RKWARD_THREADED) && (!defined Q_OS_WIN)
-	const char* token = "##RKOutputEndTag3210723##";	// should be unique enough for practical purposes
-	bool doflushstdout = (RKRSupport::SEXPToInt (flushstdout) != 0);
-
-	RBackendRequest req (true, RBackendRequest::SyncOutput);
-	if (doflushstdout) req.params["endtoken"] = QString (token);
-	RKRBackend::this_pointer->handleRequest (&req);
-	if (doflushstdout) {
-		printf ("%s", token);
-		fflush (stdout);
-	}
-#endif
-
-	return R_NilValue;
-}
-
 // returns the MIME-name of the current locale encoding (from Qt)
 SEXP doLocaleName () {
 	RK_TRACE (RBACKEND);
@@ -950,6 +957,19 @@
 	FlushConsoleInputBuffer(GetStdHandle(STD_INPUT_HANDLE));
 #endif
 
+#ifndef Q_OS_WIN
+	// re-direct stdout / stderr to a pipe, so we can read output from system() calls
+	int pfd[2];
+	pipe(pfd);
+	for (int n=0; n<2; n++) {
+		fcntl (pfd[n], F_SETFL, fcntl (pfd[n], F_GETFL, 0) | O_NONBLOCK);
+	}
+	dup2(STDOUT_FILENO, STDERR_FILENO);		// single channel to avoid interleaving hell, for now.
+	dup2(pfd[1], STDOUT_FILENO);
+	close(pfd[1]);
+	stdout_stderr_fd = pfd[0];
+#endif
+
 	setup_Rmainloop ();
 
 #ifndef Q_WS_WIN
@@ -995,7 +1015,6 @@
 		{ "rk.dialog", (DL_FUNC) &doDialog, 6 },
 		{ "rk.update.locale", (DL_FUNC) &doUpdateLocale, 0 },
 		{ "rk.locale.name", (DL_FUNC) &doLocaleName, 0 },
-		{ "rk.sync.output", (DL_FUNC) &doSyncOutput, 1 },
 		{ 0, 0, 0 }
 	};
 	R_registerRoutines (R_getEmbeddingDllInfo(), NULL, callMethods, NULL, NULL);

Modified: trunk/rkward/rkward/rbackend/rkrbackend.h
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackend.h	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackend.h	2011-06-02 18:19:29 UTC (rev 3661)
@@ -183,11 +183,18 @@
 	QList<RCommandProxy*> current_commands_to_cancel;
 	bool too_late_to_interrupt;
 	void interruptCommand (int command_id);
+
+/** check stdout and stderr for new output (from sub-processes). Since this function is called from both threads, it is protected by a mutex.
+ *  @param forcibly: if false, and the other thread currently has a lock on the mutex, do nothing, and return false.
+ *  @returns: true, if output was actually fetched (or no output was available), false, if the function gave up on a locked mutex. */
+	bool fetchStdoutStderr (bool forcibly, bool allow_blocking);
 private:
 	void clearPendingInterrupt ();
 protected:
 	RCommandProxy* handleRequest (RBackendRequest *request, bool mayHandleSubstack);
 private:
+	QMutex stdout_stderr_mutex;
+	int stdout_stderr_fd;
 /** set up R standard callbacks */
 	void setupCallbacks ();
 /** connect R standard callbacks */

Modified: trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp	2011-06-02 18:19:29 UTC (rev 3661)
@@ -93,14 +93,14 @@
 	RK_TRACE (RBACKEND);
 }
 
-void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
+void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type, bool allow_blocking) {
 	if (!buf_length) return;
 	RK_TRACE (RBACKEND);
 
 	RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
 
 	// wait while the output buffer is exceeded to give downstream threads a chance to catch up
-	while (out_buf_len > MAX_BUF_LENGTH) {
+	while ((out_buf_len > MAX_BUF_LENGTH) && allow_blocking) {
 		if (!doMSleep (10)) break;
 	}
 

Modified: trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h	2011-06-02 18:19:29 UTC (rev 3661)
@@ -48,7 +48,6 @@
 #ifndef RKWARD_THREADED
 		Output,		/**< A piece of output. Note: If the backend runs in a single process, output is handled in a pull fashion, instead of using requests. */
 		Interrupt,	/**< Interrupt evaluation. This request type originates in the frontend, not the backend (the only one so far). */
-		SyncOutput,	/**< Synchronization of output between R output and stdout. Note: If the backend runs in a single process, the stdout/stderr channel is not supported anyway. */
 #endif
 		OtherRequest		/**< Any other type of request. Note: which requests are in the enum, and which are not has mostly historical reasons. @see params */
 	};
@@ -135,7 +134,7 @@
 	virtual ~RKROutputBuffer ();
 
 /** This gets called on normal R output (R_WriteConsole). Used to get at output. */
-	void handleOutput (const QString &output, int len, ROutput::ROutputType type);
+	void handleOutput (const QString &output, int len, ROutput::ROutputType type, bool allow_blocking=true);
 
 /** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
 @param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */

Modified: trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R
===================================================================
--- trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R	2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R	2011-06-02 18:19:29 UTC (rev 3661)
@@ -466,31 +466,6 @@
 			eval (body (.rk.menu.default))
 		})
 
-	## Add output synchronisation across system(), and system2() calls.
-	rk.replace.function ("system", base::.BaseNamespaceEnv,
-		function () {
-			if (!exists ("ignore.stdout", inherits=FALSE)) ignore.stdout <- FALSE	# ignore.stdout was introduced in R 2.12.0
-
-			if (!(intern || (ignore.stdout && ignore.stderr))) {
-				.Call ("rk.sync.output", 0)
-				on.exit (.Call ("rk.sync.output", 1), TRUE)
-			}
-
-			eval (body (.rk.system.default))
-		})
-
-	# NOTE: system2 was not introduced before R 2.12.0 (or was it 2.11.0?)
-	if (exists ("system2", base::.BaseNamespaceEnv)) {
-		rk.replace.function ("system2", base::.BaseNamespaceEnv,
-			function () {
-				if ((!is.null (stdout) && stdout == "") || (!is.null (stderr) && stderr == "")) {
-					.Call ("rk.sync.output", 0)
-					on.exit (.Call ("rk.sync.output", 1), TRUE)
-				}
-				eval (body (.rk.system2.default))
-			})
-	}
-
 	# call separate assignments functions:
 	if (exists (".rk.fix.assignments.graphics")) eval (body (.rk.fix.assignments.graphics)) # internal_graphics.R
 }


This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.




More information about the rkward-tracker mailing list