主頁 > 後端開發 > RocketMQ 4.7.1 環境搭建、集群、MQ整合SpringBoot

RocketMQ 4.7.1 環境搭建、集群、MQ整合SpringBoot

2020-10-26 01:35:29 後端開發

導讀

  之前學過ActiveMQ但是并發量不是很大點我直達,所以又學阿里開源的RocketMQ,據說佇列可以堆積億級別,下面是網上找的訊息佇列對比圖,僅供參考

部署

官網

點我直達

前置條件

  1. 推薦使用64位作業系統,建議使用Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 適用于Broker服務器的記憶體4G +可用磁盤

下載

地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip 

百度云盤:

鏈接: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg  密碼: varj

安裝依賴項

  1. jdk:點我直達
  2. maven:點我直達
  3. git安裝:yum install -y git
export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin

mq上傳至linux

解壓

 

maven編譯

 

啟動NameServer

后臺啟動方式

nohup sh bin/mqnamesrv &

NameServer啟動時記憶體不足(問題解決)

找到runserver.sh 修改JAVA_OPT

vim /bin/runserver.sh配置

啟動Broker

nohup sh bin/mqbroker -n localhost:9876 &

語法:nohup sh bin/mqbroker -n NameServer服務ip地址

 

Broker記憶體不足(問題解決)

找到runbroker.sh 修改JAVA_OPT

vim /bin/runbroker.sh配置

服務都啟動成功

 

模擬消費

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

開2個控制臺,連接通一臺linux

注意

  NameServer默認埠號:9876;broker默認埠號:10911

可視化控制臺

官網地址

點我直達

百度云盤

鏈接: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg  密碼: v6bq

解壓

安裝編譯

進入:/opt/soft/rocketmq-externals-master/rocketmq-console
編譯: mvn clean package -Dmaven.test.skip=true

修改appliccation.properties的rocketmq.config.namesrvAddr

編譯打包

啟動

  進入target目錄,啟動java -jar

守護行程啟動: nohup java -jar rocketmq-console-ng-2.0.0.jar &

SpringBoot整合RocketMQ(生產者)

創建SpringBoot專案

點我直達

專案結構

加入依賴

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 這里的版本,要和部署在服務器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayProducer.java

package com.ybchen.ybchenmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

/**
 * 訊息生產者
 */
@Component
public class PayProducer {
    /**
     * 生產者所屬的組
     */
    private String producerGroup = "pay_group";
    /**
     * MQ的地址,注意需開放埠號或者關閉防火墻
     */
    private String nameServerAddr = "192.168.199.100:9876";
    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多個地址以;隔開
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    /**
     * 獲取生產者
     * @return
     */
    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 開啟,物件在使用之前必須要呼叫一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉,一般在應用背景關系,使用背景關系監聽器,進行關閉
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController.java

package com.ybchen.ybchenmq.controller;

import com.ybchen.ybchenmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName:PayController
 * @Description:支付
 * @Author:chenyb
 * @Date:2020/10/18 2:47 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v1")
public class PayController {
    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";

    /**
     * 支付回呼
     *
     * @param text
     * @return
     */
    @RequestMapping("pay_cb")
    public Object callback(String text) {
        /**
         * String topic:話題
         * String tags:二級分類
         * byte[] body:body訊息位元組陣列
         */
        Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes());
        try {
            SendResult send = payProducer.getProducer().send(message);
            System.out.println("send------>"+send);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ok";
    }
}

測驗

常見錯誤

錯誤一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多網卡,rocketmq會根據當前網卡選擇一個IP使用,當你的機器有多塊網卡時,很可能會有問題,比如,機器上有兩個ip,一個公網ip,一個私網ip,因此需要配置broker.conf指定當前公網的ip,然后重啟broker


修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增這個配置:brokerIP1=xxx.xxx.xxx.xxx

啟動命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

錯誤2

MQClientException: No route info of this topic, TopicTest1

原因:Broker 緊追自動創建Topic,且用戶沒有通過手工方式創建此Topic,或者broker和Nameserver網路不通

解決:
    通過sh bin/mqbroker -m 查看配置
    autoCreateTopicEnable=true 則自動創建Topic

Centos 7 關閉防火墻:systemctl stop firewalld

錯誤3

控制臺查看不了資料,提示連接10909錯誤

原因:Rocket默認開啟了VIP通道,VPI通道埠號為10911-2=10909

解決:阿里云安全組添加一個埠:10909

錯誤4

