基于storm的電商銷售量大屏
- 一、專案需求
- 二、專案架構模型
- 三、代碼結構
- 1.訂單物體類
- 2.模擬訊息生產者
- 3.訊息消費者
- ProcessOrderBolt
- RealBoardTopology
- 4.Redis使用工具類
- 四、環境搭建
- 1.storm集群搭建
- 2.kafka搭建
- (1)下載并解壓
- (2)安裝zookeeper
- (3)配置kafka
- 3.redis集群搭建
- 五、環境啟動
- 1.啟動zookeeper
- 2.啟動kafka
- 3.啟動redis
- 4.啟動storm
- 六、運行專案
- 1.提交拓撲到storm集群
- 2.啟動生產者
- 3.查看效果
- 七、進行可視化實時資料展示
- 1.后端從資料庫讀取資料
- 2.前端從后端請求資料并進行可視化
一、專案需求
對某一電商平臺每一天的銷售量,銷售總額進行統計并進行實時可視化展示
二、專案架構模型
storm+kafka+redis
生產者不斷生成訂單的資訊發送給kafka,然后kafka將資料發送給storm,storm進行處理后發送到redis進行存盤
三、代碼結構
整個專案的下載鏈接:
鏈接:https://pan.baidu.com/s/1nee5NXWWZUNPBt3111tLRQ
提取碼:n4j5
整個專案分為四個部分

