主頁 >  其他 > 四萬字32圖,Kafka知識體系保姆級教程寶典

四萬字32圖,Kafka知識體系保姆級教程寶典

2021-10-28 07:31:56 其他

目錄

一、訊息佇列

1. 訊息佇列的介紹

2. 訊息佇列的應用場景

3. 訊息佇列的兩種模式

4. 常用的訊息佇列介紹

5. Pulsar

6. Kafka與Pulsar對比

7. 其他訊息佇列與Kafka對比

二、Kafka基礎

1. kafka的基本介紹

2. kafka的好處

3. 分布式的發布與訂閱系統

4. kafka的主要應用場景

三、Kafka架構及組件

1. kafka架構

2. Kafka 主要組件

四、Kafka集群操作

1. 創建topic

2. 查看主題命令

3. 生產者生產資料

4. 消費者消費資料

5. 運行describe topics命令

6. 增加topic磁區數

7. 增加配置

8. 洗掉配置

9. 洗掉topic

五、Kafka的JavaAPI操作

1. 生產者代碼

2. 消費者代碼

3. kafka Streams API開發

六、Kafka中的資料不丟失機制

1. 生產者生產資料不丟失

2. broker中資料不丟失

3. 消費者消費資料不丟失

七、Kafka組態檔說明

八、CAP理論

1. 分布式系統當中的CAP理論

2. Partition tolerance

3. Consistency

4. Availability

九、Kafka中的CAP機制

十、Kafka監控及運維

1. kafka-eagle概述

2. 環境和安裝

十一、Kafka大廠面試題

Kafka 涉及的知識點如下圖所示,本文將逐一講解:

本檔案參考了關于 Kafka 的官網及其他眾多資料整理而成,為了整潔的排版及舒適的閱讀,對于模糊不清晰的圖片及黑白圖片進行重新繪制成了高清彩圖

本文首發在公眾號【五分鐘學大資料】,公眾號已經總結包括Hadoop,Hive,Spark、Flink,Kafka等超全的五萬字吐血系列教程,關注公眾號即可獲取!

一、訊息佇列

1. 訊息佇列的介紹

訊息(Message)是指在應用之間傳送的資料,訊息可以非常簡單,比如只包含文本字串,也可以更復雜,可能包含嵌入物件, 訊息佇列(Message Queue)是一種應用間的通信方式,訊息發送后可以立即回傳,有訊息系統來確保資訊的可靠專遞,訊息發布者只管把訊息發布到MQ中而不管誰來取,訊息使用者只管從MQ中取訊息而不管誰發布的,這樣發布者和使用者都不用知道對方的存在,

2. 訊息佇列的應用場景

訊息佇列在實際應用中包括如下四個場景:

  • 應用耦合:多應用間通過訊息佇列對同一訊息進行處理,避免呼叫介面失敗導致整個程序失敗;

  • 異步處理:多應用對訊息佇列中同一訊息進行處理,應用間并發處理訊息,相比串行處理,減少處理時間;

  • 限流削峰:廣泛應用于秒殺或搶購活動中,避免流量過大導致應用系統掛掉的情況;

  • 訊息驅動的系統:系統分為訊息佇列、訊息生產者、訊息消費者,生產者負責產生訊息,消費者(可能有多個)負責對訊息進行處理;

下面詳細介紹上述四個場景以及訊息佇列如何在上述四個場景中使用:

  1. 異步處理

具體場景:用戶為了使用某個應用,進行注冊,系統需要發送注冊郵件并驗證短信,對這兩個操作的處理方式有兩種:串行及并行,

  • 串行方式:新注冊資訊生成后,先發送注冊郵件,再發送驗證短信;

    在這種方式下,需要最終發送驗證短信后再回傳給客戶端,

  • 并行處理:新注冊資訊寫入后,由發短信和發郵件并行處理;

    在這種方式下,發短信和發郵件 需處理完成后再回傳給客戶端, 假設以上三個子系統處理的時間均為50ms,且不考慮網路延遲,則總的處理時間:

    串行:50+50+50=150ms
    并行:50+50 = 100ms

  • 若使用訊息佇列:

在寫入訊息佇列后立即回傳成功給客戶端,則總的回應時間依賴于寫入訊息佇列的時間,而寫入訊息佇列的時間本身是可以很快的,基本可以忽略不計,因此總的處理時間相比串行提高了2倍,相比并行提高了一倍;

  1. 應用耦合

具體場景:用戶使用QQ相冊上傳一張圖片,人臉識別系統會對該圖片進行人臉識別,一般的做法是,服務器接收到圖片后,圖片上傳系統立即呼叫人臉識別系統,呼叫完成后再回傳成功,如下圖所示:

該方法有如下缺點:

  • 人臉識別系統被調失敗,導致圖片上傳失敗;

  • 延遲高,需要人臉識別系統處理完成后,再回傳給客戶端,即使用戶并不需要立即知道結果;

  • 圖片上傳系統與人臉識別系統之間互相呼叫,需要做耦合;

若使用訊息佇列:

客戶端上傳圖片后,圖片上傳系統將圖片資訊如uin、批次寫入訊息佇列,直接回傳成功;而人臉識別系統則定時從訊息佇列中取資料,完成對新增圖片的識別,

此時圖片上傳系統并不需要關心人臉識別系統是否對這些圖片資訊的處理、以及何時對這些圖片資訊進行處理,事實上,由于用戶并不需要立即知道人臉識別結果,人臉識別系統可以選擇不同的調度策略,按照閑時、忙時、正常時間,對佇列中的圖片資訊進行處理,

  1. 限流削峰

具體場景:購物網站開展秒殺活動,一般由于瞬時訪問量過大,服務器接收過大,會導致流量暴增,相關系統無法處理請求甚至崩潰,而加入訊息佇列后,系統可以從訊息佇列中取資料,相當于訊息佇列做了一次緩沖,

該方法有如下優點:

  • 請求先入訊息佇列,而不是由業務處理系統直接處理,做了一次緩沖,極大地減少了業務處理系統的壓力;

  • 佇列長度可以做限制,事實上,秒殺時,后入佇列的用戶無法秒殺到商品,這些請求可以直接被拋棄,回傳活動已結束或商品已售完資訊;

4.訊息驅動的系統

具體場景:用戶新上傳了一批照片,人臉識別系統需要對這個用戶的所有照片進行聚類,聚類完成后由對賬系統重新生成用戶的人臉索引(加快查詢),這三個子系統間由訊息佇列連接起來,前一個階段的處理結果放入佇列中,后一個階段從佇列中獲取訊息繼續處理,

該方法有如下優點:

  • 避免了直接呼叫下一個系統導致當前系統失敗;

  • 每個子系統對于訊息的處理方式可以更為靈活,可以選擇收到訊息時就處理,可以選擇定時處理,也可以劃分時間段按不同處理速度處理;

3. 訊息佇列的兩種模式

訊息佇列包括兩種模式,點對點模式(point to point, queue)和發布/訂閱模式(publish/subscribe,topic)

1) 點對點模式

點對點模式下包括三個角色:

  • 訊息佇列

  • 發送者 (生產者)

  • 接收者(消費者)

    訊息發送者生產訊息發送到queue中,然后訊息接收者從queue中取出并且消費訊息,訊息被消費以后,queue中不再有存盤,所以訊息接收者不可能消費到已經被消費的訊息,

點對點模式特點:

  • 每個訊息只有一個接收者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中);

  • 發送者和接發收者間沒有依賴性,發送者發送訊息之后,不管有沒有接收者在運行,都不會影響到發送者下次發送訊息;

  • 接收者在成功接收訊息之后需向佇列應答成功,以便訊息佇列洗掉當前接收的訊息;

2) 發布/訂閱模式

發布/訂閱模式下包括三個角色:

  • 角色主題(Topic)

  • 發布者(Publisher)

  • 訂閱者(Subscriber)

    發布者將訊息發送到Topic,系統將這些訊息傳遞給多個訂閱者,