  無法自動創建topic:客戶端版本要和服務端版本保持一致

服務器上裝的是4.7.1

引入依賴項時
        <!--注意: 這里的版本,要和部署在服務器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>

檢索訊息發送

SpringBoot整合RocketMQ(消費者)

創建SpringBoot專案

 

專案結構

加入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 這里的版本,要和部署在服務器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayConsumer.java

package com.ybchen.ybchenmqconsumer.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @ClassName:PayConsumer
 * @Description:消費者
 * @Author:chenyb
 * @Date:2020/10/18 4:13 下午
 * @Versiion:1.0
 */
@Component
public class PayConsumer {
    /**
     * 生產者所屬的組
     */
    private String producerGroup = "pay_consumer_group";
    /**
     * MQ的地址,注意需開放埠號或者關閉防火墻
     */
    private String nameServerAddr = "192.168.199.100:9876";
    /**
     * 訂閱主題
     */
    private String topic = "ybchen_pay_topic";
    private DefaultMQPushConsumer consumer;

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(producerGroup);
        //指定NameServer地址,多個地址以;隔開
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        consumer.setNamesrvAddr(nameServerAddr);
        //設定消費地點,從最后一個開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //訂閱主題,監聽主題下的那些標簽
        consumer.subscribe(topic, "*");
        //注解一個監聽器
        //lambda方式
//        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
//            try {
//                Message message = msg.get(0);
//                System.out.printf("%s Receive New Messages: %s %n",
//                        Thread.currentThread().getName(), new String(msg.get(0).getBody()));
//                //主題
//                String topic = message.getTopic();
//                //訊息內容
//                String body = null;
//                body = new String(message.getBody(), "utf-8");
//                //二級分類
//                String tags = message.getTags();
//                ////                String keys = message.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        //一般方式
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    Message message = list.get(0);
                    System.out.printf("%s Receive New Messages: %s %n",
                            Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8"));
                    //主題
                    String topic = message.getTopic();
                    //訊息內容
                    String body = null;
                    body = new String(message.getBody(), "utf-8");
                    //二級分類
                    String tags = message.getTags();
                    //
                    String keys = message.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ..........");
    }
}

application.properties

server.port=8081

測驗生產者消費者

MQ集群架構模式分析

單節點

優點

  本地開發測驗,配置簡單,同步刷盤訊息一條都不會丟

缺點

  不可靠,如果宕機,會導致服務不可用

主從(異步、同步雙寫)

優點

  同步雙寫訊息不丟失,異步復制存在少量丟失你,主節點宕機,從節點可以對外提供訊息的消費,但是不支持寫入

缺點

  主備有短暫訊息延遲,毫秒級,目前不支持自動切換,需要腳本或者其他程式進行檢測然后停止broker,重啟讓從節點成為主節點

雙主

優點

  配置簡單,可以靠配置RAID磁盤陣列保證訊息可靠,異步刷盤丟失少量訊息

缺點

  master宕機期間,未被消費的訊息在機器恢復之前不可訊息,實時性會受到影響

雙主雙從,多主多從模式(異步復制)

優點

  磁盤損壞,訊息丟失的非常小,訊息實時性不會受影響,Master宕機后,消費者仍然可以從Slave消費

缺點

