[education/rkward] rkward: Handle transmission of RK() data in auxiliary thread

Thomas Friedrichsmeier null at kde.org
Sun Sep 14 19:26:16 BST 2025


Git commit 1f1de445498d034716c58d7e46fe8ad264faa7c9 by Thomas Friedrichsmeier.
Committed on 13/09/2025 at 13:10.
Pushed by tfry into branch 'master'.

Handle transmission of RK() data in auxiliary thread

M  +15   -0    rkward/autotests/core_test.cpp
M  +1    -0    rkward/rbackend/rkasyncdatastreamhelper.h
M  +1    -0    rkward/rbackend/rkrbackend.cpp
M  +1    -1    rkward/rbackend/rkrsupport.h
M  +77   -29   rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.cpp
M  +12   -5    rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.h
M  +1    -7    rkward/rbackend/rkwarddevice/rkgraphicsdevice_frontendtransmitter.cpp
M  +26   -60   rkward/rbackend/rkwarddevice/rkgraphicsdevice_stubs.cpp

https://invent.kde.org/education/rkward/-/commit/1f1de445498d034716c58d7e46fe8ad264faa7c9

diff --git a/rkward/autotests/core_test.cpp b/rkward/autotests/core_test.cpp
index de8a9a69f..5ebe3b455 100644
--- a/rkward/autotests/core_test.cpp
+++ b/rkward/autotests/core_test.cpp
@@ -502,6 +502,21 @@ class RKWardCoreTest : public QObject {
 		dotest(RCommand::PriorityCommand);
 	}
 
