目錄
一、認識Phaser類
1、Phaser類特征
2、任務注冊與注銷
3、同步階段變更
二、遺傳演算法
1、DataLoader類:加載資料
2、Individual類:問題所有可能解
3、GeneticOperators類:GA演算法核心
三、GA應用:Phaser類解決旅行商問題(TSP)
2、GeneticPhaser類:Java并發Phaser類實作
3、ConcurrentGeneticTask類:GA演算法階段任務執行
4、ConcurrentGeneticAlgorithm類:GA演算法并發實作
5、主類main
四、實驗結果
五、分析總結
一、認識Phaser類
學習Java并發有一段時間了,從基礎開始,認識了許多同步機制API,這次學習了Phaser類這個同步機制,重繪了以前的認識,再添一項新知識,
下面先認識一下Phaser是什么,然后結合實際應用,通過遺傳演算法經典應用的旅行商問題(TSP)的案例,更好地學習一下Phaser類同步機制的使用方法,
1、Phaser類特征
Java7的并發API引入了Phaser類,提供了強大的同步機制——分段器,它將任務劃分為多個階段執行,處理程序定義了明確的步驟,完成每個階段任務需要遵循先后順序,
主要特點:
- 分段器必須知道要控制的任務數量,Java中成為參與者(任務)的注冊機制,
- 任務完成一個階段后告知分段器,完成此階段之前,分段器使該任務處于休眠狀態,
- 分段器保存了一個整數值,存盤已經進行的階段變更數目,
- 當分段器做出階段變更時候,可以執行定制的代碼,
- 控制phaser的終止,
- 可以通過方法獲取參與者的數目和狀態,
2、任務注冊與注銷
這里根據上面說到的第1個特點,介紹注冊參與者的方法,注冊可以在執行任務開始時,也可以隨時注冊,
Phaser物件創建,常用的2個建構式:
- Phaser():創建一個有0個參與者的分段器,
- Phaser(int parties):創建一個給定數目參與者的分段器,
顯式創建方法:
- bulkRegisterl(int parties) :注冊給定數目的參與者,
- register():注冊一個參與者,
分段器控制的任務完成之后必須注銷,否則分段器會處于一直等待狀態,
注銷方法:
arriveAndDeRegister():告知分段器該任務已完成當前階段,而且不參與下一階段,
3、同步階段變更
分段器的主要目的是使那些可以分割為多階段演算法,以并發方式執行,任務只有完成當前階段,才能進入下一個階段,
Phaser類中有arrive,arriveAndDeRegister,arriveAndwaitAdvance三個方法來告知當前階段已經完成,但是當一個任務完成階段后未呼叫其中之一的方法,分段器可能對其余任務或階段進行阻塞,
因此,進入下一階段呼叫以下方法:
- arriveAndwaitAdvance():告知分段器當前階段已經完成,進入下一階段,phaser阻塞該任務,直到所有參與的任務呼叫了其中一種同步方法,
- awaitAdvance(int phaser):告知分段器,若引數等于實際階段數,則等待當前階段結束;不等則回傳,
二、遺傳演算法
上面其實都是一些理論知識,對Phaser有一定的了解之后,開始在實際案例中學習,更進一步理解,遺傳演算法就是一個很好的例子,
遺傳演算法耳熟能詳,是基于自然選擇原理的一種自適應啟發式搜索演算法,用于最優化問題和在搜索問題中尋找更好地方案,換言之,“優勝劣汰,適者生存”,這也是從生物學引申而來的有類似原理的演算法,所以也使用了生物學的一些術語,
一些在遺傳演算法中使用的特定術語:
- 適應度函式:是主要目標,能夠使函式最大化或者最小化的解決方案,來確定方案的優劣,
- 種群:問題的所有可能方案的集合,初始集合可以隨機生成或使用啟發函式獲取初始解決方案,
初始種群之后,啟動包含3個階段的迭代程序:
- 選擇:在種群中選擇更好地個體,個體在適應度函式中有著較好的值,
- 交叉:上一步選擇優質個體后進行交叉,生成新一代個體,具體操作:parent1 X parent2 ——>> child1 + child2 ,主要依賴于待解決問題,
- 突變:可以應用突變運算子更改某個體的值,只對極少個體執行此操作,
結束標準:
- 固定代數
- 適應度函式設定的閾值
- 找到滿足預定標準的方案
- 時間限制
- 手動停止
將遺傳演算法應用于TSP問題,目標找到一條最優路線,經過每個城市有且僅有一次,同時旅行總距離最短,
資料有兩份,包括15個城市和57個城市,二維矩陣中每個值表示城市之間的距離,