  主備有短暫訊息延遲,毫秒級,如果Master宕機,磁盤損壞情況,會丟失你少量訊息

雙主雙從,多主多從模式(同步雙寫)

優點

  同步雙寫方式,主備都寫成功,才向應用回傳成功,服務可用性與資料可用性非常高

缺點

  性能比異步復制模式略低,主宕機后,備機不能自動切換為主機

推薦

  1. 主從(異步、同步雙寫)
  2. 雙主雙從,多主多從模式(異步復制)
  3. 雙主雙從,多主多從模式(同步雙寫)

主從集群搭建

準備作業

  準備2臺機器,ip地址分別為:192.168.199.100;192.168.199.101;

  環境:RocketMQ4.7.1+jdk8+Maven+Centos 7

啟動兩臺nameserver

  啟動兩個機器的nameserver

路徑:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1

啟動:nohup sh bin/mqnamesrc &

編輯并啟動roccketmq

主節點

進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


編輯并修改如下:vim broker-a.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH


啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &

從節點

進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


編輯并修改如下:vim broker-a-s.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH


啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &

注意事項

  1. namesrvAddr:相同
  2. brokerClusterName:相同
  3. brokerName:相同
  4. brokerId:不同,0是主節點
  5. deleteWhen:相同
  6. fileReservedTime:相同
  7. brokerRole:不同,分ASYNC_MASTER、SLAVE
  8. flushDiskType:相同

啟動broker

使用管控臺

  使用192.168.199.100這臺服務器,修改配置

192.168.199.100這臺服務器

進入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources


修改組態檔:vim application.properties

rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876


編譯

切換到:/opt/soft/rocketmq-externals-master/rocketmq-console
打包:
mvn clean
mvn install -Dmaven.test.skip=true


啟動

進入:/opt/soft/rocketmq-externals-master/rocketmq-console/target
守護行程方式啟動:nohup java -jar rocketmq-console-ng-2.0.0.jar &

集群測驗

故障演練

  模擬主掛了,但是從還可以被消費,此時不能寫入,等主重啟后,可以繼續寫入(資料不會被重復消費),以下內容是連續的

總結

  好了,到目前為止,主從已經搭建完成了,

  Broker分Master和Slave一個Master可以對應多個Slave,但一個Slave只能對應一個Master,Master與Slave通過相同的Broker Name來匹配,不同的Broker id來定義時Master還是Slave

    Broker向所有的NameServer節點建立長連接,定時注冊Topic和發送元資料資訊

    NameServer定時掃描(默認2分鐘)所有存活Broker的連接,如果超過時間沒回應,則斷開連接(心跳檢測),但是Consumer客戶端不能感知,Consumer定時(30秒)從NameServer獲取topic的最新資訊,所以broker不可用時,Consumer最多需要30秒才能發現

  只有Master才能進行寫入操作Slave不允許寫入只能同步,同步策略取決于Master配置

  客戶端消費可以從Master和Slave消費,默認消費者都從Master消費,如果在Master掛了之后,客戶端從NameServer中感知Broker宕機,就會從Slave消費,感知非實時,存在一定的滯后性,Slave不能保證Master的100%都同步過來,會有少量的訊息丟失,一旦Master恢復,未同步過去的訊息會被最終消費掉,

  如果Consumer實體的數量比Message Queue的總數量還多的話,多出來的Consumer實體將無法分到Queue,也就無法消費到訊息,也就無法起到分攤負載的作用,所以需要控制讓Queue的總數量大于Consumer的數量,

場景模擬

生產和消費重試及處理

生產者重試

  • 訊息重試(保證資料的高可靠性),本身內部支持重試,默認次數是2
  • 如果網路情況較差,或者跨集群則建議多改幾次

生產者設定重試次數,并設定唯一的key(一般唯一識別符號)

消費者重試