+	void cancelLocator() {
+		auto c = new RCommand(QStringLiteral("plot(rnorm(10)); print('before'); locator(); print('after')"), RCommand::User);
+		connect(c->notifier(), &RCommandNotifier::commandOutput, this, [](RCommand *, const ROutput &out) {
+			if (out.output.contains(u"before"_s)) RInterface::instance()->cancelAll();
+			QVERIFY(!out.output.contains(u"after"_s));
+		});
+		runCommandWithTimeout(c, nullptr, [this](RCommand *command) {
+			// QVERIFY(command->wasCanceled()); we're currenly mis-detecting that, which is not a real-world problem, however.
+			// The real test is the command neither times out (below), nor prints "after" (above)
+			QVERIFY(command->failed());
+		});
+		RInterface::issueCommand(QStringLiteral("dev.off()"), RCommand::User);
+		waitForAllFinished();
+	}
+
 	void priorityCommandTest() {
 		// This test runs much faster when silencing the log window. Running faster also seems to help triggering the bug.
 		ScopeHandler sup([]() { RKSettingsModuleWatch::forTestingSuppressOutput(true); }, []() { RKSettingsModuleWatch::forTestingSuppressOutput(false); });
diff --git a/rkward/rbackend/rkasyncdatastreamhelper.h b/rkward/rbackend/rkasyncdatastreamhelper.h
index 4e6c245c9..f5262928a 100644
--- a/rkward/rbackend/rkasyncdatastreamhelper.h
+++ b/rkward/rbackend/rkasyncdatastreamhelper.h
@@ -44,6 +44,7 @@ class RKAsyncDataStreamHelper {
 	}
 
 	void writeOutBuffer() {
+		if (outbuffer.isEmpty()) return;
 		auxstream.device()->seek(0);
 		auxbuffer.resize(0);
 		auxstream << (LENGTH_TYPE)outbuffer.size();
diff --git a/rkward/rbackend/rkrbackend.cpp b/rkward/rbackend/rkrbackend.cpp
index 4f0a5c345..282abacbd 100644
--- a/rkward/rbackend/rkrbackend.cpp
+++ b/rkward/rbackend/rkrbackend.cpp
@@ -208,6 +208,7 @@ extern SEXP RKWard_RData_Tag;
 // ############## R Standard callback overrides BEGIN ####################
 static void RKTransmitNextUserCommandChunk(unsigned char *buf, int buflen) {
 	RK_TRACE(RBACKEND);
+	RFn::R_CheckUserInterrupt();
 
 	RK_ASSERT(RKRBackend::repl_status.user_command_transmitted_up_to <= RKRBackend::repl_status.user_command_buffer.length()); // NOTE: QByteArray::length () does not count the trailing '\0'
 	const char *current_buffer = RKRBackend::repl_status.user_command_buffer.data();
diff --git a/rkward/rbackend/rkrsupport.h b/rkward/rbackend/rkrsupport.h
index 1535bc6b8..6327b389f 100644
--- a/rkward/rbackend/rkrsupport.h
+++ b/rkward/rbackend/rkrsupport.h
@@ -45,7 +45,7 @@ class InterruptSuspension {
 	}
 	~InterruptSuspension() {
 		ROb(R_interrupts_suspended) = old_susp;
-		if (ROb(R_interrupts_pending)) RFn::Rf_onintr();
+		//if (ROb(R_interrupts_pending) && !old_susp) RFn::Rf_onintr();
 	}
 
   private:
diff --git a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.cpp b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.cpp
index 7276a0664..3886334a4 100644
--- a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.cpp
+++ b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.cpp
@@ -1,6 +1,6 @@
 /*
 rkgraphicsdevice_backendtransmitter - This file is part of RKWard (https://rkward.kde.org). Created: Mon Mar 18 2013
-SPDX-FileCopyrightText: 2013 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
+SPDX-FileCopyrightText: 2013-2025 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
 SPDX-FileContributor: The RKWard Team <rkward-devel at kde.org>
 SPDX-License-Identifier: GPL-2.0-or-later
 */
@@ -9,24 +9,24 @@ SPDX-License-Identifier: GPL-2.0-or-later
 
 #include "../rkbackendtransmitter.h"
 #include "../rkrbackendprotocol_backend.h"
+
 #include <QLocalSocket>
+#include <QWaitCondition>
 
 #include "../../debug.h"
 
 RKAsyncDataStreamHelper<RKGraphicsDeviceTransmittionLengthType> RKGraphicsDeviceBackendTransmitter::streamer;
-QIODevice *RKGraphicsDeviceBackendTransmitter::connection = nullptr;
+QLocalSocket *RKGraphicsDeviceBackendTransmitter::connection = nullptr;
 QMutex RKGraphicsDeviceBackendTransmitter::mutex;
 RKGraphicsDeviceBackendTransmitter *RKGraphicsDeviceBackendTransmitter::_instance = nullptr;
 
-RKGraphicsDeviceBackendTransmitter::RKGraphicsDeviceBackendTransmitter(QIODevice *_connection, bool is_q_local_socket) : QThread() {
+QWaitCondition write_available;
+QWaitCondition read_available;
+
+RKGraphicsDeviceBackendTransmitter::RKGraphicsDeviceBackendTransmitter() : QThread(), alive(true), expecting_reply(false), have_reply(false), commit_pending(false) {
 	RK_TRACE(GRAPHICS_DEVICE);
 
-	RK_ASSERT(!connection);
-	RK_ASSERT(_connection);
-	connection = _connection;
-	streamer.setIODevice(connection);
-	alive = true;
-	is_local_socket = is_q_local_socket;
+	RK_ASSERT(!_instance); // singleton!
 	start();
 }
 
@@ -39,38 +39,86 @@ RKGraphicsDeviceBackendTransmitter *RKGraphicsDeviceBackendTransmitter::instance
 	if (_instance) return _instance;
 	RK_TRACE(GRAPHICS_DEVICE);
 
-	QLocalSocket *con = new QLocalSocket();
-	con->connectToServer(RKRBackendProtocolBackend::rkdServerName());
-	con->waitForConnected(2000);
-	if (con->state() == QLocalSocket::ConnectedState) {
-		con->write(RKRBackendTransmitter::instance()->connectionToken().toLocal8Bit().data());
-		con->write("\n");
-		con->waitForBytesWritten(1000);
-		_instance = new RKGraphicsDeviceBackendTransmitter(con, true);
-		return _instance;
-	}
-	return nullptr;
+	QMutexLocker lock(&mutex);
+	_instance = new RKGraphicsDeviceBackendTransmitter();
+	read_available.wait(&mutex); // signifies startup state, here
+	return _instance;
 }
 
 bool RKGraphicsDeviceBackendTransmitter::connectionAlive() {
 	if (!_instance) return false;
-	if (!_instance->is_local_socket) return true;
-	return static_cast<QLocalSocket *>(_instance->connection)->state() == QLocalSocket::ConnectedState;
+	return _instance->alive;
+}
+
+// NOTE: called from R thread. It is already holding a lock
+void RKGraphicsDeviceBackendTransmitter::commitBuffer() {
+	write_available.wakeAll();
+	_instance->commit_pending = true;
+}
+
+// NOTE: called from R thread. It is already holding a lock
+bool RKGraphicsDeviceBackendTransmitter::waitForReply(int timeout) {
+	if (!streamer.instream.atEnd()) return true;
+
+	if (!_instance->have_reply) {
+		_instance->expecting_reply = true;
+		write_available.wakeAll();
+		read_available.wait(&mutex, QDeadlineTimer(timeout));
+	}
+	if (_instance->have_reply) {
+		_instance->have_reply = false;
+		return true;
+	}
+	return false;
+}
+
+void RKGraphicsDeviceBackendTransmitter::doWrite() {
+	if (commit_pending) {
+		QMutexLocker lock(&mutex);
+		streamer.writeOutBuffer();
+		commit_pending = false;
+	}
+	connection->waitForBytesWritten(1000);
+	if (connection->state() != QLocalSocket::ConnectedState) alive = false;
 }
 
 void RKGraphicsDeviceBackendTransmitter::run() {
 	RK_TRACE(GRAPHICS_DEVICE);
+	RK_ASSERT(!connection);
+
+	connection = new QLocalSocket();
+	connection->connectToServer(RKRBackendProtocolBackend::rkdServerName());
+	connection->waitForConnected(2000);
+	if (connection->state() == QLocalSocket::ConnectedState) {
+		connection->write(RKRBackendTransmitter::instance()->connectionToken().toLocal8Bit().data());
+		connection->write("\n");
+		connection->waitForBytesWritten(1000);
+	}
+	streamer.setIODevice(connection);
+	read_available.wakeAll();
 
-	bool more_left = false;
 	while (alive) {
-		msleep(more_left ? 10 : 50); // it's ok to be lazy. If a request expects a reply, RKGraphicsDataStreamReadGuard will take care of pushing everything, itself. Essentially, this thread's job is simply to make sure we don't lag *too* far behind.
-		// See note in RKRBackend::handleRequest(): sleeping short is CPU-intensive
-		mutex.lock();
-		connection->waitForBytesWritten(100);
-		more_left = connection->bytesToWrite();
-		mutex.unlock();
+		doWrite();
+		if (connection->bytesToWrite()) continue;
+		if (expecting_reply) {
+			{
+				QMutexLocker lock(&mutex);
+				if (!streamer.instream.atEnd() || streamer.readInBuffer()) {
+					have_reply = true;
+					expecting_reply = false;
+					read_available.wakeAll();
+					continue;
+				}
+			}
+			connection->waitForReadyRead(100); // don't wait too long: An RKDCancel may have been committed, meanwhile!
+		} else {
+			QMutexLocker lock(&mutex);
+			write_available.wait(&mutex);
+		}
 	}
 
+	connection->close();
+
 	RK_TRACE(GRAPHICS_DEVICE);
 }
 
diff --git a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.h b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.h
index d72c5582a..90ffeb808 100644
--- a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.h
+++ b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_backendtransmitter.h
@@ -1,6 +1,6 @@
 /*
 rkgraphicsdevice_backendtransmitter - This file is part of the RKWard project. Created: Mon Mar 18 2013
-SPDX-FileCopyrightText: 2013 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
+SPDX-FileCopyrightText: 2013-2025 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
 SPDX-FileContributor: The RKWard Team <rkward-devel at kde.org>
 SPDX-License-Identifier: GPL-2.0-or-later
 */
@@ -8,19 +8,18 @@ SPDX-License-Identifier: GPL-2.0-or-later
 #ifndef RKGRAPHICSDEVICE_BACKENDTRANSMITTER_H
 #define RKGRAPHICSDEVICE_BACKENDTRANSMITTER_H
 
-#include <QIODevice>
 #include <QMutex>
 #include <QThread>
 
 #include "../rkasyncdatastreamhelper.h"
 
 typedef quint32 RKGraphicsDeviceTransmittionLengthType;
+class QLocalSocket;
 
 /** This simple class is responsible for handling the backend side of transmitting data / requests for the RKGraphicsDevice
  Also it provides the namespace for some statics.
  As the protocol is really quite simple (only the backend send requests, only one request at a time), so is the transmitter. */
 class RKGraphicsDeviceBackendTransmitter : public QThread {
-	RKGraphicsDeviceBackendTransmitter(QIODevice *connection, bool is_q_local_socket);
 	~RKGraphicsDeviceBackendTransmitter();
 
   public:
@@ -28,14 +27,22 @@ class RKGraphicsDeviceBackendTransmitter : public QThread {
 	static bool connectionAlive();
 	static RKGraphicsDeviceBackendTransmitter *instance();
 	static RKAsyncDataStreamHelper<quint32> streamer;
-	static QIODevice *connection;
 	static QMutex mutex;
+	// commit the streamer buffer for writing
+	static void commitBuffer();
+	// wait until all data has been written, and a reply has been received
+	static bool waitForReply(int timeout);
 
   private:
+	RKGraphicsDeviceBackendTransmitter();
 	static RKGraphicsDeviceBackendTransmitter *_instance;
 	bool alive;
-	bool is_local_socket;
+	bool expecting_reply;
+	bool have_reply;
+	bool commit_pending;
 	void run() override;
+	static QLocalSocket *connection;
+	void doWrite();
 };
 
 #endif
diff --git a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_frontendtransmitter.cpp b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_frontendtransmitter.cpp
index 9f9b38f1f..75c087734 100644
--- a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_frontendtransmitter.cpp
+++ b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_frontendtransmitter.cpp
@@ -226,9 +226,7 @@ static QVector<QPointF> readPoints(QDataStream &instream) {
 void RKGraphicsDeviceFrontendTransmitter::newData() {
 	RK_TRACE(GRAPHICS_DEVICE);
 
-	while (connection->bytesAvailable()) {
-		if (!streamer.readInBuffer()) return; // wait for more data to come in
-
+	while (!streamer.instream.atEnd() || streamer.readInBuffer()) {
 		quint8 opcode, devnum;
 		streamer.instream >> opcode >> devnum;
 		RK_DEBUG(GRAPHICS_DEVICE, DL_TRACE, "Received transmission of type %d, devnum %d, size %d", opcode, devnum + 1, streamer.inSize());
@@ -529,10 +527,6 @@ void RKGraphicsDeviceFrontendTransmitter::newData() {
 		} else {
 			RK_DEBUG(GRAPHICS_DEVICE, DL_ERROR, "Unhandled operation of type %d for device number %d. Skipping.", opcode, devnum + 1);
 		}
-
-		if (!streamer.instream.atEnd()) {
-			RK_DEBUG(GRAPHICS_DEVICE, DL_ERROR, "Failed to read all data for operation of type %d on device number %d.", opcode, devnum + 1);
-		}
 	}
 }
 
diff --git a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_stubs.cpp b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_stubs.cpp
index 8b5af5f8a..a720a5229 100644
--- a/rkward/rbackend/rkwarddevice/rkgraphicsdevice_stubs.cpp
+++ b/rkward/rbackend/rkwarddevice/rkgraphicsdevice_stubs.cpp
@@ -1,6 +1,6 @@
 /*
 rkgraphicsdevice_stubs - This file is part of RKWard (https://rkward.kde.org). Created: Mon Mar 18 2013
-SPDX-FileCopyrightText: 2013-2024 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
+SPDX-FileCopyrightText: 2013-2025 by Thomas Friedrichsmeier <thomas.friedrichsmeier at kdemail.net>
 SPDX-FileContributor: The RKWard Team <rkward-devel at kde.org>
 SPDX-License-Identifier: GPL-2.0-or-later
 */
@@ -50,77 +50,43 @@ static int rkd_suppress_on_exit = 0;
  * At the same time, note that the RKGraphicsDataStreamReadGuard c'tor @em may cause R to long-jump (safely) in case of a user interrupt,
  * or if the connection was killed. Don't rely on the code following the creation of an RKGraphicsDataStreamReadGuard to be called.
  */
-class RKGraphicsDataStreamReadGuard {
+class RKGraphicsDataStreamReadGuard : public RKRSupport::InterruptSuspension {
   public:
-	RKGraphicsDataStreamReadGuard() {
+	RKGraphicsDataStreamReadGuard() : RKRSupport::InterruptSuspension() {
 		RKGraphicsDeviceBackendTransmitter::mutex.lock();
-		have_lock = true;
 		rkd_waiting_for_reply = true;
-		QIODevice *connection = RKGraphicsDeviceBackendTransmitter::connection;
-		{
-			RKRSupport::InterruptSuspension susp;
-			while (connection->bytesToWrite()) {
-				if (!connection->waitForBytesWritten(10)) {
-					checkHandleError();
-				}
-				if (connection->bytesToWrite()) RKREventLoop::processX11Events();
-			}
-			while (!RKGraphicsDeviceBackendTransmitter::streamer.readInBuffer()) {
-				RKREventLoop::processX11Events();
-				if (!connection->waitForReadyRead(10)) {
-					if (checkHandleInterrupt(connection)) break;
-					checkHandleError();
-				}
-			}
-			if (ROb(R_interrupts_pending)) {
-				if (have_lock) {
-					RKGraphicsDeviceBackendTransmitter::mutex.unlock();
-					have_lock = false; // Will d'tor still be called? We don't rely on it.
-				}
-				rkd_waiting_for_reply = false;
-			}
-		};
+		while (!RKGraphicsDeviceBackendTransmitter::waitForReply(10)) {
+			if (!checkHandleInterrupt()) RKREventLoop::processX11Events();
+			if (!RKGraphicsDeviceBackendTransmitter::connectionAlive()) break;
+		}
 	}
 
 	~RKGraphicsDataStreamReadGuard() {
-		if (have_lock) RKGraphicsDeviceBackendTransmitter::mutex.unlock();
 		rkd_waiting_for_reply = false;
+		RKGraphicsDeviceBackendTransmitter::mutex.unlock();
+		if (!RKGraphicsDeviceBackendTransmitter::connectionAlive()) {
+			RFn::Rf_error("RKWard Graphics connection has shut down");
+		}
 	}
 
   private:
-	bool checkHandleInterrupt(QIODevice *connection) {
+	bool checkHandleInterrupt() {
 		// NOTE: It would be possible, but not exactly easier to rely on GEonExit() rather than R_interrupts_pending
 		// Might be an option, if R_interrupts_pending gets hidden one day, though
 		if (!ROb(R_interrupts_pending)) return false;
 
 		// Tell the frontend to finish whatever it was doing ASAP. Don't process any other events until that has happened
 		RKGraphicsDeviceBackendTransmitter::streamer.outstream << (quint8)RKDCancel << (quint8)0;
-		RKGraphicsDeviceBackendTransmitter::streamer.writeOutBuffer();
-		while (connection->bytesToWrite()) {
-			if (!connection->waitForBytesWritten(10)) {
-				checkHandleError();
-			}
-		}
+		RKGraphicsDeviceBackendTransmitter::commitBuffer();
 		int loop = 0;
-		while (!RKGraphicsDeviceBackendTransmitter::streamer.readInBuffer()) {
-			if (!connection->waitForReadyRead(10)) {
-				if (++loop > 500) {
-					connection->close(); // If frontend is unresponsive, kill connection
-				}
-				checkHandleError();
+		while (!RKGraphicsDeviceBackendTransmitter::waitForReply(10)) {
+			if (++loop > 500) {
+				RKGraphicsDeviceBackendTransmitter::kill(); // If frontend is unresponsive, kill connection
 			}
+			if (!RKGraphicsDeviceBackendTransmitter::connectionAlive()) return true;
 		}
 		return true;
 	}
-
-	void checkHandleError() {
-		if (!RKGraphicsDeviceBackendTransmitter::connectionAlive()) { // Don't go into endless loop, if e.g. frontend has crashed
-			if (have_lock) RKGraphicsDeviceBackendTransmitter::mutex.unlock();
-			have_lock = false; // Will d'tor still be called? We don't rely on it.
-			RFn::Rf_error("RKWard Graphics connection has shut down");
-		}
-	}
-	bool have_lock;
 };
 
 /** This class is essentially like QMutexLocker. In addition, the destructor takes care of pushing anything that was written to the protocol buffer during it lifetime to the transmitter. (Does NOT wait for the transmission itself). */
@@ -145,7 +111,7 @@ class RKGraphicsDataStreamWriteGuard {
 		RKGraphicsDeviceBackendTransmitter::mutex.lock();
 	}
 	~RKGraphicsDataStreamWriteGuard() {
-		RKGraphicsDeviceBackendTransmitter::streamer.writeOutBuffer();
+		RKGraphicsDeviceBackendTransmitter::commitBuffer();
 		RKGraphicsDeviceBackendTransmitter::mutex.unlock();
 	}
 };
@@ -809,13 +775,13 @@ SEXP RKD_SetClipPath(SEXP path, SEXP ref, pDevDesc dev) {
 			WRITE_HEADER(RKDSetClipPath, dev);
 			RKD_OUT_STREAM << index;
 		}
+		qint8 ok;
 		{
 			RKGraphicsDataStreamReadGuard rguard;
-			qint8 ok;
 			RKD_IN_STREAM >> ok;
-			if (!ok) RFn::Rf_warning("Invalid reference to clipping path");
-			else return ROb(R_NilValue);
 		}
+		if (!ok) RFn::Rf_warning("Invalid reference to clipping path");
+		else return ROb(R_NilValue);
 	}
 
 	// No index, or not a valid index: create new path
@@ -858,13 +824,13 @@ SEXP RKD_SetMask(SEXP mask, SEXP ref, pDevDesc dev) {
 			WRITE_HEADER(RKDSetMask, dev);
 			RKD_OUT_STREAM << index;
 		}
+		qint8 ok;
 		{
 			RKGraphicsDataStreamReadGuard rguard;
-			qint8 ok;
 			RKD_IN_STREAM >> ok;
-			if (!ok) RFn::Rf_warning("Invalid reference to mask");
-			else return ROb(R_NilValue);
 		}
+		if (!ok) RFn::Rf_warning("Invalid reference to mask");
+		else return ROb(R_NilValue);
 	}
 
 	// No index, or not a valid index: create new mask
@@ -1026,12 +992,12 @@ void RKD_Glyph(int n, int *glyphs, double *x, double *y, SEXP font, double size,
 		}
 	}
 
+	QString warning;
 	{
 		RKGraphicsDataStreamReadGuard rguard;
-		QString warning;
 		RKD_IN_STREAM >> warning;
-		if (!warning.isEmpty()) RFn::Rf_warning("%s", qPrintable(warning));
 	}
+	if (!warning.isEmpty()) RFn::Rf_warning("%s", qPrintable(warning));
 	modified(dev);
 }
 #endif



More information about the rkward-tracker mailing list