Implementing Active Object pattern with concurrent Java – 1

I used java.util.concurrent facility to implement Active Object pattern in Java. The Active Object pattern is a pattern in implementation of asynchronous message passing. The flow of how the discussion is formed is as follows that spans through two posts (the other one):

  1. We first look at the approaches that can be taken for this implementation. I discuss the reason I chose the method here.
  2. We discuss the message encapsulation and features that we expect.
  3. We discuss the co-relation between a message and a thread of execution so that messages can be interruptible.
  4. We discuss how we extend Java facilities to provide each active object deployed in one single core.
  5. In the next post, we show the active object is implemented.
  6. Finally, we discuss how the feature of interrupting a message can be achieved.

The current implementation status does not use a proxy method or a scheduler as the original pattern says so since our requirements differ in a way or two. However, these features may be seamlessly added.

Let’s use a sample. A Server is a token object that provides an exclusive access to a token to a set of Clients. A Client asks for the token of the Server through Server.request() to gain access to the exclusive token. To publish this request for the token, the Client has two options:

  1. Client calls Server.request() and the Server is responsible for queuing the message in the message store.
  2. Client creates a message and directly puts the message in the Server‘s message store to be processed later.

The first approach has one trivial problem. To implement this pattern, each object (Client or Server), has a single thread of execution that is used to process the receiving  messages from the other objects. So, if we use the first option, then we need to queue the message in a “synchronous” fashion; i.e. if the Server is running some code while an arbitrary Client needs to queue a message, the Client should wait till the current execution thread in Server to provide access to the Client. We need to have an “asynchronous” communication among the objects at all times so we use the second approach. Each active object exposes its message store (ProcessStore) through a ProcessStoreAware interface.

[java]
public interface ProcessStoreAware {
ProcessStore getProcessStore();
}[/java]

ProcessStore is an encapsulation of LinkedBlockingQueue that is an implementation of a concurrent List API. So, a Client will put a message directly into its associated Server’s message store:

[java]((ProcessStoreAware) myServer).getProcessStore().add(myMessage);[/java]

One of the features of asynchronous message passing is its “non-blocking” nature. When a message is published to a Server, the Client continues its job and its own process. However, the expected return value of the published message can encapsulated as a “future” value computation; i.e. some value that is determined some time in future but does not block the thread of execution in the caller. So, an asynchronous message encapsulation should provide two major functionalities: (1) a wrapper around a call to the required server method (2) a future value as the result. FutureTask in Java provides these.

We define MethodInvocation class to introduce this encapsulation that extends FutureTask and adds some other properties. In our setting, each message is assigned a thread of execution to have control for awaiting and releasing the processor based on the semantics of the requirements. So, every MethodInvocation is aware of its “owner thread” as a property to such circumstances. In this regard, a MethodInvocation implements Interruptible interface to expose the features of “await” and “signal”; however, it actually delegates them to its owner thread. When a MethodInvocation “awaits”, its state is suspended.

[java]
public interface Interruptible {
boolean await() throws RuntimeException;
boolean signal() throws RuntimeException;
}
[/java]

Using locks in mi. The source of MethodInvocation

On the other, as each MethodInvocation is in a one-to-one relationship with Thread, the owner thread should be able to “await” and “release” based on its MethodInvocation. We introduce InterrutptibleThread to expose these features. It uses an instance of a ReentrantLock and its Condition to “await” execution as far as the MethodInvocation is suspended.

[java]
public class InterruptibleThread extends Thread implements Interruptible {

private Lock awaitLock = new ReentrantLock();
private Condition blockedCondition = awaitLock.newCondition();
private MethodInvocation mi;

// *** ADD Constructors

@Override
public boolean await() throws RuntimeException {
try {
awaitLock.lock();
while (this.mi.isSuspended())
blockedCondition.await();
} catch (InterruptedException e) {
interrupt();
} finally {
awaitLock.unlock();
}
return false;
}

@Override
public boolean signal() throws RuntimeException {
try {
awaitLock.lock();
blockedCondition.signalAll();
} finally {
awaitLock.unlock();
}
return false;
}

public void setMethodInvocation(MethodInvocation mi) {
this.mi = mi;
}

}
[/java]

So far, we have the required encapsulation for the “messages” that are stored in each active object. Apart form a ProcessStore, each active object composes an instance of an ExecutorService; a scheduler that directs JVM how to distribute jobs among the processors. We desire to have each active object deployed on one processor. So, we use an extension of ThreadPoolExecutor. It uses a pool of threads to manage jobs on a processor. It tries to re-use the threads that are idle to minimize the resource consumption through JVM. We extend this class for two reasons. First, to override the behavior of creating a new FutureTask out of a MethodInvocation since MethodInvocation is actually an instance of FutureTask:

[java]
@Override
protected RunnableFuture newTaskFor(Callable callable) {
if (callable instanceof MethodInvocation) {
return (FutureTask) callable;
}
return super.newTaskFor(callable);
}

protected RunnableFuture newTaskFor(Runnable runnable, T value) {
if (runnable instanceof MethodInvocation) {
return (FutureTask) runnable;
}
return super.newTaskFor(runnable, value);
}
[/java]

Next, we override beforeExecute method to dually assign the current thread of execution and MethodInvocation to each other so that they can control the flow of execution if necessary. This thread may await based on the underlying calls so other threads will be created for other messages:

[java]
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (r instanceof MethodInvocation) {
InterruptibleThread it = (InterruptibleThread) t;
it.setMethodInvocation((MethodInvocation) r);
((MethodInvocation) r).setOwnerThread(it);
}
}
[/java]

Each instance of ExecutorService is dependent on an instance of ThreadFactory. We provide an implementation since the default implementation uses only one thread group (i.e. “main”) to pool the threads while we need to maintain a pool of threads for each active object:

[java]
public class InterruptibleThreadFactory implements ThreadFactory {

private ThreadGroup group;
private final AtomicInteger counter = new AtomicInteger(1);

public InterruptibleThreadFactory() {
SecurityManager sm = System.getSecurityManager();
ThreadGroup parent = sm != null ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
group = new ThreadGroup(parent, this.hashCode() + "");
}

@Override
public Thread newThread(Runnable r) {
InterruptibleThread it = new InterruptibleThread(group, r, "pool-" + group.getName()
+ "-thread-" + counter.getAndIncrement(), 0);
it.setDaemon(false);
it.setPriority(Thread.NORM_PRIORITY);
return it;
}
}
[/java]

I will continue the discussion in the next post.

Leave a Reply

Your email address will not be published. Required fields are marked *