Thursday 30 December 2010

Java conccurency -a brief overview

Java is typically compiled to bytecode language that it's later interpreted by the JVM(Java Virtual Machine). This approach has advantages and disadvantages. On one hand, application developers can "write once, run anywhere" as one of the most remarkable benefits, but on the other hand this approach also make Java a low performance programming language.

Java has been criticised for its performance compared to other languages and as a result an entire set of performance practices has been utilised.

Java bytecode can either be interpreted at run time by a virtual machine, or it can be compiled at load time or runtime into machine code which runs directly on the computer's hardware. Interpretation is slower than native execution, and compilation at load time or runtime has an initial performance penalty for the compilation. Although this concepts can explain much of the performance issues, there is something we can do to improve our programs performance.

Concurrent programming has always been difficult, and this is one of the most important deficiencies in the programmers knowledge since ever. However, the Java Team have been building packages to manage this in the painless manner. Nevertheless, it is necessary to know the basics to use this in an application and get the wanted results.

A computer system normally has many active processes and threads. This is true even in systems that only have a single execution core, and thus only have one thread actually executing at any given moment. Processing time for a single core is shared among processes and threads through an OS feature called time slicing.

A process has a self-contained execution environment. A process generally has a complete, private set of basic run-time resources; in particular, each process has its own memory space. Threads are sometimes called lightweight processes. Both processes and threads provide an execution environment, but creating a new thread requires fewer resources than creating a new process. Threads lives within a process and a process has at least one thread.

A computer system normally has many active processes and threads. This is true even in systems that only have a single execution core, and thus only have one thread actually executing at any given moment. Processing time for a single core is shared among processes and threads through an OS feature called time slicing.

A process has a self-contained execution environment. A process generally has a complete, private set of basic run-time resources; in particular, each process has its own memory space. Threads are sometimes called lightweight processes. Both processes and threads provide an execution environment, but creating a new thread requires fewer resources than creating a new process. Threads lives within a process and a process has at least one thread.

In order to answer some questions I will give my best(:)) and I will present a scenario to give some sense to all of these. Suppose you want to simulate a situation in which you have multiple threads asking for process some information and you want to maximise your resources by having multiple threads processing those request. The first that come to my mind is a producer-consumer scenario.

The next big question for me was which one would be the topic of this short application that allows me to make my point. And I thought in the most common situation. Threads trying to make a deposit or withdraw from one only account. That way you have multi-threading, control over the amount of threads that are consuming resources and data sharing where you can have data corruption.

Let's see some code:

First of all I will implement the Accout class:

public class Account {
    private static ReentrantLock lock = new ReentrantLock(true);
    
    private long balance = 200;
    
    public Account() {
    }
    
    public long transfer(long amount) throws CorruptedAccountException {
//        lock.lock();
//        try {
            if (balance < 0)
                throw new CorruptedAccountException();
        
            if ((balance + amount) < 0)
                return balance;
            
            // This emulate a complex task after the control and before the
            // operation that affect the shared variable.
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            balance += amount;
            
            return balance;
        
//        } finally {
//            lock.unlock();
//        }
    }
}
The Account class is created once only and then passed by value to all other classes as you will see further. This will gives us the perfect scenario to the data sharing and is the critical piece of code also. Pay attention to the method transfer that is where all happens. There is a control there that throws an exception if the balance is 0 or less. That way we will know if two threads access the same variable and leave inconsistent data there. Now we have the producer, which in this case is divided in two classes. DepositRequestsProducer that is responsible of add money to the account and WithdrawRequestsProducer which is responsible of withdraw from the account. Both are almost identical except for the sign of the amount(that was done like this to clarify the idea). Furthermore, it is important to note that both producers add transactions to a queue. This is not a minor detail, indeed this queue is the clue to prevent that the amount of transactions growth infinitely demanding more and more resources until an "out of memory" stop the program. That would the case if the producers add transactions more quickly that the consumer can process.
public class DepositRequestsProducer implements Runnable {
    
    private Thread tread;
    private BlockingQueue queue;

