主頁 > 前端設計 > 大資料之Kafka(二)

大資料之Kafka(二)

2020-09-29 13:57:51 前端設計

大資料之Kafka(二)

  • 3. Kafka架構深入
    • 3.1 Kafka 作業流程及檔案存盤機制
    • 3.2 Kafak生產者
      • 3.2.1 磁區策略
      • 3.2.2 資料可靠性保證
      • 3.2.3 Exactly Once語意
    • 3.3 Kafka消費者
      • 3.3.1 消費方式
      • 3.3.2 磁區分配策略
      • 3.3.3 offset的維護
  • 不排除內部的topic
      • 3.3.4 消費者組案例
    • 3.4 Kafka 高效讀寫資料
    • 3.5 Zookeeper在Kafka中的作用
    • 3.6 Kafka事務
      • 3.6.1 Producer 事務
      • 3.6.2 Consumer事務(精準一次性消費)
  • 4 . Kafka API
    • 4.1 Producer API
      • 4.1.1 消費發送流程
      • 4.1.2 異步發送API
      • 4.1.3 磁區器
    • 4.2 Consumer API
      • 4.2.1 自動提交offset
      • 4.2.2 手動提交offset
    • 4.3 自定義Interceptor
      • 4.3.1 攔截器原理
      • 4.3.2 攔截器案例
  • 5. Kafka監控
  • 6. Flume對接Kafka
    • 6.1 配置flume
    • 6.2 資料分離

上一節主要介紹了Kafka的概述和基礎的命令列操作,今天來給大家深入了解一下!!

3. Kafka架構深入

3.1 Kafka 作業流程及檔案存盤機制

老規矩,我們先來看一下圖中的Kafka的作業流程,簡單的來了解一下…

在這里插入圖片描述

ok,我來給大家闡述一下上圖:
總的來說,大家記住一點,kafka中每個broker可以有多個partition,消費者組中的每個消費者可以消費多個partiton,而一個partition只能由一個消費者消費,
Kafka中訊息是以topic進行分類的,生產者生產訊息,消費者消費訊息,都是面向topic的,
topic是邏輯上的概念,而partiton是物理上的概念,每個partiton對應于一個log檔案,該log檔案中存盤的就是produce生產的資料,Produce生產的資料會被不斷追加到該log檔案末端,且每條資料都有自己的offset,消費者組中的每個消費者,都會實時記錄自己消費到了那個offset,以便出錯恢復時,從上次的位置繼續消費,
我們再來看一下Kafka的檔案存盤機制

在這里插入圖片描述

由于生產者生產的訊息會不斷追加到log檔案末尾,為防止log檔案過大導致資料定位效率低下,Kafka采取分片索引機制,將每個partition分為多個segment,每個segment對應兩個檔案——“.index”檔案和“.log”檔案,這些檔案位于一個檔案夾下,該檔案夾的命名規則為:topic名稱+磁區序號,例如,first這個topic有三個磁區,則其對應的檔案夾為first-0,first-1,first-2,
在這里插入圖片描述
index和log檔案以當前segment的第一條訊息的offset命名,下圖為index檔案和log檔案的結構示意圖,
在這里插入圖片描述
".index"檔案存盤大量的索引資訊,“.log”檔案存盤大量的資料,索引檔案中的元資料指向對應資料檔案中,message的物理偏移地址,

3.2 Kafak生產者

3.2.1 磁區策略

1)磁區的原因
(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的資料了;
(2)可以提高并發,因為可以以Partition為單位讀寫了,
2)磁區的原則
我們需要將producer發送的資料封裝成一個ProducerRecord物件,
在這里插入圖片描述
磁區的原則分為三種情況:
(1) 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
(2) 沒有指明 partition 值但有** key** 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取余得到 partition 值;
(3) 既沒有 partition 值又沒有 key 值的情況下, kafka采用Sticky Partition(黏性磁區器),會隨機選擇一個磁區,并盡可能一直使用該磁區,待該磁區的batch已滿或者已完成,kafka再隨機一個磁區進行使用.

3.2.2 資料可靠性保證

1)生產者發送資料到topic partition的可靠性保證
為保證produce發送的資料,能可靠的發送到指定的topic,topic的每個partiton收到producer發送的資料后,都需要向producer發送ack(acknowledgement 確認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送資料,
我們來看一下下圖,并思考其問題!!
在這里插入圖片描述
2)Topic partition存盤資料的可靠性保證
(1)副本資料同步策略

方案優點缺點
半數以上完成同步,就發送ack延遲低選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本
全部 完成同步,才發送ack選舉新的leader時 ,容忍n臺節點的故障,需要n+1個副本延遲高