  • 原因:訊息處理例外,broker端到consumer端各種問題,如網路原因閃斷,消費處理失敗,ACK回傳失敗等
  • 注意
    • 重試間隔時間配置,默認每條訊息最多重試16次
    • 超過重試次數人工補償
    • 消費端去重
    • 一條訊息無論重試多少次,這些重試訊息的Message ID,key不會改變
    • 消費重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗訊息不再重試,繼續消費新的訊息

設定廣播方式

模擬訊息重發

異步發送訊息和回呼實戰

應用場景

  比如12306付完錢??后,異步出票,對性能要求高,可以支持更高的并發,回呼成功后觸發相應的業務(onSuccess)

官方例子

點我直達

改造生產者

演示

onSuccess:因為是異步方式,這里可以記錄日志啥的
onException:補償機制,根據實際情況使用,看是否進行重試

OneWay(無需等待)

應用場景

  主要做日志收集,適用于對性能要求高,但可靠性并不高的場景,

延遲訊息實戰

什么是延遲訊息

  • Producer將訊息發送到訊息佇列RocketMQ服務端,但并不期望這條訊息立馬投遞,而是推遲在當前時間點之后的某一個時間投遞到Consumer進行消費,該訊息即定時訊息,目前支持固定精度的訊息
  • 延遲訊息級別,1....18
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

應用場景

  • 通過訊息觸發一些定時任務,比如在某一固定時間點向用戶發送提醒訊息
  • 訊息生產和消費有時間視窗要求:比如在天貓電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延遲訊息,這條訊息將會在30分鐘以后投遞給消費者,消費者收到此訊息后需要判斷對應的訂單是否已完成支付,如支付未完成,則關閉訂單,如已完成支付則忽略,

改生產者

生產者MessageQueueSelector實戰

簡介

  生產訊息使用MessageQueueSelector投遞到Topic下指定的Queue

應用場景

  • 順序訊息
  • 分攤負載

默認topic下的queue數量是4,可以配置

支持同步,異步發送指定的MessageQueue

選擇的queue數量必須小于配置的,否則會出錯

好處

  如果佇列中某個產品,流量暴增,隨機分配的話,會導致整個Topic都不能使用,指定到佇列的話,如果這個佇列壞了,其他佇列不影響使用,

改造生產者

同步發送

發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC276723EAC0000, offsetMsgId=C0A8C76400002A9F000000000009B536, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=1]
發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672BCD50001, offsetMsgId=C0A8C76400002A9F000000000009B602, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=2]
發送結果=SEND_OK,msg=SendResult [sendStatus=SEND_OK, msgId=AC1068013E3F18B4AAC27672CAA20002, offsetMsgId=C0A8C76400002A9F000000000009B6CF, messageQueue=MessageQueue [topic=ybchen_pay_topic, brokerName=broker-a, queueId=0], queueOffset=3]

  可以看到列印出來的,queueId=0

異步發送

