Qos(Quality of Service,服務質量)概念:
當網路發生擁塞的時候,所有的資料流都有可能被丟棄;為滿足用戶對不同應用不同服務質量的要求,就需要網路能根據用戶的要求分配和調度資源,對不同的資料流提供不同的服務質量:對實時性強且重要的資料報文優先處理;對于實時性不強的普通資料報文,提供較低的處理優先級,網路擁塞時甚至丟棄,QoS應運而生,支持QoS功能的設備,能夠提供傳輸品質服務;針對某種類別的資料流,可以為它賦予某個級別的傳輸優先級,來標識它的相對重要性,并使用設備所提供的各種優先級轉發策略、擁塞避免等機制為這些資料流提供特殊的傳輸服務,配置了QoS的網路環境,增加了網路性能的可預知性,并能夠有效地分配網路帶寬,更加合理地利用網路資源,
為什么要設定Qos:
在RabbitMQ中,佇列向消費者發送訊息,如果沒有設定Qos的值,那么佇列中有多少訊息就發送多少訊息給消費者,完全不管消費者是否能夠消費完,這樣可能就會形成大量未ack的訊息在快取區堆積,因為這些訊息未收到消費者發送的ack,所以只能暫時存盤在快取區中,等待ack,然后洗掉對應訊息,這樣的話,因此希望開發人員能限制此緩沖區的大小,以避免緩沖區里面無限制的未確認訊息問題,這個時候就可以通過使用 basic.qos 方法設定“預取計數”值來完成的,該值定義通道上允許的未確認訊息的最大數量,一旦數量達到配置的數量,RabbitMQ 將停止在通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認,例如,假設在通道上有未確認的訊息 5、6、7,8,并且通道的預取計數設定為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何訊息,除非至少有一個未應答的訊息被 ack,比方說 tag=6 這個訊息剛剛被確認 ACK,RabbitMQ 將會感知這個情況到并再發送一條訊息,訊息應答和 QoS 預取值對用戶吞吐量有重大影響,通常,增加預取將提高向消費者傳遞訊息的速度,雖然自動應答傳輸訊息速率是最佳的,但是,在這種情況下已傳遞但尚未處理的訊息的數量也會增加,從而增加了消費者的RAM 消耗(隨機存取存盤器)應該小心使用具有無限預處理的自動確認模式或手動確認模式,消費者消費了大量的訊息如果沒有確認的話,會導致消費者連接節點的記憶體消耗變大,所以找到合適的預取值是一個反復試驗的程序,不同的負載該值取值也不同 100 到 300 范圍內的值通常可提供最佳的吞吐量,并且不會給消費者帶來太大的風險,預取值為 1 是最保守的,當然這將使吞吐量變得很低,特別是消費者連接延遲很嚴重的情況下,特別是在消費者連接等待時間較長的環境中,對于大多數應用來說,稍微高一點的值將是最佳的,
Qos的取值問題:
在傳輸效率和消費者消費速度之間做一個平衡,這個值是需要不斷嘗試的,因為太低,信道傳輸訊息效率太低,如果太高,消費者來不及確認訊息導致訊息積累問題,記憶體消耗不斷增大,
不公平分發
概念:
如果采用默認訊息分發策略,訊息是輪詢發送的,但是消費者之前存在處理快慢問題,如果A處理慢,B處理快,他們接受同樣數量的訊息顯然是不合理的,
引出不公平分發:
就是在這樣情況下,不公平分發出現了,簡而言之就是能者多勞,處理快的多處理,處理慢的少處理,
如何實作不公平分發:
那么如何實作呢?上面介紹了basicQos,如果我們將qos的值設為1,那么你想一想會出現什么情況呢?信道中只允許傳輸一條訊息,那么當這條訊息處理完后,佇列會立馬發送下一條訊息,所以這個時候快的不斷處理,慢的等待當前處理完再處理下一條,這樣就實作了能者多勞,


代碼實作:
生產者:
public class Producer {
public static final String QUEUE_NAME = "test_basic_qos";
public static final String EXCHANGE_NAME = "test_basic_qos";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"basic.qos");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
System.out.println("發送訊息為:" + message);
channel.basicPublish(EXCHANGE_NAME,"basic.qos",null,message.getBytes(StandardCharsets.UTF_8));
}
}
}
C1(快的消費者):
public class Consumer02 {
public static final String QUEUE_NAME = "test_basic_qos";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
try {
SleepUtils.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("高性能服務器接受:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = consumerTag -> {};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
C2(慢的消費者):
public class Consumer01 {
public static final String QUEUE_NAME = "test_basic_qos";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag,message) -> {
try {
SleepUtils.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("低性能服務器接受:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(),true);
};
CancelCallback cancelCallback = consumerTag -> {};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
結果:



預取值
概念:
設定消費者信道最大傳輸資訊數,
測驗:
我們將慢的消費者preCount取值為5,快的消費者預取值為2,然后發送7條訊息(為了保證快的消費者只處理2條,我們要在2s內能發送7條資料,這樣保證后面的訊息全部發送給慢的消費者,避免快的消費者處理完了訊息,又將發送后續訊息,)
代碼:
參考上面代碼,只是修改了qos值,
結果:



分析:
因為快的消費者信道滿了,不能再發送訊息,所以訊息只能發送給慢的服務器,這就是basicQos用法,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423764.html
標籤:其他
上一篇:09、Hadoop框架Zookeeper Java API
下一篇:情人節撩妹裝逼小方法,一學就會
