1. 前言
官方對于pyflink的介紹太少了,
最近有用pyflink處理資料,最終需要sink到redis中,研究有果,分享于人,以此為例,拋磚引玉,
簡單介紹一下如何為pyflink開發jar包,
2. 分析
我們的需要是在python中能夠加入sink,在python中這樣呼叫,
data_stream.add_sink(RedisSink("localhost", 6379, "result"))
我們觀察到add_sink接收的引數是一個SinkFunction,這是一個java類,所以我們要用java來實作這個類,然后再在python中引入,給到add_sink方法呼叫,
在java版本的Flink中,有一個已經用RichSinkFunction實作的RedisSink,顯而易見RichSinkFunction是SinkFunction的子類,構建java不是本文的重點,感興趣的可以參考如下這個包的源代碼,
package org.apache.flink.streaming.connectors.redis;
java實際要做的是自身的完善,并回傳python需要的類,
3. java代碼
package simple_redis_sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SimpleRedisSink {
public RedisSink redisSink;
// 處理資料,我這里是用的redis的hset,所以需要name, key, value三個引數,其中name我是當作類別(表名)來用的,
public static final class RedisSinkExample implements RedisMapper<Tuple2<String, String>> {
public String name;
public RedisSinkExample(String name) {
this.name = name;
}
# SinkRedis初始化時呼叫,getCommandDescription,設定插入redis的方式,
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, this.name);
}
// data是一個(key, value)的Tuple,invoke方法中取出key
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
// data是一個(key, value)的Tuple,invoke方法中取出value
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
// 初始化,這里將會是python呼叫的入口,host、port、name是python傳過來的
public SimpleRedisSink(String host, int port, String name) {
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(host).setPort(port).build();
this.redisSink = new RedisSink(conf, new SimpleRedisSink.RedisSinkExample(name));
}
// 回傳RedisSink類
public RedisSink get() {
return this.redisSink;
}
}
4. 編譯生成jar
Project structure->Artifacts->+->JAR->From modules with dependencies...- 在彈出的
Create JAR from Modules對話框中,選擇extact to the target JAR一項, - 在
output layout中點擊根部,檢查一下Manifest File和Class Path是否配置正確,


Artifacts配置完成后,在主選單選擇Build->Build Artifacts...,彈出對話框中選擇Build選項即可,

5. pyflink中使用jar包
首先需要在在env中加載jar包,
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///Users/bitekong/PycharmProjects/jyyc_dp_stream/simple_redis_sink.jar")
在python中撰寫py4j類,打通python與java的方法呼叫,
class RedisSink(SinkFunction):
def __init__(self, host, port, name):
gateway = get_gateway()
j_redis_sink = gateway.jvm.simple_redis_sink.SimpleRedisSink(host, port, name).get()
super(RedisSink, self).__init__(sink_func=j_redis_sink)
接下來就可以在data_stream中將剛剛寫好的RedisSink類加入sink啦:
data_stream.add_sink(RedisSink("localhost", 6379, "result"))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/296595.html
標籤:python
