24 #include "kmessageio.h"
33 : TQObject (parent, name), m_id (0)
55 mSocket =
new TQSocket ();
56 mSocket->connectToHost (host, port);
61 *parent,
const char *name)
64 mSocket =
new TQSocket ();
65 mSocket->connectToHost (host.toString(), port);
81 mSocket =
new TQSocket ();
82 mSocket->setSocket (socketFD);
93 return mSocket->state() == TQSocket::Connection;
98 TQDataStream str (mSocket);
99 str << TQ_UINT8 (
'M');
100 str.writeBytes (msg.data(), msg.size());
103 void KMessageSocket::processNewData ()
109 TQDataStream str (mSocket);
110 while (mSocket->bytesAvailable() > 0)
115 if (mSocket->bytesAvailable() < 5)
128 kdWarning(11001) << k_funcinfo <<
": Received unexpected data, magic number wrong!" << endl;
132 str >> mNextBlockLength;
133 mAwaitingHeader =
false;
138 if (mSocket->bytesAvailable() < (TQ_ULONG) mNextBlockLength)
144 TQByteArray msg (mNextBlockLength);
145 str.readRawBytes (msg.data(), mNextBlockLength);
151 mAwaitingHeader =
true;
158 void KMessageSocket::initSocket ()
161 connect (mSocket, TQ_SIGNAL (connectionClosed()), TQ_SIGNAL (
connectionBroken()));
162 connect (mSocket, TQ_SIGNAL (readyRead()), TQ_SLOT (processNewData()));
163 mAwaitingHeader =
true;
164 mNextBlockLength = 0;
170 return mSocket->peerPort();
175 return mSocket->peerName();
189 if (partner && partner->mPartner)
191 kdWarning(11001) << k_funcinfo <<
": Object is already connected!" << endl;
199 partner->mPartner =
this;
206 mPartner->mPartner = 0;
213 return mPartner != 0;
221 kdError(11001) << k_funcinfo <<
": Not yet connected!" << endl;
227 KMessageProcess::~KMessageProcess()
229 kdDebug(11001) <<
"@@@KMessageProcess::Delete process" << endl;
236 mQueue.setAutoDelete(
true);
241 KMessageProcess::KMessageProcess(TQObject *parent, TQString file) :
KMessageIO(parent,0)
244 kdDebug(11001) <<
"@@@KMessageProcess::Start process" << endl;
246 mProcess=
new TDEProcess;
248 *mProcess << mProcessName << TQString(
"%1").arg(
id);
249 kdDebug(11001) <<
"@@@KMessageProcess::Init:Id= " <<
id << endl;
250 kdDebug(11001) <<
"@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
251 connect(mProcess, TQ_SIGNAL(receivedStdout(TDEProcess *,
char *,
int )),
252 this, TQ_SLOT(slotReceivedStdout(TDEProcess *,
char * ,
int )));
253 connect(mProcess, TQ_SIGNAL(receivedStderr(TDEProcess *,
char *,
int )),
254 this, TQ_SLOT(slotReceivedStderr(TDEProcess *,
char * ,
int )));
255 connect(mProcess, TQ_SIGNAL(processExited(TDEProcess *)),
256 this, TQ_SLOT(slotProcessExited(TDEProcess *)));
257 connect(mProcess, TQ_SIGNAL(wroteStdin(TDEProcess *)),
258 this, TQ_SLOT(slotWroteStdin(TDEProcess *)));
259 mProcess->start(TDEProcess::NotifyOnExit,TDEProcess::All);
262 mReceiveBuffer.resize(1024);
264 bool KMessageProcess::isConnected()
const
266 kdDebug(11001) <<
"@@@KMessageProcess::Is conencted" << endl;
267 if (!mProcess)
return false;
268 return mProcess->isRunning();
270 void KMessageProcess::send(
const TQByteArray &msg)
272 kdDebug(11001) <<
"@@@KMessageProcess:: SEND("<<msg.size()<<
") to process" << endl;
273 unsigned int size=msg.size()+2*
sizeof(long);
275 char *tmpbuffer=
new char[size];
276 long *p1=(
long *)tmpbuffer;
278 kdDebug(11001) <<
"p1="<<p1 <<
"p2="<< p2 << endl;
279 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
283 TQByteArray *buffer=
new TQByteArray();
284 buffer->assign(tmpbuffer,size);
286 mQueue.enqueue(buffer);
289 void KMessageProcess::writeToProcess()
292 if (mSendBuffer || mQueue.isEmpty()) return ;
293 mSendBuffer=mQueue.dequeue();
294 if (!mSendBuffer) return ;
300 mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
303 void KMessageProcess::slotWroteStdin(TDEProcess * )
305 kdDebug(11001) << k_funcinfo << endl;
314 void KMessageProcess::slotReceivedStderr(TDEProcess * proc,
char *buffer,
int buflen)
322 if (!buffer || buflen==0) return ;
323 if (proc) pid=proc->pid();
329 p=(
char *)memchr(pos,
'\n',buflen);
334 a.setRawData(pos,len);
336 kdDebug(11001) <<
"PID" <<pid<<
":" << s << endl;
337 a.resetRawData(pos,len);
344 void KMessageProcess::slotReceivedStdout(TDEProcess * ,
char *buffer,
int buflen)
346 kdDebug(11001) <<
"$$$$$$ " << k_funcinfo <<
": Received " << buflen <<
" bytes over inter process communication" << endl;
349 while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
350 memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
351 mReceiveCount+=buflen;
354 while (mReceiveCount>2*
sizeof(
long))
356 long *p1=(
long *)mReceiveBuffer.data();
361 kdDebug(11001) << k_funcinfo <<
": Cookie error...transmission failure...serious problem..." << endl;
365 if (len<2*
sizeof(
long))
367 kdDebug(11001) << k_funcinfo <<
": Message size error" << endl;
370 if (len<=mReceiveCount)
372 kdDebug(11001) << k_funcinfo <<
": Got message with len " << len << endl;
376 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(
long),len-2*
sizeof(
long));
380 if (len<mReceiveCount)
382 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
390 void KMessageProcess::slotProcessExited(TDEProcess * )
392 kdDebug(11001) <<
"Process exited (slot)" << endl;
393 emit connectionBroken();
400 KMessageFilePipe::KMessageFilePipe(TQObject *parent,TQFile *readfile,TQFile *writefile) :
KMessageIO(parent,0)
403 mWriteFile=writefile;
405 mReceiveBuffer.resize(1024);
408 KMessageFilePipe::~KMessageFilePipe()
412 bool KMessageFilePipe::isConnected ()
const
414 return (mReadFile!=0)&&(mWriteFile!=0);
417 void KMessageFilePipe::send(
const TQByteArray &msg)
419 unsigned int size=msg.size()+2*
sizeof(long);
421 char *tmpbuffer=
new char[size];
422 long *p1=(
long *)tmpbuffer;
424 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
429 buffer.assign(tmpbuffer,size);
430 mWriteFile->writeBlock(buffer);
439 void KMessageFilePipe::exec()
445 int ch=mReadFile->getch();
447 while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
448 mReceiveBuffer[mReceiveCount]=(char)ch;
452 if (mReceiveCount>=2*
sizeof(
long))
454 long *p1=(
long *)mReceiveBuffer.data();
459 fprintf(stderr,
"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
463 if (len==mReceiveCount)
469 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(long),len-2*
sizeof(
long));
482 #include "kmessageio.moc"
This class implements the message communication using function calls directly.
~KMessageDirect()
Destructor, closes the connection.
void send(const TQByteArray &msg)
Overwritten slot method from KMessageIO.
bool isConnected() const
Returns true, if the object is connected to another instance.
KMessageDirect(KMessageDirect *partner=0, TQObject *parent=0, const char *name=0)
Creates an object and connects it to the object given in the first parameter.
This abstract base class represents one end of a message connections between two clients.
KMessageIO(TQObject *parent=0, const char *name=0)
The usual TQObject constructor, does nothing else.
void connectionBroken()
This signal is emitted when the connection is closed.
void received(const TQByteArray &msg)
This signal is emitted when /e send() on the connected KMessageIO object is called.
TQ_UINT32 id()
Queries the ID of this object.
void setId(TQ_UINT32 id)
Sets the ID number of this object.
~KMessageIO()
The usual destructor, does nothing special.
bool isConnected() const
Returns true if the socket is in state /e connected.
KMessageSocket(TQString host, TQ_UINT16 port, TQObject *parent=0, const char *name=0)
Connects to a server socket on /e host with /e port.
virtual TQString peerName() const
void send(const TQByteArray &msg)
Overwritten slot method from KMessageIO.
virtual TQ_UINT16 peerPort() const
~KMessageSocket()
Destructor, closes the socket.