生產者端代碼修改

    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";

    /**
     * 支付回呼
     *
     * @param text
     * @return
     */
    @RequestMapping("pay_cb")
    public Object callback(String text) {
        /**
         * String topic:話題
         * String tags:二級分類
         * byte[] body:body訊息位元組陣列
         */
        Message message = new Message(TOPIC, "tag_a", text.getBytes());
        //生產者使用MessageQueueSelector投遞到Topic下指定的Queue,arg只能小于等于4
//        try {
//            SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
//                @Override
//                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//                    int queueNum=Integer.parseInt(o.toString());
//                    return list.get(queueNum);
//                }
//            }, 0);
//            System.out.printf("發送結果=%s,msg=%s",sendResult.getSendStatus(),sendResult);
//        } catch (MQClientException e) {
//            e.printStackTrace();
//        } catch (RemotingException e) {
//            e.printStackTrace();
//        } catch (MQBrokerException e) {
//            e.printStackTrace();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        //異步發送到指定的queue
        try {
            payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    int queueNum = Integer.parseInt(o.toString());
                    return list.get(queueNum);
                }
            }, 3, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("發送結果=%s,msg=%s", sendResult.getSendStatus(), sendResult);
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        //message.setDelayTimeLevel(2);
//        try {
//            SendResult send = payProducer.getProducer().send(message);
//            System.out.println("send------>"+send);
//        } catch (MQClientException e) {
//            e.printStackTrace();
//        } catch (RemotingException e) {
//            e.printStackTrace();
//        } catch (MQBrokerException e) {
//            e.printStackTrace();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        //異步發送
//        try {
//            payProducer.getProducer().send(message, new SendCallback() {
//                @Override
//                public void onSuccess(SendResult sendResult) {
//                    System.out.printf("發送結果=%s,msg=%s",sendResult.getSendStatus(),sendResult);
//                }
//
//                @Override
//                public void onException(Throwable e) {
//                    e.printStackTrace();
//                    //補償機制,根據實際情況使用,看是否進行重試
//                }
//            });
//        } catch (MQClientException e) {
//            e.printStackTrace();
//        } catch (RemotingException e) {
//            e.printStackTrace();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        return "ok";
    }

順序訊息的應用場景

簡介

  順序訊息可以應用到電商和證券系統,訂單系統,

什么是順序系統?

  訊息的生產和消費順序一致

全域順序

  topic下面全部訊息都要有序(很少用)

  1. 性能要求不高,所有的訊息嚴格按照FIFO(先進先出)原則進行訊息發布和消費的場景,并行度成為訊息系統的瓶頸,吞吐量不夠
  2. 在證券處理中,以人民幣兌換美元為例,在價格相同的情況下,先出價者優先處理,則可以通過全域順序的方式進行發布和消費

區域順序

  只要保證一組訊息被順序消費即可(RocketMQ中使用)

  1. 性能要求高
  2. 電商的訂單創建,同一訂單相關的創建訂單訊息、訂單支付訊息、訂單退款訊息、訂單物流訊息、訂單交易成功訊息都會按照先后順序來發布和消費

順序發布

  對于指定的一個Topic,客戶端按照一定的先后順序發送訊息

順序消費

  對于指定的一個Topic,按照一定的先后順序接收訊息,即先發送的訊息一定先會被客戶端接收到

注意事項

  1. 順序訊息不支持異步發送,否則將無法保證順序消費
  2. 順序訊息暫不支持廣播模式

官方例子

點我直達

改造生產者代碼

創建ProductOrder.java

 

package com.ybchen.ybchenmq.entity;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName:ProductOrder
 * @Description:訂單
 * @Author:chenyb
 * @Date:2020/10/25 12:56 下午
 * @Versiion:1.0
 */
public class ProductOrder implements Serializable {
    /**
     * 訂單id
     */
    private long orderIdl;
    /**
     * 訂單操作型別
     */
    private String type;

    public long getOrderIdl() {
        return orderIdl;
    }

    public void setOrderIdl(long orderIdl) {
        this.orderIdl = orderIdl;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public ProductOrder() {

    }

    public ProductOrder(long orderIdl, String type) {
        this.orderIdl = orderIdl;
        this.type = type;
    }

    @Override
    public String toString() {
        return "ProductOrder{" +
                "orderIdl=" + orderIdl +
                ", type='" + type + '\'' +
                '}';
    }

    /**
     * 模擬批量創建物體類
     * @return
     */
    public static List<ProductOrder> getOrderList(){
        List<ProductOrder> list=new ArrayList<>();
        list.add(new ProductOrder(111L,"創建訂單"));
        list.add(new ProductOrder(222L,"創建訂單"));
        list.add(new ProductOrder(333L,"創建訂單"));
        list.add(new ProductOrder(111L,"支付訂單"));
        list.add(new ProductOrder(222L,"支付訂單"));
        list.add(new ProductOrder(111L,"完成訂單"));
        list.add(new ProductOrder(222L,"完成訂單"));
        list.add(new ProductOrder(333L,"支付訂單"));
        list.add(new ProductOrder(333L,"完成訂單"));
        return list;
    }
}

控制層:PayController.java

 

    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";
    private static final String TOPIC_ORDER = "ybchen_pay_order_topic";