    public DepositRequestsProducer(BlockingQueue q) {
        queue = q;
        tread = new Thread(this);
        tread.start();
    }

    public void run() {
        try {
            Random randomGenerator = new Random();
            while (true) {
                int amount = randomGenerator.nextInt(100);            
                queue.put(new Long(amount));
                Thread.yield();
            }
            
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}
public class WithdrawRequestsProducer implements Runnable {

    private Thread thread;
    private BlockingQueue queue;

    public WithdrawRequestsProducer(BlockingQueue q) {
        queue = q;
        thread = new Thread(this);
        thread.start();
    }

    public void run() {
        try {
            Random randomGenerator = new Random();
            while (true) {
                int amount = randomGenerator.nextInt(300);
                queue.put(new Long(amount*-1));
                Thread.yield();
            }
            
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}
Note that both classes receives a queue. This queue is where the transactions produced by them are going to be put. Lately, the consumer will get the transactions from there to process them. On the other side of the desk we need the Consumer, which is responsible of process the transactions:
public class TransactionConsumer implements Runnable {
    
    private Thread tread;
    private BlockingQueue queue;
    private Account account;

    public TransactionConsumer(BlockingQueue q, Account account, int thread) {
        queue = q;
        this.account = account;
        tread = new Thread(this, "Consumer_" + thread);
        tread.start();
    }

    public void run() {
        long request, result = 0;
        Runtime s_runtime = Runtime.getRuntime();
        
        NumberFormat numberFormat = NumberFormat.getNumberInstance();
        numberFormat.setRoundingMode(RoundingMode.DOWN);

        try {
            while (true) {
                request = queue.take().longValue();
                result = account.transfer(request);
                
                double freeMemory = (s_runtime.freeMemory() / 1048576);
                
                System.out.println("Calculated result after add " + request + " is " + result + " -- Free Memory: " +  numberFormat.format(freeMemory) + " / " + numberFormat.format(s_runtime.totalMemory() / 1048576));
            }
            
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        
        System.exit(0);
    }
}
This class also receives the queue in the constructor and open a thread to run as soon as a transaction were found in the queue. Lastly we have the class that makes it work:
import java.util.concurrent.ArrayBlockingQueue;

public class TestConcurrency {

    public static void main(String[] args) {
        Account account = new Account();
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(40);
        
        for (int i = 0; i < 15; i++)
            new TransactionConsumer(queue, account, i);

     for (int i = 0; i < 30; i++) {
         new Thread(new DepositRequestsProducer(queue)).start();
         new Thread(new WithdrawRequestsProducer(queue)).start();
     }
    }
}

As you can see, when you run the program, the first thread executes the main method. Here an ArrayBlockingQueue is created as final and this is the first clue to analyse. This queue, as we mentioned above, protects the program of an infinite increase on the resource demand. Once the queue reaches the highest of it's capacity, all the threads that intend to put transactions in, will be put to sleep until a consumer takes a transaction out. At this moment all the threads that were sleeping waiting for a place in the queue, are wake up to compete for the queue. The implementation of this ArrayBlockingQueue is very useful because it avoids us a lot of work regarding monitoring threads.

Then we can see that a specified number of threads are created to run producers and consumers, and we are going to have at least 75 threads running as sub-processes of the main one.

Back in the Account class, all the threads will access transfer method code without any synchronisation. This will produce shared data corruption when two threads access to critical sections of code. Lets present a simple example:

Suppose that two threads(which we'll call A and B) access transfer method. Let say that balance variable is 125 and and the thread A is invoked with -100 and the thread B with -50. The thread A passes through the "if" that prevent negative balance and reaches the sleep command. Meanwhile, thread B passes through the balance check before the thread A updates the balance amount. Consequently, the balance will be updated by thread A leaving the balance in 25. When thread B reaches the balance update operation, the shared data gets corrupted.

This is the moment in which we realise that this is "a critical code". Here is when we have to add some protection to transfer method. In Account class you will see some commented lines of code which you should un-comment in order to see the difference.

I have added some extra lines in order to add some information related to memory use. Try to run the program with and without comments to see the difference.

Have luck!!!