Kafka選擇的是第二種方案,原因如下:

  1. 同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個磁區都有大量的資料,第一種方案會造成大量資料的冗余,
  2. 雖然第二種方案的網路延遲會比較高,但網路延遲對Kafka的影響較小,
    (2)ISR
    采用第二種方案之后,設想以下情景:leader收到資料,所有follower都開始同步資料,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack,這個問題怎么解決呢?
    Leadre維護了一個動態的in-sync replica set(ISR),意為和leader保持同步的follower集合,當ISR中的follower完成資料的同步之后,leader就會給producer發送給ack,如果follower長時間未向leader同步資料,則該follower將別踢出ISR,該時間闕值由replica.lang.time.ms引數設定,Leader發生故障后,就會從ISR中選舉新的leader,
    (3)ack應答級別
    對于某些不太重要的資料,對資料的可靠性要求不是很高,能夠容忍資料的少量丟失,所以沒必要等ISR中的follower全部接收成功,
    所以Kafka為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡,選擇以下的配置,
    acks引數配置:
    acks:
    0:這一操作提供了一個最低的延遲,partiton的leader接收到訊息后還沒有寫入磁盤就已經回傳ack,當
    leader故障時有可能丟失資料**;
    1:partition的leader落盤成功后回傳ack,如果在follower同步之前leader故障,那么將會丟失資料
    在這里插入圖片描述
    -1(all):partition的leader和follower全部落盤成功后才回傳ack,但是如果在follower同步完成后,broker發送ack之前,leader發生故障,那么將會造成資料重復
    在這里插入圖片描述
    3)leader和follower故障處理細節
    在這里插入圖片描述
    LEO:指的是每個副本最大的offset;
    HW:指的是消費者能見到的最大的offset,ISR佇列中最小的LEO,
    (1)follower故障
    follower發生故障后會被臨時踢出ISR,待該follower恢復后,followerhi讀取本地磁盤記錄的上次的HW,并將log檔案高于HW的部分截取掉,從HW開始向leader進行同步,等該follower的LEO大于等于該partition的HW,即follower追上leader之后,就可以重新加入ISR了,
    (2)leader故障
    leader發生故障之后,會從ISR中選出一個新的leader,之后,為保證多個副本之間的資料一致性,其余的follower會先將各自的log檔案高于HW的部分截取掉,然后從新的leader同步資料,
    注意:這只能保證副本之間的資料一致性 ,并不能保證資料不丟失或者不重復,

3.2.3 Exactly Once語意

將服務器的ACK級別設定為-1,可以保證Producer到Server之間不會丟失資料,即At Least Once語意,相對的,將服務器ACK級別設定為0,可以保證生產者每條訊息只會被發送一次,即At Most Once語意,
At Least Once可以保證資料不丟失,但是不能保證資料不重復;相對的,At Most Once可以保證資料不重復,但是不能保證資料不丟失,但是,對于一些非常重要的資訊,比如說交易資料,下游資料消費者要求資料既不重復也不丟失,即Exactly Once語意,在0.11版本以前的Kafka,對此是無能為力的,只能保證資料不丟失,再在下游消費者對資料做全域去重,對于多個下游應用的情況,每個都需要單獨做全域去重,這就對性能造成了很大影響,
0.11版本的Kafka,引入了一項重大特性:冪等性,**所謂的冪等性就是指Producer不論向Server發送多少次重復資料,Server端都只會持久化一條,**冪等性結合At Least Once語意,就構成了Kafka的Exactly Once語意,即:At Least Once + 冪等性 = Exactly Once
要啟用冪等性,只需要將Producer的引數中enable.idempotence設定為true即可,Kafka的冪等性實作其實就是將原來下游需要做的去重放在了資料上游,開啟冪等性的Producer在初始化的時候會被分配一個PID,發往同一Partition的訊息會附帶Sequence Number,而Broker端會對<PID, Partition, SeqNumber>做快取,當具有相同主鍵的訊息提交時,Broker只會持久化一條,
但是PID重啟就會變化,同時不同的Partition也具有不同主鍵,所以冪等性無法保證跨磁區跨會話的Exactly Once,

3.3 Kafka消費者

3.3.1 消費方式

consumer采用pull(拉)模式從broker中讀取資料,
**push(推)模式很難適應消費速率不同的消費者,因為訊息發送速率是由broker決定的,**它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞,而pull模式則可以根據consumer的消費能力以適當的速率消費資訊,
pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入回圈中,一直回傳空資料,針對這一點,Kafka的消費者在消費資料時會傳入一個時長引數timeout,如果當前沒有資料可供消費,consumer會等待一段時間之后再回傳,這段時長即為timeout,

3.3.2 磁區分配策略

一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由那個consumer來消費,
Kafka有三種分配策略,RoundRobin,Range,Sticky,
1)RoundRobin(輪循)
在這里插入圖片描述
其結果如下0、1、2按照順序直接分配到三個消費者,3、4、5同上,最后剩下一個6被消費者0消費,如下圖所示:
在這里插入圖片描述
2)Range
在這里插入圖片描述
0、1、2三個磁區直接被消費者0消費,3、4磁區被消費者1直接消費,5、6磁區被消費者2消費,如下圖所示:
在這里插入圖片描述

3.3.3 offset的維護

由于consumer在消費程序中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置繼續消費,所以consumer需要實時記錄自己消費到了那個offset,以便故障恢復后繼續消費,
Kafka 0.9 版本之前,consumer默認將offset保存在Zookeeper中,從0.9 版本之后,consumer默認將offset保存Kafka一個內置的topic中,該topic為_consumer_offsets,
1)消費offset案例
(0)思想: __consumer_offsets 為kafka中的topic, 那就可以通過消費者進行消費.
(1)修改組態檔consumer.properties

不排除內部的topic

exclude.internal.topics=false
(2)創建一個topic

bin/kafka-topics.sh --create --topic atguigu --zookeeper hadoop102:2181 --partitions 2
--replication-factor 2

(3)啟動生產者和消費者,分別往atguigu生產資料和消費資料

bin/kafka-console-producer.sh --topic atguigu --broker-list  hadoop102:9092
bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic atguigu --bootstrap-server hadoop102:9092