發布/訂閱模式特點:

  • 每個訊息可以有多個訂閱者;

  • 發布者和訂閱者之間有時間上的依賴性,針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的訊息,

  • 為了消費訊息,訂閱者需要提前訂閱該角色主題,并保持在線運行;

4. 常用的訊息佇列介紹

1) RabbitMQ

RabbitMQ 2007年發布,是一個在AMQP(高級訊息佇列協議)基礎上完成的,可復用的企業訊息系統,是當前最主流的訊息中間件之一,

2) ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實作,它非常快速,支持多種語言的客戶端和協議,而且可以非常容易的嵌入到企業的應用環境中,并有許多高級功能,

3) RocketMQ

RocketMQ出自 阿里公司的開源產品,用 Java 語言實作,在設計時參考了 Kafka,并做出了自己的一些改進,訊息可靠性上比 Kafka 更好,RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,訊息推送,日志流式處理等,

4) Kafka

Apache Kafka是一個分布式訊息發布訂閱系統,它最初由LinkedIn公司基于獨特的設計實作為一個分布式的提交日志系統( a distributed commit log),,之后成為Apache專案的一部分,Kafka系統快速、可擴展并且可持久化,它的磁區特性,可復制和可容錯都是其不錯的特性,

5. Pulsar

Apahce Pulasr是一個企業級的發布-訂閱訊息系統,最初是由雅虎開發,是下一代云原生分布式訊息流平臺,集訊息、存盤、輕量化函式式計算為一體,采用計算與存盤分離架構設計,支持多租戶、持久化存盤、多機房跨區域資料復制,具有強一致性、高吞吐、低延時及高可擴展性等流資料存盤特性,

Pulsar 非常靈活:它既可以應用于像 Kafka 這樣的分布式日志應用場景,也可以應用于像 RabbitMQ 這樣的純訊息傳遞系統場景,它支持多種型別的訂閱、多種交付保證、保留策略以及處理模式演變的方法,以及其他諸多特性,

1. Pulsar 的特性

  • 內置多租戶:不同的團隊可以使用相同的集群并將其隔離,解決了許多管理難題,它支持隔離、身份驗證、授權和配額;

  • 多層體系結構:Pulsar 將所有 topic 資料存盤在由 Apache BookKeeper 支持的專業資料層中,存盤和訊息傳遞的分離解決了擴展、重新平衡和維護集群的許多問題,它還提高了可靠性,幾乎不可能丟失資料,另外,在讀取資料時可以直連 BookKeeper,且不影響實時攝取,例如,可以使用 Presto 對 topic 執行 SQL 查詢,類似于 KSQL,但不會影響實時資料處理;

  • 虛擬 topic:由于采用 n 層體系結構,因此對 topic 的數量沒有限制,topic 及其存盤是分離的,用戶還可以創建非持久性 topic;

  • N 層存盤:Kafka 的一個問題是,存盤費用可能變高,因此,它很少用于存盤"冷"資料,并且訊息經常被洗掉,Apache Pulsar 可以借助分層存盤自動將舊資料卸載到 Amazon S3 或其他資料存盤系統,并且仍然向客戶端展示透明視圖;Pulsar 客戶端可以從時間開始節點讀取,就像所有訊息都存在于日志中一樣;

2. Pulsar 存盤架構

Pulsar 的多層架構影響了存盤資料的方式,Pulsar 將 topic 磁區劃分為分片(segment),然后將這些分片存盤在 Apache BookKeeper 的存盤節點上,以提高性能、可伸縮性和可用性

Pulsar 的無限分布式日志以分片為中心,借助擴展日志存盤(通過 Apache BookKeeper)實作,內置分層存盤支持,因此分片可以均勻地分布在存盤節點上,由于與任一給定 topic 相關的資料都不會與特定存盤節點進行捆綁,因此很容易替換存盤節點或縮擴容,另外,集群中最小或最慢的節點也不會成為存盤或帶寬的短板,

Pulsar 架構能實作磁區管理,負載均衡,因此使用 Pulsar 能夠快速擴展并達到高可用,這兩點至關重要,所以 Pulsar 非常適合用來構建關鍵任務服務,如金融應用場景的計費平臺,電子商務和零售商的交易處理系統,金融機構的實時風險控制系統等,

通過性能強大的 Netty 架構,資料從 producers 到 broker,再到 bookie 的轉移都是零拷貝,不會生成副本,這一特性對所有流應用場景都非常友好,因為資料直接通過網路或磁盤進行傳輸,沒有任何性能損失,

3. Pulsar 訊息消費

Pulsar 的消費模型采用了流拉取的方式,流拉取是長輪詢的改進版,不僅實作了單個呼叫和請求之間的零等待,還可以提供雙向訊息流,通過流拉取模型,Pulsar 實作了端到端的低延遲,這種低延遲比所有現有的長輪詢訊息系統(如 Kafka)都低,

6. Kafka與Pulsar對比

1. Pulsar 的主要優勢:

  • 更多功能:Pulsar Function、多租戶、Schema registry、n 層存盤、多種消費模式和持久性模式等;

  • 更大的靈活性:3 種訂閱型別(獨占,共享和故障轉移),用戶可以在一個訂閱上管理多個 topic;

  • 易于操作運維:架構解耦和 n 層存盤;

  • 與 Presto 的 SQL 集成,可直接查詢存盤而不會影響 broker;

  • 借助 n 層自動存盤選項,可以更低成本地存盤;

2. Pulsar 的劣勢

Pulsar 并不完美,Pulsar 也存在一些問題:

  • 相對缺乏支持、檔案和案例;

  • n 層體系結構導致需要更多組件:BookKeeper;

  • 插件和客戶端相對 Kafka 較少;

  • 云中的支持較少,Confluent 具有托管云產品,

3. 什么時候應該考慮 Pulsar

  • 同時需要像 RabbitMQ 這樣的佇列和 Kafka 這樣的流處理程式;

  • 需要易用的地理復制;

  • 實作多租戶,并確保每個團隊的訪問權限;

  • 需要長時間保留訊息,并且不想將其卸載到另一個存盤中;

  • 需要高性能,基準測驗表明 Pulsar 提供了更低的延遲和更高的吞吐量;

總之,Pulsar還比較新,社區不完善,用的企業比較少,網上有價值的討論和問題的解決比較少,遠沒有Kafka生態系統龐大,且用戶量非常龐大,目前Kafka依舊是大資料領域訊息佇列的王者!所以我們還是以Kafka為主!

7. 其他訊息佇列與Kafka對比

二、Kafka基礎

1. kafka的基本介紹

官網:http://kafka.apache.org/

kafka是最初由linkedin公司開發的,使用scala語言撰寫,kafka是一個分布式,磁區的,多副本的,多訂閱者的日志系統(分布式MQ系統),可以用于搜索日志,監控日志,訪問日志等,

Kafka is a distributed,partitioned,replicated commit logservice,它提供了類似于JMS的特性,但是在設計實作上完全不同,此外它并不是JMS規范的實作,kafka對訊息保存時根據Topic進行歸類,發送訊息者成為Producer,訊息接受者成為Consumer,此外kafka集群有多個kafka實體組成,每個實體(server)成為broker,無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統可用性集群保存一些meta資訊,

2. kafka的好處

  • 可靠性:分布式的,磁區,復本和容錯的,

  • 可擴展性:kafka訊息傳遞系統輕松縮放,無需停機,

  • 耐用性:kafka使用分布式提交日志,這意味著訊息會盡可能快速的保存在磁盤上,因此它是持久的,

  • 性能:kafka對于發布和定于訊息都具有高吞吐量,即使存盤了許多TB的訊息,他也爆出穩定的性能,

  • kafka非常快:保證零停機和零資料丟失,

3. 分布式的發布與訂閱系統