1、DataLoader類:加載資料
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
public class DataLoader {
/**
* @param path
* @return int[][]
* @author Charzous
* @date 2021/2/5 10:46
*
* 加載檔案,轉化為二維陣列
*/
public static int[][] load(Path path) throws IOException {
InputStream in = Files.newInputStream(path);
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
String[] row = null;
int[][] a;
ArrayList<String> arrayList = new ArrayList<>();
int rowNum = 0, colNum = 0, i = 0, j = 0;
while ((line = reader.readLine()) != null) {
arrayList.add(line);
colNum = line.split(",").length;
}
rowNum = arrayList.size();
a = new int[rowNum][colNum];
int count = 0;
for (String str : arrayList) {
row = str.split(",");
for (i = 0; i < colNum; i++) {
a[count][i] = Integer.parseInt(row[i]);
}
count++;
}
return a;
}
/**
* @param args
* @return void
* @author Charzous
* @date 2021/2/5 10:45
*
* 類內測驗,加載檔案
*/
// public static void main(String[] args) throws IOException {
// Path file = Paths.get("data/kn57_dist.txt");
// int[][] a;
// a = DataLoader.load(file);
// for (int i = 0; i < a.length; i++) {
// for (int j = 0; j < a[0].length; j++) {
// System.out.print(a[i][j] + " ");
// }
// System.out.println();
// }
// }
}
2、Individual類:問題所有可能解
/**
* 存放訪問不同城市的順序,TSP問題的所有可能解
*
* @author Charzous
* @date 2021/2/5 10:53
*/
public class Individual implements Comparable<Individual> {
private Integer[] chromosomes;
private int value;
public Integer[] getChromosomes() {
return chromosomes;
}
public void setChromosomes(Integer[] chromosomes) {
this.chromosomes = chromosomes;
}
public void setValue(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public Individual(int size) {
chromosomes = new Integer[size];
}
public Individual(Individual other) {
chromosomes = other.getChromosomes().clone();
}
@Override
public int compareTo(Individual o) {
return Integer.compare(this.getValue(), o.getValue());
}
@Override
public String toString() {
String ret = "";
for (Integer number : chromosomes)
ret += number + ",";
return ret;
}
}
3、GeneticOperators類:GA演算法核心
遺傳演算法核心內部邏輯實作:選擇、交叉、個體和種群評估,詳見代碼注釋
import java.util.*;
/**
* 實作遺傳演算法內部邏輯
*
* @author Charzous
* @date 2021/2/5 11:21
*
*/
public class GeneticOperators {
/**
* @param selected
* @param numberOfIndividual
* @param size
* @return GeneticAlgorithm.Individual[]
* @author Charzous
* @date 2021/2/5 11:29
*
* 接收被選中的個體,交叉操作生成下一代種群
*/
public static Individual[] crossover(Individual[] selected,int numberOfIndividual,int size){
Individual population[] =new Individual[numberOfIndividual];
Random random=new Random(System.nanoTime());
for (int i=0;i<numberOfIndividual/2;i++){
population[2*i]=new Individual(size);
population[2*i+1]=new Individual(size);
int p1Idx=random.nextInt(selected.length);
int p2Idx;
do {
p2Idx=random.nextInt(selected.length);
}while (p1Idx==p2Idx);
Individual parent1=selected[p1Idx];
Individual parent2=selected[p2Idx];
crossover(parent1,parent2,population[2*i],population[2*i+1]);
}
return population;
}
/**
* @param parent1
* @param parent2
* @param individual1
* @param individual2
* @return void
* @author Charzous
* @date 2021/2/5 11:31
*
* parent1 X parent2 --> individual1 , individual2
*/
public static void crossover(final Individual parent1,final Individual parent2,final Individual individual1,final Individual individual2){
List<Integer> p1= Arrays.asList(parent1.getChromosomes());
List<Integer> p2= Arrays.asList(parent2.getChromosomes());
List<Integer> ch1=new ArrayList<Integer>(p1.size());
List<Integer> ch2=new ArrayList<Integer>(p2.size());//p1 ? p2
int size=p1.size();
Random random=new Random();
int number1=random.nextInt(size-1);
int number2;
//1,14,6,2,3,5,7,0,12,4,11,13,9,8,10,
do {
number2=random.nextInt(size);
}while (number1==number2);
int start=Math.min(number1,number2);
int end=Math.max(number1,number2);
ch1.addAll(p1.subList(start,end));
ch2.addAll(p2.subList(start,end));
int currentCity=0;
int currentCityParent1=0;
int currentCityParent2=0;
for (int i = 0; i < size; i++) {
currentCity=(end+i)%size;
currentCityParent1=p1.get(currentCity);
currentCityParent2=p2.get(currentCity);
if (!ch1.contains(currentCityParent2))
ch1.add(currentCityParent2);
if (!ch2.contains(currentCityParent1))
ch2.add(currentCityParent1);
}
Collections.rotate(ch1,start);
Collections.rotate(ch2,start);
individual1.setChromosomes(ch1.toArray(individual1.getChromosomes()));
individual2.setChromosomes(ch2.toArray(individual2.getChromosomes()));
}
/**
* @param population
* @return GeneticAlgorithm.Individual[]
* @author Charzous
* @date 2021/2/5 11:23
*
* 獲取種群的最優個體,回傳種群一半的個體,可使用最適合函式選擇
*/
public static Individual[] selection(Individual[] population){
Individual selected[] =new Individual[population.length/2];
for (int i = 0; i < selected.length; i++) {
selected[i]=new Individual(population[i]);
}
return selected;
}
/**
* @param numberOfIndividual
* @param size
* @return GeneticAlgorithm.Individual[]
* @author Charzous
* @date 2021/2/5 11:24
*
* 創建一個群,城市數目size
*/
public static Individual[] initialize(int numberOfIndividual,int size){
Individual population[] =new Individual[numberOfIndividual];
for (int i = 0; i < numberOfIndividual; i++) {
population[i]=new Individual(size);
initialize(population[i].getChromosomes());
}
return population;
}
/**
* @param chromosomes
* @return void
* @author Charzous
* @date 2021/2/5 11:25
*
* 隨機初始化某一個體的染色體,生成合法的個體(即每個城市只訪問一次)
*/
public static void initialize(Integer[] chromosomes){
int size=chromosomes.length;
List<Integer> values=new ArrayList<Integer>(size);
for (int i = 0; i < size; i++) {
values.add(i);
}
Collections.shuffle(values,new Random(System.nanoTime()));
for (int i = 0; i < size; i++) {
chromosomes[i]=values.get(i);
}
}
/**
* @param population
* @param distanceMatrix
* @return void
* @author Charzous
* @date 2021/2/5 11:33
*
* 接收距離矩陣,將適應函式應用到全部個體,對種群排序
*/
public static void evaluate(Individual[] population,int[][] distanceMatrix){
for (Individual individual:population) {
evaluate(individual,distanceMatrix);
}
Arrays.sort(population);
}
/**
* @param individual
* @param distanceMatrix
* @return void
* @author Charzous
* @date 2021/2/5 11:36
*
* 接將適應函式應用到一個個體,結果加入到個體集合,以供對種群所有進行排序
*/
public static void evaluate(Individual individual,int[][] distanceMatrix){
Integer chromosomes[]=individual.getChromosomes();
int totalDistance=0;
int source,destination;
for (int i = 0; i < chromosomes.length-1; i++) {
source=chromosomes[i];
destination=chromosomes[i+1];
totalDistance+=distanceMatrix[source][destination];
}
source=chromosomes[chromosomes.length-1];
destination=chromosomes[0];
totalDistance+=distanceMatrix[source][destination];
individual.setValue(totalDistance);
}
}
三、GA應用:Phaser類解決旅行商問題(TSP)
1、SharedData類:共享物件
import java.util.concurrent.atomic.AtomicInteger;
/**
* 存放任務之間共享的物件
*
* @author Charzous
* @date 2021/2/5 15:16
*
*/
public class SharedData {
private Individual[] population;
private Individual[] selected;
private AtomicInteger index;
private Individual best;
private int[][] distanceMatrix;
public SharedData() {
index=new AtomicInteger();
}
public Individual[] getPopulation() {
return population;
}
public void setPopulation(Individual[] population) {
this.population = population;
}
public Individual[] getSelected() {
return selected;
}
public void setSelected(Individual[] selected) {
this.selected = selected;
}
public AtomicInteger getIndex() {
return index;
}
public void setIndex(AtomicInteger index) {
this.index = index;
}
public Individual getBest() {
return best;
}
public void setBest(Individual best) {
this.best = best;
}
public int[][] getDistanceMatrix() {
return distanceMatrix;
}
public void setDistanceMatrix(int[][] distanceMatrix) {
this.distanceMatrix = distanceMatrix;
}
}
2、GeneticPhaser類:Java并發Phaser類實作
import java.util.Arrays;
import java.util.concurrent.Phaser;
public class GeneticPhaser extends Phaser {
private SharedData data;
public GeneticPhaser(int parties, SharedData data) {
super(parties);
this.data=data;
}
/**
* @param phase
* @param registeredParties
* @return boolean
* @author Charzous
* @date 2021/2/5 15:20
*
* 多載onAdvance方法,使所有任務完成第一階段之后執行代碼
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
int realPhase=phase%3;
if (registeredParties>0) {
switch (realPhase) {
case 0:
case 1:
data.getIndex().set(0);
break;
case 2:
Arrays.sort(data.getPopulation());
if (data.getPopulation()[0].getValue() < data.getBest().getValue()) {
data.setBest(data.getPopulation()[0]);
}
break;
}
return false;
}
return true;
}
public SharedData getData() {
return data;
}
public void setData(SharedData data) {
this.data = data;
}
}
3、ConcurrentGeneticTask類:GA演算法階段任務執行
import JavaConcurrencyProgramming.chapter06.GeneticAlgorithm.GeneticOperators;
import JavaConcurrencyProgramming.chapter06.GeneticAlgorithm.Individual;
import java.util.Random;
/**
* 執行遺傳演算法各個階段的任務
*
* @author Charzous
* @date 2021/2/5 15:28
*
*/
public class ConcurrentGeneticTask implements Runnable {
private GeneticPhaser phaser;
private SharedData data;
private int numberOfGenerations;
private boolean main;
public ConcurrentGeneticTask(GeneticPhaser phaser, int numberOfGenerations,
boolean main) {
this.phaser = phaser;
this.numberOfGenerations = numberOfGenerations;
this.main = main;
this.data = phaser.getData();
}
@Override
public void run() {
Random random = new Random((System.nanoTime()));
for (int i = 0; i < numberOfGenerations; i++) {
if (main)
data.setSelected(GeneticOperators.selection(data.getPopulation()));
phaser.arriveAndAwaitAdvance();
//交叉操作
int individualIndex;
do {
individualIndex = data.getIndex().getAndAdd(2);
if (individualIndex < data.getPopulation().length) {
int secondIndividual = individualIndex++;
int p1Index = random.nextInt(data.getSelected().length);
int p2Index;
do {
p2Index = random.nextInt(data.getSelected().length);
} while (p1Index == p2Index);
Individual parent1 = data.getSelected()[p1Index];
Individual parent2 = data.getSelected()[p2Index];
Individual individual1 = data.getPopulation()[individualIndex];
Individual individual2 = data.getPopulation()[secondIndividual];
GeneticOperators.crossover(parent1, parent2, individual1, individual2);
}
} while (individualIndex < data.getPopulation().length);
phaser.arriveAndAwaitAdvance();
do {
individualIndex = data.getIndex().getAndIncrement();
if (individualIndex < data.getPopulation().length) {
GeneticOperators.evaluate(data.getPopulation()[individualIndex], data.getDistanceMatrix());
}
} while (individualIndex < data.getPopulation().length);
phaser.arriveAndAwaitAdvance();
}
phaser.arriveAndDeregister();
}
}
4、ConcurrentGeneticAlgorithm類:GA演算法并發實作
/**
* 遺傳演算法并發演算法
* @author Charzous
* @date 2021/2/5 15:30
*
*/
public class ConcurrentGeneticAlgorithm {
private int numberOfGenerations;
private int numberOfIndividuals;
private int[][] distanceMatrix;
private int size;
public ConcurrentGeneticAlgorithm(int[][] distanceMatrix, int numberOfGenerations, int numberOfIndividuals) {
this.distanceMatrix = distanceMatrix;
this.numberOfGenerations = numberOfGenerations;
this.numberOfIndividuals = numberOfIndividuals;
size = distanceMatrix.length;
}
public Individual calculate() {
Individual[] population = GeneticOperators.initialize(numberOfIndividuals, size);
GeneticOperators.evaluate(population, distanceMatrix);
SharedData data = new SharedData();
data.setPopulation(population);
data.setDistanceMatrix(distanceMatrix);
data.setBest(population[0]);
int numTasks = Runtime.getRuntime().availableProcessors();
GeneticPhaser phaser = new GeneticPhaser(numTasks, data);
ConcurrentGeneticTask[] tasks = new ConcurrentGeneticTask[numTasks];
Thread[] threads = new Thread[numTasks];
tasks[0] = new ConcurrentGeneticTask(phaser, numberOfGenerations, true);
for (int i = 1; i < numTasks; i++) {
tasks[i] = new ConcurrentGeneticTask(phaser, numberOfGenerations, false);
}
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(tasks[i]);
threads[i].start();
}
for (int i = 0; i < numTasks; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data.getBest();
}
}
5、主類main
import java.io.IOException;
import java.nio.file.Paths;
public class ConcurrentMain {
public static void main(String[] args) throws IOException {
long start, end;
int generations = 1000;
int individuals = 1000;
String[] names={"kn57_dist"};//"lau15_dist" ,
for (String name:names) {
int[][] distanceMatrix = DataLoader.load(Paths.get("data", name + ".txt"));
ConcurrentGeneticAlgorithm concurrentGeneticAlgorithm = new ConcurrentGeneticAlgorithm(distanceMatrix, generations,
individuals);
start = System.currentTimeMillis();
Individual result = concurrentGeneticAlgorithm.calculate();
end = System.currentTimeMillis();
System.out.println("=======================================");
System.out.println("Example:"+name);
System.out.println("Generations: " + generations);
System.out.println("Population: " + individuals);
System.out.println("Execution Time: " + (end - start)+"ms");
System.out.println("Best Individual: " + result);
System.out.println("Total Distance: " + result.getValue());
System.out.println("=======================================");
}
}
}
四、實驗結果
資料和實驗引數設定:
generations = 1000; individuals = 1000; 資料:lau15_dist.txt(15個城市)
測驗執行10次,平均執行時間為500ms左右,

當然,種群數和代數可以設定其他值,資料集使用可以更換,進行實驗,
為了比較并發版本執行表現的提升,簡單實作了單執行緒版本,只需要將ConcurrentGeneticAlgorithm類里面的并發部分修改為單執行緒方法,結果如下:

測驗執行10次,平均執行時間為690ms左右,對比之下可看出并發的性能提升,
五、總結
這篇記錄Phaser類同步機制的原理和實踐理解,學習Java并發有一段時間了,從基礎開始,認識了許多同步機制API,這次學習了Phaser類這個同步機制,重繪了以前的認識,再添一項新知識,先認識一下Phaser是什么,然后結合實際應用,通過遺傳演算法經典應用的旅行商問題(TSP)的案例,更好地學習一下Phaser類同步機制的使用方法,對Phaser一些理論知識有一定的了解之后,開始在實際案例中學習,更進一步理解,遺傳演算法就是一個很好的例子,掌握了分段器在并發編程中的基礎使用,也希望在之后的學習應用能夠有所幫助,
如果覺得不錯歡迎“一鍵三連”哦,點贊收藏關注,有問題直接評論,交流學習!
Java并發編程系列文章:
- 原創 Java并發API案例分析之并發設計原理[https://blog.csdn.net/Charzous/article/details/112603639]
- 原創 Java并發(Runnable+Thread)實作硬碟檔案搜索[https://blog.csdn.net/Charzous/article/details/112853937]
- 原創 并發編程之Callable/Future介面(以單詞最佳匹配演算法為例)[https://blog.csdn.net/Charzous/article/details/113338669]
- 并發編程之Phaser類多階段任務(以遺傳演算法TSP問題為例)[https://blog.csdn.net/Charzous/article/details/113698041]
我的CSDN博客:https://blog.csdn.net/Charzous/article/details/113698041
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/257498.html
標籤:其他