    @RequestMapping("pay_order")
    public Object payOrder() throws Exception{
        //獲取訂單號
        List<ProductOrder> list=ProductOrder.getOrderList();
        for (int i = 0; i < list.size(); i++) {
            ProductOrder order=list.get(i);
            Message message=new Message(TOPIC_ORDER,
                    "",
                    order.getOrderIdl()+"",
                    order.toString().getBytes());
            //發送,同一個訂單id進入同一個佇列中
           SendResult sendResult =payProducer.getProducer().send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {
                    Long id=(Long)arg;
                    long index=id%mqs.size();
                    return mqs.get((int) index);
                }
            },order.getOrderIdl());
           //列印輸出結果
            System.out.printf("發送結果=%s,sendResult=%s,orderId=%s,type=%s\n",
                    sendResult.getSendStatus(),
                    sendResult.toString(),
                    order.getOrderIdl(),
                    order.getType());

        }
        return "ok";
    }

改造消費者

 

package com.ybchen.ybchenmqconsumer.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @ClassName:PayOrderConsumer
 * @Description:消費者-訂單
 * @Author:chenyb
 * @Date:2020/10/18 4:13 下午
 * @Versiion:1.0
 */
@Component
public class PayOrderConsumer {
    /**
     * 生產者所屬的組
     */
    private String producerGroup = "pay_order_consumer_group";
    /**
     * MQ的地址,注意需開放埠號或者關閉防火墻
     */
    private String nameServerAddr = "192.168.199.100:9876;192.168.199.101:9876";
    /**
     * 訂閱主題,訂單
     */
    private static final String TOPIC_ORDER = "ybchen_pay_order_topic";
    private DefaultMQPushConsumer consumer;

    public PayOrderConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(producerGroup);
        //指定NameServer地址,多個地址以;隔開
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        consumer.setNamesrvAddr(nameServerAddr);
        //設定消費地點,從最后一個開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //訂閱主題,監聽主題下的那些標簽
        consumer.subscribe(TOPIC_ORDER, "*");
        //默認是集群方式,廣播方式不支持重試
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //注解一個監聽器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
                                                       ConsumeOrderlyContext consumeOrderlyContext) {
                MessageExt msg=list.get(0);
                System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),
                        new String(msg.getBody()));
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer order start ..........");
    }
}

測驗順序訊息

一個生產者一個消費者

  可以看到消費的時候,有點慢,因為我本地安裝了2個虛擬機做一主一從,消費的順序是正確的,都是按照:創建訂單、支付訂單、完成訂單

2020-10-25 13:52:31.822  INFO 1473 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-10-25 13:52:31.822  INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-10-25 13:52:31.825  INFO 1473 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D46F0000, offsetMsgId=C0A8C76400002A9F000000000009C8B2, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=6],orderId=111,type=創建訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4930001, offsetMsgId=C0A8C76400002A9F000000000009C9A5, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=6],orderId=222,type=創建訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4A90002, offsetMsgId=C0A8C76400002A9F000000000009CA98, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=6],orderId=333,type=創建訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4C00003, offsetMsgId=C0A8C76400002A9F000000000009CB8B, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=7],orderId=111,type=支付訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4CC0004, offsetMsgId=C0A8C76400002A9F000000000009CC7E, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=7],orderId=222,type=支付訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D00005, offsetMsgId=C0A8C76400002A9F000000000009CD71, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=3], queueOffset=8],orderId=111,type=完成訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4D30006, offsetMsgId=C0A8C76400002A9F000000000009CE64, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=2], queueOffset=8],orderId=222,type=完成訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4DE0007, offsetMsgId=C0A8C76400002A9F000000000009CF57, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=7],orderId=333,type=支付訂單
發送結果=SEND_OK,sendResult=SendResult [sendStatus=SEND_OK, msgId=AC10680105C118B4AAC27E92D4F80008, offsetMsgId=C0A8C76400002A9F000000000009D04A, messageQueue=MessageQueue [topic=ybchen_pay_order_topic, brokerName=broker-a, queueId=1], queueOffset=8],orderId=333,type=完成訂單
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='創建訂單'} 
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='支付訂單'} 
ConsumeMessageThread_1 Receive New Messages: ProductOrder{orderIdl=333, type='完成訂單'} 
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='創建訂單'} 
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='支付訂單'} 
ConsumeMessageThread_2 Receive New Messages: ProductOrder{orderIdl=222, type='完成訂單'} 
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='創建訂單'} 
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='支付訂單'} 
ConsumeMessageThread_3 Receive New Messages: ProductOrder{orderIdl=111, type='完成訂單'} 