apache kafka是一個分布式發布-訂閱訊息系統和一個強大的佇列,可以處理大量的資料,并使能夠將訊息從一個端點傳遞到另一個端點,kafka適合離線和在線訊息消費,kafka訊息保留在磁盤上,并在集群內復制以防止資料丟失,kafka構建在zookeeper同步服務之上,它與apache和spark非常好的集成,應用于實時流式資料分析,

4. kafka的主要應用場景

1. 指標分析

kafka 通常用于操作監控資料,這設計聚合來自分布式應用程式的統計資訊, 以產生操作的資料集中反饋

2. 日志聚合解決方法

kafka可用于跨組織從多個服務器收集日志,并使他們以標準的格式提供給多個服務器,

3. 流式處理

流式處理框架(spark,storm,?ink)重主題中讀取資料,對齊進行處理,并將處理后的資料寫入新的主題,供 用戶和應用程式使用,kafka的強耐久性在流處理的背景關系中也非常的有用,

三、Kafka架構及組件

1. kafka架構

  1. 生產者API

允許應用程式發布記錄流至一個或者多個kafka的主題(topics),

  1. 消費者API

允許應用程式訂閱一個或者多個主題,并處理這些主題接收到的記錄流,

3, StreamsAPI

允許應用程式充當流處理器(stream processor),從一個或者多個主題獲取輸入流,并生產一個輸出流到一個或 者多個主題,能夠有效的變化輸入流為輸出流,

  1. ConnectAPI

允許構建和運行可重用的生產者或者消費者,能夠把kafka主題連接到現有的應用程式或資料系統,例如:一個連接到關系資料庫的連接器可能會獲取每個表的變化,

Kafka 架構

Kafka 架構

注:在Kafka 2.8.0 版本,移除了對Zookeeper的依賴,通過KRaft進行自己的集群管理,使用Kafka內部的Quorum控制器來取代ZooKeeper,因此用戶第一次可在完全不需要ZooKeeper的情況下執行Kafka,這不只節省運算資源,并且也使得Kafka效能更好,還可支持規模更大的集群,

過去Apache ZooKeeper是Kafka這類分布式系統的關鍵,ZooKeeper扮演協調代理的角色,所有代理服務器啟動時,都會連接到Zookeeper進行注冊,當代理狀態發生變化時,Zookeeper也會儲存這些資料,在過去,ZooKeeper是一個強大的工具,但是畢竟ZooKeeper是一個獨立的軟體,使得Kafka整個系統變得復雜,因此官方決定使用內部Quorum控制器來取代ZooKeeper,

這項作業從去年4月開始,而現在這項作業取得部分成果,用戶將可以在2.8版本,在沒有ZooKeeper的情況下執行Kafka,官方稱這項功能為Kafka Raft元資料模式(KRaft),在KRaft模式,過去由Kafka控制器和ZooKeeper所操作的元資料,將合并到這個新的Quorum控制器,并且在Kafka集群內部執行,當然,如果使用者有特殊使用情境,Quorum控制器也可以在專用的硬體上執行,

好,說完在新版本中移除zookeeper這個事,咱們在接著聊kafka的其他功能:

kafka支持訊息持久化,消費端是主動拉取資料,消費狀態和訂閱關系由客戶端負責維護,訊息消費完后,不會立即洗掉,會保留歷史訊息,因此支持多訂閱時,訊息只會存盤一份就可以,

  1. broker:kafka集群中包含一個或者多個服務實體(節點),這種服務實體被稱為broker(一個broker就是一個節點/一個服務器);

  2. topic:每條發布到kafka集群的訊息都屬于某個類別,這個類別就叫做topic;

  3. partition:partition是一個物理上的概念,每個topic包含一個或者多個partition;

  4. segment:一個partition當中存在多個segment檔案段,每個segment分為兩部分,.log檔案和 .index 檔案,其中 .index 檔案是索引檔案,主要用于快速查詢, .log 檔案當中資料的偏移量位置;

  5. producer:訊息的生產者,負責發布訊息到 kafka 的 broker 中;

  6. consumer:訊息的消費者,向 kafka 的 broker 中讀取訊息的客戶端;

  7. consumer group:消費者組,每一個 consumer 屬于一個特定的 consumer group(可以為每個consumer指定 groupName);

  8. .log:存放資料檔案;

  9. .index:存放.log檔案的索引資料,

2. Kafka 主要組件

1. producer(生產者)

producer主要是用于生產訊息,是kafka當中的訊息生產者,生產的訊息通過topic進行歸類,保存到kafka的broker里面去,

2. topic(主題)

  1. kafka將訊息以topic為單位進行歸類;

  2. topic特指kafka處理的訊息源(feeds of messages)的不同分類;

  3. topic是一種分類或者發布的一些列記錄的名義上的名字,kafka主題始終是支持多用戶訂閱的;也就是說,一 個主題可以有零個,一個或者多個消費者訂閱寫入的資料;

  4. 在kafka集群中,可以有無數的主題;

  5. 生產者和消費者消費資料一般以主題為單位,更細粒度可以到磁區級別,

3. partition(磁區)

kafka當中,topic是訊息的歸類,一個topic可以有多個磁區(partition),每個磁區保存部分topic的資料,所有的partition當中的資料全部合并起來,就是一個topic當中的所有的資料,

一個broker服務下,可以創建多個磁區,broker數與磁區數沒有關系;
在kafka中,每一個磁區會有一個編號:編號從0開始,
每一個磁區內的資料是有序的,但全域的資料不能保證是有序的,(有序是指生產什么樣順序,消費時也是什么樣的順序)

4. consumer(消費者)

consumer是kafka當中的消費者,主要用于消費kafka當中的資料,消費者一定是歸屬于某個消費組中的,

5. consumer group(消費者組)

消費者組由一個或者多個消費者組成,同一個組中的消費者對于同一條訊息只消費一次

每個消費者都屬于某個消費者組,如果不指定,那么所有的消費者都屬于默認的組,

每個消費者組都有一個ID,即group ID,組內的所有消費者協調在一起來消費一個訂閱主題( topic)的所有磁區(partition),當然,每個磁區只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組來消費,

partition數量決定了每個consumer group中并發消費者的最大數量,如下圖:

示例 1

示例 1

如上面左圖所示,如果只有兩個磁區,即使一個組內的消費者有4個,也會有兩個空閑的,
如上面右圖所示,有4個磁區,每個消費者消費一個磁區,并發量達到最大4,

在來看如下一幅圖:

示例 2

示例 2

如上圖所示,不同的消費者組消費同一個topic,這個topic有4個磁區,分布在兩個節點上,左邊的 消費組1有兩個消費者,每個消費者就要消費兩個磁區才能把訊息完整的消費完,右邊的 消費組2有四個消費者,每個消費者消費一個磁區即可,

總結下kafka中磁區與消費組的關系

消費組: 由一個或者多個消費者組成,同一個組中的消費者對于同一條訊息只消費一次, 某一個主題下的磁區數,對于消費該主題的同一個消費組下的消費者數量,應該小于等于該主題下的磁區數

如:某一個主題有4個磁區,那么消費組中的消費者應該小于等于4,而且最好與磁區數成整數倍 1 2 4 這樣,同一個磁區下的資料,在同一時刻,不能同一個消費組的不同消費者消費

總結:磁區數越多,同一時間可以有越多的消費者來進行消費,消費資料的速度就會越快,提高消費的性能

6. partition replicas(磁區副本)

kafka 中的磁區副本如下圖所示:

kafka 磁區副本

kafka 磁區副本

副本數(replication-factor):控制訊息保存在幾個broker(服務器)上,一般情況下副本數等于broker的個數,

一個broker服務下,不可以創建多個副本因子,創建主題時,副本因子應該小于等于可用的broker數

副本因子操作以磁區為單位的,每個磁區都有各自的主副本和從副本;