1.訂單物體類
domain下的PaymentInfo類,這里定義了一個訂單的各種屬性和方法,后面用到的主要是商鋪編號、商品編號、商品價格,其中為了便于可視化展示,商鋪編號在20以內取,商品編號在50以內取
package domain;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo implements Serializable{
private static final long serialVersionUID = -7958315778386204397L;
private String orderId;//訂單編號
private Date createOrderTime;//訂單創建時間
private String paymentId;//支付編號
private Date paymentTime;//支付時間
private int productId;//商品編號
private String productName;//商品名稱
private long productPrice;//商品價格
private long promotionPrice;//促銷價格
private String shopId;//商鋪編號
private String shopName;//商鋪名稱
private String shopMobile;//店鋪電話
private long payPrice;//訂單支付價格
private int num;//訂單數量
/**
* <Province>19</Province>
* <City>1657</City>
* <County>4076</County>
*/
private String province; //省
private String city; //市
private String county;//縣
//102,144,114
private String catagorys; //分類
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getCounty() {
return county;
}
public void setCounty(String county) {
this.county = county;
}
public String getCatagorys() {
return catagorys;
}
public void setCatagorys(String catagorys) {
this.catagorys = catagorys;
}
public PaymentInfo() {
}
public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, int productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
this.orderId = orderId;
this.createOrderTime = createOrderTime;
this.paymentId = paymentId;
this.paymentTime = paymentTime;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.promotionPrice = promotionPrice;
this.shopId = shopId;
this.shopName = shopName;
this.shopMobile = shopMobile;
this.payPrice = payPrice;
this.num = num;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Date getCreateOrderTime() {
return createOrderTime;
}
public void setCreateOrderTime(Date createOrderTime) {
this.createOrderTime = createOrderTime;
}
public String getPaymentId() {
return paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public Date getPaymentTime() {
return paymentTime;
}
public void setPaymentTime(Date paymentTime) {
this.paymentTime = paymentTime;
}
public int getProductId() {
return productId;
}
public void setProductId(int productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
public long getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(long promotionPrice) {
this.promotionPrice = promotionPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
public String getShopMobile() {
return shopMobile;
}
public void setShopMobile(String shopMobile) {
this.shopMobile = shopMobile;
}
public long getPayPrice() {
return payPrice;
}
public void setPayPrice(long payPrice) {
this.payPrice = payPrice;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", createOrderTime=" + createOrderTime +
", paymentId='" + paymentId + '\'' +
", paymentTime=" + paymentTime +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", promotionPrice=" + promotionPrice +
", shopId='" + shopId + '\'' +
", shopName='" + shopName + '\'' +
", shopMobile='" + shopMobile + '\'' +
", payPrice=" + payPrice +
", num=" + num +
'}';
}
public String random() {
this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
this.productPrice = new Random().nextInt(1000);
this.promotionPrice = new Random().nextInt(500);
this.payPrice = new Random().nextInt(480);
this.shopId = new Random().nextInt(20)+"";
this.productId = new Random().nextInt(50);
this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
this.province = new Random().nextInt(23)+"";
this.city = new Random().nextInt(265)+"";
this.county = new Random().nextInt(1489)+"";
String date = "2017-11-11 12:22:12";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.createOrderTime = simpleDateFormat.parse(date);
} catch (Exception e) {
e.printStackTrace();
}
JSONObject obj = new JSONObject();
String jsonString = obj.toJSONString(this);
return jsonString;
// return new Gson().toJson(this);
}
}
2.模擬訊息生產者
producer目錄下的PayMentInfoProducer,用來模擬生產者不斷隨機產生訂單資料,需要配置kafka集群的埠和IP
package producer;
import domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class PayMentInfoProducer {
public static void main(String[] args){
//1、準備組態檔
Properties props = new Properties();
props.put("bootstrap.servers", "storm-master:9092,storm-slave1:9092,storm-slave2:9092");
/**
* 當生產者將ack設定為“全部”(或“-1”)時,min.insync.replicas指定必須確認寫入被認為成功的最小副本數,
* 如果這個最小值不能滿足,那么生產者將會引發一個例外(NotEnoughReplicas或NotEnoughReplicasAfterAppend),
* 當一起使用時,min.insync.replicas和acks允許您執行更大的耐久性保證,
* 一個典型的情況是創建一個復制因子為3的主題,將min.insync.replicas設定為2,并使用“全部”選項來產生,
* 這將確保生產者如果大多數副本沒有收到寫入引發例外,
*/
props.put("acks", "all");
/**
* 設定一個大于零的值,將導致客戶端重新發送任何失敗的記錄
*/
props.put("retries", 0);
/**
* 只要有多個記錄被發送到同一個磁區,生產者就會嘗試將記錄一起分成更少的請求,
* 這有助于客戶端和服務器的性能,該配置以位元組為單位控制默認的批量大小,
*/
props.put("batch.size", 16384);
/**
*在某些情況下,即使在中等負載下,客戶端也可能希望減少請求的數量,
* 這個設定通過添加少量的人工延遲來實作這一點,即不是立即發出一個記錄,
* 而是等待達到給定延遲的記錄,以允許發送其他記錄,以便發送可以一起批量發送
*/
props.put("linger.ms", 1);
/**
* 生產者可用于緩沖等待發送到服務器的記錄的總位元組數,
* 如果記錄的發送速度比發送給服務器的速度快,那么生產者將會阻塞,max.block.ms之后會拋出例外,
* 這個設定應該大致對應于生產者將使用的總記憶體,但不是硬性限制,
* 因為不是所有生產者使用的記憶體都用于緩沖,
* 一些額外的記憶體將被用于壓縮(如果壓縮被啟用)以及用于維護正在進行的請求,
*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、創建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
while (true){
//3、發送資料
kafkaProducer.send(new ProducerRecord<String, String>("itcast_order",new PaymentInfo().random()));
System.out.println("發送資料");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3.訊息消費者
kafkaStorm下面的兩個類
ProcessOrderBolt
這里寫了在存入redis資料庫時進行的操作,以及定義了存入的資料格式,包括:
- 每個店鋪的總銷售額
- 每個店鋪的銷售數量
- 平臺總銷售額度
- 平臺商品銷售數量
- 商品的維度
- 每個商品的銷售數量
package kafkaStorm;
import domain.PaymentInfo;
import producer.PayMentInfoProducer;
import util.JedisUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
public class ProcessOrderBolt extends BaseBasicBolt{
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
Object value = input.getValue(4);
//通過fastjson,將我們的json字串轉為物件
JSONObject obj = new JSONObject();
PaymentInfo parseObject = obj.parseObject(value.toString(), PaymentInfo.class);
//拿到了paymentInfo就可以從paymentinf里面獲取我們需要計算的一些指標
// 計算店鋪的維度
String shopId = parseObject.getShopId();
Jedis conn = JedisUtil.getConn();
// 每個店鋪的總銷售額
conn.incrBy("shop_price"+shopId,parseObject.getPayPrice());
// conn.incrBy("itcast:order:"+shopId+":user:2018-02-26",1);
// 每個店鋪的銷售數量
conn.incrBy("shop_num"+shopId,1);
//平臺維度的資料統計
// 平臺總銷售額度
conn.incrBy("total_price",parseObject.getPayPrice());
// conn.incrBy("itcast:order:total:user:date",1);
// 平臺商品銷售數量
conn.incrBy("total_num",1);
//商品的維度
int productId = parseObject.getProductId();
// 每個商品的總銷售額
conn.incrBy("product_price"+productId,parseObject.getPayPrice());
// conn.incrBy("itcast:order:"+productId+":user:date",1);
// 每個商品的銷售數量
conn.incrBy("product_num"+productId,1);
conn.close();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
RealBoardTopology
構建拓撲結構,需要配置kafka集群的埠和IP
package kafkaStorm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
public class RealBoardTopology {
public static void main(String[] args) throws Exception {
//從kafka當中獲取資料
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("storm-master:9092,storm-slave1:9092,storm-slave2:9092", "itcast_order");
kafkaSpoutConfig.setGroupId("itcast_order_group");
KafkaSpoutConfig<String, String> build = kafkaSpoutConfig.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
builder.setSpout("kafkaSpout",kafkaSpout,5);
builder.setBolt("processBolt",new ProcessOrderBolt(),8).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
if(args.length > 0){
config.setNumWorkers(2);
config.setDebug(false);
StormSubmitter.submitTopology(args[0],config,builder.createTopology());
}else{
LocalCluster cluster = new LocalCluster();
config.setDebug(true);
cluster.submitTopology("realBoard",config,builder.createTopology());
}
}
}
4.Redis使用工具類
util目錄下的JedisUtil類,需要配置redis服務器的埠和IP
package util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisUtil {
private static JedisPool pool = null;
/**
* 獲取jedis連接池
* */
public static JedisPool getPool(){
if(pool == null){
//創建jedis連接池配置
JedisPoolConfig config = new JedisPoolConfig();
//最大連接數
config.setMaxTotal(20);
//最大空閑連接
config.setMaxIdle(5);
//創建redis連接池
// 使用jedisPool的時候,timeout一定要給出來,如果不給,redis很大概率會報錯,超時
pool = new JedisPool(config,"172.25.0.5",6379,3000);
}
return pool;
}
/**
* 獲取jedis連接
* */
public static Jedis getConn(){
return getPool().getResource();
}
/**
* 測驗連接
* @param args
*/
public static void main(String[] args) {
Jedis jedis = getPool().getResource();
jedis.incrBy("mine", 5);
jedis.close();
}
}
四、環境搭建
1.storm集群搭建
博客:https://blog.csdn.net/weixin_43622131/article/details/111057523
2.kafka搭建
(1)下載并解壓
下載鏈接:
鏈接:https://pan.baidu.com/s/1nnlJB0vnLO7_NBKZgJ9BtA
提取碼:w4a6
(2)安裝zookeeper
在上述配置storm教程中已經配置了zookeeper
(3)配置kafka
首先配置主節點:
組態檔為server.properties

在zookeeper.connect中配置上zookeeper集群的IP和埠

將主節點的broker.id設定為1

配置完成,將整個kafka發送到從節點,從節點的組態檔只需要修改broker.id,從2開始依次設定即可
啟動kafka:
首先啟動zookeeper集群,然后進入kafka安裝目錄的bin目錄執行下面命令(集群中的機器都要執行)
./kafka-server-start.sh -daemon ../config/server.properties &
3.redis集群搭建
博客:https://blog.csdn.net/weixin_43622131/article/details/105820078
五、環境啟動
1.啟動zookeeper
每個節點都要執行下面命令,該命令在zookeeper安裝目錄下的bin目錄中
./zkServer.sh start
2.啟動kafka
每個節點都要執行下面命令,該命令在kafka安裝目錄下的bin目錄中
./kafka-server-start.sh -daemon ../config/server.properties &
3.啟動redis
這里為了方便使用了redis的單機版
啟動一個即可
redis-server redis1.conf
使用下面命令訪問資料庫
redis-cli -c -h 172.25.0.5 -p 6379
4.啟動storm
在master上啟動
storm nimbus &
storm ui &
storm logviewer &
在slave上啟動
storm supervisor &
storm logviewer &
全部啟動成功后查看行程:
master

slave

六、運行專案
首先將專案打成jar包
1.提交拓撲到storm集群
./storm jar /home/zf/storm_bigwork.jar kafkaStorm.RealBoardTopology realboard2
要運行此專案需要在storm的lib目錄下添加該jar包

下載鏈接:
鏈接:https://pan.baidu.com/s/1kU7q3YdTwfsPWOWyjYCPqA
提取碼:jdf0
2.啟動生產者
java -cp /home/zf/storm_bigwork.jar producer.PayMentInfoProducer start
3.查看效果

查看資料


七、進行可視化實時資料展示
展示視頻鏈接:
鏈接:https://pan.baidu.com/s/14dN-2D7HLQ2kBaSCut-2BQ
提取碼:tno9
中間是該平臺的總銷售額,左邊是前十名店鋪

前面已經實作了將流資料處理后放入redis資料庫中,下面是從資料庫中讀取資料并通過前端進行實時展示
1.后端從資料庫讀取資料
這里使用了flask框架,從redis資料庫中讀取資料進行排序后取前十,設定好路由供前端獲取資料
代碼鏈接:
鏈接:https://pan.baidu.com/s/1j8mpiyg_VJJhU7Y5KxVI1w
提取碼:855n
from flask import Flask
from flask_cors import CORS
from flask import render_template
import redis
from flask import Flask, jsonify,request
app = Flask(__name__)
CORS(app)
# 獲取總銷售額
@app.route('/total_price', methods=['GET', 'POST'])
def sum_total():
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
total_price = r.get("total_price")
# return render_template('index.html', total_price=total_price)
# print("success")
return total_price
# 獲取各個店鋪的總銷售額
@app.route('/shop_price', methods=['GET', 'POST'])
def sum_shop():
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
keys = r.keys()
# print(keys[0][:4])
shop_key = []
for i in keys:
# print(i[:10])
if i[:10] == b"shop_price":
shop_key.append(i)
data = []
for i in shop_key:
tmp = {};
tmp["stock"] = "shop" + str(i[10:])[1:].strip("'")
tmp["fundPost"] = int(r.get(i))
data.append(tmp)
data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
return jsonify(data[:10])
# 獲取各個商品的銷售量
@app.route('/product_num', methods=['GET', 'POST'])
def product_num():
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
keys = r.keys()
# print(keys[0][:4])
product_key = []
for i in keys:
# print(i[:10])
if i[:11] == b"product_num":
product_key.append(i)
data = []
for i in product_key:
tmp = {};
tmp["stock"] = "product" + str(i[11:])[1:].strip("'")
tmp["fundPost"] = int(r.get(i))
data.append(tmp)
data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
return jsonify(data[:10])
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000)

2.前端從后端請求資料并進行可視化
代碼鏈接:
鏈接:https://pan.baidu.com/s/1Tg7NGCtDPFLaCFIM6phC3w
提取碼:r6m2
使用的可視化框架是echarts,請求后端資料使用的是Ajax,用法如下
$.ajax({
type: 'GET',
url: 'http://127.0.0.1:5000/total_price',
async: false,
success: function(data) {
let option = myChart.getOption();
option.series[3].startAngle = option.series[3].startAngle - 1;
option.series[6].data[0].value = Number(data);
myChart.setOption(option);
}
})
實作資料的實時重繪是通過javascript的setInterval函式來實作的,剩下的具體實作可以看看完整的代碼,
整體實作下來確實需要花費一些時間,但是堅持做下來是一定能成功的,如果遇到一些小bug可以與我交流,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/248568.html
標籤:其他