(4)消費offset

bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server  hadoop102:9092  --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

(5)消費到的資料

[test-consumer-group,atguigu,1]::OffsetAndMetadata(offset=2, leaderEpoch=Optional[0],
metadata=, commitTimestamp=1591935656078, expireTimestamp=None)
[test-consumer-group,atguigu,0]::OffsetAndMetadata(offset=1, leaderEpoch=Optional[0], metadata=, commitTimestamp=1591935656078, expireTimestamp=None)

3.3.4 消費者組案例

1)需求:測驗同一個消費者組中的消費者,同一時刻只能有一個消費者消費,
2)案例實操
(1)在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties組態檔中的group.id屬性為任意組名,

[atguigu@hadoop103 config]$ vi consumer.properties
group.id=mygroup

(2)在hadoop104上啟動生產者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first

(3)在hadoop102、hadoop103上分別啟動消費者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
bootstrap-server hadoop102:9092 --topic first --consumer.config config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first --consumer.config config/consumer.properties

(4)查看hadoop102和hadoop103的消費者的消費情況,

3.4 Kafka 高效讀寫資料

1)順序寫磁盤
Kafka的producer生產資料,要寫入到log檔案中,寫的程序是一直追加到檔案末端,為順序寫,官網有資料表明,同樣的磁盤,順序寫能到600M/s,而隨機寫只有100K/s,這與磁盤的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間,
2)應用
Kafka資料持久化是直接持久化到Pagecache中,這樣會產生以下幾個好處:
①I/O Scheduler 會將連續的小塊寫組裝成大塊的物理寫從而提高性能
②I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
③充分利用所有空閑記憶體(非 JVM 記憶體),如果使用應用層 Cache(即 JVM 堆記憶體),會增加 GC 負擔
④讀操作可直接在 Page Cache 內進行,如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過 Page Cache)交換資料
⑤如果行程重啟,JVM 內的 Cache 會失效,但 Page Cache 仍然可用
盡管持久化到Pagecache上可能會造成宕機丟失資料的情況,但這可以被Kafka的Replication機制解決,如果為了保證這種情況下資料不丟失而強制將 Page Cache 中的資料 Flush 到磁盤,反而會降低性能,
3)零復制技術
正常情況下在這里插入圖片描述
kafka中:(實作零拷貝)
在這里插入圖片描述

3.5 Zookeeper在Kafka中的作用

Kafka集群中有一個broker會被選舉為Controller,負責管理集群broker的上下線,所有topic的磁區副本分配leader選舉等作業,
Controller的管理作業都是依賴于Zookeeper的,
以下為partition的leader選舉程序:
第一步:
在這里插入圖片描述
第二步:
在這里插入圖片描述

3.6 Kafka事務

