Method One:
Use LinkedHashMap as cache and synchronized as lock.
public class PublicQueue<T> {
// index to insert data
private int putIndex = 0;
// maximum capacity
private int maxCount = 50;
// buffer
private LinkedHashMap<Integer, T> linkedHashMap = new LinkedHashMap<>();
/**
* add data
* @param msg
*/
public synchronized void put(T msg) {
// if the size equals to maxcount, block it
if (linkedHashMap.size() == maxCount) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// if not, wake up all threads
notifyAll();
}
// put data into the buffer
linkedHashMap.put(putIndex, msg);
System.out.println("Produce a product, current index is:"+putIndex+"===content:"+msg+"===buffer length:"+linkedHashMap.size());
//update putIndex
putIndex = (putIndex + 1 >= maxCount) ? (putIndex + 1) % maxCount : putIndex + 1;
}
/***
* get data from blocking queue
* @return
*/
public synchronized T get() {
// if it is empty, block consumer
if (linkedHashMap.size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
notifyAll();
}
// get data through Iterator, make sure data is ordered
Iterator iterator = linkedHashMap.entrySet().iterator();
T t = null;
if (iterator.hasNext()) {
Map.Entry<Integer, T> entry = (Map.Entry<Integer, T>) iterator.next();
t = entry.getValue();
int index = entry.getKey();
linkedHashMap.remove(index);
System.out.println("Consume a product,current index is:"+index+"===content:"+ t +"===buffer length:"+linkedHashMap.size());
}
return t;
}
}
public class Producer extends Thread{
private PublicQueue publicQueue;
public Producer(PublicQueue publicQueue) {
this.publicQueue = publicQueue;
}
@Override
public void run() {
for (int i = 0; i < 60; i++) {
publicQueue.put(String.valueOf(i));
}
}
}
public class Consumer extends Thread{
private PublicQueue publicQueue;
public Consumer(PublicQueue publicQueue) {
this.publicQueue = publicQueue;
}
@Override
public void run() {
for (; ; ) {
publicQueue.get();
}
}
}
public class ProviderConsumerTest {
public static void main(String[] args) {
PublicQueue publicQueue = new PublicQueue();
Producer producer = new Producer(publicQueue);
Consumer consumer = new Consumer(publicQueue);
provider.start();
consumer.start();
}
}
Method Two:
Use BlockingDeque in JAVA
public class PublicQueue<T> {
// buffer
private BlockingDeque<T> blockingDeque = new LinkedBlockingDeque<>(50);
public void add(T msg){
try {
blockingDeque.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produce a product, current index is:"+"=== content:"+msg);
}
public T get(){
T t = null;
try {
t = blockingDeque.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consume a product, current index is:"+"=== content:"+t);
return t;
}
}
0 Comments