本文原始碼:GitHub·點這里 || GitEE·點這里
一、概念簡介
1、執行緒通信
在作業系統中,執行緒是個獨立的個體,但是在執行緒執行程序中,如果處理同一個業務邏輯,可能會產生資源爭搶,導致并發問題,通常使用互斥鎖來控制該邏輯,但是在還有這樣一類場景,任務執行是有順序控制的,例如常見的報表資料生成:

- 啟動資料分析任務,生成報表資料;
- 報表資料存入指定位置資料容器;
- 通知資料搬運任務,把資料寫入報表庫;
該場景在相對復雜的系統中非常常見,如果基于多執行緒來描述該程序,則需要執行緒之間通信協作,才能有條不紊的處理該場景業務,
2、等待通知機制
如上的業務場景,如果執行緒A生成資料程序中,執行緒B一直在訪問資料容器,判斷該程序的資料是否已經生成,則會造成資源浪費,正常的流程應該如圖,執行緒A和執行緒B同時啟動,執行緒A開始處理資料生成任務,執行緒B嘗試獲取容器資料,資料還沒過來,執行緒B則進入等待狀態,當執行緒A的任務處理完成,則通知執行緒B去容器中獲取資料,這樣基于執行緒等待和通知的機制來協作完成任務,
3、基礎方法
等待/通知機制的相關方法是Java中Object層級的基礎方法,任何物件都有該方法:
- notify:隨機通知一個在該物件上等待的執行緒,使其結束wait狀態回傳;
- notifyAll:喚醒在該物件上所有等待的執行緒,進入物件鎖爭搶佇列中;
- wait:執行緒進入waiting等待狀態,不會爭搶鎖物件,也可以設定等待時間;
執行緒的等待通知機制,就是基于這幾個基礎方法,
二、等待通知原理
1、基本原理
等待/通知機制,該模式下指執行緒A在不滿足任務執行的情況下呼叫物件wait()方法進入等待狀態,執行緒B修改了執行緒A的執行條件,并呼叫物件notify()或者notifyAll()方法,執行緒A收到通知后從wait狀態回傳,進而執行后續操作,兩個執行緒通過基于物件提供的wait()/notify()/notifyAll()等方法完成等待和通知間互動,提高程式的可伸縮性,
2、實作案例
通過執行緒通信解決上述資料生成和存盤任務的解耦流程,
public class NotifyThread01 {
static Object lock = new Object() ;
static volatile List<String> dataList = new ArrayList<>();
public static void main(String[] args) throws Exception {
Thread saveThread = new Thread(new SaveData(),"SaveData");
saveThread.start();
TimeUnit.SECONDS.sleep(3);
Thread dataThread = new Thread(new AnalyData(),"AnalyData");
dataThread.start();
}
// 等待資料生成,保存
static class SaveData implements Runnable {
@Override
public void run() {
synchronized (lock){
while (dataList.size()==0){
try {
System.out.println(Thread.currentThread().getName()+"等待...");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1));
}
}
}
// 生成資料,通知保存
static class AnalyData implements Runnable {
@Override
public void run() {
synchronized (lock){
dataList.add("hello,");
dataList.add("java");
lock.notify();
System.out.println("AnalyData End...");
}
}
}
}
注意:除了dataList滿足寫條件,還要在AnalyData執行緒執行通知操作,
三、管道流通信
1、管道流簡介
基本概念
管道流主要用于在不同執行緒間直接傳送資料,一個執行緒發送資料到輸出管道,另一個執行緒從輸入管道中讀取資料,進而實作不同執行緒間的通信,
實作分類
管道位元組流:PipedInputStream和PipedOutputStream;
管道字符流:PipedWriter和PipedReader;
新IO管道流:Pipe.SinkChannel和Pipe.SourceChannel;
2、使用案例
public class NotifyThread02 {
public static void main(String[] args) throws Exception {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
// 鏈接輸入流和輸出流
pos.connect(pis);
// 寫資料執行緒
new Thread(new Runnable() {
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
// 將從鍵盤讀取的資料寫入管道流
PrintStream ps = new PrintStream(pos);
while (true) {
try {
System.out.print(Thread.currentThread().getName());
ps.println(br.readLine());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, "輸入資料執行緒:").start();
// 讀資料執行緒
new Thread(new Runnable() {
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(pis));
while (true) {
try {
System.out.println(Thread.currentThread().getName() + br.readLine());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}, "輸出資料執行緒:").start();
}
}
寫執行緒向管道流寫入資料,讀執行緒讀取資料,完成基本通信流程,
四、生產消費模式
1、業務場景
基于執行緒等待通知機制:實作工廠生產一件商品,通知商店賣出一件商品的業務流程,
2、代碼實作
public class NotifyThread03 {
public static void main(String[] args) {
Product product = new Product();
ProductFactory productFactory = new ProductFactory(product);
ProductShop productShop = new ProductShop(product);
productFactory.start();
productShop.start();
}
}
// 產品
class Product {
public String name ;
public double price ;
// 產品是否生產完畢,默認沒有
boolean flag ;
}
// 產品工廠:生產
class ProductFactory extends Thread {
Product product ;
public ProductFactory (Product product){
this.product = product;
}
@Override
public void run() {
int i = 0 ;
while (i < 20) {
synchronized (product) {
if (!product.flag){
if (i%2 == 0){
product.name = "滑鼠";
product.price = 79.99;
} else {
product.name = "鍵盤";
product.price = 89.99;
}
System.out.println("產品:"+product.name+"【價格:"+product.price+"】出廠...");
product.flag = true ;
i++;
// 通知消費者
product.notifyAll();
} else {
try {
// 進入等待狀態
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
// 產品商店:銷售
class ProductShop extends Thread {
Product product ;
public ProductShop (Product product){
this.product = product ;
}
@Override
public void run() {
while (true) {
synchronized (product) {
if (product.flag == true ){
System.out.println("產品:"+product.name+"【價格"+(product.price*2)+"】賣出...");
product.flag = false ;
product.notifyAll(); //喚醒生產者
} else {
try {
product.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
流程描述:ProductFactory生成一件商品,通知商店售賣,通過flag標識判斷控制是否進入等待狀態,商店賣出商品后,再次通知工廠生產商品,
五、源代碼地址
GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent

| 序號 | 文章標題 |
|---|---|
| 01 | Java并發:執行緒的創建方式,狀態周期管理 |
| 02 | Java并發:執行緒核心機制,基礎概念擴展 |
| 03 | Java并發:多執行緒并發訪問,同步控制 |
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/193100.html
標籤:Java