主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個磁區下的所有磁區,設定角色關系:一個leader和N個 follower),處于同步狀態的副本叫做in-sync-replicas(ISR);

follower通過拉的方式從leader同步資料, 消費者和生產者都是從leader讀寫資料,不與follower互動

副本因子的作用:讓kafka讀取資料和寫入資料時的可靠性,

副本因子是包含本身,同一個副本因子不能放在同一個broker中,

如果某一個磁區有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因為在另一臺啟動的話,存在資料傳遞,只要在機器之間有資料傳遞,就會長時間占用網路IO,kafka是一個高吞吐量的訊息系統,這個情況不允許發生)所以不會在另一個broker中啟動,

如果所有的副本都掛了,生產者如果生產資料到指定磁區的話,將寫入不成功,

lsr表示:當前可用的副本,

7. segment檔案

一個partition當中由多個segment檔案組成,每個segment檔案,包含兩部分,一個是 .log 檔案,另外一個是 .index 檔案,其中 .log 檔案包含了我們發送的資料存盤,.index 檔案,記錄的是我們.log檔案的資料索引值,以便于我們加快資料的查詢速度,

索引檔案與資料檔案的關系

既然它們是一一對應成對出現,必然有關系,索引檔案中元資料指向對應資料檔案中message的物理偏移地址,

比如索引檔案中 3,497 代表:資料檔案中的第三個message,它的偏移地址為497,

再來看資料檔案中,Message 368772表示:在全域partiton中是第368772個message,

注:segment index file 采取稀疏索引存盤方式,減少索引檔案大小,通過mmap(記憶體映射)可以直接記憶體操作,稀疏索引為資料檔案的每個對應message設定一個元資料指標,它比稠密索引節省了更多的存盤空間,但查找起來需要消耗更多的時間,

.index 與 .log 對應關系如下:

.index 與 .log

.index 與 .log

上圖左半部分是索引檔案,里面存盤的是一對一對的key-value,其中key是訊息在資料檔案(對應的log檔案)中的編號,比如“1,3,6,8……”, 分別表示在log檔案中的第1條訊息、第3條訊息、第6條訊息、第8條訊息……

那么為什么在index檔案中這些編號不是連續的呢? 這是因為index檔案中并沒有為資料檔案中的每條訊息都建立索引,而是采用了稀疏存盤的方式,每隔一定位元組的資料建立一條索引, 這樣避免了索引檔案占用過多的空間,從而可以將索引檔案保留在記憶體中, 但缺點是沒有建立索引的Message也不能一次定位到其在資料檔案的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了,

value 代表的是在全域partiton中的第幾個訊息,

以索引檔案中元資料 3,497 為例,其中3代表在右邊log資料檔案中從上到下第3個訊息, 497表示該訊息的物理偏移地址(位置)為497(也表示在全域partiton表示第497個訊息-順序寫入特性),

log日志目錄及組成 kafka在我們指定的log.dir目錄下,會創建一些檔案夾;名字是 (主題名字-磁區名) 所組成的檔案夾, 在(主題名字-磁區名)的目錄下,會有兩個檔案存在,如下所示:

#索引檔案
00000000000000000000.index
#日志內容
00000000000000000000.log

在目錄下的檔案,會根據log日志的大小進行切分,.log檔案的大小為1G的時候,就會進行切分檔案;如下:

-rw-r--r--. 1 root root 389k  1月  17  18:03   00000000000000000000.index
-rw-r--r--. 1 root root 1.0G  1月  17  18:03   00000000000000000000.log
-rw-r--r--. 1 root root  10M  1月  17  18:03   00000000000000077894.index
-rw-r--r--. 1 root root 127M  1月  17  18:03   00000000000000077894.log

在kafka的設計中,將offset值作為了檔案名的一部分,

segment檔案命名規則:partion全域的第一個segment從0開始,后續每個segment檔案名為上一個全域 partion的最大offset(偏移message數),數值最大為64位long大小,20位數字字符長度,沒有數字就用 0 填充,

通過索引資訊可以快速定位到message,通過index元資料全部映射到記憶體,可以避免segment File的IO磁盤操作;

通過索引檔案稀疏存盤,可以大幅降低index檔案元資料占用空間大小,

稀疏索引:為了資料創建索引,但范圍并不是為每一條創建,而是為某一個區間創建; 好處:就是可以減少索引值的數量, 不好的地方:找到索引區間之后,要得進行第二次處理,

8. message的物理結構

生產者發送到kafka的每條訊息,都被kafka包裝成了一個message

message 的物理結構如下圖所示:

.index 與 .log

.index 與 .log

所以生產者發送給kafka的訊息并不是直接存盤起來,而是經過kafka的包裝,每條訊息都是上圖這個結構,只有最后一個欄位才是真正生產者發送的訊息資料,

四、Kafka集群操作

1. 創建topic

創建一個名字為test的主題, 有三個磁區,有兩個副本:

bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test

2. 查看主題命令

查看kafka當中存在的主題:

bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

3. 生產者生產資料

模擬生產者來生產資料:

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

4. 消費者消費資料

執行以下命令來模擬消費者進行消費資料:

bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper node01:2181,node02:2181,node03:2181

5. 運行describe topics命令

執行以下命令運行describe查看topic的相關資訊:

bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

結果說明:

這是輸出的解釋,第一行給出了所有磁區的摘要,每個附加行提供有關一個磁區的資訊,由于我們只有一個分 區用于此主題,因此只有一行,

“leader”是負責給定磁區的所有讀取和寫入的節點,每個節點將成為隨機選擇的磁區部分的領導者,(因為在kafka中 如果有多個副本的話,就會存在leader和follower的關系,表示當前這個副本為leader所在的broker是哪一個)

“replicas”是復制此磁區日志的節點串列,無論它們是否為領導者,或者即使它們當前處于活動狀態,(所有副本串列0,1,2)

“isr”是“同步”復制品的集合,這是副本串列的子集,該串列當前處于活躍狀態并且已經被領導者捕獲,(可用的串列數)

6. 增加topic磁區數

執行以下命令可以增加topic磁區數:

bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

7. 增加配置

動態修改kakfa的配置:

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

8. 洗掉配置

動態洗掉kafka集群配置:

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

9. 洗掉topic

目前洗掉topic在默認情況下知識打上一個洗掉的標記,在重新啟動kafka后才洗掉,

如果需要立即洗掉,則需要在 server.properties中配置:

delete.topic.enable=true

然后執行以下命令進行洗掉topic:

kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName

五、Kafka的JavaAPI操作

1. 生產者代碼

使用生產者,生產資料

/**
* 訂單的生產者代碼,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、連接集群,通過組態檔的方式
* 2、發送資料-topic:order,value
*/
Properties props = new Properties(); props.put("bootstrap.servers", "node01:9092"); props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432); 
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 發送資料 ,需要一個producerRecord物件,最少引數 String topic, V value kafkaProducer.send(new ProducerRecord<String, String>("order", "訂單信
息!"+i));
Thread.sleep(100);
}
}
}

kafka當中的資料磁區:

kafka生產者發送的訊息,都是保存在broker當中,我們可以自定義磁區規則,決定訊息發送到哪個partition里面去進行保存 查看ProducerRecord這個類的原始碼,就可以看到kafka的各種不同磁區策略

kafka當中支持以下四種資料的磁區方式:

//第一種磁區策略,如果既沒有指定磁區號,也沒有指定資料key,那么就會使用輪詢的方式將資料均勻的發送到不同的磁區里面去
  //ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "mymessage" + i);
  //kafkaProducer.send(producerRecord1);
  //第二種磁區策略 如果沒有指定磁區號,指定了資料key,通過key.hashCode  % numPartitions來計算資料究竟會保存在哪一個磁區里面
  //注意:如果資料key,沒有變化   key.hashCode % numPartitions  =  固定值  所有的資料都會寫入到某一個磁區里面去
  //ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
  //kafkaProducer.send(producerRecord2);
  //第三種磁區策略:如果指定了磁區號,那么就會將資料直接寫入到對應的磁區里面去
