[rkward-cvs] SF.net SVN: rkward:[3193] branches/2010_10_18_backend_restructuring_branch/ rkward/rbackend
tfry at users.sourceforge.net
tfry at users.sourceforge.net
Thu Nov 18 15:32:05 UTC 2010
Revision: 3193
http://rkward.svn.sourceforge.net/rkward/?rev=3193&view=rev
Author: tfry
Date: 2010-11-18 15:32:05 +0000 (Thu, 18 Nov 2010)
Log Message:
-----------
Unify the code for backend and frontend sides of the transmitter, and fix exit handling.
Modified Paths:
--------------
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/CMakeLists.txt
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.h
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
Added Paths:
-----------
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.h
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.cpp
branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.h
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/CMakeLists.txt
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/CMakeLists.txt 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/CMakeLists.txt 2010-11-18 15:32:05 UTC (rev 3193)
@@ -37,30 +37,38 @@
SET(RBACKEND_SPLIT 1)
IF(RBACKEND_SPLIT)
SET (
- rbackend_STAT_SRCS
+ rbackend_FRONTEND_SRCS
${rbackend_frontend_SRCS}
${rbackend_shared_SRCS}
rkfrontendtransmitter.cpp
+ rktransmitter.cpp
)
- QT4_AUTOMOC(${rbackend_STAT_SRCS})
- ADD_LIBRARY(rbackend STATIC ${rbackend_STAT_SRCS})
+ QT4_AUTOMOC(${rbackend_FRONTEND_SRCS})
+ ADD_LIBRARY(rbackend STATIC ${rbackend_FRONTEND_SRCS})
TARGET_LINK_LIBRARIES(rbackend ${CMAKE_THREAD_LIBS_INIT})
- QT4_AUTOMOC(${rbackend_backend_SRCS})
+ SET (
+ rbackend_BACKEND_SRCS
+ ${rbackend_backend_SRCS}
+ ${rbackend_shared_SRCS}
+ rkbackendtransmitter.cpp
+ rktransmitter.cpp
+ )
+ QT4_AUTOMOC(${rbackend_BACKEND_SRCS})
ADD_DEFINITIONS (-DRKWARD_SPLIT_PROCESS)
- KDE4_ADD_EXECUTABLE(rkward.rbackend ${rbackend_backend_SRCS} ${rbackend_shared_SRCS})
+ KDE4_ADD_EXECUTABLE(rkward.rbackend ${rbackend_BACKEND_SRCS})
TARGET_LINK_LIBRARIES(rkward.rbackend ${R_USED_LIBS} ${CMAKE_THREAD_LIBS_INIT} ${KDE4_KDECORE_LIBS})
LINK_DIRECTORIES(${R_SHAREDLIBDIR})
ELSE(RBACKEND_SPLIT)
SET (
- rbackend_STAT_SRCS
+ rbackend_ALL_SRCS
${rbackend_frontend_SRCS}
${rbackend_shared_SRCS}
${rbackend_backend_SRCS}
)
- QT4_AUTOMOC(${rbackend_STAT_SRCS})
- ADD_LIBRARY(rbackend STATIC ${rbackend_STAT_SRCS})
+ QT4_AUTOMOC(${rbackend_ALL_SRCS})
+ ADD_LIBRARY(rbackend STATIC ${rbackend_ALL_SRCS})
TARGET_LINK_LIBRARIES(rbackend ${R_USED_LIBS} ${CMAKE_THREAD_LIBS_INIT})
LINK_DIRECTORIES(${R_SHAREDLIBDIR})
ENDIF(RBACKEND_SPLIT)
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rinterface.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -131,7 +131,6 @@
RK_ASSERT (previous_command == 0);
RK_ASSERT (!all_current_commands.isEmpty ());
previous_command = all_current_commands.takeLast ();
-qDebug ("previous command: %d", previous_command->id ());
RCommandStack::currentStack ()->pop ();
}
@@ -690,10 +689,12 @@
na_real = request->params["na_real"].toDouble ();
na_int = request->params["na_int"].toInt ();
} else if (type == RBackendRequest::BackendExit) {
- QString message = request->params["message"].toString ();
- message += i18n ("\nIt will be shut down immediately. This means, you can not use any more functions that rely on the R backend. I.e. you can do hardly anything at all, not even save the workspace (but if you're lucky, R already did that). What you can do, however, is save any open command-files, the output, or copy data out of open data editors. Quit RKWard after that.\nSince this should never happen, please write a mail to rkward-devel at lists.sourceforge.net, and tell us, what you were trying to do, when this happened. Sorry!");
- KMessageBox::error (0, message, i18n ("R engine has died"));
- backend_dead = true;
+ if (!backend_dead) {
+ backend_dead = true;
+ QString message = request->params["message"].toString ();
+ message += i18n ("\nThe R backend will be shut down immediately. This means, you can not use any more functions that rely on it. I.e. you can do hardly anything at all, not even save the workspace (but if you're lucky, R already did that). What you can do, however, is save any open command-files, the output, or copy data out of open data editors. Quit RKWard after that.\nSince this should never happen, please write a mail to rkward-devel at lists.sourceforge.net, and tell us, what you were trying to do, when this happened. Sorry!");
+ KMessageBox::error (0, message, i18n ("R engine has died"));
+ }
} else {
RK_ASSERT (false);
}
Added: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.cpp (rev 0)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -0,0 +1,124 @@
+/***************************************************************************
+ rkbackendtransmitter - description
+ -------------------
+ begin : Thu Nov 18 2010
+ copyright : (C) 2010 by Thomas Friedrichsmeier
+ email : tfry at users.sourceforge.net
+ ***************************************************************************/
+
+/***************************************************************************
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ ***************************************************************************/
+
+#include "rkbackendtransmitter.h"
+
+#include "rkrbackend.h"
+
+#include <QTimer>
+#include <QLocalSocket>
+
+#include "../debug.h"
+
+RKRBackendTransmitter::RKRBackendTransmitter (const QString &servername) {
+ RK_TRACE (RBACKEND);
+
+ RKRBackendTransmitter::servername = servername;
+}
+
+RKRBackendTransmitter::~RKRBackendTransmitter () {
+ RK_TRACE (RBACKEND);
+ if (!current_sync_requests.isEmpty ()) {
+ RK_DO (qDebug ("%d pending requests while exiting RKRBackendTransmitter", current_sync_requests.size ()), RBACKEND, DL_WARNING);
+ }
+
+ if (!connection) return;
+
+ // To prevent closing the process before the frontend has had a chance to see the QuitCommand
+ if (connection->bytesToWrite ()) connection->waitForBytesWritten (1000);
+ msleep (1000);
+}
+
+void RKRBackendTransmitter::flushOutput () {
+ // do not trace.
+ flushOutput (false);
+}
+
+void RKRBackendTransmitter::run () {
+ RK_TRACE (RBACKEND);
+
+ QLocalSocket* con = new QLocalSocket (this);
+ con->connectToServer (servername);
+ setConnection (con);
+
+ if (!connection->waitForConnected ()) handleTransmissionError ("Could not connect: " + connection->errorString ());
+#warning do handshake
+
+ QTimer* flush_timer = new QTimer (this);
+ connect (flush_timer, SIGNAL (timeout()), this, SLOT (flushOutput()));
+ flush_timer->setInterval (50);
+ flush_timer->start ();
+
+ exec ();
+}
+
+void RKRBackendTransmitter::writeRequest (RBackendRequest *request) {
+ RK_TRACE (RBACKEND);
+
+ if (request->type != RBackendRequest::Output) flushOutput (true);
+ transmitRequest (request);
+ connection->flush ();
+
+ if (request->synchronous) {
+ current_sync_requests.append (request);
+ RK_DO (qDebug ("Expecting replies for %d requests (added %p)", current_sync_requests.size (), request), RBACKEND, DL_DEBUG);
+ } else {
+ delete request;
+ }
+}
+
+void RKRBackendTransmitter::requestReceived (RBackendRequest* request) {
+ RK_TRACE (RBACKEND);
+
+ if (current_sync_requests.isEmpty ()) {
+ RK_ASSERT (false);
+ return;
+ }
+
+ RBackendRequest* current_sync_request = current_sync_requests.takeFirst ();
+ if (current_sync_request->type == RBackendRequest::Output) {
+ delete current_sync_request; // this was just our internal request
+ delete request;
+ } else {
+ current_sync_request->mergeReply (request);
+ delete request;
+ current_sync_request->done = true;
+ }
+ RK_DO (qDebug ("Expecting replies for %d requests (popped %p)", current_sync_requests.size (), current_sync_request), RBACKEND, DL_DEBUG);
+}
+
+void RKRBackendTransmitter::flushOutput (bool force) {
+ if (!current_sync_requests.isEmpty ()) return;
+
+ ROutputList out = RKRBackend::this_pointer->flushOutput (force);
+ if (out.isEmpty ()) return;
+
+ RK_TRACE (RBACKEND);
+ // output request would not strictly need to be synchronous. However, making them synchronous ensures that the frontend is keeping up with the output sent by the backend.
+ RBackendRequest* request = new RBackendRequest (true, RBackendRequest::Output);
+ request->output = new ROutputList (out);
+ writeRequest (request);
+}
+
+void RKRBackendTransmitter::handleTransmissionError (const QString &message) {
+ RK_TRACE (RBACKEND);
+
+ RK_DO (qDebug (qPrintable ("Transmission error " + message)), RBACKEND, DL_ERROR);
+ RKRBackend::tryToDoEmergencySave ();
+}
+
+#include "rkbackendtransmitter.moc"
Added: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.h (rev 0)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkbackendtransmitter.h 2010-11-18 15:32:05 UTC (rev 3193)
@@ -0,0 +1,46 @@
+/***************************************************************************
+ rkbackendtransmitter - description
+ -------------------
+ begin : Thu Nov 18 2010
+ copyright : (C) 2010 by Thomas Friedrichsmeier
+ email : tfry at users.sourceforge.net
+ ***************************************************************************/
+
+/***************************************************************************
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ ***************************************************************************/
+
+#ifndef RKBACKENDTRANSMITTER_H
+#define RKBACKENDTRANSMITTER_H
+
+#include "rktransmitter.h"
+
+/** Private class used by the RKRBackendProtocol, in case of the backend running in a split process.
+This will be used as the secondary thread, and takes care of serializing, sending, receiving, and unserializing requests. */
+class RKRBackendTransmitter : public RKAbstractTransmitter {
+Q_OBJECT
+public:
+ RKRBackendTransmitter (const QString &servername);
+ ~RKRBackendTransmitter ();
+
+ void publicmsleep (int delay) { msleep (delay); };
+
+ void run ();
+
+ void writeRequest (RBackendRequest *request);
+ void requestReceived (RBackendRequest *request);
+ void handleTransmissionError (const QString &message);
+private slots:
+ void flushOutput ();
+private:
+ void flushOutput (bool force);
+ QList<RBackendRequest*> current_sync_requests; // pointers to the request that we expect a reply for. Yes, internally, this can be several requests.
+ QString servername;
+};
+
+#endif
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -23,19 +23,15 @@
#include "kstandarddirs.h"
#include <QCoreApplication>
+#include <QProcess>
+#include <QLocalServer>
+#include <QLocalSocket>
#include "../debug.h"
-RKFrontendTransmitter* RKFrontendTransmitter::_instance = 0;
-RKFrontendTransmitter::RKFrontendTransmitter () : QThread () {
+RKFrontendTransmitter::RKFrontendTransmitter () : RKAbstractTransmitter () {
RK_TRACE (RBACKEND);
- connection = 0;
-
- RK_ASSERT (_instance == 0);
- _instance = this;
-
- moveToThread (this);
start ();
}
@@ -43,7 +39,6 @@
RK_TRACE (RBACKEND);
RK_ASSERT (!server->isListening ());
- delete connection;
}
void RKFrontendTransmitter::run () {
@@ -51,7 +46,7 @@
// start server
server = new QLocalServer (this);
- if (!server->listen ("rkward")) handleTransmitError ("failure to start frontend server: " + server->errorString ());
+ if (!server->listen ("rkward")) handleTransmissionError ("failure to start frontend server: " + server->errorString ());
connect (server, SIGNAL (newConnection ()), this, SLOT (connectAndEnterLoop ()), Qt::QueuedConnection);
// start backend
@@ -62,24 +57,29 @@
args.append ("--data-dir " + RKSettingsModuleGeneral::filesPath ());
backend->setProcessChannelMode (QProcess::MergedChannels); // at least for now. Seems difficult to get interleaving right, without this.
connect (backend, SIGNAL (readyReadStandardOutput ()), this, SLOT (newProcessOutput ()));
- connect (backend, SIGNAL (finished (int, QProcess::ExitStatus)), this, SLOT (backendExit (int, QProcess::ExitStatus)));
+ connect (backend, SIGNAL (finished (int, QProcess::ExitStatus)), this, SLOT (backendExit (int)));
QString backend_executable = KStandardDirs::findExe ("rkward.rbackend", QCoreApplication::applicationDirPath () + "/rbackend");
if (backend_executable.isEmpty ()) backend_executable = KStandardDirs::findExe ("rkward.rbackend", QCoreApplication::applicationDirPath ());
RK_ASSERT (!backend_executable.isEmpty ());
backend->start (backend_executable, args, QIODevice::ReadOnly);
exec ();
+
+ if (!connection) {
+ RK_ASSERT (false);
+ return;
+ }
+
+ connection->close ();
+ backend->waitForFinished ();
}
void RKFrontendTransmitter::connectAndEnterLoop () {
RK_TRACE (RBACKEND);
RK_ASSERT (server->hasPendingConnections ());
- connection = server->nextPendingConnection ();
+ setConnection (server->nextPendingConnection ());
server->close ();
-
- connect (connection, SIGNAL (stateChanged (QLocalSocket::LocalSocketState)), this, SLOT (connectionStateChanged ()));
- connect (connection, SIGNAL (readyRead ()), this, SLOT (newConnectionData ()));
}
void RKFrontendTransmitter::newProcessOutput () {
@@ -89,88 +89,48 @@
handleOutput (output, output.size (), ROutput::Warning);
}
-void RKFrontendTransmitter::newConnectionData () {
+void RKFrontendTransmitter::requestReceived(RBackendRequest* request) {
RK_TRACE (RBACKEND);
- if (!connection->canReadLine ()) return;
-
- QString line = QString::fromLocal8Bit (connection->readLine ());
- bool ok;
- int expected_length = line.toInt (&ok);
- if (!ok) handleTransmitError ("Protocol header error. Last connection error was: " + connection->errorString ());
-
- QByteArray receive_buffer;
- while (receive_buffer.length () < expected_length) {
- if (connection->bytesAvailable ()) {
- receive_buffer.append (connection->read (expected_length - receive_buffer.length ()));
- } else {
- connection->waitForReadyRead (1000);
- if (!connection->isOpen ()) {
- handleTransmitError ("Connection closed unexepctedly. Last error: " + connection->errorString ());
- return;
- }
- }
- }
-
- RBackendRequest *req = RKRBackendSerializer::unserialize (receive_buffer);
- if (req->type == RBackendRequest::Output) {
- ROutputList* list = req->output;
+ if (request->type == RBackendRequest::Output) {
+ ROutputList* list = request->output;
for (int i = 0; i < list->size (); ++i) {
ROutput *out = (*list)[i];
handleOutput (out->output, out->output.length (), out->type);
delete (out);
}
- req->output = 0;
- RK_ASSERT (req->synchronous);
- writeRequest (req); // to tell the backend, that we are keeping up. This also deletes the request.
+ request->output = 0;
+ RK_ASSERT (request->synchronous);
+ writeRequest (request); // to tell the backend, that we are keeping up. Also deletes the request.
return;
}
- RKRBackendEvent* event = new RKRBackendEvent (req);
+ RKRBackendEvent* event = new RKRBackendEvent (request);
qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
}
-void RKFrontendTransmitter::backendExit (int exitcode, QProcess::ExitStatus exitstatus) {
+void RKFrontendTransmitter::backendExit (int exitcode) {
RK_TRACE (RBACKEND);
- RBackendRequest* req = new RBackendRequest (false, RBackendRequest::BackendExit);
- RKRBackendEvent* event = new RKRBackendEvent (req);
- qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
+ handleTransmissionError ("Backend process has exited with code " + QString::number (exitcode));
}
-void RKFrontendTransmitter::connectionStateChanged () {
- if (connection->state () != QLocalSocket::UnconnectedState) return; // only interested in connection failure
- RK_TRACE (RBACKEND);
-
- RBackendRequest* req = new RBackendRequest (false, RBackendRequest::BackendExit);
- RKRBackendEvent* event = new RKRBackendEvent (req);
- qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
-}
-
void RKFrontendTransmitter::writeRequest (RBackendRequest *request) {
RK_TRACE (RBACKEND);
- QByteArray buffer = RKRBackendSerializer::serialize (*request);
- connection->write (QString::number (buffer.length ()).toLocal8Bit () + "\n");
- connection->write (buffer);
+ transmitRequest (request);
connection->flush ();
delete request;
}
-void RKFrontendTransmitter::customEvent (QEvent *e) {
- if (((int) e->type ()) == ((int) RKRBackendEvent::RKWardEvent)) {
- RKRBackendEvent *ev = static_cast<RKRBackendEvent*> (e);
- writeRequest (ev->data ());
- } else {
- RK_ASSERT (false);
- return;
- }
-}
-
-void RKFrontendTransmitter::handleTransmitError (const QString &message) {
+void RKFrontendTransmitter::handleTransmissionError (const QString &message) {
RK_TRACE (RBACKEND);
- #warning: Show those errors to the user!
- qDebug ("%s", qPrintable (message));
+ RBackendRequest* req = new RBackendRequest (false, RBackendRequest::BackendExit);
+ req->params["message"] = message;
+ RKRBackendEvent* event = new RKRBackendEvent (req);
+ qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
+
+ exit ();
}
#include "rkfrontendtransmitter.moc"
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.h 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkfrontendtransmitter.h 2010-11-18 15:32:05 UTC (rev 3193)
@@ -18,25 +18,17 @@
#ifndef RKFRONTENDTRANSMITTER_H
#define RKFRONTENDTRANSMITTER_H
-#include "rkrbackendprotocol_shared.h"
+#include "rktransmitter.h"
-#ifdef RKWARD_THREADED
-# error This file should only be compiled for split process backends!
-#endif
+class QProcess;
+class QLocalServer;
-#include <QLocalServer>
-#include <QLocalSocket>
-#include <QProcess>
-#include <QThread>
-
-class RKFrontendTransmitter : public QThread, public RKROutputBuffer {
+class RKFrontendTransmitter : public RKAbstractTransmitter, public RKROutputBuffer {
Q_OBJECT
public:
RKFrontendTransmitter ();
~RKFrontendTransmitter ();
- static RKFrontendTransmitter* instance () { return _instance; };
-
void run ();
bool doMSleep (int delay) {
@@ -44,21 +36,17 @@
return true;
};
void writeRequest (RBackendRequest *request);
- void customEvent (QEvent *e);
+ void requestReceived (RBackendRequest *request);
private slots:
void connectAndEnterLoop ();
void newProcessOutput ();
- void newConnectionData ();
- void backendExit (int exitcode, QProcess::ExitStatus exitstatus);
- void connectionStateChanged ();
+ void backendExit (int exitcode);
private:
- void handleTransmitError (const QString &message);
+ void handleTransmissionError (const QString &message);
int current_request_length;
QProcess* backend;
QLocalServer* server;
- QLocalSocket* connection;
- static RKFrontendTransmitter *_instance;
};
#endif
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackend.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -377,8 +377,10 @@
if (!dir.exists (filename)) break;
i++;
}
+ filename = dir.absoluteFilePath (filename);
if (R_DirtyImage) R_SaveGlobalEnvToFile (filename.toLocal8Bit ());
+ qDebug ("Created emergency save file in %s", qPrintable (filename));
}
RKRBackend::this_pointer->killed = RKRBackend::AlreadyDead; // just in case
@@ -405,6 +407,7 @@
if (RKRBackendProtocolBackend::inRThread ()) {
// If we are in the correct thread, things are easy:
+ RKRBackend::this_pointer->killed = RKRBackend::EmergencySaveThenExit;
RCleanUp (SA_SUICIDE, 1, 0);
RK_doIntr (); // to jump out of the loop, if needed
} else {
@@ -1095,7 +1098,7 @@
RKRBackendProtocolBackend::instance ()->sendRequest (request);
- if (!request->synchronous) {
+ if ((!request->synchronous) && (!isKilled ())) {
RK_ASSERT (mayHandleSubstack); // i.e. not called from fetchNextCommand
return 0;
}
@@ -1128,7 +1131,7 @@
RCommandProxy* RKRBackend::fetchNextCommand () {
RK_TRACE (RBACKEND);
- RBackendRequest req (killed == NotKilled, RBackendRequest::CommandOut); // when killed, we don't wait for the reply. Thus, request is async, then.
+ RBackendRequest req (!isKilled (), RBackendRequest::CommandOut); // when killed, we do *not* actually wait for the reply, before the request is deleted.
req.command = previous_command;
previous_command = 0;
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_backend.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -35,6 +35,7 @@
# include <QMutex>
# include "kcomponentdata.h"
# include "kglobal.h"
+# include "rktransmitter.h"
#endif
#ifdef RKWARD_THREADED
@@ -80,169 +81,35 @@
};
RKRBackendThread* RKRBackendThread::instance = 0;
#else
- /** Private class used by the RKRBackendProtocol, in case of the backend running in a split process.
- This will be used as the secondary thread, and takes care of serializing, sending, receiving, and unserializing requests. */
- class RKRBackendTransmitter : public QThread {
- public:
- RKRBackendTransmitter (const QString &servername) {
- RK_TRACE (RBACKEND);
- RK_ASSERT (_instance == 0);
- _instance = this;
- connection = 0;
+# include "rkbackendtransmitter.h"
- moveToThread (this);
- RKRBackendTransmitter::servername = servername;
- };
-
- ~RKRBackendTransmitter () {
- RK_TRACE (RBACKEND);
- if (!current_requests.isEmpty ()) {
- RK_DO (qDebug ("%d pending requests when exiting RKRBackendTransmitter", current_requests.size ()), RBACKEND, DL_WARNING);
- }
- };
-
- void publicmsleep (int delay) { msleep (delay); };
-
- void run () {
- RK_TRACE (RBACKEND);
-
- connection = new QLocalSocket (this);
- connection->connectToServer (servername); // acutal connection will be done inside run()
-
- if (!connection->waitForConnected ()) handleTransmitError ("Could not connect: %s");
-#warning do handshake
- while (1) {
- flushOutput (false);
- request_mutex.lock ();
- while (!current_requests.isEmpty ()) {
- RBackendRequest* request = current_requests.takeFirst ();
-
- request_mutex.unlock ();
- flushOutput (true);
- handleRequestInternal (request);
- request_mutex.lock ();
- }
- request_mutex.unlock ();
- msleep (1);
- }
- };
-
- void postRequest (RBackendRequest *request) {
- RK_TRACE (RBACKEND);
- RK_ASSERT (request);
-
- request_mutex.lock ();
- current_requests.append (request);
- request_mutex.unlock ();
- }
-
- void handleRequestInternal (RBackendRequest *request) {
- RK_TRACE (RBACKEND);
- RK_ASSERT (request);
-
- // send request
- QByteArray buffer = RKRBackendSerializer::serialize (*request);
- connection->write (QString::number (buffer.length ()).toLocal8Bit () + "\n");
- connection->write (buffer);
- while (connection->bytesToWrite ()) {
- if (!connection->waitForBytesWritten ()) handleTransmitError ("Could not connect: %s");
-#warning, at this point we could check for an early reply to CommandOut requests
-// currently, there is not as much concurrency between the processes as there could be. This is due to the fact, that the transmitter will always block until the result of the
-// last command has been serialized and transmitted to the frontend. Instead, it could check for and fetch the next command, already (if available), to keep the backend going.
- }
- if (!request->synchronous) {
- delete request; // async requests are posted as copy
- return;
- }
-
- // wait for reply
- // NOTE: currently, in the backend, we *never* expect a read without a synchronous request
- RBackendRequest* reply = RKRBackendSerializer::unserialize (fetchTransmission (true));
- request->mergeReply (reply);
- delete reply;
-#warning Read up on whether volatile provides a good enough memory barrier at this point!
- request->done = true; // must be the very last thing we do with the request!
- }
-
- /** fetch the next transmission.
- @param block Block until a transmission was received.
- @note @em If a transmission is available, this will always block until the transmission has been received in full. */
- QByteArray fetchTransmission (bool block) {
- RK_ASSERT (connection);
-
- QByteArray receive_buffer;
- int expected_length = 0;
- bool got_header = false;
- while (1) {
- connection->waitForReadyRead (1);
- if (!connection->isOpen ()) {
- handleTransmitError ("Connection closed unexepctedly. Last error: %s");
- return receive_buffer;
- }
- if (!connection->bytesAvailable ()) {
- if (!block) return receive_buffer;
- continue;
- }
-
- RK_TRACE (RBACKEND);
- if (!got_header) {
- if (!connection->canReadLine ()) continue; // at this point we have received *something*, but not even a full header, yet.
-
- QString line = QString::fromLocal8Bit (connection->readLine ());
- bool ok;
- expected_length = line.toInt (&ok);
- if (!ok) handleTransmitError ("Protocol header error. Last connection error was: %s");
- got_header = true;
- }
-
- receive_buffer.append (connection->read (expected_length - receive_buffer.length ()));
- if (receive_buffer.length () >= expected_length) return receive_buffer;
- }
- }
-
- void flushOutput (bool force) {
- ROutputList out = RKRBackend::this_pointer->flushOutput (force);
- if (out.isEmpty ()) return;
-
- // output request would not strictly need to be synchronous. However, making them synchronous ensures that the frontend is keeping up with the output sent by the backend.
- RBackendRequest request (true, RBackendRequest::Output);
- request.output = new ROutputList (out);
- handleRequestInternal (&request);
- }
-
- void handleTransmitError (const char* message_template) {
- printf (message_template, qPrintable (connection->errorString ()));
- }
-
- QLocalSocket* connection;
- QList<RBackendRequest *> current_requests; // there *can* be multiple active requests (if the first ones are asynchronous)
- QMutex request_mutex;
- static RKRBackendTransmitter* _instance;
- static RKRBackendTransmitter* instance () { return _instance; };
- QString servername;
- };
- RKRBackendTransmitter* RKRBackendTransmitter::_instance = 0;
-
+# include "ktemporaryfile.h"
int RK_Debug_Level = 2;
int RK_Debug_Flags = ALL;
QMutex RK_Debug_Mutex;
+ KTemporaryFile* RK_Debug_File;
-/* void RKDebugMessageOutput (QtMsgType type, const char *msg) {
+ void RKDebugMessageOutput (QtMsgType type, const char *msg) {
RK_Debug_Mutex.lock ();
if (type == QtFatalMsg) {
fprintf (stderr, "%s\n", msg);
}
- RKSettingsModuleDebug::debug_file->write (msg);
- RKSettingsModuleDebug::debug_file->write ("\n");
- RKSettingsModuleDebug::debug_file->flush ();
+ RK_Debug_File->write (msg);
+ RK_Debug_File->write ("\n");
+ RK_Debug_File->flush ();
RK_Debug_Mutex.unlock ();
- } */
+ }
int main(int argc, char *argv[]) {
QCoreApplication app (argc, argv);
KComponentData data ("rkward");
KGlobal::locale (); // to initialize it in the primary thread
+ RK_Debug_File = new KTemporaryFile ();
+ RK_Debug_File->setPrefix ("rkward.rbackend");
+ RK_Debug_File->setAutoRemove (false);
+ if (RK_Debug_File->open ()) qInstallMsgHandler (RKDebugMessageOutput);
+
QString servername;
QString data_dir;
QStringList args = app.arguments ();
@@ -266,6 +133,10 @@
RKRBackendProtocolBackend backend (data_dir);
transmitter.start ();
RKRBackend::this_pointer->run ();
+ transmitter.quit ();
+ transmitter.wait (5000);
+
+ if (!RKRBackend::this_pointer->isKilled ()) RKRBackend::tryToDoEmergencySave ();
}
#endif
@@ -303,11 +174,12 @@
request = _request->duplicate (); // the instance we send to the frontend will remain in there, and be deleted, there
_request->done = true; // for aesthetics
}
-#ifdef RKWARD_THREADED
RKRBackendEvent* event = new RKRBackendEvent (request);
+#ifdef RKWARD_THREADED
qApp->postEvent (RKRBackendProtocolFrontend::instance (), event);
#else
- RKRBackendTransmitter::instance ()->postRequest (request);
+ RK_ASSERT (request->type != RBackendRequest::Output);
+ qApp->postEvent (RKRBackendTransmitter::instance (), event);
#endif
}
@@ -319,7 +191,7 @@
#ifdef RKWARD_THREADED
RKRBackendThread::instance->publicmsleep (delay);
#else
- RKRBackendTransmitter::instance ()->publicmsleep (delay);
+ static_cast<RKRBackendTransmitter*> (RKRBackendTransmitter::instance ())->publicmsleep (delay);
#endif
}
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_frontend.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -50,6 +50,8 @@
#ifdef RKWARD_THREADED
delete RKRBackendProtocolBackend::instance ();
#else
+ RKFrontendTransmitter::instance ()->quit ();
+ RKFrontendTransmitter::instance ()->wait (1000);
delete RKFrontendTransmitter::instance ();
#endif
}
@@ -82,7 +84,7 @@
#ifdef RKWARD_THREADED
return (RKRBackend::this_pointer->flushOutput (force));
#else
- return RKFrontendTransmitter::instance ()->flushOutput (force);
+ return static_cast<RKFrontendTransmitter*> (RKFrontendTransmitter::instance ())->flushOutput (force);
#endif
}
@@ -104,8 +106,7 @@
#ifdef RKWARD_THREADED
RKRBackend::this_pointer->kill ();
#else
-// kill (SIGUSR2, pid_of_it);
-#warning will not work on windows!
+ // Backend process will terminate automatically, when the transmitter dies
#endif
}
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -79,180 +79,7 @@
}
-#ifndef RKWARD_THREADED
- QByteArray RKRBackendSerializer::serialize (const RBackendRequest &request) {
- RK_TRACE (RBACKEND);
- QByteArray ret;
- QDataStream stream (&ret, QIODevice::WriteOnly);
-
- stream << (qint8) request.type;
- stream << request.synchronous;
- stream << request.done; // well, not really needed, but...
- if (request.command) {
- stream << true;
- serializeProxy (*(request.command), stream);
- } else {
- stream << false;
- }
- if (request.output) {
- RK_ASSERT (request.type == RBackendRequest::Output);
- stream << true;
- serializeOutput (*(request.output), stream);
- } else {
- stream << false;
- }
- stream << request.params;
-
- return ret;
- }
-
- RBackendRequest *RKRBackendSerializer::unserialize (const QByteArray &buffer) {
- RK_TRACE (RBACKEND);
-
- QDataStream stream (buffer);
- RBackendRequest *request = new RBackendRequest (false, RBackendRequest::OtherRequest); // will be overwritten
-
- bool dummyb;
- qint8 dummy8;
- stream >> dummy8;
- request->type = (RBackendRequest::RCallbackType) dummy8;
- stream >> request->synchronous;
- stream >> dummyb;
- request->done = dummyb;
- stream >> dummyb;
- if (dummyb) request->command = unserializeProxy (stream);
- stream >> dummyb;
- if (dummyb) request->output = unserializeOutput (stream);
- stream >> request->params;
-
- return request;
- }
-
- void RKRBackendSerializer::serializeOutput (const ROutputList &list, QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- stream << (qint32) list.size ();
- for (qint32 i = 0; i < list.size (); ++i) {
- stream << (qint8) list[i]->type;
- stream << list[i]->output;
- }
- }
-
- ROutputList* RKRBackendSerializer::unserializeOutput (QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- ROutputList *ret = new ROutputList ();
- qint32 len;
- stream >> len;
-#if QT_VERSION >= 0x040700
- ret->reserve (len);
-#endif
-
- for (qint32 i = 0; i < len; ++i) {
- ROutput* out = new ROutput;
- qint8 dummy8;
- stream >> dummy8;
- out->type = (ROutput::ROutputType) dummy8;
- stream >> out->output;
- ret->append (out);
- }
-
- return ret;
- }
-
- void RKRBackendSerializer::serializeData (const RData &data, QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- RData::RDataType type = data.getDataType ();
- stream << (qint8) type;
- if (type == RData::IntVector) stream << data.getIntVector ();
- else if (type == RData::StringVector) stream << data.getStringVector ();
- else if (type == RData::RealVector) stream << data.getRealVector ();
- else if (type == RData::StructureVector) {
- RData::RDataStorage list = data.getStructureVector ();
- qint32 len = list.size ();
- stream << len;
- for (qint32 i = 0; i < list.size (); ++i) {
- serializeData (*(list[i]), stream);
- }
- } else {
- RK_ASSERT (type == RData::NoData);
- }
- }
-
- RData* RKRBackendSerializer::unserializeData (QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- RData* ret = new RData;
- RData::RDataType type;
- qint8 dummy8;
- stream >> dummy8;
- type = (RData::RDataType) dummy8;
- if (type == RData::IntVector) {
- RData::IntStorage data;
- stream >> data;
- ret->setData (data);
- } else if (type == RData::StringVector) {
- RData::StringStorage data;
- stream >> data;
- ret->setData (data);
- } else if (type == RData::RealVector) {
- RData::RealStorage data;
- stream >> data;;
- ret->setData (data);
- } else if (type == RData::StructureVector) {
- RData::RDataStorage data;
- qint32 len;
- stream >> len;
-#if QT_VERSION >= 0x040700
- data.reserve (len);
-#endif
- for (qint32 i = 0; i < len; ++i) {
- data.append (unserializeData (stream));
- }
- ret->setData (data);
- } else {
- RK_ASSERT (type == RData::NoData);
- }
-
- return ret;
- }
-
- void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- stream << proxy.command;
- stream << (qint32) proxy.type;
- stream << (qint32) proxy.id;
- stream << (qint32) proxy.status;
-
- serializeData (proxy, stream);
- }
-
- RCommandProxy* RKRBackendSerializer::unserializeProxy (QDataStream &stream) {
- RK_TRACE (RBACKEND);
-
- QString command;
- stream >> command;
- qint32 type;
- stream >> type;
- RCommandProxy* ret = new RCommandProxy (command, type);
- qint32 dummy32;
- stream >> dummy32;
- ret->id = dummy32;
- stream >> dummy32;
- ret->status = dummy32;
-
- RData *data = unserializeData (stream);
- ret->swallowData (*data);
- delete (data);
-
- return ret;
- }
-#endif
-
-
#define MAX_BUF_LENGTH 16000
#define OUTPUT_STRING_RESERVE 1000
Modified: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h 2010-11-17 19:14:00 UTC (rev 3192)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rkrbackendprotocol_shared.h 2010-11-18 15:32:05 UTC (rev 3193)
@@ -124,24 +124,6 @@
int status;
};
-#ifndef RKWARD_THREADED
- /** functions for serialization / unserialization of communication between backend and frontend.
- NOTE: This could really be a namespace, instead of a class, but "friending" a class is simply easier... */
- class RKRBackendSerializer {
- public:
- static QByteArray serialize (const RBackendRequest &request);
- static RBackendRequest *unserialize (const QByteArray &buffer);
-
- private:
- static void serializeOutput (const ROutputList &list, QDataStream &stream);
- static void serializeData (const RData &data, QDataStream &stream);
- static void serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
- static ROutputList* unserializeOutput (QDataStream &stream);
- static RData* unserializeData (QDataStream &stream);
- static RCommandProxy* unserializeProxy (QDataStream &stream);
- };
-#endif
-
class RKROutputBuffer {
public:
RKROutputBuffer ();
Added: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.cpp
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.cpp (rev 0)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.cpp 2010-11-18 15:32:05 UTC (rev 3193)
@@ -0,0 +1,287 @@
+/***************************************************************************
+ rktransmitter - description
+ -------------------
+ begin : Thu Nov 18 2010
+ copyright : (C) 2010 by Thomas Friedrichsmeier
+ email : tfry at users.sourceforge.net
+ ***************************************************************************/
+
+/***************************************************************************
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ ***************************************************************************/
+
+#include "rktransmitter.h"
+
+#include "../debug.h"
+
+QByteArray RKRBackendSerializer::serialize (const RBackendRequest &request) {
+ RK_TRACE (RBACKEND);
+
+ QByteArray ret;
+ QDataStream stream (&ret, QIODevice::WriteOnly);
+
+ stream << (qint8) request.type;
+ stream << request.synchronous;
+ stream << request.done; // well, not really needed, but...
+ if (request.command) {
+ stream << true;
+ serializeProxy (*(request.command), stream);
+ } else {
+ stream << false;
+ }
+ if (request.output) {
+ RK_ASSERT (request.type == RBackendRequest::Output);
+ stream << true;
+ serializeOutput (*(request.output), stream);
+ } else {
+ stream << false;
+ }
+ stream << request.params;
+
+ return ret;
+}
+
+RBackendRequest *RKRBackendSerializer::unserialize (const QByteArray &buffer) {
+ RK_TRACE (RBACKEND);
+
+ QDataStream stream (buffer);
+ RBackendRequest *request = new RBackendRequest (false, RBackendRequest::OtherRequest); // will be overwritten
+
+ bool dummyb;
+ qint8 dummy8;
+ stream >> dummy8;
+ request->type = (RBackendRequest::RCallbackType) dummy8;
+ stream >> request->synchronous;
+ stream >> dummyb;
+ request->done = dummyb;
+ stream >> dummyb;
+ if (dummyb) request->command = unserializeProxy (stream);
+ stream >> dummyb;
+ if (dummyb) request->output = unserializeOutput (stream);
+ stream >> request->params;
+
+ return request;
+}
+
+void RKRBackendSerializer::serializeOutput (const ROutputList &list, QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ stream << (qint32) list.size ();
+ for (qint32 i = 0; i < list.size (); ++i) {
+ stream << (qint8) list[i]->type;
+ stream << list[i]->output;
+ }
+}
+
+ROutputList* RKRBackendSerializer::unserializeOutput (QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ ROutputList *ret = new ROutputList ();
+ qint32 len;
+ stream >> len;
+#if QT_VERSION >= 0x040700
+ ret->reserve (len);
+#endif
+
+ for (qint32 i = 0; i < len; ++i) {
+ ROutput* out = new ROutput;
+ qint8 dummy8;
+ stream >> dummy8;
+ out->type = (ROutput::ROutputType) dummy8;
+ stream >> out->output;
+ ret->append (out);
+ }
+
+ return ret;
+}
+
+void RKRBackendSerializer::serializeData (const RData &data, QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ RData::RDataType type = data.getDataType ();
+ stream << (qint8) type;
+ if (type == RData::IntVector) stream << data.getIntVector ();
+ else if (type == RData::StringVector) stream << data.getStringVector ();
+ else if (type == RData::RealVector) stream << data.getRealVector ();
+ else if (type == RData::StructureVector) {
+ RData::RDataStorage list = data.getStructureVector ();
+ qint32 len = list.size ();
+ stream << len;
+ for (qint32 i = 0; i < list.size (); ++i) {
+ serializeData (*(list[i]), stream);
+ }
+ } else {
+ RK_ASSERT (type == RData::NoData);
+ }
+}
+
+RData* RKRBackendSerializer::unserializeData (QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ RData* ret = new RData;
+ RData::RDataType type;
+ qint8 dummy8;
+ stream >> dummy8;
+ type = (RData::RDataType) dummy8;
+ if (type == RData::IntVector) {
+ RData::IntStorage data;
+ stream >> data;
+ ret->setData (data);
+ } else if (type == RData::StringVector) {
+ RData::StringStorage data;
+ stream >> data;
+ ret->setData (data);
+ } else if (type == RData::RealVector) {
+ RData::RealStorage data;
+ stream >> data;;
+ ret->setData (data);
+ } else if (type == RData::StructureVector) {
+ RData::RDataStorage data;
+ qint32 len;
+ stream >> len;
+#if QT_VERSION >= 0x040700
+ data.reserve (len);
+#endif
+ for (qint32 i = 0; i < len; ++i) {
+ data.append (unserializeData (stream));
+ }
+ ret->setData (data);
+ } else {
+ RK_ASSERT (type == RData::NoData);
+ }
+
+ return ret;
+}
+
+void RKRBackendSerializer::serializeProxy (const RCommandProxy &proxy, QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ stream << proxy.command;
+ stream << (qint32) proxy.type;
+ stream << (qint32) proxy.id;
+ stream << (qint32) proxy.status;
+
+ serializeData (proxy, stream);
+}
+
+RCommandProxy* RKRBackendSerializer::unserializeProxy (QDataStream &stream) {
+ RK_TRACE (RBACKEND);
+
+ QString command;
+ stream >> command;
+ qint32 type;
+ stream >> type;
+ RCommandProxy* ret = new RCommandProxy (command, type);
+ qint32 dummy32;
+ stream >> dummy32;
+ ret->id = dummy32;
+ stream >> dummy32;
+ ret->status = dummy32;
+
+ RData *data = unserializeData (stream);
+ ret->swallowData (*data);
+ delete (data);
+
+ return ret;
+}
+
+
+#include <QTimer>
+#include <QLocalSocket>
+RKAbstractTransmitter* RKAbstractTransmitter::_instance = 0;
+RKAbstractTransmitter::RKAbstractTransmitter () : QThread () {
+ RK_TRACE (RBACKEND);
+
+ RK_ASSERT (_instance == 0); // NOTE: Although there are two instances of an abstract transmitter in an RKWard session, these live in different processes.
+ _instance = this;
+ connection = 0;
+
+ moveToThread (this);
+}
+
+RKAbstractTransmitter::~RKAbstractTransmitter () {
+ RK_TRACE (RBACKEND);
+}
+
+void RKAbstractTransmitter::transmitRequest (RBackendRequest *request) {
+ RK_TRACE (RBACKEND);
+ RK_ASSERT (connection);
+
+ if (!connection->isOpen ()) {
+ handleTransmissionError ("Connection not open while trying to write request. Last error was: " + connection->errorString ());
+ return;
+ }
+
+ QByteArray buffer = RKRBackendSerializer::serialize (*request);
+ connection->write (QString::number (buffer.length ()).toLocal8Bit () + "\n");
+ connection->write (buffer);
+}
+
+void RKAbstractTransmitter::customEvent (QEvent *e) {
+ RK_TRACE (RBACKEND);
+
+ if (((int) e->type ()) == ((int) RKRBackendEvent::RKWardEvent)) {
+ RKRBackendEvent *ev = static_cast<RKRBackendEvent*> (e);
+ writeRequest (ev->data ());
+ } else {
+ RK_ASSERT (false);
+ return;
+ }
+}
+
+void RKAbstractTransmitter::fetchTransmission () {
+ RK_TRACE (RBACKEND);
+
+ if (!connection->isOpen ()) {
+ handleTransmissionError ("Connection not open while trying to read request. Last error was: " + connection->errorString ());
+ return;
+ }
+
+ if (!connection->canReadLine ()) return;
+
+ QString line = QString::fromLocal8Bit (connection->readLine ());
+ bool ok;
+ int expected_length = line.toInt (&ok);
+ if (!ok) handleTransmissionError ("Protocol header error. Last connection error was: " + connection->errorString ());
+
+ QByteArray receive_buffer;
+ while (receive_buffer.length () < expected_length) {
+ if (connection->bytesAvailable ()) {
+ receive_buffer.append (connection->read (expected_length - receive_buffer.length ()));
+ } else {
+ connection->waitForReadyRead (1000);
+ if (!connection->isOpen ()) {
+ handleTransmissionError ("Connection closed unexepctedly. Last error: " + connection->errorString ());
+ return;
+ }
+ }
+ }
+
+ requestReceived (RKRBackendSerializer::unserialize (receive_buffer));
+
+ if (connection->bytesAvailable ()) QTimer::singleShot (0, this, SLOT (fetchTransmission ()));
+}
+
+void RKAbstractTransmitter::setConnection (QLocalSocket *_connection) {
+ RK_TRACE (RBACKEND);
+ RK_ASSERT (!connection);
+
+ connection = _connection;
+ RK_ASSERT (connection->isOpen ());
+
+ connect (connection, SIGNAL (readyRead()), this, SLOT (fetchTransmission()));
+ connect (connection, SIGNAL (disconnected()), this, SLOT (disconnected()));
+}
+
+void RKAbstractTransmitter::disconnected () {
+ RK_TRACE (RBACKEND);
+
+ handleTransmissionError ("Connection closed unexpectedly. Last error was: " + connection->errorString ());
+}
+
+#include "rktransmitter.moc"
Added: branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.h
===================================================================
--- branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.h (rev 0)
+++ branches/2010_10_18_backend_restructuring_branch/rkward/rbackend/rktransmitter.h 2010-11-18 15:32:05 UTC (rev 3193)
@@ -0,0 +1,72 @@
+/***************************************************************************
+ rktransmitter - description
+ -------------------
+ begin : Thu Nov 18 2010
+ copyright : (C) 2010 by Thomas Friedrichsmeier
+ email : tfry at users.sourceforge.net
+ ***************************************************************************/
+
+/***************************************************************************
+ * *
+ * This program is free software; you can redistribute it and/or modify *
+ * it under the terms of the GNU General Public License as published by *
+ * the Free Software Foundation; either version 2 of the License, or *
+ * (at your option) any later version. *
+ * *
+ ***************************************************************************/
+
+#ifndef RKTRANSMITTER_H
+#define RKTRANSMITTER_H
+
+#include "rkrbackendprotocol_shared.h"
+
+#include <QThread>
+#include <QByteArray>
+
+#ifndef RKWARD_SPLIT_PROCESS
+# error This should only be compiled for split process backends.
+#endif
+
+/** functions for serialization / unserialization of communication between backend and frontend.
+NOTE: This could really be a namespace, instead of a class, but "friending" a class is simply easier... */
+class RKRBackendSerializer {
+public:
+ static QByteArray serialize (const RBackendRequest &request);
+ static RBackendRequest *unserialize (const QByteArray &buffer);
+
+private:
+ static void serializeOutput (const ROutputList &list, QDataStream &stream);
+ static void serializeData (const RData &data, QDataStream &stream);
+ static void serializeProxy (const RCommandProxy &proxy, QDataStream &stream);
+ static ROutputList* unserializeOutput (QDataStream &stream);
+ static RData* unserializeData (QDataStream &stream);
+ static RCommandProxy* unserializeProxy (QDataStream &stream);
+};
+
+class QLocalSocket;
+/** The base class for the frontend- and backend transmitters */
+class RKAbstractTransmitter : public QThread {
+Q_OBJECT
+public:
+ static RKAbstractTransmitter* instance () { return _instance; };
+ virtual ~RKAbstractTransmitter ();
+protected:
+ RKAbstractTransmitter ();
+
+ virtual void writeRequest (RBackendRequest *request) = 0;
+ virtual void requestReceived (RBackendRequest *request) = 0;
+ virtual void handleTransmissionError (const QString &message) = 0;
+
+ void transmitRequest (RBackendRequest *request);
+ void customEvent (QEvent *e);
+ void setConnection (QLocalSocket *connection);
+ QLocalSocket *connection;
+private slots:
+ /** Note: this blocks until a compelete request has been received. Connected to the "readyRead"-signal of the connection. Calls requestReceived() once the request has been read. */
+ void fetchTransmission ();
+ void disconnected ();
+private:
+static RKAbstractTransmitter* _instance;
+};
+
+#endif
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
More information about the rkward-tracker
mailing list