Its been (literally) years since my last contribution here on newlc.com, so I figured its time to get off my behind and share something again. The following is not a new idea by any standards, but having come across a working implementation for it some time ago, I found it elegant and useful enough to improve it further and present it here. Now, the explicit use of threads is often discouraged in Symbian, and even if they should be used, the client-server framework wrapped around the server thread is in most cases more than enough to provide inter-thread communication. However here I try to present a solution to a problem where there are 2 equal producer/consumer threads that require two-way communication not easily available via client-server framework. There seem to be quite a few threads on the forums dealing with issues related to threads and active objects -- hopefully this writeup will provide new tools for people.
Say we have threads A (the consumer) and B (the producer). They communicate via a synchronized buffer. The buffer implementation specifics aren't central for the scope of this article. The buffer's data insertion method blocks the calling thread when there is no room in the buffer. That monitor is signaled every time the buffer's data retrieval method is called, or a special Reset() method is called on the buffer. Now on to the beef of this presentation..
For sending messages between the two threads, we define the following class (ill omit anything obvious for clarity):
typedef TInt (*ThreadCallback)( TInt aMessage, TAny* aParam );
class CThreadMessager : public CActive
{
public: // New methods
TInt SendMessage( TInt aMessage, TAny* aParam );
private: // Constructors
CThreadMessager ( ThreadCallback aCallback );
private: // Data
ThreadCallback iCallback;
TThreadId iNotifyThreadId;
RSemaphore iSemaphore;
RCriticalSection iCritSect;
// call data
TInt iMessage;
TAny* iParam;
TInt iReturnValue;
};
The important bits of the implementation are:
CThreadMessager::CThreadMessager ( ThreadCallback aCallback )
: CActive( EPriorityStandard ),
iCallback( aCallback )
{
CActiveScheduler::Add( this );
// save the thread id of the current thread (RThread() creates a handle
// to the currently executing thread)
iNotifyThreadId = RThread().Id();
}
TInt CThreadMessager::SendMessage( TInt aMessage,
TAny* aParam )
{
// wait for mutex entry -- will be signaled in RunL()
iCritSect.Wait();
// save message values
iMessage = aMessage;
iParam = aParam;
iReturnValue = KErrNone;
if ( IsActive() )
{
Cancel();
}
RThread notifyThread;
TInt ret = notifyThread.Open( iNotifyThreadId );
if ( ret != KErrNone )
{
// could not open a handle to the thread to be notified
iCritSect.Signal();
return ret;
}
// prepare the active object to make a call to RunL()
SetActive();
iStatus = KRequestPending;
TRequestStatus* status = &iStatus;
notifyThread.RequestComplete( status, KErrNone );
notifyThread.Close();
// wait till the return value is updated
iSemaphore.Wait();
return iReturnValue;
}
void CThreadMessager::RunL()
{
// take a local copy of the call parameters
TInt message = iMessage;
TAny* param = iParam;
// release mutex, allowing next thread into SendMessage()
iCritSect.Signal();
// call the callback
iReturnValue = iCallback( message, param );
// release the thread waiting in SendMessage()
iSemaphore.Signal();
}
So what does this thing do, then?
First, the construction. When it is constructed, it stores the current thread's id and adds itself to the current threads active scheduler (this is important). This is why these objects need to be constructed in the thread they are to signal. Usual place in code would be in the beginning of the thread function, after trap harness and active scheduler have been installed. The thread(s) should be be created with shared heap, as such for example:
RThread thread;
User::LeaveIfError( thread.Create( KThreadAName,
CEncapsulatingClass::ThreadFunctionA,
KThreadStackSize, NULL, this ) );
Second, the message passing. Say you have constructed an object of this type in thread A like this:
iThreadAMessager = CThreadMessager::NewL( CEncapsulatingClass::ThreadACallback );
Then, to pass message from any thread to thread A you would use:
iThreadAMessager->SendMessage( EMyCustomMessage, this );
What happens is SendMessage will call SetActive() and notifyThread.RequestComplete() to trigger RunL() being run. The thread switch occurs here (RunL() gets run in thread A instead of the thread calling SendMessage().). SendMessage() will then wait against a semaphore until RunL() has been run and signals it, allowing SendMessage() to return. This way you can pass return values from the thread A to the caller. To make this process asynhronous, just remove the blocking/signaling, but then you'll lose the return value support. And for the last thing, you'll need to define a callback function that gets called from the RunL():
void CEncapsulatingClass::ThreadACallback( TInt aMessage, TAny* aParam )
{
CEncapsulatingClass* self = reinterpret_cast<CEncapsulatingClass*>(aParam);
switch ( aMessage )
{
case EMyCustomMessage:
//TODO: handling code
break;
}
}
In conclusion, the above mechanism is a simple approach to CActive-based communication between threads. It offers a thread-safe (thanks to the RCriticalSections) way of delivering messages from one thread to another and is not limited to a one-way thinking like client-server architecture is -- which, although, is best suited for most cases. The pitfalls include lengthy calculations in the signaled thread -- obviously RunL() cannot be triggered before the previous invocation has returned. In my example this will be implemented through the synchronized buffer; if data producer thread B needs to be interrupted, the data buffer's Reset() will be called which will signal thread B, letting it return from RunL(), allowing the next message to go through.
Looks nice.
One thought though, isn't this quite similar to how RMsgQueue already works?
You could limit the data send through the RMsgQueue to just an integer and a pointer, and you have quite similar functionality right?
RMsgQueue can also be used to send data between threads in different processes, but in that case you ofcourse have to copy also all argument data across the process boundary.
edit: well one difference, you still need to implement the active object handling the queue
I have to confess I have not looked much into RMsgQueue, but yeah it looks like its doing the same thing -- NotifyDataAvailable()+Receive() should do the job nicely.