//  ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
 // kafkaProducer.send(producerRecord3);
  //第四種磁區策略:自定義磁區策略,如果不自定義磁區規則,那么會將資料使用輪詢的方式均勻的發送到各個磁區里面去
  kafkaProducer.send(new ProducerRecord<String, String>("mypartition","mymessage"+i));

自定義磁區策略:

public class KafkaCustomPartitioner implements Partitioner {
 @Override
 public void configure(Map<String, ?> configs) {
 }

 @Override
 public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
     int partitionNum = partitions.size();
  Random random = new Random();
  int partition = random.nextInt(partitionNum);
     return partition;
 }

 @Override
 public void close() {
  
 }

}

主代碼中添加配置:

@Test
 public void kafkaProducer() throws Exception {
  //1、準備組態檔
     Properties props = new Properties();
     props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("partitioner.class", "cn.itcast.kafka.partitioner.KafkaCustomPartitioner");
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     //2、創建KafkaProducer
     KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
     for (int i=0;i<100;i++){
         //3、發送資料
         kafkaProducer.send(new ProducerRecord<String, String>("testpart","0","value"+i));
     }

  kafkaProducer.close();
 }

2. 消費者代碼

消費必要條件:

消費者要從kafka Cluster進行消費資料,必要條件有以下四個:

  1. 地址:bootstrap.servers=node01:9092

  2. 序列化:key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer

  3. 主題(topic):需要制定具體的某個topic(order)即可,

  4. 消費者組:group.id=test

1) 自動提交offset

消費完成之后,自動提交offset:

/**
* 消費訂單資料--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\連接集群
Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-01:9092"); props.put("group.id", "test");

//以下兩行代碼 ---消費者自動提交offset值 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms",  "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
//   2、發送資料 發送資料需要,訂閱下要消費的topic, order kafkaConsumer.subscribe(Arrays.asList("order")); 
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll獲取元素, blockingqueue put插入原生, take獲取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println("消費的資料為:" + record.value());
}
}
}
}

2) 手動提交offset

如果Consumer在獲取資料后,需要加入處理,資料完畢后才確認offset,需要程式來控制offset的確認,

關閉自動提交確認選項:props.put("enable.auto.commit", "false");

手動提交offset值:kafkaConsumer.commitSync();

完整代碼如下:

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("group.id", "test");
//關閉自動提交確認選項
props.put("enable.auto.commit", "false"); 
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) { 
insertIntoDb(buffer);
// 手動提交offset值
consumer.commitSync(); 
buffer.clear();
}
}

3) 消費完每個磁區之后手動提交offset

上面的示例使用commitSync將所有已接收的記錄標記為已提交,在某些情況下,可能希望通過明確指定偏移量來更好地控制已提交的記錄,在下面的示例中,我們在完成處理每個磁區中的記錄后提交偏移量:

try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally { consumer.close();}

注意事項

提交的偏移量應始終是應用程式將讀取的下一條訊息的偏移量, 因此,在呼叫commitSync(偏移量)時,應該在最后處理的訊息的偏移量中添加一個,

4) 指定磁區資料進行消費

  1. 如果行程正在維護與該磁區關聯的某種本地狀態(如本地磁盤上的鍵值存盤),那么它應該只獲取它在磁盤上維護的磁區的記錄,

  2. 如果行程本身具有高可用性,并且如果失敗則將重新啟動(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作為流處理框架的一部分),在這種情況下,Kafka不需要檢測故障并重新分配磁區,因為消耗程序將在另一臺機器上重新啟動,

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); 
props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000"); 
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList("foo",  "bar"));

//手動指定消費指定磁區的資料---start 
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0); 
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0,  partition1));
//手動指定消費指定磁區的資料---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); 
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

注意事項

  1. 要使用此模式,只需使用要使用的磁區的完整串列呼叫assign(Collection),而不是使用subscribe訂閱主題,

  2. 主題與磁區訂閱只能二選一,

5) 重復消費與資料丟失

說明:

  1. 已經消費的資料對于kafka來說,會將消費組里面的o?set值進行修改,那什么時候進行修改了?是在資料消費 完成之后,比如在控制臺列印完后自動提交;

  2. 提交程序:是通過kafka將o?set進行移動到下個message所處的o?set的位置,

  3. 拿到資料后,存盤到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出例外,如果在處理資料的時候已經進行了提交,那么kafka傷的o?set值已經進行了修改了,但是hbase或者mysql中沒有資料,這個時候就會出現資料丟失,

4.什么時候提交o?set值?在Consumer將資料處理完成之后,再來進行o?set的修改提交,默認情況下o?set是 自動提交,需要修改為手動提交o?set值,

  1. 如果在處理代碼中正常處理了,但是在提交o?set請求的時候,沒有連接到kafka或者出現了故障,那么該次修 改o?set的請求是失敗的,那么下次在進行讀取同一個磁區中的資料時,會從已經處理掉的o?set值再進行處理一 次,那么在hbase中或者mysql中就會產生兩條一樣的資料,也就是資料重復,

6) consumer消費者消費資料流程

流程描述

Consumer連接指定的Topic partition所在leader broker,采用pull方式從kafkalogs中獲取訊息,對于不同的消費模式,會將offset保存在不同的地方 官網關于high level API 以及low level API的簡介: http://kafka.apache.org/0100/documentation.html#impl_consumer

高階API(High Level API)

kafka消費者高階API簡單;隱藏Consumer與Broker細節;相關資訊保存在zookeeper中:

/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()
}

說明:大部分的操作都已經封裝好了,比如:當前消費到哪個位置下了,但是不夠靈活(作業程序推薦使用)

低級API(Low Level API):

kafka消費者低級API非常靈活;需要自己負責維護連接Controller Broker,保存offset,Consumer Partition對應關系:

class SimpleConsumer {

/* Send fetch request to a broker and get back a set of messages. */ 
public ByteBufferMessageSet fetch(FetchRequest request);

/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);

/**

Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest

available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);

* offset
*/

說明:沒有進行包裝,所有的操作有用戶決定,如自己的保存某一個磁區下的記錄,你當前消費到哪個位置,

3. kafka Streams API開發

需求:使用StreamAPI獲取test這個topic當中的資料,然后將資料全部轉為大寫,寫入到test2這個topic當中去,

第一步:創建一個topic

node01服務器使用以下命令來常見一個 topic 名稱為test2:

bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

第二步:開發StreamAPI

public class StreamAPI {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }
}

執行上述代碼,監聽獲取 test 中的資料,然后轉成大寫,將結果寫入 test2

第三步:生產資料

node01執行以下命令,向test這個topic當中生產資料:

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

第四步:消費資料

node02執行一下命令消費test2這個topic當中的資料:

bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

六、Kafka中的資料不丟失機制

1. 生產者生產資料不丟失

發送訊息方式

生產者發送給kafka資料,可以采用同步方式異步方式

同步方式

發送一批資料給kafka后,等待kafka回傳結果:

  1. 生產者等待10s,如果broker沒有給出ack回應,就認為失敗,

  2. 生產者重試3次,如果還沒有回應,就報錯.

異步方式

發送一批資料給kafka,只是提供一個回呼函式:

  1. 先將資料保存在生產者端的buffer中,buffer大小是2萬條 ,

  2. 滿足資料閾值或者數量閾值其中的一個條件就可以發送資料,

  3. 發送一批資料的大小是500條,

注:如果broker遲遲不給ack,而buffer又滿了,開發者可以設定是否直接清空buffer中的資料,

ack機制(確認機制)

生產者資料發送出去,需要服務端回傳一個確認碼,即ack回應碼;ack的回應有三個狀態值0,1,-1