Kafka從0.11版本開始引入了事務支持,事務可以保證Kafka在Exactly Once語意的基礎上,生產和消費可以跨磁區和會話,要么`全部成功,要么全部失敗,

3.6.1 Producer 事務

為了實作跨磁區跨會話的事務,需要引入一個全域唯一的Transation ID,并將Producer獲得PID和Transaction ID系結,這樣當Producer重啟后就可以通過正在進行的Transation ID獲得原來的PID,
為了管理Transaction,Kafka引入了一個新的組件Transaction Coordinator,Producer就是通過和Transaction Coordinator互動獲得Transaction ID對應的任務狀態,Transaction Coordinator還負責將事務所有寫入Kafka的一個內部Topic,這樣即使整個服務重啟,由于事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行,

3.6.2 Consumer事務(精準一次性消費)

上述事務機制主要是從Producer方面考慮,對于Consumer而言,事務的保證就會相對較弱,尤其時無法保證Commit的資訊被精確消費,這是由于Consumer可以通過offset訪問任意資訊,而且不同的Segment File生命周期不同,同一事務的訊息可能會出現重啟后被洗掉的情況,
如果想完成Consumer端的精準一次性消費,那么需要kafka消費端將消費程序和提交offset程序做原子系結,此時我們需要將kafka的offset保存到支持事務的自定義介質(比如mysql),這部分知識會在后續專案部分涉及,

4 . Kafka API

4.1 Producer API

4.1.1 消費發送流程

Kafka的Producer發送訊息采用的是異步發送的方式,在訊息發送的程序中,涉及到了兩個執行緒——main執行緒和Sender執行緒,以及一個執行緒共享變數——RecordAccumulator,main執行緒將訊息發送給RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息發送到Kafka broker,
在這里插入圖片描述
生產者生產的資料不會直接到topic的磁區中,它會到batch中快取一下,爭取大批量的傳輸資料在這里插入圖片描述
相關引數:
batch.size:只有資料積累到batch.size之后,sender才會發送資料,
linger.ms:如果資料遲遲未達到batch.size,sender等待linger.time之后就會發送資料,

4.1.2 異步發送API

1)匯入依賴

<dependencies>
       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>2.4.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-slf4j-impl</artifactId>
           <version>2.12.0</version>
       </dependency>
</dependencies>

2)添加log4j2組態檔

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
   <Appenders>
       <!-- 型別名為Console,名稱為必須屬性 -->
       <Appender type="Console" name="STDOUT">
           <!-- 布局為PatternLayout的方式,
           輸出樣式為[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
           <Layout type="PatternLayout"
                   pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
       </Appender>

   </Appenders>

   <Loggers>
       <!-- 可加性為false -->
       <Logger name="test" level="info" additivity="false">
           <AppenderRef ref="STDOUT" />
       </Logger>

       <!-- root loggerConfig設定 -->
       <Root level="info">
           <AppenderRef ref="STDOUT" />
       </Root>
   </Loggers>

</Configuration>

撰寫代碼
(1)無回呼無指定磁區以及key

public class ProducerDemo {
   public static void main(String[] args) {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           //不帶回呼的方法
           //不指定key,磁區使用默認磁區:粘性磁區
           produce.send(new ProducerRecord<String, String>("atguigu","value-->" + i));
       }
       //關閉
       produce.close();
   }
}

(2)采用回呼方法

public class ProducerDemo1 {
   public static void main(String[] args) {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           //待回呼的方法
           //不指定key,默認使用粘性磁區
          produce.send(new ProducerRecord<String, String>("atguigu","value-->" + i),
                  new Callback() {
                      public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                          if(e != null){
                              //資訊發送失敗
                              System.out.println(e.getMessage());
                          }else {
                              //資訊發送成功
                              System.out.println(recordMetadata.topic() + " : " + recordMetadata.partition() + " : " + recordMetadata.offset());
                          }
                      }
                  });
       }
       //關閉
       produce.close();
   }
}

(3) 指定key或磁區

public class ProducerDemo2 {
   public static void main(String[] args) {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           //不帶回呼的方法
           //指定key,使用key對磁區數量取余確定
          //produce.send(new ProducerRecord<String, String>("atguigu","key-->" + i,"message" + i));
          produce.send(new ProducerRecord<String, String>("atguigu",0,"key-->" + i,"value-->" + i));
       }
       //關閉
       produce.close();
   }
}

(4) 異步發送與同步發送

public class ProducerDemo3 {
   public static void main(String[] args) throws ExecutionException, InterruptedException {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           //待回呼的方法
           //不指定key,默認使用粘性磁區
           //異步發送
//           produce.send(new ProducerRecord<String, String>("atguigu","value-->" + i),
//                   new Callback() {
//                       public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//                           System.out.println("訊息發送完成");
//                           if(e != null){
//                               //資訊發送失敗
//                               System.out.println(e.getMessage());
//                           }else {
//                               //資訊發送成功
//                               System.out.println(recordMetadata.topic() + " : " + recordMetadata.partition() + " : " + recordMetadata.offset());
//                           }
//                       }
//                   });
//            System.out.println("訊息發送出去");
           //同步發送
           produce.send(new ProducerRecord<String, String>("atguigu", "value-->" + i), new Callback() {
               public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                   System.out.println("訊息發送完成....");
                   if(e != null){
                       //資訊發送失敗
                       System.out.println(e.getMessage());
                   }else {
                       //資訊發送成功
                       System.out.println(recordMetadata.topic() + " : " + recordMetadata.partition() + " : " + recordMetadata.offset());
                   }
               }
           }).get();
           System.out.println("訊息發送出去.....");
       }
       //關閉
       produce.close();
   }
}

4.1.3 磁區器

1)默認的磁區器 DefaultPartitioner
2)自定義磁區器

public class Mypartitioner implements Partitioner {
   /**
    * 計算某條訊息要發送到那個磁區
    * @param topic 主題
    * @param key  訊息的key
    * @param keyBytes  訊息key序列化后的位元組陣列
    * @param value  訊息的value
    * @param valueBytes  訊息的value 序列化后的位元組陣列
    * @param cluster
    * @return
    */
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       if(value.toString().contains("joe")){
           return 0;
       }else if(value.toString().contains("dog")){
           return 1;
       }else {
           return 2;
       }
   }

   public void close() {

   }

   public void configure(Map<String, ?> map) {

   }
}

(3) 指定自定義的磁區器

public class ProducerDemo4 {
   public static void main(String[] args) {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
       //指定磁區器
       properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.partitioner.Mypartitioner");
       //序列化
       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           if(i % 2 == 0){
               produce.send(new ProducerRecord<String, String>("atguigu","joe" + i));
           }else {
               produce.send(new ProducerRecord<String, String>("atguigu","dog" + i));
           }
       }
       //關閉
       produce.close();
   }
}

4.2 Consumer API

Consumer消費資料時的可靠性是很容易保證的,因為資料在Kafka中是持久化的,故不用擔心資料丟失問題,
由于consumer在消費程序中可能會出現斷電宕機等故障,consumer恢復后,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續消費,
所以offset的維護是Consumer消費資料是必須考慮的問題,

4.2.1 自動提交offset

1)撰寫代碼
需要用到的類:
KafkaConsumer:需要創建一個消費者物件,用來消費資料
ConsumerConfig:獲取所需的一系列配置引數
ConsuemrRecord:每條資料都要封裝成一個ConsumerRecord物件
為了使我們能夠專注于自己的業務邏輯,Kafka提供了自動提交offset的功能,
自動提交offset的相關引數:
enable.auto.commit:是否開啟自動提交offset功能
auto.commit.interval.ms:自動提交offset的時間間隔
2)消費者自動提交offset且重置offset

public class ConsumerDemo {
   public static void main(String[] args) {
       //設定配置項
       Properties prop = new Properties();
       prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       prop.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu-group");
       //自動提交
       prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
       //自動提交的事件間隔
       prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
       //設定key
       prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
       //設定value
       prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
       //offset重置問題
       /**
        * auto,offset.reset:
        * 滿足兩種情況會重置offset:
        * 1.當啟動的消費者之前沒有在kafka中有消費記錄(新的組新的人)
        * 2.當啟動的消費者要消費的offset在kafka中已經不存在(例如超時7天會被洗掉)
        *
        * 重置到什么位置:
        * earliest:目前kafka中topic的磁區中最小的offset
        * latest:  目前lafla中topic的磁區中最大的offset
        */
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
       //創建消費者物件
       KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

       //訂閱主題
       List<String> topics = new ArrayList<String>();
       topics.add("atguigu");
       topics.add("first");
       //若不存在的主題,它會自動創建
       //topics.add("second");
       consumer.subscribe(topics);

       //消費資料
       while(true){
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

           for (ConsumerRecord<String, String> record : records) {
               System.out.println("offset : " + record.offset()  + "key : " + record.key() + "value" + record.value());
           }
       }
   }
}

4.2.2 手動提交offset

雖然自動提交offset十分簡潔便利,但由于其是基于時間提交的,開發人員難以把握offset提交的時機,因此Kafka還提供了手動提交offset的API,
手動提交offset的方法有兩種:分別是commitSync(同步提交)commitAsync(異步提交),兩者的相同點是,都會將本次poll的一批資料最高的偏移量提交;不同點是,commitSync阻塞當前執行緒,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而commitAsync則沒有失敗重試機制,故有可能提交失敗,
1)同步提交offset
由于同步提交有失敗重試機制,故更加可靠,以下為同步提交與異步提交offset的示例,

public class ConsumerDemo1 {
   public static void main(String[] args) {
       //設定配置項
       Properties prop = new Properties();
       prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       prop.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu-group");
       //手動提交
       prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

       //設定key
       prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
       //設定value
       prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

       //創建消費者物件
       KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);

       //訂閱主題
       List<String> topics = new ArrayList<String>();
       topics.add("atguigu");
       topics.add("first");
       //若不存在的主題,它會自動創建
       //topics.add("second");
       consumer.subscribe(topics);

       //消費資料
       while(true){
           ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

           for (ConsumerRecord<String, String> record : records) {
               System.out.println("offset : " + record.offset()  + "key : " + record.key() + "value" + record.value());
           }
           //同步提交,當前執行緒會阻塞直到offset提交成功
           //consumer.commitSync();
           //異步提交
           consumer.commitAsync(new OffsetCommitCallback() {
               @Override
               public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                   if(exception != null){
                       System.out.println(exception.getMessage());
                   }else {
                       System.out.println(offsets);
                   }
               }
           });
       }
   }
}

手動提交的話主要是在特殊業務中才會使用,我們一般使用自動提交就可以了,
3)資料漏消費和重復消費分析
無論是同步提交還是異步提交offset,都有可能造成資料的漏消費或者重復消費,**先提交offset后消費,有可能造成漏消費;而先消費后提交offset,有可能會造成資料的重復消費,

4.3 自定義Interceptor

4.3.1 攔截器原理

Produce攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用于實作clients端的定制化控制化邏輯,
對于produce而言,interceptor使得用戶在訊息發送前以及producer回呼邏輯前有機會對訊息做一些定制化需求,比如修改訊息等,同時,producer允許用戶指定多個interceptor按序作用于同一條訊息而形成一個攔截鏈(interceptor),interceptor的實作介面是org.apache.kafka.clients.producer.ProducerInterceptor,其定義方法包括:
(1)configure(configs)
獲取配置資訊和初始化資料時呼叫,
(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中,即它運行在用戶主執行緒中,Producer確保在訊息被序列化以及計算磁區前呼叫該方法,用戶可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和磁區,否則會影響目標磁區的計算,
(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在訊息從RecordAccumulator成功發送到Kafka Broker之后,或者在發送程序中失敗時呼叫,并且通常都是在producer回呼邏輯觸發之前,onAcknowledgement運行在producer的IO執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的訊息發送效率,
(4)close:
關閉interceptor,主要用于執行一些資源清理作業
如前所述,interceptor可能被運行在多個執行緒中,因此在具體實作時用戶需要自行確保執行緒安全,另外倘若指定了多個interceptor,則producer將按照指定順序呼叫它們,并僅僅是捕獲每個interceptor可能拋出的例外記錄到錯誤日志中而非在向上傳遞,這在使用程序中要特別留意,

4.3.2 攔截器案例

1)需求:
實作一個簡單的雙interceptor組成的攔截鏈,第一個interceptor會在訊息發送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息發送后更新成功發送訊息數或失敗發送訊息數,
2)案例實操
在這里插入圖片描述
(1)增加時間戳攔截器

public class TimeInterceptor implements ProducerInterceptor<String,String> {
   /**
    * 在訊息發送前將時間戳資訊加到訊息value的最前部
    * @param record
    * @return
    */
   @Override
   public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
       //獲取訊息的value
       String value = record.value();
       value = System.currentTimeMillis() + " >> " + value;

       //封裝新的訊息
       ProducerRecord<String,String> newRecord = new ProducerRecord<String, String>(record.topic(),record.partition(),record.key(),value);
       return newRecord;

   }

   @Override
   public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

   }

   @Override
   public void close() {

   }

   @Override
   public void configure(Map<String, ?> configs) {

   }
}

(2)統計發送訊息成功和發送失敗訊息數,并在producer關閉時列印這兩個計數器

public class CountInterceptor implements ProducerInterceptor<String,String> {
   private Integer success = 0;
   private Integer fail = 0;
   @Override
   public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
       return record;
   }

   /**
    * 在訊息發送后更新成功發送訊息數或失敗發送訊息數,
    * @param metadata
    * @param exception
    */
   @Override
   public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
       if(exception != null){
           //發送失敗
           fail++;
       }else {
           //發送成功
           success++;
       }
   }

   @Override
   public void close() {
       System.out.println("SUCCESS:" + success);
       System.out.println("FAIL:" + fail);
   }

   @Override
   public void configure(Map<String, ?> configs) {

   }
}

(3)producer主程式

public class ProducerDemo {
   public static void main(String[] args) {
       //獲取配置引數
       Properties properties = new Properties();
       //kafka集群
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
       properties.put(ProducerConfig.ACKS_CONFIG,"-1");
       //重試次數
       properties.put(ProducerConfig.RETRIES_CONFIG,3);
       //批次大小
       properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
       //等待時間
       properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
       //緩沖區大小
       properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
       //指定攔截器
       List<String> interceptors = new ArrayList<>();
       interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
       interceptors.add("com.atguigu.kafka.interceptor.CountInterceptor");
       properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
       //指定key和value的序列化器
       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
       //創建生產者物件
       KafkaProducer<String, String> produce = new KafkaProducer<String, String>(properties);

       for (int i = 0; i < 10; i++) {
           //待回呼的方法
           //不指定key,默認使用粘性磁區
          produce.send(new ProducerRecord<String, String>("atguigu","value-->" + i),
                  new Callback() {
                      public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                          if(e != null){
                              //資訊發送失敗
                              System.out.println(e.getMessage());
                          }else {
                              //資訊發送成功
                              System.out.println(recordMetadata.topic() + " : " + recordMetadata.partition() + " : " + recordMetadata.offset());
                          }
                      }
                  });
       }
       //關閉
       produce.close();
   }
}

3)測驗
(1)我們可以在啟動之前寫的消費者,運行測驗一下,消費到的資料為:

offset : 35,key : null,value1601287826845 >> value-->0
offset : 36,key : null,value1601287827143 >> value-->1
offset : 37,key : null,value1601287827143 >> value-->2
offset : 38,key : null,value1601287827143 >> value-->3
offset : 39,key : null,value1601287827143 >> value-->4
offset : 40,key : null,value1601287827143 >> value-->5
offset : 41,key : null,value1601287827143 >> value-->6
offset : 42,key : null,value1601287827143 >> value-->7
offset : 43,key : null,value1601287827143 >> value-->8
offset : 44,key : null,value1601287827143 >> value-->9

5. Kafka監控

1)修改kafka啟動命令
修改kafka-server-start.sh命令中

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
   export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
   export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
   export JMX_PORT="9999"
   #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:修改之后在啟動kafka之前要分發給其他節點
2)上傳壓縮包kafka-eagle-bin-1.4.5.tar.gz到集群/opt/software目錄
3)解壓到本地

[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin-1.4.5.tar.gz

4)進入剛才解壓的目錄

[atguigu@hadoop102 kafka-eagle-bin-1.4.5]$ ll
總用量 82932
-rw-rw-r--. 1 atguigu atguigu 84920710 8月  13 23:00 kafka-eagle-web-1.4.5-bin.tar.gz

5)將kafka-eagle-web-1.3.7-bin.tar.gz解壓至/opt/module

[atguigu@hadoop102 kafka-eagle-bin-1.4.5]$ tar -zxvf kafka-eagle-web-1.4.5-bin.tar.gz -C /opt/module/

6)修改名稱

[atguigu@hadoop102 module]$ mv kafka-eagle-web-1.4.5/ eagle

7)給啟動檔案執行權限

[atguigu@hadoop102 eagle]$ cd bin/
[atguigu@hadoop102 bin]$ ll
總用量 12
-rw-r--r--. 1 atguigu atguigu 1848 8月  22 2017 ke.bat
-rw-r--r--. 1 atguigu atguigu 7190 7月  30 20:12 ke.sh
[atguigu@hadoop102 bin]$ chmod 777 ke.sh

8)修改組態檔 conf/system-config.properties

######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181

######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka

######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false

######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

9)添加環境變數

export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin

注意:source /etc/profile
10)啟動

[atguigu@hadoop102 eagle]$ bin/ke.sh start
... ...
... ...
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://192.168.202.102:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************
[atguigu@hadoop102 eagle]$

注意:啟動之前需要先啟動ZK以及KAFKA
11)登錄頁面查看監控資料
http://192.168.202.102:8048/ke

6. Flume對接Kafka

我們先來看一下拓撲圖
在這里插入圖片描述
上圖中清晰的展示了Flume和Kafka的對接是如何實作的,但是我們不需要如此復雜,Kafka中的sink給我們提供了相應的方法,我們只需要如下圖所示就可以了:
在這里插入圖片描述

6.1 配置flume

這些配置flume的官網都會有…

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/module/data/flume.log

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2) 啟動kafka消費者
3) 進入flume根目錄下,啟動flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4) 向 /opt/module/data/flume.log里追加資料,查看kafka消費者消費情況

$ echo hello >> /opt/module/data/flume.log

6.2 資料分離

0)需求: 將flume采集的資料按照不同的型別輸入到不同的topic中
將日志資料中帶有atguigu的,輸入到Kafka的first主題中,
將日志資料中帶有shangguigu的,輸入到Kafka的second主題中,
其他的資料輸入到Kafka的third主題中
1)撰寫Flume的Interceptor
0)需求: 將flume采集的資料按照不同的型別輸入到不同的topic中
將日志資料中帶有joe的,輸入到Kafka的first主題中,
將日志資料中帶有dog的,輸入到Kafka的atguigu主題中,
其他的資料輸入到Kafka的third主題中
1)撰寫Flume的Interceptor

public class FlumeKafkaInterceptor implements Interceptor {
   @Override
   public void initialize() {

  }

   /**
    * 如果包含“joe”就發送到first主題
    * 如果包含“dog”就發送到atguigu主題
    * 其他的資料發送到third主題
    * @param event
    * @return
    */
   @Override
   public Event intercept(Event event) {
       //獲取event的header
       Map<String, String> headers = event.getHeaders();
       //獲取event的body
       String body = new String(event.getBody());
       if(body.contains("joe")){
           headers.put("topic","first");
       }else if(body.contains("dog")) {
           headers.put("topic","atguigu");
       }
       return event;
   }

   @Override
   public List<Event> intercept(List<Event> events) {
       for (Event event : events) {
           intercept(event);
       }
       return events;
   }

   @Override
   public void close() {

   }
   //獲取FlumeKafkaIntercceptor的物件
   public static class MyBuilder implements Builder{

       @Override
       public Interceptor build() {
           return new FlumeKafkaInterceptor();
       }

       @Override
       public void configure(Context context) {

       }
   }
}

2)將寫好的interceptor打包上傳到Flume安裝目錄的lib目錄下
3)配置flume

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 6666


# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = third
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.kafka.flumeInterceptor.FlumeKafkaInterceptor$MyBuilder

# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4) 啟動kafka消費者
5) 進入flume根目錄下,啟動flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

6) 向6666埠寫資料,查看kafka消費者消費情況

其實Kafka主要是在我們在收集資料時與Flume一起使用,從而起到一個快取從而可以收集大量資料,Kafka的主要特點就是吞吐量大,,哈哈,學到這里Kafka結束,想學其他的大資料框架,大家接著跟我學哈😄😄😄😄😄😄😄😄

轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/138612.html

標籤:其他

上一篇:資料結構基礎筆記、基礎知識總結、周周練匯總,通過代碼,更快速掌握資料結構和演算法知識!

下一篇:驚艷!阿里出產的MyCat性能筆記,帶你領略什么叫細節爆炸

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • vue移動端上拉加載

    可能做得過于簡單或者比較low,請各位大佬留情,一起探討技術 ......

    uj5u.com 2020-09-10 04:38:07 more
  • 優美網站首頁,頂部多層導航

    一個個人用的瀏覽器首頁,可以把一下常用的網站放在這里,平常打開會比較方便。 第一步,HTML代碼 <script src=https://www.cnblogs.com/szharf/p/"js/jquery-3.4.1.min.js"></script> <div id="navigate"> <ul> <li class="labels labels_1"> ......

    uj5u.com 2020-09-10 04:38:47 more
  • 頁面為要加<!DOCTYPE html>

    最近因為寫一個js函式,需要用到$(window).height(); 由于手寫demo的時候,過于自信,其實對前端方面的認識也不夠體系,用文本檔案直接敲出來的html代碼,第一行沒有加上<!DOCTYPE html> 導致了$(window).height();的結果直接是整個document的高 ......

    uj5u.com 2020-09-10 04:38:52 more
  • WordPress網站程式手動升級要做好資料備份

    WordPress博客網站程式在進行升級前,必須要做好網站資料的備份,這個問題良家佐言是遇見過的;在剛開始接觸WordPress博客程式的時候,因為升級問題和博客網站的修改的一些嘗試,良家佐言是吃盡了苦頭。因為購買的是西部數碼的空間和域名,每當佐言把自己的WordPress博客網站搞到一塌糊涂的時候 ......

    uj5u.com 2020-09-10 04:39:30 more
  • WordPress程式不能升級為5.4.2版本的原因

    WordPress是一款個人博客系統,受到英文博客愛好者和中文博客愛好者的追捧,并逐步演化成一款內容管理系統軟體;它是使用PHP語言和MySQL資料庫開發的,用戶可以在支持PHP和MySQL資料庫的服務器上使用自己的博客。每一次WordPress程式的更新,就會牽動無數WordPress愛好者的心, ......

    uj5u.com 2020-09-10 04:39:49 more
  • 使用CSS3的偽元素進行首字母下沉和首行改變樣式

    網頁中常見的一種效果,首字改變樣式或者首行改變樣式,效果如下圖。 代碼: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, ......

    uj5u.com 2020-09-10 04:40:09 more
  • 關于a標簽的講解

    什么是a標簽? <a> 標簽定義超鏈接,用于從一個頁面鏈接到另一個頁面。 <a> 元素最重要的屬性是 href 屬性,它指定鏈接的目標。 a標簽的語法格式:<a href=https://www.cnblogs.com/summerxbc/p/"指定要跳轉的目標界面的鏈接">需要展示給用戶看見的內容</a> a標簽 在所有瀏覽器中,鏈接的默認外觀如下: 未被訪問的鏈接帶 ......

    uj5u.com 2020-09-10 04:40:11 more
  • 前端輪播圖

    在需要輪播的頁面是引入swiper.min.js和swiper.min.css swiper.min.js地址: 鏈接:https://pan.baidu.com/s/15Uh516YHa4CV3X-RyjEIWw 提取碼:4aks swiper.min.css地址 鏈接:https://pan.b ......

    uj5u.com 2020-09-10 04:40:13 more
  • 如何設定html中的背景圖片(全屏顯示,且不拉伸)

    1 <style>2 body{background-image:url(https://uploadbeta.com/api/pictures/random/?key=BingEverydayWallpaperPicture); 3 background-size:cover;background ......

    uj5u.com 2020-09-10 04:40:16 more
  • Java學習——HTML詳解(上)

    HTML詳解 初識HTML Hyper Text Markup Language(超文本標記語言) 1 <!--DOCTYPE:告訴瀏覽器我們要使用什么規范--> 2 <!DOCTYPE html> 3 <html lang="en"> 4 <head> 5 <!--meta 描述性的標簽,描述一些 ......

    uj5u.com 2020-09-10 04:40:33 more
最新发布
  • 我的第一個NPM包:panghu-planebattle-esm(胖虎飛機大戰)使用說明

    好家伙,我的包終于開發完啦 歡迎使用胖虎的飛機大戰包!! 為你的主頁添加色彩 這是一個有趣的網頁小游戲包,使用canvas和js開發 使用ES6模塊化開發 效果圖如下: (覺得圖片太sb的可以自己改) 代碼已開源!! Git: https://gitee.com/tang-and-han-dynas ......

    uj5u.com 2023-04-20 07:59:23 more
  • 生產事故-走近科學之消失的JWT

    入職多年,面對生產環境,盡管都是小心翼翼,慎之又慎,還是難免捅出簍子。輕則滿頭大汗,面紅耳赤。重則系統停擺,損失資金。每一個生產事故的背后,都是寶貴的經驗和教訓,都是專案成員的血淚史。為了更好地防范和遏制今后的各類事故,特開此專題,長期更新和記錄大大小小的各類事故。有些是親身經歷,有些是經人耳傳口授 ......

    uj5u.com 2023-04-18 07:55:04 more
  • 記錄--Canvas實作打飛字游戲

    這里給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 打開游戲界面,看到一個畫面簡潔、卻又富有挑戰性的游戲。螢屏上,有一個白色的矩形框,里面不斷下落著各種單詞,而我需要迅速地輸入這些單詞。如果我輸入的單詞與螢屏上的單詞匹配,那么我就可以獲得得分;如果我輸入的單詞錯誤或者時間過長,那么我就會輸 ......

    uj5u.com 2023-04-04 08:35:30 more
  • 了解 HTTP 看這一篇就夠

    在學習網路之前,了解它的歷史能夠幫助我們明白為何它會發展為如今這個樣子,引發探究網路的興趣。下面的這張圖片就展示了“互聯網”誕生至今的發展歷程。 ......

    uj5u.com 2023-03-16 11:00:15 more
  • 藍牙-低功耗中心設備

    //11.開啟藍牙配接器 openBluetoothAdapter //21.開始搜索藍牙設備 startBluetoothDevicesDiscovery //31.開啟監聽搜索藍牙設備 onBluetoothDeviceFound //30.停止監聽搜索藍牙設備 offBluetoothDevi ......

    uj5u.com 2023-03-15 09:06:45 more
  • canvas畫板(滑鼠和觸摸)

    <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>canves</title> <style> #canvas { cursor:url(../images/pen.png),crosshair; } #canvasdiv{ bo ......

    uj5u.com 2023-02-15 08:56:31 more
  • 手機端H5 實作自定義拍照界面

    手機端 H5 實作自定義拍照界面也可以使用 MediaDevices API 和 <video> 標簽來實作,和在桌面端做法基本一致。 首先,使用 MediaDevices.getUserMedia() 方法獲取攝像頭媒體流,并將其傳遞給 <video> 標簽進行渲染。 接著,使用 HTML 的 < ......

    uj5u.com 2023-01-12 07:58:22 more
  • 記錄--短視頻滑動播放在 H5 下的實作

    這里給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 短視頻已經無數不在了,但是主體還是使用 app 來承載的。本文講述 H5 如何實作 app 的視頻滑動體驗。 無聲勝有聲,一圖頂百辯,且看下圖: 網址鏈接(需在微信或者手Q中瀏覽) 從上圖可以看到,我們主要實作的功能也是本文要講解的有: ......

    uj5u.com 2023-01-04 07:29:05 more
  • 一文讀懂 HTTP/1 HTTP/2 HTTP/3

    從 1989 年萬維網(www)誕生,HTTP(HyperText Transfer Protocol)經歷了眾多版本迭代,WebSocket 也在期間萌芽。1991 年 HTTP0.9 被發明。1996 年出現了 HTTP1.0。2015 年 HTTP2 正式發布。2020 年 HTTP3 或能正... ......

    uj5u.com 2022-12-24 06:56:02 more
  • 【HTML基礎篇002】HTML之form表單超詳解

    ??一、form表單是什么

    ??二、form表單的屬性

    ??三、input中的各種Type屬性值

    ??四、標簽 ......

    uj5u.com 2022-12-18 07:17:06 more