主頁 > 資料庫 > 漲姿勢了解一下Kafka消費位移可好?

漲姿勢了解一下Kafka消費位移可好?

2020-09-10 18:58:46 資料庫

摘要:Kafka中的位移是個極其重要的概念,因為資料一致性、準確性是一個很重要的語意,我們都不希望訊息重復消費或者丟失,而位移就是控制消費進度的大佬,本文就詳細聊聊kafka消費位移的那些事,包括:

概念剖析

kafka的兩種位移

關于位移(Offset),其實在kafka的世界里有兩種位移:

  • 磁區位移:生產者向磁區寫入訊息,每條訊息在磁區中的位置資訊由一個叫offset的資料來表征,假設一個生產者向一個空磁區寫入了 10 條訊息,那么這 10 條訊息的位移依次是 0、1、…、9;

  • 消費位移:消費者需要記錄消費進度,即消費到了哪個磁區的哪個位置上,這是消費者位移(Consumer Offset),

注意,這和上面所說的訊息在磁區上的位移完全不是一個概念,上面的“位移”表征的是磁區內的訊息位置,它是不變的,即一旦訊息被成功寫入到一個磁區上,它的位移值就是固定的了,而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器,

消費位移

消費位移,記錄的是 Consumer 要消費的下一條訊息的位移,切記,是下一條訊息的位移! 而不是目前最新消費訊息的位移

假設一個磁區中有 10 條訊息,位移分別是 0 到 9,某個 Consumer 應用已消費了 5 條訊息,這就說明該 Consumer 消費了位移為 0 到 4 的 5 條訊息,此時 Consumer 的位移是 5,指向了下一條訊息的位移,

至于為什么要有消費位移,很好理解,當 Consumer 發生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應的位移處繼續消費,從而避免整個消費程序重來一遍,就好像書簽一樣,需要書簽你才可以快速找到你上次讀書的位置,

那么了解了位移是什么以及它的重要性,我們自然而然會有一個疑問,kafka是怎么記錄、怎么保存、怎么管理位移的呢?

位移的提交

Consumer 需要上報自己的位移資料,這個匯報程序被稱為位移提交,因為 Consumer 能夠同時消費多個磁區的資料,所以位移的提交實際上是在磁區粒度上進行的,即Consumer 需要為分配給它的每個磁區提交各自的位移資料,

鑒于位移提交甚至是位移管理對 Consumer 端的巨大影響,KafkaConsumer API提供了多種提交位移的方法,每一種都有各自的用途,這些都是本文將要談到的方案,

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的總結一下,位移提交分為自動提交和手動提交;而手動提交又分為同步提交和異步提交,

自動提交

當消費配置enable.auto.commit=true的時候代表自動提交位移,

自動提交位移是發生在什么時候呢?auto.commit.interval.ms默認值是50000ms,即kafka每隔5s會幫你自動提交一次位移,自動位移提交的動作是在 poll()方法的邏輯里完成的,在每次真正向服務端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那么就會提交上一次輪詢的位移,假如消費資料量特別大,可以設定的短一點,

越簡單的東西功能越不足,自動提交位移省事的同時肯定會帶來一些問題,自動提交帶來重復消費和訊息丟失的問題:

  • 重復消費: 在默認情況下,Consumer 每 5 秒自動提交一次位移,現在,我們假設提交位移之后的 3 秒發生了 Rebalance 操作,在 Rebalance 之后,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移資料了,故在 Rebalance 發生前 3 秒消費的所有資料都要重新再消費一次,雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復消費的時間視窗,不可能完全消除它,這是自動提交機制的一個缺陷,

  • 訊息丟失: 假設拉取了100條訊息,正在處理第50條訊息的時候,到達了自動提交視窗期,自動提交執行緒將拉取到的每個磁區的最大訊息位移進行提交,如果此時消費服務掛掉,訊息并未處理結束,但卻提交了最大位移,下次重啟就從100條那消費,即發生了50-100條的訊息丟失,

手動提交

當消費配置enable.auto.commit=false的時候代表手動提交位移,用戶必須在適當的時機(一般是處理完業務邏輯后),手動的呼叫相關api方法提交位移,比如在下面的案例中,我需要確認我的業務邏輯回傳true之后再手動提交位移

 while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 處理業務
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     log.info(" handle success, kafkaMessage={}" ,kafkaMessage);
                 } else {
                     log.info(" handle failed, kafkaMessage={}" ,kafkaMessage);
                 }
             }
             // 手動提交offset
             consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        
         } 
     } catch (Exception e) {
         log.info("kafka consume error." ,e);
     }
 }

