Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Friday, October 4

Blocking Queue and Consumer pattern for fixed resources

                                 Blocking Queue and Consumer pattern for fixed resources

What is blocking Queue 

Blocking Queue is a collection   introduced in java 1.6 . It provides lot of useful methods to write efficient mutithreaded and concurrent program . 

We will discuss here two very important methods of blocking  Queue here 

1. put() : this method put an object in the queue only If there is capacity to add. In blocking queue ,we can define the maximum possible size of the queue at Queue instance construction time. 

So If we say : 

    BlockingQueue tokensQueue = new LinkedBlockingQueue(10);

maximum allowed Token objects in queue is 10 . 

Blocking Queue is interface and LinkedBlockingQueue is one of it's implementation. 

So if current size of queue is 10 and we call put method like queue.put(Object) , this call will be blocked until an object is consumed from the queue by one of the consumer . After consumption , current size will reduce and queue will have capacity to add more objects. Then queue.put(Object) cal will succeed . 

2. take() : This method retrieves the Object from Queue. This is also blocking call If there is no object available in the queue and queue is empty . It will continue waiting until and object is inserted in the queue by some other thread . Once at least once object is available in queue , queue.take() method will succeed and blocking will break.

One more method I would like to discuss is offer() . offer() is method that puts object in the queue But that is not a blocking call. It simply returns false If object is not added due to capacity and does not wait for Queue getting space for another object. 





-->



Before starting my code example I would like to touch upon one more thing : 

AtomicInteger : this is class available in java now to provide atomic operations while accessing and modifying value of a primitive type. It provides certain methods to provide synchronization and atomicity . Will see in the example How to use that ..

 So now lets move to our code example :

Consider there is a class of 1000 odd students. Now being summer season students keep of going out of class to take water . Teacher can not send a lot of students together So a rule is maintained that at one time maximum of only 10 students can stay outside . 

So implement this practically , 10 tokens are created. Every time student goes outside has to carry a token . After returning will put the token back in pool for someone else to use. 





So how can we implement this using Blocking Queue : 

As usual below code has lot of System.out.println statements . Executing this program will print them on console and they will take you through all the steps followed. 

------------------------------------


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;


// This is the class where tokens are created . It interact with other classes to create and overall //flow.
 
public class Class {


    BlockingQueue tokensQueue = new LinkedBlockingQueue(10);

    public static void main(String[] args) {

        Class Class = new Class();
        Class.generateTokens();

    }
// this method is doing nothing but adding 10 instances of Token //class in queue
    public void generateTokens() {

        for (int i = 0; i < 10; i++) {
            System.out.println("generating token no" + i);

            try {
                tokensQueue.put(new Token());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        System.out.println("Now all 10 tokens are created and are stored in blocking queue ");


// Here consumer thread is started to start using tokens
        Thread BCC = new Thread(new Student(tokensQueue));
        BCC.start();

    }

}
// This is Token class . These tokens are allotted to students 

 
class Token {

    public static AtomicInteger tokenNumber = new AtomicInteger(0);

    public int number_Of_Token_Assigned;

    void useToken(Token token) {
        System.out.println("Student is using token " + token.number_Of_Token_Assigned);
   
}

    Token() {
        this.number_Of_Token_Assigned = tokenNumber.incrementAndGet();
    }

}

// Students are continuously using the tokens So infinite while loop is executed... 
 
class Student implements Runnable {
    BlockingQueue tokenQueue;

    Student(BlockingQueue tokenQueue) {
        this.tokenQueue = tokenQueue;
    }

    public void run() {

        try {
            while (true) {
                System.out.println("student request for a token..");
                Token token = tokenQueue.take();
                System.out.println(token.number_Of_Token_Assigned + " is the token no alloted ...");
                token.useToken(token);
                System.out.println("token" + token.number_Of_Token_Assigned
                        + "is used .Now putting that back into tokenQueue");


                tokenQueue.offer(token);
            }
        } catch (InterruptedException e) {

            e.printStackTrace();
        }

    }

}
 


This will print on console something like :





-----------------------------
2 is the token no alloted ...
Student is currently using token
token2is used .Now putting that back into tokenQueue
student request for a token..
3 is the token no alloted ...
Student is currently using token
token3is used .Now putting that back into tokenQueue
student request for a token..
4 is the token no alloted ...
Student is currently using token
token4is used .Now putting that back into tokenQueue
student request for a token..
5 is the token no alloted ...
Student is currently using token
token5is used .Now putting that back into tokenQueue
student request for a token..
6 is the token no alloted ...
Student is currently using token
token6is used .Now putting that back into tokenQueue
student request for a token..
7 is the token no alloted ...
Student is currently using token
token7is used .Now putting that back into tokenQueue
student request for a token..
8 is the token no alloted ...
Student is currently using token
token8is used .Now putting that back into tokenQueue
student request for a token..
9 is the token no alloted ...
Student is currently using token
token9is used .Now putting that back into tokenQueue
student request for a token..
10 is the token no alloted ...
Student is currently using token
token10is used .Now putting that back into tokenQueue
student request for a token..
1 is the token no alloted ...
Student is currently using token
token1is used .Now putting that back into tokenQueue
student request for a token..
2 is the token no alloted ...
Student is currently using token
token2is used .Now putting that back into tokenQueue
student request for a token..
3 is the token no alloted ...
Student is currently using token
token3is used .Now putting that back into tokenQueue
student request for a token..
4 is the token no alloted ...
Student is currently using token
token4is used .Now putting that back into tokenQueue
student request for a token..
5 is the token no alloted ...
Student is currently using token
token5is used .Now putting that back into tokenQueue
------------

----------
--------

 You can repeat tis example with more consumer by creating more consumer classes as below and see how output comes 

public void generateTokens() {

        for (int i = 0; i < 10; i++) {
            System.out.println("generating token no" + i);

            try {
                tokensQueue.put(new Token());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        System.out.println("Now all 10 tokens are created and are stored in blocking queue ");

        Thread BCC = new Thread(new Student(tokensQueue));
        BCC.start();

  Thread BCC1 = new Thread(new Student(tokensQueue));
        BCC1.start();

  Thread BCC2 = new Thread(new Student(tokensQueue));
        BCC2.start();


    }