Method One:

Use LinkedHashMap as cache and synchronized as lock.

public class PublicQueue<T> {
    // index to insert data
    private int putIndex = 0;
    // maximum capacity
    private int maxCount = 50;

    // buffer
    private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();


    /**
     * add data
     * @param msg
     */
    public synchronized void put(T msg) {
        // if the size equals to maxcount, block it
        if (linkedHashMap.size() == maxCount) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            // if not, wake up all threads
            notifyAll();
        }

        // put data into the buffer
        linkedHashMap.put(putIndex, msg);
        System.out.println("Produce a product, current index is:"+putIndex+"===content:"+msg+"===buffer length:"+linkedHashMap.size());
        //update putIndex
        putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1;
    }


    /***
     * get data from blocking queue
     * @return
     */
    public synchronized T get() {
        // if it is empty, block consumer
        if (linkedHashMap.size() == 0) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            notifyAll();
        }

        // get data through Iterator, make sure data is ordered
        Iterator iterator = linkedHashMap.entrySet().iterator();
        T t = null;
        if (iterator.hasNext()) {
            Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) iterator.next();
            t = entry.getValue();
            int index = entry.getKey();
            linkedHashMap.remove(index);
            System.out.println("Consume a product,current index is:"+index+"===content:"+ t +"===buffer length:"+linkedHashMap.size());
        }
        return t;
    }
}
public class Producer extends Thread{
    private PublicQueue publicQueue;

    public Producer(PublicQueue publicQueue) {
        this.publicQueue = publicQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 60; i++) {
            publicQueue.put(String.valueOf(i));
        }
    }
}
public class Consumer extends Thread{
    private PublicQueue publicQueue;

    public Consumer(PublicQueue publicQueue) {
        this.publicQueue = publicQueue;
    }

    @Override
    public void run() {
        for (; ; ) {
            publicQueue.get();
        }
    }
}
public class ProviderConsumerTest {
    public static void main(String[] args) {
        PublicQueue publicQueue = new PublicQueue();
        Producer producer = new Producer(publicQueue);
        Consumer consumer = new Consumer(publicQueue);
        provider.start();
        consumer.start();
    }
}

Method Two:

Use BlockingDeque in JAVA

public class PublicQueue<T> {
    // buffer
    private BlockingDeque<T> blockingDeque = new LinkedBlockingDeque<>(50);

    public void add(T msg){

        try {
            blockingDeque.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Produce a product, current index is:"+"=== content:"+msg);
    }

    public T get(){

        T t = null;
        try {
            t = blockingDeque.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Consume a product, current index is:"+"=== content:"+t);
        return t;
    }
}

0 Comments

Leave a Reply

Avatar placeholder

Your email address will not be published. Required fields are marked *

Table of Contents

Catalogue