一個生產者3個消費者

  消費者會平均分配queue的數量,消費者數量小于等于4!!!

  本地在線模擬,一個生產者、3個消費者場景,看看消費的順序,內容較長,被分割3塊

消費者核心配置

setConsumeFromWhere

  1. CONSUME_FORM_FIRST_OFFSET:初次從訊息佇列頭部開始消費,即歷史訊息(還存盤在broker的)全部消費一遍,后續在啟動接著上次消費的進度開始消費
  2. CONSUME_FROM_LAST_OFFSET:默認策略,初次從該佇列尾開始消費,即跳過歷史小心,后續在啟動接著上次消費的進度開始消費
  3. CCONSUME_FROM_TIMESTAMP:從某個時間點開始消費,默認是半小時以前,后續在啟動接著上次消費的進度開始消費

setAllocateMessageQueueStrategy

  • 負載均衡策略演算法,即消費者分配到queue的演算法,默認值AllocateMessageQueueAveragely即取模平均分配

setOffsetStore

  • 訊息消費進度存盤器,2個策略
    • LocalFileOffsetStore(廣播模式默認使用)
    • RemoteBrokerOffsetStore(集群模式默認使用)

setConsumeThreadMin

  • 最小消費執行緒池數量

setConsumeThreadMax

  • 最大消費執行緒池數量

setPullBatchSize

  • 消費者去broker拉取訊息時,一次次拉取多少條

setConsumeMessageBatchMaxSize

  • 單次消費時一次性消費多少條訊息

setMessageModel

  • 消費者消費模式
    • CLUSTERING:默認是集群模式
    • BROADCASTING:廣播模式

Topic下佇列的奇偶數會影響Customer個數里面的消費數量

  • 如果是4個佇列(默認佇列為4),8個訊息,4個節點則各會消費2條,如果不對等,則負載均衡會分配不均勻
  • 如果consumer實體數量比message queue的總數量還多的話,多出來的consumer實體將無法分到queue,也就無法消費達到訊息,也就無法起到分攤負載的作用,所以需要控制讓queue的總數量大于consumer的數量

集群模式(默認)

  • Consumer實體平均分攤消費生產者發送的訊息
  • 例如:訂單訊息,只能被消費一次

廣播模式

  • 廣播模式下消費訊息,投遞到Broker的訊息會被每個Consumer進行消費,一條訊息被多個Consumer消費,廣播消費中ConsumerGroup暫時無用
  • 例如:QQ群,群主發一條訊息,所有人都可以看到

訊息存盤

ConsumeQueue

  邏輯佇列,默認存盤位置:/root/store/consumequeue

CommitLog

  真正存盤訊息檔案的,默認存盤位置:/root/store/commitlog

常見面試題

為什么訊息佇列?

優點

  1. 異步:例如秒殺,可以使用,點我直達
  2. 解耦
  3. 削峰:秒殺情況下,一個個入隊,一個個出隊,有序進行

