Flink使用KafkaSource從Kafka訊息佇列中讀取資料
使用KafkaSource從Kafka訊息佇列中讀取資料
1.KafkaSource創建的DataStream是一個并行的DataStream
2.KafkaSource創建的DataStream是一個無限的資料流
使用步驟:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
1.匯入依賴:
org.apache.flink
flink-connector-kafka_2.11
1.13.2
2.new FlinkKafkaConsumer
3.呼叫env的addSource傳入FlinkKafkaConsumer實體
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* 使用KafkaSource從Kafka訊息佇列中讀取資料
* 1.KafkaSource創建的DataStream是一個并行的DataStream
* 2.KafkaSource創建的DataStream是一個無限的資料流
*
* 使用步驟:
* https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
*
* 1.匯入依賴:
* <dependency>
* <groupId>org.apache.flink</groupId>
* <artifactId>flink-connector-kafka_2.11</artifactId>
* <version>1.13.2</version>
* </dependency>
*
* 2.new FlinkKafkaConsumer
*
* 3.呼叫env的addSource傳入FlinkKafkaConsumer實體
*
*
*
*/
public class KafkaSource {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//設定Evn的并行度
env.setParallelism(2);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
properties.setProperty("group.id", "test777");
properties.setProperty("auto.offset.reset", "earliest"); //如果沒有記錄歷史偏移量就從頭讀
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(
"worldcount",
new SimpleStringSchema(),
properties
);
//呼叫env的addSource方法創建DataStream
DataStreamSource<String> lines = env.addSource(flinkKafkaConsumer);
System.out.println("kafkaSource的并行度:" + lines.getParallelism());
lines.print();
env.execute();
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/401585.html
標籤:其他
下一篇:Flink 用戶電商行為分析專案
