Threading design pattern ?

When designing the new libssh architecture, we decided to make it completely callback-based, even internally. This provide cool features, like the possibility to extend libssh without recompiling it, or executing more easily asynchronous or nonblocking functions. Libssh 0.5 will run as a state machine, which listen for packets, and then calls callbacks from the packet type. The handlers will evaluate the current state of the session and what to do with packets. Sometimes, the state of the session itself changes as the result of a packet (e.g. when receiving a SSH_AUTH_SUCCESS packet).

A sequence diagram of a synchronous function such as authentication or simple channel read can be systematized as following:

What’s happening is pretty straightforward. The thread X is waiting for a specific packet x (or more precisely, the internal state change caused by packet x). It calls a function called ProcessInput (this is a simplification) which itself locks itself and tries to read packets from the socket. After a packet has been read, the callback for the packet (in this case, x) is called, which updates the internal state of the session.

ProcessInput returns after reading one packet. X verifies that the state changes to x, otherwise, it simply tries a ProcessInput again (not on the drawing) until it receives a state change it can process.

Then, what’s the problem ?

I though that this design could provide an interesting feature to libssh users. By adding a lock in the ProcessInput function (already on the previous drawing), we could let applications call different libssh functions on a same session, simultaneously, in different threads. Thread X could be doing a blocking ssh_channel_read() on one channel while thread Y could be doing the same on another. A naive implementation of locking would give this result :

This sequence diagram is a simple extension of the previous one. Thread X waits for a packet x, and thread Y wait its turn by using a lock (or semaphore) in ProcessInput(Y). Looks great, except there’s a downfall. This exemple does not work if the first packet to show up is not an x packet :

In this example, the x packet never arrives. the ProcessInput called by the X thread receives the y packet and do process it (after all, all threads can manipulate the internal state of the session). The problem is that after ProcessInput has processed the y packet (and left X unhappy and looping in hope of receiving the x packet), ProcessInput(Y)’s lock was released, and Y is doing a packet read, which can be blocking and make the Y thread wait for a moment. This is unfortunate because Y was in the correct state y before calling ReadPacket. Unfortunately, ProcessInput is meant to be generic enough and doesn’t know anything about the x or y states.

I’m looking for some kind of design pattern, or elegant solution to resolve this problem, by those who already resolved this problem before me.

Potential solution ?

I have though of two solutions:

  • A lock would “remember” it was blocked by another thread, would wait until the lock is free and then directly return to the caller. This way, our Y thread would not run the potentially blocking ReadPacket function, in the case thread X made the hard work for him. In the opposite case (our second example), Y would call ProcessInput a second time and catch the y packet soon or later.
    Unfortunately, I do not see an elegant way of doing this with the common denominator building blocks of pthread and win32 threads. It doesn’t look like an elegant solution to me, but it complies with the specification “ProcessInput returns when at least one packet has been read since the call for ProcessInput”.
  • A received packet counter would be read at the start of the ProcessInput function and stored in the local stack. The packet counter would then be incremented each time a packet is received in the session, and after entering in the critical area of ProcessInput, the values would be compared. If it changed, ProcessInput would return.
    I suspect this scenario is vulnerable to races between the moment the function is called and the counter is read. Nothing tells us that another thread did not just finish to read the y packet before we initialize the local packet counter.
  • The ProcessInput function would take an additional parameter which would help it to tell if the ReadPacket function is still worth being called. This could be a callback to be called just after acquiring the lock. For instance, Y could call ProcessInput with a specific callback function check_y() which checks if the status has changed by action of the y packet. This function could also be called by the Y function itself in the ProcessInput loop, since it’s somewhat the termination condition for the loop.
    As a drawback, I think this solution adds additional binding between the ProcessInput function and the potential callers (there are hundreds of them in libssh master) and may add too much complexity.

What’s your opinion ? Feel free to comment !

Aris

The Conversation {1 comments}

  1. Preston A. Elder {Friday July 30, 2010 @ 1:06 pm}

    The question I have is why anyone sane would design their app in the way you’re describing. I am planning to implement the socket back end of libssh 0.5 using boost::asio. This allows for multiple threads to read the socket like you describe, but it also allows for what it calls ‘strands’. Strands guarantee that only one thread can be handling events with the same strand.

    Asio works by having an i/o service that can handle almost any kind of event (including a simple callback), and allowing the client programmer to ‘give’ or ‘lend’ threads to processing said events. The number of threads you provide to handle this processing is up to you, but any thread that you give to asio can handle any event that asio is managing (can handle the timer firing, socket read or write, etc).

    So for me this is how I would use libssh with asio:

    Register async socket read event with io_service.
    io_service calls back when some data has been read.
    Register another async socket read event with io_service.
    Invoke the ProcessPacket in the strand specific to the ssh session involved.
    (done)

    If a second thread receives data from the async read, then the act of invoking ProcessPacket in the strand specific to the ssh session means that it will not process the packet directly (it will return immediately), and either the other thread currently within that ‘strand’ will process the next packet when it is done, or when that strand is no longer ‘in use’, some other thread will be invoked with the process packet callback.

    The net effect of the above is that there is ALWAYS one async read event in asio (so that we read as fast as possible), and that packets are automatically serialized without needing to explicitly lock (blocking the threads waiting on the lock, which I don’t want to do under any circumstances).

    For socket writes, I would probably create a second strand (specific to the socket) to handle that – and do all my async writes in that strand guaranteeing that only one thread can write to that socket at the same time (ie. I cannot get messages corrupting over the wire because of two simultaneous writes).

    To extend the above, if you really want multiple threads to be able to handle multiple channels within the same ssh session at once, you could assign a strand to each channel, and then once ProcessPacket has figured out which channel it is for, it invokes ProcessChannelMessage inside a strand specific to that channel. Though I think this is overly complicated and in the real world will gain little.

    Just a note about strands – strands are a logical concept in asio that simply means ‘serialized events within the same strand.’ Executing something within a strand does not mean that it will jump threads or context switch – it just guarantees only one thread can work within that strand at once, and if someone else is working within it, the current thread will just return immediately (NOT block which is important) and the thread currently working in that strand will likely handle that next event when done. You can also ‘exit’ a strand at any time (again without thread jumping or context switching) too at any time.

    This sounds more what you want – your ‘X’ and ‘Y’ threads above should only be reading packets, without really CARING whether they are x or y type packets. Regardless of which is receives, it just calls ProcessInput, which will figure out if it’s an x or y type packet and process it appropriately. Preferably allowing for non-blocking serialization, but if that is not possible, blocking off the ‘next’ packet. There is no reason for X and Y to care specifically about x and y type packets unless thread-specific data is used. These should be generic workers, not a thread specific to the processing of x or y type data. That is only important to the ProcessInput function, which should have all the contextual data to know what to do with it once it determines the type.

Speak Your Peace

  • Comment Policy:Could go here if there's a nagging need Login Instructions: Would go here if there's a desire.