手動提交明顯能解決訊息丟失的問題,因為你是處理完業務邏輯后再提交的,假如此時消費服務掛掉,訊息并未處理結束,那么重啟的時候還會重新消費,

但是對于業務層面的失敗導致訊息未消費成功,是無法處理的,因為業務層的邏輯千變萬化、比如格式不正確,你叫Kafka消費端程式怎么去處理?應該要業務層面自己處理,記錄失敗日志做好監控等,

但是手動提交不能解決訊息重復的問題,也很好理解,假如消費0-100條訊息,50條時掛了,重啟后由于沒有提交這一批訊息的offset,是會從0開始重新消費,至于如何避免重復消費的問題,在這篇文章有說,

手動提交又分為異步提交和同步提交,

同步提交

上面案例代碼使用的是commitSync() ,顧名思義,是同步提交位移的方法,同步提交位移Consumer 程式會處于阻塞狀態,等待 Broker 回傳提交結果,同步模式下提交失敗的時候一直嘗試提交,直到遇到無法重試的情況下才會結束,在任何系統中,因為程式而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程式的 TPS,當然,你可以選擇拉長提交間隔,但這樣做的后果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來后,會有更多的訊息被重新消費,因此,為了解決這些不足,kafka還提供了異步提交方法,

異步提交

異步提交會立即回傳,不會阻塞,因此不會影響 Consumer 應用的 TPS,由于它是異步的,Kafka 提供了回呼函式,供你實作提交之后的邏輯,比如記錄日志或處理例外等,下面這段代碼展示了呼叫 commitAsync() 的方法

 consumer.commitAsync((offsets, exception) -> {
 if (exception != null)
     handleException(exception);
 });

但是異步提交會有一個問題,那就是它沒有重試機制,不過一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的,所以訊息也是不會丟失和重復消費的,
但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功,因此,組合使用commitAsync()commitSync()是最佳的方式,