0:生產者只負責發送資料,不關心資料是否丟失,丟失的資料,需要再次發送

1:partition的leader收到資料,不管follow是否同步完資料,回應的狀態碼為1

-1:所有的從節點都收到資料,回應的狀態碼為-1

如果broker端一直不回傳ack狀態,producer永遠不知道是否成功;producer可以設定一個超時時間10s,超過時間認為失敗,

2. broker中資料不丟失

在broker中,保證資料不丟失主要是通過副本因子(冗余),防止資料丟失,

3. 消費者消費資料不丟失

在消費者消費資料的時候,只要每個消費者記錄好offset值即可,就能保證資料不丟失,也就是需要我們自己維護偏移量(offset),可保存在 Redis 中,

文章首發于公眾號:五分鐘學大資料,深度鉆研大資料技術

七、Kafka組態檔說明

Server.properties組態檔說明

#broker的全域唯一編號,不能重復
broker.id=0

#用來監聽鏈接的埠,producer或consumer將在此埠建立連接
port=9092

#處理網路請求的執行緒數量
num.network.threads=3

#用來處理磁盤IO的執行緒數量
num.io.threads=8

#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400

#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩沖區大小
socket.request.max.bytes=104857600

#kafka運行日志存放的路徑
log.dirs=/export/data/kafka/

#topic在當前broker上的分片個數
num.partitions=2

#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,超時將被洗掉
log.retention.hours=168

#滾動生成新的segment檔案的最大時間
log.roll.hours=1

#日志檔案中每個segment的大小,默認為1G
log.segment.bytes=1073741824

#周期性檢查檔案大小的時間
log.retention.check.interval.ms=300000

#日志清理是否打開
log.cleaner.enable=true

#broker需要使用zookeeper保存meta資料
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000

#partion buffer中,訊息的條數達到閾值,將觸發flush到磁盤
log.flush.interval.messages=10000

#訊息buffer的時間,達到閾值,將觸發flush到磁盤
log.flush.interval.ms=3000

#洗掉topic需要server.properties中設定delete.topic.enable=true否則只是標記洗掉
delete.topic.enable=true

#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01

advertised.host.name=192.168.140.128

producer生產者組態檔說明
#指定kafka節點串列,用于獲取metadata,不必全部指定
metadata.broker.list=node01:9092,node02:9092,node03:9092
# 指定磁區處理類,默認kafka.producer.DefaultPartitioner,表通過key哈希到對應磁區
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮,壓縮后訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定,
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮訊息,這里指定哪些topic要壓縮訊息,默認empty,表示不壓縮,
#compressed.topics=

# 設定發送資料是否需要服務端的反饋,有三個值0,1,-1
# 0: producer不會等待broker發送ack 
# 1: 當leader接收到訊息之后發送ack 
# -1: 當所有的follower都同步訊息成功后發送ack. 
request.required.acks=0 

# 在向producer發送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer發送一個error ACK.意味著上一次訊息因為某種原因未能成功(比如follower未能同步成功) 
request.timeout.ms=10000

# 同步還是異步發送訊息,默認“sync”表同步,"async"表異步,異步可以提高發送吞吐量,
也意味著訊息將會在本地buffer中,并適時批量發送,但是也可能導致丟失未發送過去的訊息
producer.type=sync

# 在async模式下,當message被快取的時間超過此值后,將會批量發送給broker,默認為5000ms
# 此值和batch.num.messages協同作業.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允許buffer的最大訊息量
# 無論如何,producer都無法盡快的將訊息發送給broker,從而導致訊息在producer端大量沉積
# 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,默認為10000
queue.buffering.max.messages=20000

# 如果是異步,指定每次批量發送資料量,默認為200
batch.num.messages=500

# 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"后 
# 阻塞一定時間后,佇列仍然沒有enqueue(producer仍然沒有發送出任何訊息) 
# 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用于控制"阻塞"的時間 
# -1: 無阻塞超時限制,訊息不會被拋棄 
# 0:立即清空佇列,訊息被拋棄 
queue.enqueue.timeout.ms=-1


# 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數 
# 因為broker并沒有完整的機制來避免訊息重復,所以當網路例外時(比如ACK丟失) 
# 有可能導致broker接收到重復的訊息,默認值為3.
message.send.max.retries=3

# producer重繪topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 
# 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重繪 
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重繪機制,默認值600000 
topic.metadata.refresh.interval.ms=60000

consumer消費者配置詳細說明:

# zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消費者更新offset到zookeeper中,注意offset更新時基于time而不是每次獲得的訊息,一旦在更新zookeeper發生例外并重啟,將可能拿到已拿到過的訊息
zookeeper.sync.time.ms=2000
#指定消費 
group.id=itcast
# 當consumer消費一定量的訊息之后,將會自動向zookeeper提交offset資訊 
# 注意offset資訊并不是每消費一次訊息就向zk提交一次,而是現在本地保存(記憶體),并定期提交,默認為true
auto.commit.enable=true
# 自動更新時間,默認60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便于觀察
conusmer.id=xxx 
# 消費者客戶端編號,用于區分不同客戶端,默認客戶端程式自動產生
client.id=xxxx
# 最大取多少塊快取到消費者(默認10)
queued.max.message.chunks=50
# 當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新  的consumer上,如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊 "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用于控制,注冊節點的重試次數. 
rebalance.max.retries=5

# 獲取訊息的最大尺寸,broker不會像consumer輸出大于此值的訊息chunk 每次feth將得到多條訊息,此值為總大小,提升此值,將會消耗更多的consumer端記憶體
fetch.min.bytes=6553600

# 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即發送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出范圍,那么就給個初始的offset,有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋例外,默認largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

八、CAP理論

1. 分布式系統當中的CAP理論

分布式系統(distributed system)正變得越來越重要,大型網站幾乎都是分布式的,

分布式系統的最大難點,就是各個節點的狀態如何同步,

為了解決各個節點之間的狀態同步問題,在1998年,由加州大學的計算機科學家 Eric Brewer 提出分布式系統的三個指標,分別是:

  • Consistency:一致性

  • Availability:可用性

  • Partition tolerance:磁區容錯性

Eric Brewer 說,這三個指標不可能同時做到,最多只能同時滿足其中兩個條件,這個結論就叫做 CAP 定理,

CAP理論是指:分布式系統中,一致性、可用性和磁區容忍性最多只能同時滿足兩個

一致性:Consistency

  • 通過某個節點的寫操作結果對后面通過其它節點的讀操作可見

  • 如果更新資料后,并發訪問情況下后續讀操作可立即感知該更新,稱為強一致性

  • 如果允許之后部分或者全部感知不到該更新,稱為弱一致性

  • 若在之后的一段時間(通常該時間不固定)后,一定可以感知到該更新,稱為最終一致性

可用性:Availability

  • 任何一個沒有發生故障的節點必須在有限的時間內回傳合理的結果

磁區容錯性:Partition tolerance

  • 部分節點宕機或者無法與其它節點通信時,各磁區間還可保持分布式系統的功能

一般而言,都要求保證磁區容忍性,所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡,

2. Partition tolerance

先看 Partition tolerance,中文叫做"磁區容錯",

大多數分布式系統都分布在多個子網路,每個子網路就叫做一個區(partition),磁區容錯的意思是,區間通信可能失敗,比如,一臺服務器放在中國,另一臺服務器放在美國,這就是兩個區,它們之間可能無法通信,

上圖中,G1 和 G2 是兩臺跨區的服務器,G1 向 G2 發送一條訊息,G2 可能無法收到,系統設計的時候,必須考慮到這種情況,

一般來說,磁區容錯無法避免,因此可以認為 CAP 的 P 總是存在的,即永遠可能存在磁區容錯這個問題

3. Consistency

Consistency 中文叫做"一致性",意思是,寫操作之后的讀操作,必須回傳該值,舉例來說,某條記錄是 v0,用戶向 G1 發起一個寫操作,將其改為 v1,

