Implemeting Active Object pattern with concurrent Java – 2

In the previous post, I discussed some steps towards implementing active object patter with concurrent Java.

Now, let’s turn to each “active object” (Server or Client). We expect from each object to hold a single thread of execution. To have this, it maintains an instance of an ExecutorService described above. Also, it maintains a queue of messages (ProcessStore). Through time, it scans through its ProcessStore to fetch the available MethodInvocations to be processed. It submits each MethodInvocation to its ExecutorService to be executed. To achieve this, we make each active object implement Runnable interface. Then, we introduce an init() method to be called to actually submit the instance of the active object (itself) to its own ExecutorService. Then through the run method it processes the messages out of the ProcessStore:

[java]
class Server implements Runnable, ProcessStoreAware {

private ProcessStore ps = new ProcessStore();
private ExecutorService es = new InterruptibleThreadPoolExecutor(
new InterruptibleThreadFactory());
private MethodInvocation currentMI;

public void init() {
this.es.submit(this);
}

@Override
public void run() {
for (;;) {
try {
MethodInvocation mi = ps.take();
currentMI = mi;
this.es.submit(mi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
public ProcessStore getProcessStore() {
return ps;
}

}
[/java]

Having a Server, a Client may now queue a message in Server to be processed:

[java]
Callable command = new Callable() {
public Object call() {
myServer.request();
}
}
MethodInvocation mi = new MethodInvocation(command);
((ProcessStoreAware) myServer).getProcessStore().add(mi);
[/java]

Now, there are circumstances when a MethodInvocation is being executed, it required to await the current thread of execution for a specific condition to take effect; i.e. a MethodInvocation awaits on a condition. For instance, Server maintains a taken property of type boolean to denote whether it is already occupied with a Client or not. So, in such situation, if another Client also requests for the Server, then it should await on taken be available again. On the other hand, when taken is again available, all the waiting MethodInvocations should be notified. We need a event notification mechanism to release (signal) all the waiting ones. We introduce SingalAction to encapsulate such event of notification:

[java]
public class SignalAction implements Callable {

private MethodInvocation originalMI;
private ExpressionHolder expressionHolder;
private ProcessStoreAware executor;

public SignalAction(final MethodInvocation originalMI, final ExpressionHolder expressionHolder,
final ProcessStoreAware executor) {
this.originalMI = originalMI;
this.expressionHolder = expressionHolder;
this.executor = executor;
}

public Object call() throws Exception {
if (expressionHolder.getValue()) {
originalMI.signal();
} else {
executor.getProcessStore().offer(copy());
}
return null;
}

private MethodInvocation copy() {
MethodInvocation mi = new MethodInvocation(this, originalMI);
return mi;
}

}
[/java]

SignalAction uses an interface ExpressionHolder that presents a value to denote whether or not the releasing condition is satisfied. If satisfied, the original MethodInvocation is signalled; otherwise, it queues another MethodInvocation based on itself to check the condition some time later in future. This continues till the release conditions are satisfied and the original MethodInvocation is signalled. ExpressionHolder is a simple interface to present a boolean value. We cannot use the original fields or variables since they are changed through that their corresponding value will not be updated:

[java]
public interface ExpressionHolder {
Boolean getValue();
}
[/java]

Having SignalAction, we add a method to Server to simplify the job of queuing an instance of SignalAction:

[java]
private void addSignalActionNotifier(MethodInvocation mi, ExpressionHolder expressionHolder) {
SignalAction sa = new SignalAction(mi, expressionHolder, this);
MethodInvocation saMI = new MethodInvocation(sa, mi);
this.ps.offer(saMI);
}
[/java]

Now, we modify Server.request() method to observe such circumstances:

[java]
private Boolean taken = false;

public void request() {
while (!(!taken)) {
addSignalActionNotifier(currentMI, new ExpressionHolder() {
public Boolean getValue() {
return !taken;
}
});
((Interruptible) currentMI).await();
}
taken = true;
}

public void free() {
taken = false;
}
[/java]

A sample code to run for the whole scenario would be:

[java]
Server s = new Server();
Client c1 = new Client(s);
Client c2 = new Client(s);
s.init();
c1.init();
c2.init();
for (int i = 0; i < 20; ++i) {
c1.process();
Thread.sleep((long) (Math.random() * 1000));
c2.process();
Thread.sleep((long) (Math.random() * 1000));
c1.finish();
Thread.sleep((long) (Math.random() * 1000));
c2.finish();
}
[/java]

As a side note, I am aware that there is already languages or frameworks for such implementation such as Scala, Groovy GPars or Clojure. The discuss here is used to in cases that some modelling language will be transformed to Java code using a model transformer. Still, the target code does not have to be straight Java since there are other languages that finally run on JRE.

 

Implementing Active Object pattern with concurrent Java – 1

I used java.util.concurrent facility to implement Active Object pattern in Java. The Active Object pattern is a pattern in implementation of asynchronous message passing. The flow of how the discussion is formed is as follows that spans through two posts (the other one):

  1. We first look at the approaches that can be taken for this implementation. I discuss the reason I chose the method here.
  2. We discuss the message encapsulation and features that we expect.
  3. We discuss the co-relation between a message and a thread of execution so that messages can be interruptible.
  4. We discuss how we extend Java facilities to provide each active object deployed in one single core.
  5. In the next post, we show the active object is implemented.
  6. Finally, we discuss how the feature of interrupting a message can be achieved.

The current implementation status does not use a proxy method or a scheduler as the original pattern says so since our requirements differ in a way or two. However, these features may be seamlessly added.

Let’s use a sample. A Server is a token object that provides an exclusive access to a token to a set of Clients. A Client asks for the token of the Server through Server.request() to gain access to the exclusive token. To publish this request for the token, the Client has two options:

  1. Client calls Server.request() and the Server is responsible for queuing the message in the message store.
  2. Client creates a message and directly puts the message in the Server‘s message store to be processed later.

The first approach has one trivial problem. To implement this pattern, each object (Client or Server), has a single thread of execution that is used to process the receiving  messages from the other objects. So, if we use the first option, then we need to queue the message in a “synchronous” fashion; i.e. if the Server is running some code while an arbitrary Client needs to queue a message, the Client should wait till the current execution thread in Server to provide access to the Client. We need to have an “asynchronous” communication among the objects at all times so we use the second approach. Each active object exposes its message store (ProcessStore) through a ProcessStoreAware interface.

[java]
public interface ProcessStoreAware {
ProcessStore getProcessStore();
}[/java]

ProcessStore is an encapsulation of LinkedBlockingQueue that is an implementation of a concurrent List API. So, a Client will put a message directly into its associated Server’s message store:

[java]((ProcessStoreAware) myServer).getProcessStore().add(myMessage);[/java]

One of the features of asynchronous message passing is its “non-blocking” nature. When a message is published to a Server, the Client continues its job and its own process. However, the expected return value of the published message can encapsulated as a “future” value computation; i.e. some value that is determined some time in future but does not block the thread of execution in the caller. So, an asynchronous message encapsulation should provide two major functionalities: (1) a wrapper around a call to the required server method (2) a future value as the result. FutureTask in Java provides these.

We define MethodInvocation class to introduce this encapsulation that extends FutureTask and adds some other properties. In our setting, each message is assigned a thread of execution to have control for awaiting and releasing the processor based on the semantics of the requirements. So, every MethodInvocation is aware of its “owner thread” as a property to such circumstances. In this regard, a MethodInvocation implements Interruptible interface to expose the features of “await” and “signal”; however, it actually delegates them to its owner thread. When a MethodInvocation “awaits”, its state is suspended.

[java]
public interface Interruptible {
boolean await() throws RuntimeException;
boolean signal() throws RuntimeException;
}
[/java]

Using locks in mi. The source of MethodInvocation

On the other, as each MethodInvocation is in a one-to-one relationship with Thread, the owner thread should be able to “await” and “release” based on its MethodInvocation. We introduce InterrutptibleThread to expose these features. It uses an instance of a ReentrantLock and its Condition to “await” execution as far as the MethodInvocation is suspended.

[java]
public class InterruptibleThread extends Thread implements Interruptible {

private Lock awaitLock = new ReentrantLock();
private Condition blockedCondition = awaitLock.newCondition();
private MethodInvocation mi;

// *** ADD Constructors

@Override
public boolean await() throws RuntimeException {
try {
awaitLock.lock();
while (this.mi.isSuspended())
blockedCondition.await();
} catch (InterruptedException e) {
interrupt();
} finally {
awaitLock.unlock();
}
return false;
}

@Override
public boolean signal() throws RuntimeException {
try {
awaitLock.lock();
blockedCondition.signalAll();
} finally {
awaitLock.unlock();
}
return false;
}

public void setMethodInvocation(MethodInvocation mi) {
this.mi = mi;
}

}
[/java]

So far, we have the required encapsulation for the “messages” that are stored in each active object. Apart form a ProcessStore, each active object composes an instance of an ExecutorService; a scheduler that directs JVM how to distribute jobs among the processors. We desire to have each active object deployed on one processor. So, we use an extension of ThreadPoolExecutor. It uses a pool of threads to manage jobs on a processor. It tries to re-use the threads that are idle to minimize the resource consumption through JVM. We extend this class for two reasons. First, to override the behavior of creating a new FutureTask out of a MethodInvocation since MethodInvocation is actually an instance of FutureTask:

[java]
@Override
protected RunnableFuture newTaskFor(Callable callable) {
if (callable instanceof MethodInvocation) {
return (FutureTask) callable;
}
return super.newTaskFor(callable);
}

protected RunnableFuture newTaskFor(Runnable runnable, T value) {
if (runnable instanceof MethodInvocation) {
return (FutureTask) runnable;
}
return super.newTaskFor(runnable, value);
}
[/java]

Next, we override beforeExecute method to dually assign the current thread of execution and MethodInvocation to each other so that they can control the flow of execution if necessary. This thread may await based on the underlying calls so other threads will be created for other messages:

[java]
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (r instanceof MethodInvocation) {
InterruptibleThread it = (InterruptibleThread) t;
it.setMethodInvocation((MethodInvocation) r);
((MethodInvocation) r).setOwnerThread(it);
}
}
[/java]

Each instance of ExecutorService is dependent on an instance of ThreadFactory. We provide an implementation since the default implementation uses only one thread group (i.e. “main”) to pool the threads while we need to maintain a pool of threads for each active object:

[java]
public class InterruptibleThreadFactory implements ThreadFactory {

private ThreadGroup group;
private final AtomicInteger counter = new AtomicInteger(1);

public InterruptibleThreadFactory() {
SecurityManager sm = System.getSecurityManager();
ThreadGroup parent = sm != null ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
group = new ThreadGroup(parent, this.hashCode() + "");
}

@Override
public Thread newThread(Runnable r) {
InterruptibleThread it = new InterruptibleThread(group, r, "pool-" + group.getName()
+ "-thread-" + counter.getAndIncrement(), 0);
it.setDaemon(false);
it.setPriority(Thread.NORM_PRIORITY);
return it;
}
}
[/java]

I will continue the discussion in the next post.

Multicore Programming and Object-Oriented Languages

I had a literature study on how different object-oriented languages provide programming techniques for multicore programming. The outcome was presented as a research report and a presentation talk.

مطالعه‌ای بر روی برنامه‌نویسی چند‌هسته‌ای در زبان‌های شی‌گرا انجام دادم که نتیجه‌‌ش به صورت یک گزارش و یک ارائه در این زمینه در‌آمد.

 

برنامه نویسی چند‌هسته‌ای – Multicore Programming

در مسیر پیدا‌کردن موضوعی برای پایان‌نامه‌ی فوق لیسانس، به طور خیلی اتفاقی با موضوعی آشنا شدم به نام برنامه‌نویسی چند‌هسته‌ای که چند سالیه موضوع داغی برای تحقیقات در زمینه‌ی زبان‌های برنامه‌نویسی و تولید و توسعه‌ی نرم‌افزار محسوب می‌شه. این داستان از این‌جا جالبه که ایده‌های برنامه‌نویسی موازی از سال‌ها پیش مطرح بود اما در همون زمان به دلیل نداشتن امکانات سخت‌افزاری و قدرت پردازشی لازم روش‌های برنامه‌نویسی و زبان‌های برنامه‌نویسی با دیدگاه تک‌واحدی بودن پردازنده شروع به رشد کردند. از سال ۲۰۰۰ به بعد که سخت‌افزار‌ها از نظر قدرت پردازشی به شدت پیشرفت کردند و مفهوم چند‌هسته‌ای بودن یک پردازنده الان خیلی عادی می‌آد، چندسالیه که این موضوع قدیمی دوباره به روی میز اومده و خیلی‌ها به دنبال این هستند که چه طور نرم‌افزار باید برای این دوره آماده بشوند.

مقایسه‌ی مشابهی در این زمینه وجود داره که زمانی که ساختار و چارچوب شی‌ءگرا مطرح شده و دنیای نرم‌افزار هم به این سمت رفت که از این ساختار بیشتر استفاده کنه، تاریخ نشون می‌ده که بخشی از موضوع به پیاده‌سازی فرهنگ استفاده از این ساختار مربوط بوده؛ به این معنی زمانی طول کشیده تا برنامه‌نویسان در مرحله اول متقاعد بشوند که این ساختار بهتر از ساختار‌های ساخت‌یافته و قدیمی‌تر بودند و در مرحله این تحول ایجاد شد که تقریباً تمام برنامه‌نویسان از ابتدایی که شروع به فکر و نوشتن برای یک نرم‌افزار می‌کنند، راه‌حل‌ها و ایده‌های خود را در این چارچوب ارائه بدهند. در مورد برنامه‌نویسی چند‌هسته‌ای هم در حال حاضر به نظر می‌آید که این تردید در جامعه‌ی برنامه‌نویس وجود دارد و زمانی طول خواهد کشید تا در آینده با این دیدگاه به سراغ حل مسائل و نوشتن نرم‌افزار بروند.

از سوی دیگر، بررسی راه‌حل‌های موجود نشان می‌دهد که تلاش‌های فراوانی در این زمینه صورت گرفته‌است؛ این تلاش‌های اکثراً یا به شکل زبان‌های جدید مبتنی برزبان‌های شیءگرا هستند یا به شکل کتابخانه‌های تکمیلی برای زبان هدف. نیز، دیدگاه‌های جدیدی هم در این زمینه مطرح‌ شده مانند استفاده از مدل Actor یا Software Transactional Memory که هر دو دارای خوبی‌ها و بدی‌هایی هم هستند.از طرف دیگر، دیدگاه‌های توصیفی (Declarative) در مقابل دستوری (Imperative) هم خود مایه‌ای برای تفاوت در تلاش‌های تحقیقاتی در این زمینه می‌باشد. نکته‌ی جالب این است که تقریباً تمام تحقیقات این موضوع به اتفاق از برنامه‌نویسی وظیفه‌گرا (Functional Programming) حمایت می‌کنند.

یکی از چالش‌های مطرح در این شاخه، انتقال (Migration) سیستم‌های حال حاضر در شرکت‌های بزرگ به بستر‌های چند‌هسته‌ای است. اهمیت این چالش در این است که خیلی‌ها به دنبال روش‌هایی هستند که یا بدون تغییر نرم‌افزار یا با کم‌ترین تغییرات این انتقال بستر را انجام دهند. به همین دلیل، به طور مثال، تلاش‌های خیلی وسیعی در این زمینه بر روی بستر جاوا (Java) انجام شده که به طور مثال این موضوع در زبانی مانند اسکالا (Scala) و با استفاده از Actorها و یا در کتابخانه‌ای مانند Multiverse از طریق Software Transactional Memory مورد تحقیق قرار گرفته‌است. در همین راستا، این موضوع نیز جالب است که زبان‌های برنامه‌نویسی در چه سطحی از دستورالعمل‌ها گرفته تا اشیاء برنامه این فناوری را پشتیبانی می‌کنند که هر کدام کاربرد‌های خود را دارند.

در این زمینه کتابی به نام The Art of Multiprocessor Programming کتابی خوب با محتوای مناسب برای آشنایی عمیق با این موضوعات می‌باشد.