try {
    while (true) {
        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
        if (!consumerRecords.isEmpty()) {
             for (ConsumerRecord<String, String> record : consumerRecords) {
                KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                boolean handleResult = handle(kafkaMessage);             
             }
             //異步提交位移               
             consumer.commitAsync((offsets, exception) -> {
             if (exception != null)
                 handleException(exception);
             });
           
        }
    }
} catch (Exception e) {
    System.out.println("kafka consumer error:" + e.toString());
} finally {
    try {
        //最后同步提交位移
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

讓位移提交更加靈活和可控

如果細心的閱讀了上面所有demo的代碼,那么你會發現這樣幾個問題:

1、所有的提交,都是提交 poll 方法回傳的所有訊息的位移,poll 方法一次回傳1000 條訊息,則一次性地將這 1000 條訊息的位移一并提交,可這樣一旦中間出現問題,位移沒有提交,下次會重新消費已經處理成功的資料,所以我想做到細粒度控制,比如每次提交100條,該怎么辦?

答:可以通過commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)對位移進行精確控制,

2、poll和commit方法對于普通的開發人員而言是一個黑盒,無法精確地掌控其消費的具體位置,我都不知道這次的提交,是針對哪個partition,提交上去的offset是多少,

答:可以通過record.topic()獲取topic資訊, record.partition()獲取磁區資訊,record.offset() + 1獲取消費位移,記住消費位移是指示下一條消費的位移,所以要加一,

3、我想自己管理offset怎么辦?一方面更加保險,一方面下次重啟之后可以精準的從資料庫讀取最后的offset就不存在丟失和重復消費了,
答:可以將消費位移保存在資料庫中,消費端程式使用comsumer.seek方法指定從某個位移開始消費,

綜合以上幾個可優化點,并結合全文,可以給出一個比較完美且完整的demo:聯合異步提交和同步提交,對處理程序中所有的例外都進行了處理,細粒度的控制了消費位移的提交,并且保守的將消費位移記錄到了資料庫中,重新啟動消費端程式的時候會從資料庫讀取位移,這也是我們消費端程式位移提交的最佳實踐方案,你只要繼承這個抽象類,實作你具體的業務邏輯就可以了,

public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset記錄到資料庫中 從指定的offset處消費 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {

                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一條訊息的位移,所以OffsetAndMetadata 物件時,必須使用當前訊息位移加 1,
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));

                            // 細粒度控制提交 每10條提交一次offset
                            if (count % 10 == 0) {
                                // 異步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 將消費位移再記錄一份到資料庫中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });


                                });
                            }
                            count++;
                        } else {         
                            System.out.println("消費訊息失敗 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());                         
                        }
                    }


                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最后一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }


        }
    }


    /**
     * 具體的業務邏輯
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);

    public abstract List<String> getTopics();

    public abstract String getGroupId();

    void handleException(Exception e) {
        //例外處理
    }
}

控制位移提交的N種方式

剛剛我們說自己控制位移,使用seek方法可以指定offset消費,那到底怎么控制位移?怎么重設消費組位移?seek是什么?現在就來仔細說說,

并不是所有的訊息佇列都可以重設消費者組位移達到重新消費的目的,比如傳統的RabbitMq,它們處理訊息是一次性的,即一旦訊息被成功消費,就會被洗掉,而Kafka消費訊息是可以重演的,因為它是基于日志結構(log-based)的訊息引擎,消費者在消費訊息時,僅僅是從磁盤檔案上讀取資料而已,所以消費者不會洗掉訊息資料,同時,由于位移資料是由消費者控制的,因此它能夠很容易地修改位移的值,實作重復消費歷史資料的功能,

了解如何重設位移是很重要的,假設這么一個場景,我已經消費了1000條訊息后,我發現處理邏輯錯了,所以我需要重新消費一下,可是位移已經提交了,我到底該怎么重新消費這1000條呢??假設我想從某個時間點開始消費,我又該如何處理呢?

首先說個誤區:auto.offset.reset=earliest/latest這個引數大家都很熟悉,但是初學者很容易誤會它,大部分朋友都覺得在任何情況下把這兩個值設定為earliest或者latest ,消費者就可以從最早或者最新的offset開始消費,但實際上并不是那么回事,他們生效都有一個前提條件,那就是對于同一個groupid的消費者,如果這個topic某個磁區有已經提交的offset,那么無論是把auto.offset.reset=earliest還是latest,都將失效,消費者會從已經提交的offset開始消費,因此這個引數并不能解決用戶想重設消費位移的需求,

kafka有七種控制消費組消費offset的策略,主要分為位移維度和時間維度,包括:

  • 位移維度,這是指根據位移值來重設,也就是說,直接把消費者的位移值重設成我們給定的位移值,包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略

  • 時間維度,我們可以給定一個時間,讓消費者把位移調整成大于該時間的最小位移;也可以給出一段時間間隔,比如 30 分鐘前,然后讓消費者直接將位移調回 30 分鐘之前的位移值,包括DateTime和Duration策略

說完了重設策略,我們就來看一下具體應該如何實作,可以從兩個角度,API方式和命令列方式,

重設位移的方法之API方式

API方式只要記住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd,

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);    

從方法簽名我們可以看出seekToBeginningseekToEnd是可以一次性重設n個磁區的位移,而seek 只允許重設指定磁區的位移,即為每個磁區都單獨設定位移,因為不難得出,如果要自定義每個磁區的位移值則用seek,如果希望kafka幫你批量重設所有磁區位移,比如從最新資料消費或者從最早資料消費,那么用seekToEnd和seekToBeginning,

Earliest 策略:從最早的資料開始消費

從主題當前最早位移處開始消費,這個最早位移不一定就是 0 ,因為很久遠的訊息會被 Kafka 自動洗掉,主要取決于你的洗掉配置,

代碼如下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是構造consumer物件,這樣我們可以通過partitionsFor獲取到磁區的資訊,然后我們就可以構造出TopicPartition集合,傳給seekToBegining方法,需要注意的一個地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))

在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元資料資訊,之后它才會發起fetch請求去獲取資料,而poll(Duration)會把元資料獲取也計入整個超時時間,由于本例中使用的是0,即瞬時超時,因此consumer根本無法在這么短的時間內連接上coordinator,所以只能趕在超時前回傳一個空集合,

Latest策略:從最新的資料開始消費

    consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:從當前已經提交的offset處消費

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);
        });

**Special-offset策略:從指定的offset處消費 **

該策略使用的方法和current策略一樣,區別在于,current策略是直接從kafka元資訊中讀取中已經提交的offset值,而special策略需要用戶自己為每一個磁區指定offset值,我們一般是把offset記錄到資料庫中然后可以從資料庫去讀取這個值

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                });

以上演示了用API方式重設位移,演示了四種常見策略的代碼,另外三種沒有演示,一方面是大同小異,另一方面在實際生產中,用API的方式不太可能去做時間維度的重設,而基本都是用命令列方式,

重設位移的方法之命令列方式

命令列方式重設位移是通過 kafka-consumer-groups 腳本,比起 API 的方式,用命令列重設位移要簡單得多,

Earliest 策略指定–to-earliest,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Latest 策略指定–to-latest,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Current 策略指定–to-current,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略指定–to-offset,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

Shift-By-N 策略指定–shift-by N,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

DateTime 策略指定–to-datetime,

DateTime 允許你指定一個時間,然后將位移重置到該時間之后的最早位移處,常見的使用場景是,你想重新消費昨天的資料,那么你可以使用該策略重設位移到昨天 0 點,

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略指定–by-duration,
Duration 策略則是指給定相對的時間間隔,然后將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS,如果你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生,它就是一個符合 ISO-8601 規范的 Duration 格式,以字母 P 開頭,后面由 4 部分組成,即 D、H、M 和 S,分別表示天、小時、分鐘和秒,舉個例子,如果你想將位移調回到 15 分鐘前,那么你就可以指定 PT0H15M0S

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

提交的位移都去哪了?

通過上面那幾部分的內容,我們已經搞懂了位移提交的方方面面,那么提交的位移它保存在哪里呢?這就要去位移主題的的世界里一探究竟了,kafka把位移保存在一個叫做__consumer_offsets的內部主題中,叫做位移主題,

注意:老版本的kafka其實是把位移保存在zookeeper中的,但是zookeeper并不適合這種高頻寫的場景,所以新版本已經是改進了這個方案,直接保存到kafka,畢竟kafka本身就適合高頻寫的場景,并且kafka也可以保證高可用性和高持久性,

既然它也是主題,那么離不開磁區和副本這兩個機制,我們并沒有手動創建這個主題并且指定,所以是kafka自動創建的, 磁區的數量取決于Broker 端引數 offsets.topic.num.partitions,默認是50個磁區,而副本引數取決于offsets.topic.replication.factor,默認是3,

既然也是主題,肯定會有訊息,那么訊息格式是什么呢?參考前面我們手動設計將位移寫入資料庫的方案,我們保存了topic,group_id,partition,offset四個欄位,topic,group_id,partition無疑是資料表中的聯合主鍵,而offset是不斷更新的,無疑kafka的位移主題訊息也是類似這種設計,key也是那三個欄位,而訊息體其實很復雜,你可以先簡單理解為就是offset,

既然也是主題,肯定也會有洗掉策略,否則訊息會無限膨脹,但是位移主題的洗掉策略和其他主題洗掉策略又不太一樣,我們知道普通主題的洗掉是可以通過配置洗掉時間或者大小的,而位移主題的洗掉,叫做 Compaction,Kafka 使用Compact 策略來洗掉位移主題中的過期訊息,對于同一個 Key 的兩條訊息 M1 和 M2,如果 M1 的發送時間早于 M2,那么 M1 就是過期訊息,Compact 的程序就是掃描日志的所有訊息,剔除那些過期的訊息,然后把剩下的訊息整理在一起,

Kafka 提供了專門的后臺執行緒定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可洗掉資料,這個后臺執行緒叫 Log Cleaner,很多實際生產環境中都出現過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 執行緒的狀態,通常都是這個執行緒掛掉了導致的,

總結

kafka的位移是個極其重要的概念,控制著消費進度,也即控制著消費的準確性,完整性,為了保證訊息不重復和不丟失,我們最好做到以下幾點:

  • 手動提交位移,

  • 手動提交有異步提交和同步提交兩種方式,既然兩者有利也有弊,那么我們可以結合起來使用,

  • 細粒度的控制消費位移的提交,這樣可以避免重復消費的問題,

  • 保守的將消費位移再記錄到了資料庫中,重新啟動消費端程式的時候從資料庫讀取位移,

獲取Kafka全套原創學習資料及思維導圖,關注【胖滾豬學編程】公眾號,回復"kafka",

本文來源于公眾號:【胖滾豬學編程】,一枚集顏值與才華于一身,不算聰明卻足夠努力的女程式媛,用漫畫形式讓編程so easy and interesting!求關注!

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/3136.html

標籤:大數據

上一篇:大資料技術堆疊,主要有哪些

下一篇:elasticsearch集群配置 (Tobe Continue)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more