接下來,用戶的讀操作就會得到 v1,這就叫一致性,

問題是,用戶有可能向 G2 發起讀操作,由于 G2 的值沒有發生變化,因此回傳的是 v0,G1 和 G2 讀操作的結果不一致,這就不滿足一致性了,

為了讓 G2 也能變為 v1,就要在 G1 寫操作的時候,讓 G1 向 G2 發送一條訊息,要求 G2 也改成 v1,

這樣的話,用戶向 G2 發起讀操作,也能得到 v1,

4. Availability

Availability 中文叫做"可用性",意思是只要收到用戶的請求,服務器就必須給出回應, 用戶可以選擇向 G1 或 G2 發起讀操作,不管是哪臺服務器,只要收到請求,就必須告訴用戶,到底是 v0 還是 v1,否則就不滿足可用性,

九、Kafka中的CAP機制

kafka是一個分布式的訊息佇列系統,既然是一個分布式的系統,那么就一定滿足CAP定律,那么在kafka當中是如何遵循CAP定律的呢?kafka滿足CAP定律當中的哪兩個呢?

kafka滿足的是CAP定律當中的CA,其中Partition tolerance通過的是一定的機制盡量的保證磁區容錯性

其中C表示的是資料一致性,A表示資料可用性

kafka首先將資料寫入到不同的磁區里面去,每個磁區又可能有好多個副本,資料首先寫入到leader磁區里面去,讀寫的操作都是與leader磁區進行通信,保證了資料的一致性原則,也就是滿足了Consistency原則,然后kafka通過磁區副本機制,來保證了kafka當中資料的可用性,但是也存在另外一個問題,就是副本磁區當中的資料與leader當中的資料存在差別的問題如何解決,這個就是Partition tolerance的問題,

kafka為了解決Partition tolerance的問題,使用了ISR的同步策略,來盡最大可能減少Partition tolerance的問題

每個leader會維護一個ISR(a set of in-sync replicas,基本同步)串列,

ISR串列主要的作用就是決定哪些副本磁區是可用的,也就是說可以將leader磁區里面的資料同步到副本磁區里面去,決定一個副本磁區是否可用的條件有兩個:

  • replica.lag.time.max.ms=10000 副本磁區與主磁區心跳時間延遲

  • replica.lag.max.messages=4000 副本磁區與主磁區訊息同步最大差

produce 請求被認為完成時的確認值:request.required.acks=0

  • ack=0:producer不等待broker同步完成的確認,繼續發送下一條(批)資訊,

  • ack=1(默認):producer要等待leader成功收到資料并得到確認,才發送下一條message,

  • ack=-1:producer得到follwer確認,才發送下一條資料,

十、Kafka監控及運維

在開發作業中,消費在Kafka集群中訊息,資料變化是我們關注的問題,當業務前提不復雜時,我們可以使用Kafka 命令提供帶有Zookeeper客戶端工具的工具,可以輕松完成我們的作業,隨著業務的復雜性,增加Group和 Topic,那么我們使用Kafka提供命令工具,已經感到無能為力,那么Kafka監控系統目前尤為重要,我們需要觀察 消費者應用的細節,

1. kafka-eagle概述

為了簡化開發者和服務工程師維護Kafka集群的作業有一個監控管理工具,叫做 Kafka-eagle,這個管理工具可以很容易地發現分布在集群中的哪些topic分布不均勻,或者是磁區在整個集群分布不均勻的的情況,它支持管理多個集群、選擇副本、副本重新分配以及創建Topic,同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,

2. 環境和安裝

1. 環境要求

需要安裝jdk,啟動zk以及kafka的服務

2. 安裝步驟

  1. 下載原始碼包

kafka-eagle官網: http://download.kafka-eagle.org/

我們可以從官網上面直接下載最細的安裝包即可kafka-eagle-bin-1.3.2.tar.gz這個版本即可

代碼托管地址:

https://github.com/smartloli/kafka-eagle/releases

  1. 解壓

這里我們選擇將kafak-eagle安裝在第三臺,

直接將kafka-eagle安裝包上傳到node03服務器的/export/softwares路徑下,然后進行解壓 node03服務器執行一下命令進行解壓,

  1. 準備資料庫

kafka-eagle需要使用一個資料庫來保存一些元資料資訊,我們這里直接使用msyql資料庫來保存即可,在node03服務器執行以下命令創建一個mysql資料庫即可,

進入mysql客戶端:

create database eagle;
  1. 修改kafak-eagle組態檔

執行以下命令修改kafak-eagle組態檔:

vim system-config.properties

修改為如下:

kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster2.zk.list=node01:2181,node02:2181,node03:2181

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456
  1. 配置環境變數

kafka-eagle必須配置環境變數,node03服務器執行以下命令來進行配置環境變數: vim /etc/profile

export KE_HOME=/opt//kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH

修改立即生效,執行: source /etc/profile

  1. 啟動kafka-eagle

執行以下界面啟動kafka-eagle:

cd kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start
  1. 主界面

訪問kafka-eagle

http://node03:8048/ke/account/signin?/ke/

用戶名:admin

密碼:123456

十一、Kafka大廠面試題

1. 為什么要使用 kafka?

  1. 緩沖和削峰:上游資料時有突發流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把訊息暫存在kafka中,下游服務就可以按照自己的節奏進行慢慢處理,

  2. 解耦和擴展性:專案開始的時候,并不能確定具體需求,訊息佇列可以作為一個介面層,解耦重要的業務流程,只需要遵守約定,針對資料編程即可獲取擴展能力,

  3. 冗余:可以采用一對多的方式,一個生產者發布訊息,可以被多個訂閱topic的服務消費到,供多個毫無關聯的業務使用,

  4. 健壯性:訊息佇列可以堆積請求,所以消費端業務即使短時間死掉,也不會影響主要業務的正常進行,

  5. 異步通信:很多時候,用戶不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,

2. Kafka消費過的訊息如何再消費?

kafka消費訊息的offset是定義在zookeeper中的, 如果想重復消費kafka的訊息,可以在redis中自己記錄offset的checkpoint點(n個),當想重復消費訊息時,通過讀取redis中的checkpoint點進行zookeeper的offset重設,這樣就可以達到重復消費訊息的目的了

3. kafka的資料是放在磁盤上還是記憶體上,為什么速度會快?

kafka使用的是磁盤存盤,

速度快是因為:

  1. 順序寫入:因為硬碟是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是耗時的,所以硬碟 “討厭”隨機I/O, 喜歡順序I/O,為了提高讀寫硬碟的速度,Kafka就是使用順序I/O,

  2. Memory Mapped Files(記憶體映射檔案):64位作業系統中一般可以表示20G的資料檔案,它的作業原理是直接利用作業系統的Page來實作檔案到物理記憶體的直接映射,完成映射之后你對物理記憶體的操作會被同步到硬碟上,

  3. Kafka高效檔案存盤設計: Kafka把topic中一個parition大檔案分成多個小檔案段,通過多個小檔案段,就容易定期清除或洗掉已經消費完檔案,減少磁盤占用,通過索引資訊可以快速定位 message和確定response的 大 小,通過index元資料全部映射到memory(記憶體映射檔案), 可以避免segment file的IO磁盤操作,通過索引檔案稀疏存盤,可以大幅降低index檔案元資料占用空間大小,

  1. Kafka解決查詢效率的手段之一是將資料檔案分段,比如有100條Message,它們的offset是從0到99,假設將資料檔案分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的資料檔案里面,資料檔案以該段中 小的offset命名,這樣在查找指定offset的 Message的時候,用二分查找就可以定位到該Message在哪個段中,

  2. 為資料檔案建 索引資料檔案分段 使得可以在一個較小的資料檔案中查找對應offset的Message 了,但是這依然需要順序掃描才能找到對應offset的Message, 為了進一步提高查找的效率,Kafka為每個分段后的資料檔案建立了索引檔案,檔案名與資料檔案的名字是一樣的,只是檔案擴展名為.index,

