集群規劃(辛苦我的小本本了,拖8臺centos):
flink采用on yarn模式,機器資源有限,ck只裝了單節點
| 域名 | IP | 安裝的軟體 | 運行的行程 |
| zcx1 | 192.168.220.128 | hadoop flink kafka | NameNode DFSZKFailoverController(zkfc) JobManager TaskManager |
| zcx2 | 192.168.220.129 | hadoop flink | NameNode DFSZKFailoverController(zkfc) JobManager TaskManager |
| zcx3 | 192.168.220.130 | hadoop flink | ResourceManager TaskManager |
| zcx4 | 192.168.220.131 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx5 | 192.168.220.132 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx6 | 192.168.220.133 | hadoop zookeeper kafka | DataNode NodeManager JournalNode QuorumPeerMain |
| zcx7 | 192.168.220.134 | hadoop | ResourceManager |
| ck3 | 192.168.220.142 | clickhoue | clickhouse-server |
flink主要代碼
public class Kafka_To_Flink_To_Clickhouse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","zcx4:9092");
FlinkKafkaConsumer<String> stringFlinkKafkaConsumer = new FlinkKafkaConsumer<String>("zcx1",new SimpleStringSchema(),properties);
stringFlinkKafkaConsumer.setStartFromEarliest();
DataStreamSource<String> topic = env.addSource(stringFlinkKafkaConsumer);
SingleOutputStreamOperator<String> map = topic.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s;
}
});
tenv.registerDataStream("zcx1",map,"name");
Table result = tenv.sqlQuery("select name from zcx1");
DataStream<Zcx1> rowDataStream = tenv.toDataStream(result,Zcx1.class);
rowDataStream.print();
rowDataStream.addSink(new MyClickhouseUtil());
env.execute();
}
}
public class MyClickhouseUtil extends RichSinkFunction<Zcx1> {
Connection connection;
PreparedStatement preparedStatement;
@Override
public void invoke(Zcx1 value, Context context) throws Exception {
preparedStatement.setString(1,value.name);
// preparedStatement.setInt(2,value.num);
preparedStatement.execute();
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
connection = DriverManager.getConnection("jdbc:clickhouse://192.168.220.142:8123/default","default","GsAdBi/O");
preparedStatement = connection.prepareStatement("insert into zcx1 values(?)");
}
@Override
public void close() throws Exception {
if(null!=connection){
connection.close();
}
if(null!=preparedStatement){
preparedStatement.close();
}
}
}
測驗
kafka producer生產資料

實時寫入clickhouse

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423224.html
標籤:其他