缺點

  1. 系統可用性越低:外部依賴越多,依賴越多,出問題風險越大
  2. 系統復雜性提高:需要考慮多種場景,比如訊息重復消費、訊息丟失
  3. 需要更多的機器和人力:訊息佇列一般集群部署,需要運維和監控

如何避免重復消費?

  RocketMQ不保證訊息不重復,如果業務保證嚴格的不能重復消費,需要自己去業務端去重

資料庫表去重

  指定某個欄位唯一值

setNX

  利用Redis的特性分布式鎖,下面是我之前的代碼,待改造

package com.cyb.redis.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

public class jedisUtils {
    private static String ip = "192.168.31.200";
    private static int port = 6379;
    private static JedisPool pool;
    static {
        pool = new JedisPool(ip, port);
    }
    public static Jedis getJedis() {
        return pool.getResource();
    }
    public static boolean getLock(String lockKey, String requestId, int timeout) {
        //獲取jedis物件,負責和遠程redis服務器進行連接
        Jedis je=getJedis();
        //引數3:NX和XX
        //引數4:EX和PX
        String result = je.set(lockKey, requestId, "NX", "EX", timeout);
        if (result=="ok") {
            return true;
        }
        return false;
    }

    public static synchronized boolean getLock2(String lockKey, String requestId, int timeout) {
        //獲取jedis物件,負責和遠程redis服務器進行連接
        Jedis je=getJedis();
        //引數3:NX和XX
        //引數4:EX和PX
        Long result = je.setnx(lockKey, requestId);
        if (result==1) {
            je.expire(lockKey, timeout); //設定有效期
            return true;
        }
        return false;
    }
}

Redis原子遞增

  利用Redis的incr特性,如果大于0說明消費過了(需要設定過期時間)

如何保證訊息的可靠性傳輸?

producer端

  1. 不采用oneway發送,使用同步或者一部方式發送,做好重試,但是重試的Message key必須唯一
  2. 投遞的日志需要保存,關鍵欄位、投遞時間、投遞狀態、重試次數、請求體、回應體等

broker端

  1. 雙主雙從架構,NameServer需要多節點
  2. 同步雙寫,異步刷盤

consumer端

  1. 訊息消費保存日志檔案中

大量堆積到broker里面,如何處理?

  1. 臨時topic佇列擴容,提高消費者能力
  2. 撰寫臨時處理分發程式,從舊topic快速讀取到臨時新topic中,新topic的queue數量擴容多倍,然后再啟動更多consumer進行臨時新的topic消費

RocketMQ高性能的原因?

MQ架構配置

  1. 順序寫
  2. 隨機讀
  3. 零拷貝

發送端高可用

  1. 雙主雙從架構:創建Topic的時候,MessageQueue創建在多個Broker上,即相同的Broker名稱,不同brokerid;當一個Master不可用時,組內其他的Master仍然可用

消費高可用

  1. 主從架構:Broker角色,Master提供讀寫,Slave只支持讀
  2. Consumer不用配置,當Master不可用或者繁忙的時候,Consumer會自動切換到Slave節點進行讀取

提升訊息的消費能力

  1. 增加多個消費者
  2. 修改消費者的執行緒池最小/大數量

專案原始碼

案例原始碼

鏈接: https://pan.baidu.com/s/1Q8iL0lH-bdFEycYGq61hQg  密碼: rww2

Linux下RocketMQ安裝包

鏈接: https://pan.baidu.com/s/1dkE7sAs9E4TjwDQ38Pv4_A  密碼: mkjm

尾聲

  過幾天搭建RocketMQ雙主雙從集群,今天先到這兒~

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/191151.html

標籤:其他

上一篇:Java高并發1-創建執行緒的三種方式、虛假喚醒、native關鍵字

下一篇:Docker常用命令[2]

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more