[rkward-cvs] SF.net SVN: rkward:[3661] trunk/rkward
tfry at users.sourceforge.net
tfry at users.sourceforge.net
Thu Jun 2 18:19:30 UTC 2011
Revision: 3661
http://rkward.svn.sourceforge.net/rkward/?rev=3661&view=rev
Author: tfry
Date: 2011-06-02 18:19:29 +0000 (Thu, 02 Jun 2011)
Log Message:
-----------
Simplify system() output handling.
Thanks to Simon Urbanek for reminding me that it is possible to capture stdout inside a single process using a pipe.
This makes synchronization a lot easier.
Modified Paths:
--------------
trunk/rkward/ChangeLog
trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp
trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp
trunk/rkward/rkward/rbackend/rkrbackend.cpp
trunk/rkward/rkward/rbackend/rkrbackend.h
trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp
trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h
trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R
Modified: trunk/rkward/ChangeLog
===================================================================
--- trunk/rkward/ChangeLog 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/ChangeLog 2011-06-02 18:19:29 UTC (rev 3661)
@@ -1,4 +1,5 @@
-- Simplify code produced by several plugins # TODO: convert remaining uses of substitute()
+- Simplify internal handling of system() and system2() output TODO: test on Windows!
+- Simplify code produced by several plugins
- Fixed: Occasional duplication of first letter of keyword, when using "Copy lines to output" from the console window
--- Version 0.5.6 - May-30-2011
Modified: trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkbackendtransmitter.cpp 2011-06-02 18:19:29 UTC (rev 3661)
@@ -114,7 +114,8 @@
void RKRBackendTransmitter::flushOutput (bool force) {
if (!current_sync_requests.isEmpty ()) return;
-
+
+ if (!RKRBackend::this_pointer->fetchStdoutStderr (force, false)) return;
ROutputList out = RKRBackend::this_pointer->flushOutput (force);
if (out.isEmpty ()) return;
Modified: trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkfrontendtransmitter.cpp 2011-06-02 18:19:29 UTC (rev 3661)
@@ -131,29 +131,6 @@
RK_ASSERT (request->synchronous);
writeRequest (request); // to tell the backend, that we are keeping up. Also deletes the request.
return;
- } else if (request->type == RBackendRequest::SyncOutput) {
- RK_ASSERT (request->synchronous);
-
- QString token = request->params["endtoken"].toString ();
- writeRequest (request);
-
- if (!token.isEmpty ()) {
- QString buffer;
-
- disconnect (backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
- for (int i=5; i > 0; --i) { // don't wait forever for the end-token.
- buffer.append (QString::fromLocal8Bit (backend->readAll ()));
- if (buffer.endsWith (token)) {
- buffer = buffer.left (buffer.size () - token.size ());
- break;
- }
- backend->waitForReadyRead (500);
- }
- connect (backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
-
- if (!buffer.isEmpty ()) handleOutput (buffer, buffer.size (), ROutput::Warning);
- }
- return;
}
RKRBackendEvent* event = new RKRBackendEvent (request);
Modified: trunk/rkward/rkward/rbackend/rkrbackend.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackend.cpp 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackend.cpp 2011-06-02 18:19:29 UTC (rev 3661)
@@ -96,6 +96,8 @@
#ifndef Q_WS_WIN
# include <signal.h> // needed for pthread_kill
# include <pthread.h> // seems to be needed at least on FreeBSD
+# include <unistd.h> // for non-blocking pipes
+# include <fcntl.h>
#endif
void RK_scheduleIntr () {
@@ -381,6 +383,29 @@
}
#endif
+bool RKRBackend::fetchStdoutStderr (bool forcibly, bool allow_blocking) {
+#ifndef Q_OS_WIN
+ if (!forcibly) {
+ if (!stdout_stderr_mutex.tryLock ()) return false;
+ } else {
+ stdout_stderr_mutex.lock ();
+ }
+
+ char buffer[1024];
+ while (true) {
+ int bytes = read (stdout_stderr_fd, buffer, 1023);
+ if (bytes < 0) break;
+ if (bytes != 0) {
+ buffer[bytes] = '\0';
+ handleOutput (current_locale_codec->toUnicode (buffer, bytes), bytes, ROutput::Warning, allow_blocking);
+ }
+ }
+
+ stdout_stderr_mutex.unlock ();
+#endif
+ return true;
+}
+
void RWriteConsoleEx (const char *buf, int buflen, int type) {
RK_TRACE (RBACKEND);
@@ -400,6 +425,7 @@
}
}
+ RKRBackend::this_pointer->fetchStdoutStderr (true, true);
RKRBackend::this_pointer->handleOutput (RKRBackend::this_pointer->current_locale_codec->toUnicode (buf, buflen), buflen, type == 0 ? ROutput::Output : ROutput::Warning);
}
@@ -864,25 +890,6 @@
return R_NilValue;
}
-SEXP doSyncOutput (SEXP flushstdout) {
- RK_TRACE (RBACKEND);
-
-#if (!defined RKWARD_THREADED) && (!defined Q_OS_WIN)
- const char* token = "##RKOutputEndTag3210723##"; // should be unique enough for practical purposes
- bool doflushstdout = (RKRSupport::SEXPToInt (flushstdout) != 0);
-
- RBackendRequest req (true, RBackendRequest::SyncOutput);
- if (doflushstdout) req.params["endtoken"] = QString (token);
- RKRBackend::this_pointer->handleRequest (&req);
- if (doflushstdout) {
- printf ("%s", token);
- fflush (stdout);
- }
-#endif
-
- return R_NilValue;
-}
-
// returns the MIME-name of the current locale encoding (from Qt)
SEXP doLocaleName () {
RK_TRACE (RBACKEND);
@@ -950,6 +957,19 @@
FlushConsoleInputBuffer(GetStdHandle(STD_INPUT_HANDLE));
#endif
+#ifndef Q_OS_WIN
+ // re-direct stdout / stderr to a pipe, so we can read output from system() calls
+ int pfd[2];
+ pipe(pfd);
+ for (int n=0; n<2; n++) {
+ fcntl (pfd[n], F_SETFL, fcntl (pfd[n], F_GETFL, 0) | O_NONBLOCK);
+ }
+ dup2(STDOUT_FILENO, STDERR_FILENO); // single channel to avoid interleaving hell, for now.
+ dup2(pfd[1], STDOUT_FILENO);
+ close(pfd[1]);
+ stdout_stderr_fd = pfd[0];
+#endif
+
setup_Rmainloop ();
#ifndef Q_WS_WIN
@@ -995,7 +1015,6 @@
{ "rk.dialog", (DL_FUNC) &doDialog, 6 },
{ "rk.update.locale", (DL_FUNC) &doUpdateLocale, 0 },
{ "rk.locale.name", (DL_FUNC) &doLocaleName, 0 },
- { "rk.sync.output", (DL_FUNC) &doSyncOutput, 1 },
{ 0, 0, 0 }
};
R_registerRoutines (R_getEmbeddingDllInfo(), NULL, callMethods, NULL, NULL);
Modified: trunk/rkward/rkward/rbackend/rkrbackend.h
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackend.h 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackend.h 2011-06-02 18:19:29 UTC (rev 3661)
@@ -183,11 +183,18 @@
QList<RCommandProxy*> current_commands_to_cancel;
bool too_late_to_interrupt;
void interruptCommand (int command_id);
+
+/** check stdout and stderr for new output (from sub-processes). Since this function is called from both threads, it is protected by a mutex.
+ * @param forcibly: if false, and the other thread currently has a lock on the mutex, do nothing, and return false.
+ * @returns: true, if output was actually fetched (or no output was available), false, if the function gave up on a locked mutex. */
+ bool fetchStdoutStderr (bool forcibly, bool allow_blocking);
private:
void clearPendingInterrupt ();
protected:
RCommandProxy* handleRequest (RBackendRequest *request, bool mayHandleSubstack);
private:
+ QMutex stdout_stderr_mutex;
+ int stdout_stderr_fd;
/** set up R standard callbacks */
void setupCallbacks ();
/** connect R standard callbacks */
Modified: trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.cpp 2011-06-02 18:19:29 UTC (rev 3661)
@@ -93,14 +93,14 @@
RK_TRACE (RBACKEND);
}
-void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type) {
+void RKROutputBuffer::handleOutput (const QString &output, int buf_length, ROutput::ROutputType output_type, bool allow_blocking) {
if (!buf_length) return;
RK_TRACE (RBACKEND);
RK_DO (qDebug ("Output type %d: %s", output_type, qPrintable (output)), RBACKEND, DL_DEBUG);
// wait while the output buffer is exceeded to give downstream threads a chance to catch up
- while (out_buf_len > MAX_BUF_LENGTH) {
+ while ((out_buf_len > MAX_BUF_LENGTH) && allow_blocking) {
if (!doMSleep (10)) break;
}
Modified: trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rkrbackendprotocol_shared.h 2011-06-02 18:19:29 UTC (rev 3661)
@@ -48,7 +48,6 @@
#ifndef RKWARD_THREADED
Output, /**< A piece of output. Note: If the backend runs in a single process, output is handled in a pull fashion, instead of using requests. */
Interrupt, /**< Interrupt evaluation. This request type originates in the frontend, not the backend (the only one so far). */
- SyncOutput, /**< Synchronization of output between R output and stdout. Note: If the backend runs in a single process, the stdout/stderr channel is not supported anyway. */
#endif
OtherRequest /**< Any other type of request. Note: which requests are in the enum, and which are not has mostly historical reasons. @see params */
};
@@ -135,7 +134,7 @@
virtual ~RKROutputBuffer ();
/** This gets called on normal R output (R_WriteConsole). Used to get at output. */
- void handleOutput (const QString &output, int len, ROutput::ROutputType type);
+ void handleOutput (const QString &output, int len, ROutput::ROutputType type, bool allow_blocking=true);
/** Flushes current output buffer. Meant to be called from RInterface::flushOutput, only.
@param forcibly: if true, will always flush the output. If false, will flush the output only if the mutex can be locked without waiting. */
Modified: trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R
===================================================================
--- trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R 2011-06-02 16:03:27 UTC (rev 3660)
+++ trunk/rkward/rkward/rbackend/rpackages/rkward/R/internal.R 2011-06-02 18:19:29 UTC (rev 3661)
@@ -466,31 +466,6 @@
eval (body (.rk.menu.default))
})
- ## Add output synchronisation across system(), and system2() calls.
- rk.replace.function ("system", base::.BaseNamespaceEnv,
- function () {
- if (!exists ("ignore.stdout", inherits=FALSE)) ignore.stdout <- FALSE # ignore.stdout was introduced in R 2.12.0
-
- if (!(intern || (ignore.stdout && ignore.stderr))) {
- .Call ("rk.sync.output", 0)
- on.exit (.Call ("rk.sync.output", 1), TRUE)
- }
-
- eval (body (.rk.system.default))
- })
-
- # NOTE: system2 was not introduced before R 2.12.0 (or was it 2.11.0?)
- if (exists ("system2", base::.BaseNamespaceEnv)) {
- rk.replace.function ("system2", base::.BaseNamespaceEnv,
- function () {
- if ((!is.null (stdout) && stdout == "") || (!is.null (stderr) && stderr == "")) {
- .Call ("rk.sync.output", 0)
- on.exit (.Call ("rk.sync.output", 1), TRUE)
- }
- eval (body (.rk.system2.default))
- })
- }
-
# call separate assignments functions:
if (exists (".rk.fix.assignments.graphics")) eval (body (.rk.fix.assignments.graphics)) # internal_graphics.R
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
More information about the rkward-tracker
mailing list