main:
package com;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.process.MyProcessFunction;
import com.sink.LogSink;
import com.source.DataSourceGet;
import com.utils.PropertiesUtils;
/**
* 啟動類
*
* @author duhai
* @date 2021年10月8日
*/
public class IotMain {
/**
* ****注意:<br/>
* env.execute之前和之后是兩套環境,創建的類不共享<br/>
* 比如下面的main方法中的PropertiesUtils.initPropertiesByKey(argsObject.getString("prop.path"));<br/>
* 初始化了PropertiesUtils,但是在flink 的流程中還需要再初始化一次,否則無法使用,會報錯 <br/>
*
* execute之前的類有:<br/>
* DataSourceGet和DataSourceGet里面的KafkaProduceUtils能使用main方法中初始化的PropertiesUtils<br/>
*
* execute之后的類有:<br/>
* FibonacciSource、MyProcessFunction、LogSink,和里面創建的類,<br/>
* 這個三個類和里面的類需要額外的初始化一次PropertiesUtils,即使用的是FibonacciSource里面初始化的KafkaProduceUtils
*
* ------同理:<br/>
* env.execute之前的log訊息也不會列印在Flink Dashboard的日志中<br/>
*
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception {
// 0.獲取環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 檢查點 every 5000 msecs
env.enableCheckpointing(5000);
// 1.獲取引數
final JSONObject argsObject = getFormArgs(args);
PropertiesUtils.initPropertiesByKey(argsObject.getString("prop.path"));
System.out.println(argsObject.toJSONString());
//
DataStreamSource<Tuple2<Integer, Integer>> dataStreamSource = env
.addSource(new DataSourceGet().getFibonacciSource(argsObject));
SingleOutputStreamOperator<String> process = dataStreamSource.process(new MyProcessFunction());
process.addSink(new LogSink());
env.execute("iot test");
}
/**
* 拆解獲取到引數:<br/>
* flink-1.13.2/bin/flink run -d
* /data/flink/demo/iot-rule-engine-criterion-1.0.0-SNAPSHOT.jar
* prop.path=/data/flink/demo <br/>
* 中獲取到prop.path
*/
private static JSONObject getFormArgs(final String[] args) {
// 引數map
JSONObject argsObject = new JSONObject();
// 回圈引數和值
for (int i = 0; i < args.length; i++) {
String[] args_is = args[i].split("=");
if (args_is.length == 2) {
argsObject.put(args_is[0], args_is[1]);
}
}
return argsObject;
}
}
DataSourceGet:
package com.source;
import com.alibaba.fastjson.JSONObject;
import com.utils.KafkaProduceUtils;
public class DataSourceGet {
private KafkaProduceUtils kafkaProduceUtils = new KafkaProduceUtils();
public FibonacciSource getFibonacciSource(final JSONObject argsObject) {
//
kafkaProduceUtils.producer("dh_test_dh", "123");
//
FibonacciSource fibonacciSource = new FibonacciSource(argsObject);
return fibonacciSource;
}
}
MyProcessFunction:
package com.process;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
import com.utils.JedisUtil;
/**
* 流程處理函式
*
* @author duhai
* @date 2021年10月8日
*/
public class MyProcessFunction extends ProcessFunction<Tuple2<Integer, Integer>, String> {
private static final long serialVersionUID = 665249935064432746L;
private static Logger logger = Logger.getLogger(MyProcessFunction.class);
private JedisUtil jedisUtil = new JedisUtil();
@Override
public void processElement(final Tuple2<Integer, Integer> data, //
final ProcessFunction<Tuple2<Integer, Integer>, String>.Context context, //
final Collector<String> out) throws Exception {
final Integer key = data.f0;
final Integer value = data.f1;
// 從redis獲取規則
String redisValue = jedisUtil.hget("afcs", "event_47_post");
if (logger.isInfoEnabled()) {
logger.info("##### redisValue為:" + redisValue + "#####");
}
out.collect("key:" + key + ",value:" + value);
}
}
LogSink:
package com.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.log4j.Logger;
/**
* sink節點
*
* @author duhai
* @date 2021年10月9日
*/
public class LogSink extends RichSinkFunction<String> {
private static final long serialVersionUID = -8897600652941199311L;
private static Logger logger = Logger.getLogger(LogSink.class);
/**
* open方法在sink第一次啟動時呼叫,一般用于sink的初始化操作
*/
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
}
/**
* invoke方法是sink資料處理邏輯的方法,source端傳來的資料都在invoke方法中進行處理
* 其中invoke方法中第一個引數型別與RichSinkFunction<String>中的泛型對應, 第二個引數 為一些背景關系資訊
*/
@Override
public void invoke(final String value, final Context context) throws Exception {
logger.info("\r\n##### value:" + value + " #####\r\n");
}
/**
* close方法在sink結束時呼叫,一般用于資源的回收操作
*/
@Override
public void close() throws Exception {
super.close();
}
}
JedisUtil:
package com.utils;
import java.io.Serializable;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.util.SafeEncoder;
/**
* redis工具類
*
* @author duhai
* @date 2021年10月8日
*/
public class JedisUtil implements Serializable {
private static final long serialVersionUID = 2712033148155101965L;
private static volatile JedisPool jedisPool = null;
/**
* 初始化
*
* @return
*/
public JedisPool getJedisPool() {
if (jedisPool == null) {
synchronized (this) {
if (jedisPool == null) {
String redisHost = PropertiesUtils.getPropertiesByKey("redis.host");
String redisPort = PropertiesUtils.getPropertiesByKey("redis.port");
String redisPass = PropertiesUtils.getPropertiesByKey("redis.password");
String database = PropertiesUtils.getPropertiesByKey("redis2.database");
String redisPoolMaxIdle = PropertiesUtils.getPropertiesByKey("redis.jedis.pool.max-idle");
JedisPoolConfig config = new JedisPoolConfig();
// 控制一個pool可分配多少個jedis實體,通過pool.getResource()來獲取;
// 如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis實體,則此時pool的狀態為exhausted(耗盡),
config.setMaxTotal(500);
// 控制一個pool最多有多少個狀態為idle(空閑的)的jedis實體,
config.setMaxIdle(Integer.parseInt(redisPoolMaxIdle));
// 表示當borrow(引入)一個jedis實體時,最大的等待時間,如果超過等待時間,則直接拋出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
// 在borrow一個jedis實體時,是否提前進行validate操作;如果為true,則得到的jedis實體均是可用的;
config.setTestOnBorrow(true);
// redis如果設定了密碼:
jedisPool = new JedisPool(config, redisHost, Integer.parseInt(redisPort), 10000, redisPass,
Integer.parseInt(database));
}
}
}
return jedisPool;
}
/**
* 從jedis連接池中獲取獲取jedis物件
*
* @return
*/
public Jedis getJedis() {
return getJedisPool().getResource();
}
/**
* 回收jedis(放到finally中)
*
* @param jedis
*/
public void returnJedis(final Jedis jedis) {
if (null != jedis) {
jedis.close();
}
}
/**
* 洗掉keys對應的記錄,可以是多個key
*
* @param String
* ... keys
* @return 洗掉的記錄數
*/
public long del(final String... keys) {
Jedis jedis = getJedis();
try {
long count = jedis.del(keys);
return count;
} finally {
returnJedis(jedis);
}
}
/**
* 從hash中洗掉指定的存盤
*
* @param String
* key
* @param String
* fieid 存盤的名字
* @return 狀態碼,1成功,0失敗
*/
public long hdel(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
long s = jedis.hdel(key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 添加一個對應關系
*
* @param String
* key
* @param String
* fieid
* @param String
* value
* @return 狀態碼 1成功,0失敗,fieid已存在將更新,也回傳0
**/
public long hset(final String key, final String fieid, final String value) {
Jedis jedis = getJedis();
try {
long s = jedis.hset(key, fieid, value);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 回傳hash中指定存盤位置的值
*
* @param String
* key
* @param String
* fieid 存盤的名字
* @return 存盤對應的值
*/
public String hget(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
String s = jedis.hget(key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
/**
* 回傳hash中指定存盤位置的值
*
* @param String
* key
* @return 存盤對應的值
*/
public Map<String, String> hgetAll(final String key) {
Jedis jedis = getJedis();
try {
Map<String, String> map = jedis.hgetAll(key);
return map;
} finally {
returnJedis(jedis);
}
}
/**
* 根據key獲取記錄
*
* @param String
* key
* @return 值
*/
public String get(final String key) {
Jedis jedis = getJedis();
try {
String value = jedis.get(key);
return value;
} finally {
returnJedis(jedis);
}
}
/**
* 添加記錄,如果記錄已存在將覆寫原有的value
*
* @param String
* key
* @param String
* value
* @return 狀態碼
*/
public String set(final String key, final String value) {
return set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
/**
* 添加記錄,如果記錄已存在將覆寫原有的value
*
* @param byte[]
* key
* @param byte[]
* value
* @return 狀態碼
*/
public String set(final byte[] key, final byte[] value) {
Jedis jedis = getJedis();
try {
String status = jedis.set(key, value);
return status;
} finally {
returnJedis(jedis);
}
}
}
KafkaProduceUtils:
package com.utils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
/**
* KafkaProduce發送訊息到AMQP
*
* @author duhai
* @date 2020年5月11日
*/
public class KafkaProduceUtils implements Serializable {
private static final long serialVersionUID = 3442009340575361234L;
private static Logger logger = Logger.getLogger(KafkaProduceUtils.class);
private static volatile KafkaProducer<String, String> kafkaProducer;
/**
*
* @return
*/
public KafkaProducer<String, String> getkafkaProducer() {
if (kafkaProducer == null) {
synchronized (this) {
if (kafkaProducer == null) {
logger.info("##### new KafkaProducer 開始#####");
String bootstrap = PropertiesUtils.getPropertiesByKey("kafka.second.bootstrap-servers");
String retry = PropertiesUtils.getPropertiesByKey("kafka.second.producer.retries");
String acks = PropertiesUtils.getPropertiesByKey("kafka.second.producer.acks");
final Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retry));
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducer = new KafkaProducer<String, String>(props);
logger.info("##### new KafkaProducer 結束#####");
}
}
}
return kafkaProducer;
}
/**
* 生產訊息
*
* @param topic
* @param value
*/
public void producer(final String topic, final String value) {
logger.info("##### kafkaProduceUtils開始,topic=" + topic + ",value=" + value);
// 發送訊息
getkafkaProducer().send(new ProducerRecord<String, String>(topic, value));
logger.info("##### kafkaProduceUtils結束 #####");
}
}
PropertiesUtils:
package com.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
/**
* 先從外部獲取組態檔,沒有則從內部獲取
*
* @author duhai
* @date 2021年10月13日
*/
public class PropertiesUtils implements Serializable {
private static final long serialVersionUID = 315601047419047701L;
private static Logger logger = Logger.getLogger(PropertiesUtils.class);
private static Properties properties = null;
/**
* 初始化propertiies
*/
public static void initPropertiesByKey(final String propPath) {
properties = new Properties();
try {
if (StringUtils.isNotEmpty(propPath)) {
// 優先從專案路徑獲取連接資訊
// String confPath = System.getProperty("user.dir");
String confPath = propPath + File.separator + "environment.properties";
final File file = new File(confPath);
if (file.exists()) {
logger.info("組態檔路徑---->>" + confPath);
System.out.println("組態檔路徑---->>" + confPath);
final InputStream in = new FileInputStream(new File(confPath));
properties.load(in);
} else {// 未傳入路徑時,讀取classpath路徑
logger.info("專案路徑confPath[" + confPath + "]下無連接資訊,默認從classpath路徑下加載properties檔案");
System.out.println("專案路徑confPath[" + confPath + "]下無連接資訊,默認從classpath路徑下加載properties檔案");
final InputStream in = PropertiesUtils.class.getClassLoader()
.getResourceAsStream("environment.properties");
properties.load(in);
}
} else {// propPath屬性為空
logger.info("專案路徑propPath沒有傳入,默認從classpath路徑下加載properties檔案");
System.out.println("專案路徑propPath沒有傳入,默認從classpath路徑下加載properties檔案");
final InputStream in = PropertiesUtils.class.getClassLoader()
.getResourceAsStream("environment.properties");
properties.load(in);
}
} catch (IOException e) {
logger.error(e);
}
}
/**
*
* 根據key獲取value值
*
* @param key
* @return
*/
public static String getPropertiesByKey(final String key) {
String value = properties.getProperty(key);
return value == null ? "" : value;
}
}
environment.properties
#kafka
kafka.bootstrap-servers=10.11.11.11:9092
kafka.consumer.group-id=aa
#kafka amqp
kafka.second.bootstrap-servers=10.11.11.12:9092
kafka.second.producer.acks=1
kafka.second.producer.retries=0
#redis.host
redis.host=10.11.11.11
redis.port=6379
redis.password=111111
redis2.database=1
redis.jedis.pool.max-idle=8
#rabbitmq.useflag
rabbitmq.queue=test
rabbitmq.host=10.11.11.11
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.vhost=/vadmin_host
#
jdbc.url=jdbc:mysql://10.11.11.11:3306/aa_aa?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
jdbc.username=root
jdbc.password=111111
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/317654.html
標籤:其他
上一篇:單片機破 解和防 護建議
