package producerconsumer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
int t;
private final Semaphore notEmptyy = new Semaphore(0); // prevent underflow
private final Semaphore notFulll = new Semaphore(10); // prevent overflow
private int itemn;
private int itemb;
int count=0;
int buffersize =10 ;
private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(buffersize);
private final Semaphore mutex = new Semaphore(1); // control buffer access
private final Semaphore notEmpty = new Semaphore(0); // prevent underflow
private final Semaphore notFull = new Semaphore(10); // prevent overflow
public Buffer(){
}
public void add( int x) throws InterruptedException{
while(count== buffersize )
notFulll.acquire();
System.out.println("user printer-request,: " Thread.currentThread().getName() " " x);
queue.offer(x);
count ;
notEmptyy.release();
}
public int take() throws InterruptedException{
while(queue.isEmpty())
notEmptyy.acquire();
t=queue.take();
count--;
notFulll.release();
return t;
}
public void put( ){
while(true){
try {
Random random = new Random();
int data = random.nextInt(100);
notFull .acquire();
mutex .acquire();
add(data);
mutex .release();
notEmpty .release();
// itemb = data;
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
// System.out.println("user printer-request,: " Thread.currentThread().getName() " " itemb);
// this.item = item;
}
}
public void get(){
while(true){
try{
notEmpty .acquire();
mutex .acquire();
itemn = take();
mutex .release();
notFull .release();
queue.remove(itemn);
System.out.println("print request, : " Thread.currentThread().getName() " " itemn );
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
}
}
}
package producerconsumer;
public class producers implements Runnable{
private static final int DELAY = 500;
private Buffer osLabGroup;
public producers(Buffer buffer){
osLabGroup = buffer;
}
public void run(){
// while(true){
osLabGroup.put();
try{
Thread.sleep(DELAY);
}catch (InterruptedException exception) {}
}
}
package producerconsumer;
public class consumers implements Runnable{
private static final int DELAY = 1000;
private Buffer osLabGroup;
public consumers(Buffer buffer){
osLabGroup = buffer;
}
public void run(){
// while(true){
osLabGroup.get();
try{
Thread.sleep(DELAY);
}catch (InterruptedException exception) {}
}
}
//}
package producerconsumer;
public class ProducerConsumer {
public static void main(String[] args) {
Buffer buffer = new Buffer();
producers p1 = new producers(buffer);
consumers c1 = new consumers(buffer);
producers p2 = new producers(buffer);
consumers c2 = new consumers(buffer);
producers p3 = new producers(buffer);
consumers c3 = new consumers(buffer);
Thread pr1 = new Thread(p1);
Thread co1 = new Thread(c1);
Thread pr2 = new Thread(p2);
Thread co2 = new Thread(c2);
Thread pr3 = new Thread(p3);
Thread co3 = new Thread(c3);
pr1.setName("p1");
co1.setName("c1");
pr2.setName("p2");
co2.setName("c2");
pr3.setName("p3");
co3.setName("c3");
pr1.start();
co1.start();
pr2.start();
co2.start();
pr3.start();
co3.start();
}
}
我有一個程式可以模擬列印機的列印作業,一次一個作業:來自 Producer 或 Consumer 并且它共享一個名為Buffer的物件。
在緩沖區中,它有一組執行緒,生產者,以 ? 秒的延遲放置資料。
它也有一組執行緒,消費者,從緩沖區讀取資料,延遲一秒。
就目前而言,我正在嘗試關注這個人https://www.youtube.com/watch?v=nxw2y27z0V4&t=1207s
使用信號量 這是我的代碼 我的問題是我的輸出不準確在此處輸入影像描述
這是我編輯過的代碼
package producerconsumer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Buffer {
int t;
private final Semaphore notEmptyy = new Semaphore(0); // prevent underflow
private final Semaphore notFulll = new Semaphore(10); // prevent overflow
private int itemn;
private int itemb;
int count=0;
int buffersize =10 ;
private final ArrayList<Integer> list = new ArrayList<Integer>(buffersize);
private final LinkedList<Integer> queue = new LinkedList<Integer>(list);
private final Semaphore mutex = new Semaphore(1); // control buffer access
private final Semaphore notEmpty = new Semaphore(0); // prevent underflow
private final Semaphore notFull = new Semaphore(10); // prevent overflow
public Buffer(){
}
public void add( int x) throws InterruptedException{
while(count== buffersize )
notFulll.acquire();
System.out.println("user printer-request,: " Thread.currentThread().getName() " " x);
queue.offer(x);
count ;
notEmptyy.release();
}
public int take() throws InterruptedException{
while(count ==0)
notEmptyy.acquire();
t=queue.pollFirst();
count--;
notFulll.release();
return t;
}
public void put( ){
while(true){
try {
Random random = new Random();
int data = random.nextInt(100);
notFull.acquire();
mutex.acquire();
add(data);
mutex.release();
notEmpty.release();
// itemb = data;
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
// System.out.println("user printer-request,: " Thread.currentThread().getName() " " itemb);
// this.item = item;
}
}
public void get(){
while(true){
try{
notEmpty.acquire();
mutex.acquire();
itemn = take();
mutex.release();
notFull.release();
// queue.remove(itemn);
System.out.println("print request, : " Thread.currentThread().getName() " " itemn );
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
}
}
}
我得到這個輸出在這里輸入影像描述 ,這個輸出在這里輸入影像描述
uj5u.com熱心網友回復:
在 中Buffer#get,當您從佇列中取出一個專案時,您queue#remove再次呼叫。這是錯誤的行為。你應該消費這個專案,而不是在并發控制代碼之外操作佇列。這可能會導致例外行為,例如死鎖。
try {
notEmpty .acquire();
mutex .acquire();
itemn = take();
mutex .release();
notFull .release();
// why remove from queue again? It is already taken from queue. Comment this line
// queue.remove(itemn);
System.out.println("print request, : " Thread.currentThread().getName() " " itemn );
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
另外,在你的程式中,你應該替換ArrayBlockingQueue為java.utilpackage下的一個集合類,比如LinkedList.
你使用的ArrayBlockingQueue實際上是一個執行緒安全的blocking queue。您可以直接使用take&put來完成這個程式,無需其他并發控制。
編輯:
正如視頻介紹中提到的,有兩種方法可以解決生產者-消費者問題,信號量或監視器。您的代碼似乎將這兩者混合在一起。無需在您的add和take方法中進行任何并發控制。
代碼應該是:
public class consumers implements Runnable{
private static final int DELAY = 1000;
private Buffer osLabGroup;
public consumers(Buffer buffer){
osLabGroup = buffer;
}
public void run(){
while (true) {
osLabGroup.get();
try{
Thread.sleep(DELAY);
}catch (InterruptedException exception) {}
}
}
}
public class producers implements Runnable{
private static final int DELAY = 500;
private Buffer osLabGroup;
public producers(Buffer buffer){
osLabGroup = buffer;
}
public void run(){
while(true) {
osLabGroup.put();
try {
Thread.sleep(DELAY);
} catch (InterruptedException exception) {
}
}
}
}
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Semaphore;
public class Buffer {
int buffersize =10 ;
private final LinkedList<Integer> queue = new LinkedList<Integer>();
private final Semaphore mutex = new Semaphore(1); // control buffer access
private final Semaphore notEmpty = new Semaphore(0); // prevent underflow
private final Semaphore notFull = new Semaphore(buffersize); // prevent overflow
public Buffer(){
}
public void add( int x) throws InterruptedException{
System.out.println("user printer-request,: " Thread.currentThread().getName() " " x "," System.currentTimeMillis());
queue.add(x);
}
public int take() throws InterruptedException{
Integer first = queue.removeFirst();
System.out.println("print request, : " Thread.currentThread().getName() " " first "," System.currentTimeMillis() );
return first;
}
public void put(){
try {
Random random = new Random();
int data = random.nextInt(100);
notFull.acquire();
mutex.acquire();
add(data);
mutex.release();
notEmpty.release();
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
}
public void get(){
try{
notEmpty.acquire();
mutex.acquire();
int take = take();
mutex.release();
notFull.release();
} catch (InterruptedException e){
System.out.println("InterruptedException caught ");
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/382682.html
