Implemeting Active Object pattern with concurrent Java – 2

In the previous post, I discussed some steps towards implementing active object patter with concurrent Java.

Now, let’s turn to each “active object” (Server or Client). We expect from each object to hold a single thread of execution. To have this, it maintains an instance of an ExecutorService described above. Also, it maintains a queue of messages (ProcessStore). Through time, it scans through its ProcessStore to fetch the available MethodInvocations to be processed. It submits each MethodInvocation to its ExecutorService to be executed. To achieve this, we make each active object implement Runnable interface. Then, we introduce an init() method to be called to actually submit the instance of the active object (itself) to its own ExecutorService. Then through the run method it processes the messages out of the ProcessStore:

[java]
class Server implements Runnable, ProcessStoreAware {

private ProcessStore ps = new ProcessStore();
private ExecutorService es = new InterruptibleThreadPoolExecutor(
new InterruptibleThreadFactory());
private MethodInvocation currentMI;

public void init() {
this.es.submit(this);
}

@Override
public void run() {
for (;;) {
try {
MethodInvocation mi = ps.take();
currentMI = mi;
this.es.submit(mi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
public ProcessStore getProcessStore() {
return ps;
}

}
[/java]

Having a Server, a Client may now queue a message in Server to be processed:

[java]
Callable command = new Callable() {
public Object call() {
myServer.request();
}
}
MethodInvocation mi = new MethodInvocation(command);
((ProcessStoreAware) myServer).getProcessStore().add(mi);
[/java]

Now, there are circumstances when a MethodInvocation is being executed, it required to await the current thread of execution for a specific condition to take effect; i.e. a MethodInvocation awaits on a condition. For instance, Server maintains a taken property of type boolean to denote whether it is already occupied with a Client or not. So, in such situation, if another Client also requests for the Server, then it should await on taken be available again. On the other hand, when taken is again available, all the waiting MethodInvocations should be notified. We need a event notification mechanism to release (signal) all the waiting ones. We introduce SingalAction to encapsulate such event of notification:

[java]
public class SignalAction implements Callable {

private MethodInvocation originalMI;
private ExpressionHolder expressionHolder;
private ProcessStoreAware executor;

public SignalAction(final MethodInvocation originalMI, final ExpressionHolder expressionHolder,
final ProcessStoreAware executor) {
this.originalMI = originalMI;
this.expressionHolder = expressionHolder;
this.executor = executor;
}

public Object call() throws Exception {
if (expressionHolder.getValue()) {
originalMI.signal();
} else {
executor.getProcessStore().offer(copy());
}
return null;
}

private MethodInvocation copy() {
MethodInvocation mi = new MethodInvocation(this, originalMI);
return mi;
}

}
[/java]

SignalAction uses an interface ExpressionHolder that presents a value to denote whether or not the releasing condition is satisfied. If satisfied, the original MethodInvocation is signalled; otherwise, it queues another MethodInvocation based on itself to check the condition some time later in future. This continues till the release conditions are satisfied and the original MethodInvocation is signalled. ExpressionHolder is a simple interface to present a boolean value. We cannot use the original fields or variables since they are changed through that their corresponding value will not be updated:

[java]
public interface ExpressionHolder {
Boolean getValue();
}
[/java]

Having SignalAction, we add a method to Server to simplify the job of queuing an instance of SignalAction:

[java]
private void addSignalActionNotifier(MethodInvocation mi, ExpressionHolder expressionHolder) {
SignalAction sa = new SignalAction(mi, expressionHolder, this);
MethodInvocation saMI = new MethodInvocation(sa, mi);
this.ps.offer(saMI);
}
[/java]

Now, we modify Server.request() method to observe such circumstances:

[java]
private Boolean taken = false;

public void request() {
while (!(!taken)) {
addSignalActionNotifier(currentMI, new ExpressionHolder() {
public Boolean getValue() {
return !taken;
}
});
((Interruptible) currentMI).await();
}
taken = true;
}

public void free() {
taken = false;
}
[/java]

A sample code to run for the whole scenario would be:

[java]
Server s = new Server();
Client c1 = new Client(s);
Client c2 = new Client(s);
s.init();
c1.init();
c2.init();
for (int i = 0; i < 20; ++i) {
c1.process();
Thread.sleep((long) (Math.random() * 1000));
c2.process();
Thread.sleep((long) (Math.random() * 1000));
c1.finish();
Thread.sleep((long) (Math.random() * 1000));
c2.finish();
}
[/java]

As a side note, I am aware that there is already languages or frameworks for such implementation such as Scala, Groovy GPars or Clojure. The discuss here is used to in cases that some modelling language will be transformed to Java code using a model transformer. Still, the target code does not have to be straight Java since there are other languages that finally run on JRE.

 

Leave a Reply

Your email address will not be published. Required fields are marked *