4. Kafka資料怎么保障不丟失?

分三個點說,一個是生產者端,一個消費者端,一個broker端,

  1. 生產者資料的不丟失

kafka的ack機制:在kafka發送資料的時候,每次發送訊息都會有一個確認反饋機制,確保訊息正常的能夠被收到,其中狀態有0,1,-1,

如果是同步模式:
ack設定為0,風險很大,一般不建議設定為0,即使設定為1,也會隨著leader宕機丟失資料,所以如果要嚴格保證生產端資料不丟失,可設定為-1,

如果是異步模式:
也會考慮ack的狀態,除此之外,異步模式下的有個buffer,通過buffer來進行控制資料的發送,有兩個值來進行控制,時間閾值與訊息的數量閾值,如果buffer滿了資料還沒有發送出去,有個選項是配置是否立即清空buffer,可以設定為-1,永久阻塞,也就資料不再生產,異步模式下,即使設定為-1,也可能因為程式員的不科學操作,操作資料丟失,比如kill -9,但這是特別的例外情況,

注:
ack=0:producer不等待broker同步完成的確認,繼續發送下一條(批)資訊,
ack=1(默認):producer要等待leader成功收到資料并得到確認,才發送下一條message,
ack=-1:producer得到follwer確認,才發送下一條資料,

  1. 消費者資料的不丟失

通過offset commit 來保證資料的不丟失,kafka自己記錄了每次消費的offset數值,下次繼續消費的時候,會接著上次的offset進行消費,

而offset的資訊在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消費者在運行程序中掛掉了,再次啟動的時候會找到offset的值,找到之前消費訊息的位置,接著消費,由于 offset 的資訊寫入的時候并不是每條訊息消費完成后都寫入的,所以這種情況有可能會造成重復消費,但是不會丟失訊息,

唯一例外的情況是,我們在程式中給原本做不同功能的兩個consumer組設定 KafkaSpoutConfig.bulider.setGroupid的時候設定成了一樣的groupid,這種情況會導致這兩個組共享同一份資料,就會產生組A消費partition1,partition2中的訊息,組B消費partition3的訊息,這樣每個組消費的訊息都會丟失,都是不完整的, 為了保證每個組都獨享一份訊息資料,groupid一定不要重復才行,

  1. kafka集群中的broker的資料不丟失

每個broker中的partition我們一般都會設定有replication(副本)的個數,生產者寫入的時候首先根據分發策略(有partition按partition,有key按key,都沒有輪詢)寫入到leader中,follower(副本)再跟leader同步資料,這樣有了備份,也可以保證訊息資料的不丟失,

5. 采集資料為什么選擇kafka?

采集層 主要可以使用Flume, Kafka等技術,

Flume:Flume 是管道流方式,提供了很多的默認實作,讓用戶通過參數部署,及擴展API.

Kafka:Kafka是一個可持久化的分布式的訊息佇列, Kafka 是一個非常通用的系統,你可以有許多生產者和很多的消費者共享多個主題Topics,

相比之下,Flume是一個專用工具被設計為旨在往HDFS,HBase發送資料,它對HDFS有特殊的優化,并且集成了Hadoop的安全特性,

所以,Cloudera 建議如果資料被多個系統消費的話,使用kafka;如果資料被設計給Hadoop使用,使用Flume,

6. kafka 重啟是否會導致資料丟失?

  1. kafka是將資料寫到磁盤的,一般資料不會丟失,

  2. 但是在重啟kafka程序中,如果有消費者消費訊息,那么kafka如果來不及提交offset,可能會造成資料的不準確(丟失或者重復消費),

7. kafka 宕機了如何解決?

  1. 先考慮業務是否受到影響

kafka 宕機了,首先我們考慮的問題應該是所提供的服務是否因為宕機的機器而受到影響,如果服務提供沒問題,如果實作做好了集群的容災機制,那么這塊就不用擔心了,

  1. 節點排錯與恢復

想要恢復集群的節點,主要的步驟就是通過日志分析來查看節點宕機的原因,從而解決,重新恢復節點,

8. 為什么Kafka不支持讀寫分離?

在 Kafka 中,生產者寫入訊息、消費者讀取訊息的操作都是與 leader 副本進行互動的,從 而實作的是一種主寫主讀的生產消費模型, Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明顯的缺點:

  1. 資料一致性問題:資料從主節點轉到從節點必然會有一個延時的時間視窗,這個時間 視窗會導致主從節點之間的資料不一致,某一時刻,在主節點和從節點中 A 資料的值都為 X, 之后將主節點中 A 的值修改為 Y,那么在這個變更通知到從節點之前,應用讀取從節點中的 A 資料的值并不為最新的 Y,由此便產生了資料不一致的問題,

  2. 延時問題:類似 Redis 這種組件,資料從寫入主節點到同步至從節點中的程序需要經歷 網路→主節點記憶體→網路→從節點記憶體 這幾個階段,整個程序會耗費一定的時間,而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經歷 網路→主節點記憶體→主節點磁盤→網路→從節 點記憶體→從節點磁盤 這幾個階段,對延時敏感的應用而言,主寫從讀的功能并不太適用,

而kafka的主寫主讀的優點就很多了:

  1. 可以簡化代碼的實作邏輯,減少出錯的可能;

  2. 將負載粒度細化均攤,與主寫從讀相比,不僅負載效能更好,而且對用戶可控;

  3. 沒有延時的影響;

  4. 在副本穩定的情況下,不會出現資料不一致的情況,

9. kafka資料磁區和消費者的關系?

每個磁區只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組的消費者來消費,同組的消費者則起到并發的效果,

10. kafka的資料offset讀取流程

  1. 連接ZK集群,從ZK中拿到對應topic的partition資訊和partition的Leader的相關資訊

  2. 連接到對應Leader對應的broker

  3. consumer將?自?己保存的offset發送給Leader

  4. Leader根據offset等資訊定位到segment(索引?檔案和?日志?檔案)

  5. 根據索引?檔案中的內容,定位到?日志?檔案中該偏移量量對應的開始位置讀取相應?長度的資料并回傳給consumer

11. kafka內部如何保證順序,結合外部組件如何保證消費者的順序?

kafka只能保證partition內是有序的,但是partition間的有序是沒辦法的,愛奇藝的搜索架構,是從業務上把需要有序的打到同?個partition,

12. Kafka訊息資料積壓,Kafka消費能力不足怎么處理?

  1. 如果是Kafka消費能力不足,則可以考慮增加Topic的磁區數,并且同時提升消費組的消費者數量,消費者數=磁區數,(兩者缺一不可)

  2. 如果是下游的資料處理不及時:提高每批次拉取的數量,批次拉取資料過少(拉取資料/處理時間<生產速度),使處理的資料小于生產的資料,也會造成資料積壓,

13. Kafka單條日志傳輸大小

kafka對于訊息體的大小默認為單條最大值是1M但是在我們應用場景中, 常常會出現一條訊息大于1M,如果不對kafka進行配置,則會出現生產者無法將訊息推送到kafka或消費者無法去消費kafka里面的資料, 這時我們就要對kafka進行以下配置:server.properties

replica.fetch.max.bytes: 1048576  broker可復制的訊息的最大位元組數, 默認為1M
message.max.bytes: 1000012   kafka 會接收單個訊息size的最大限制, 默認為1M左右

注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導致replica之間資料同步失敗,


最后

第一時間獲取最新大資料技術,盡在公眾號:五分鐘學大資料

搜索公眾號:五分鐘學大資料,學更多大資料技術!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/339086.html

標籤:其他

上一篇:Hadoop安裝MySQL、Hive以及Sqoop(步驟圖文超詳細版)

下一篇:安裝使用反編譯工具ILSPY

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more