在云服務器上做的,由于白嫖的云服務器性能比較差,就設計了如下架構,
功能與設計
(大資料集群+架構設計+功能分析與設計)
總體架構圖

功能:
訂單成交量統計分析
歷史成交總金額
熱門分類的實時和離線統計分析
熱門商品的實時和離線統計分析
活躍用戶統計分析
專案實作
SpringBoot tmall商城部署
在服務器git拉取tmall springboot專案到本地,配置mysql,創建對應資料庫,運行sql檔案,復制資料庫,運行springboot專案,生成日志檔案到/root/log/info/下
flume采集
flume采集資料有兩個流向,一個存入hdfs,另一個為kafkachannel,
資料存入hdfs的用作離線分析,kafkachannel則將資料給到sparkstreaming實時處理
資料流向

flume采集方案組態檔如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r3
a3.sinks = sinkhdfs
a3.channels = ch1 kafka-channel
# Define an Avro source called avro-source1 on a3 and tell it
a3.sources.r3.channels = ch1 kafka-channel
#a3.sources.r3.type = spooldir
#a3.sources.r3.spoolDir = /root/logs/info
#a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
a3.sources.r3.type = exec
a3.sources.r3.command = tail -F /root/logs/info/info.log
# Define a memory channel called ch1 on a3
a3.channels.ch1.type = memory
a3.channels.ch1.capacity = 10000000
a3.channels.ch1.transactionCapacity = 100000
a3.channels.ch1.keep-alive = 10
a3.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
a3.channels.kafka-channel.kafka.bootstrap.servers = master:9092,slave2:9092
a3.channels.kafka-channel.kafka.topic = tmalllog
a3.channels.kafka-channel.kafka.producer.acks = 1
a3.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a3.channels.kafka-channel.parseAsFlumeEvent = false
kafka-streaming實時處理
需搭建zookeeper、kafka集群,消費來自kafka生產者的訊息
撰寫sparkstreaming應用程式
(1)添加kafka的pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.hgu</groupId>
<artifactId>sparkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.3</spark.version>
<hadoop.version>2.9.2</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>provided</scope>
</dependency>
<!-- 匯入scala的依賴 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 匯入spark的依賴 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.3</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 編譯scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 編譯java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>junit:junit</artifact>
<includes>
<include>junit/framework/**</include>
<include>org/junit/**</include>
</includes>
<excludes>
<exclude>org/junit/experimental/**</exclude>
<exclude>org/junit/runners/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
實時處理代碼
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.regexp_extract
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.{Date, Properties}
object Kafka_spark_streaming {
def main(args: Array[String]): Unit = {
// offset保存路徑
val checkpointPath = "file:///export/data/kafka/checkpoint/kafka-direct"
val conf = new SparkConf()
.setAppName("ScalaKafkaStreaming")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(checkpointPath)
val spark: SparkSession = new SparkSession.Builder().master("local").appName("sqlDemo").getOrCreate()
val bootstrapServers = "master:9092,slave1:9092,slave2:9092"
val groupId = "flume"
val topicName = "tmalllog"
val maxPoll = 500
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
case class schema(mytime: String, action: String, frequency: Int)
val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set(topicName), kafkaParams))
import spark.implicits._
val properties = new Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "kun/roo123")
properties.setProperty("driver", "com.mysql.jdbc.Driver")
val uri = "jdbc:mysql://slave2:3306/tmalldata?useSSL=false"
kafkaTopicDS.foreachRDD(
foreachFunc = rdd => if (!rdd.isEmpty()) {
//資料業務邏輯處理
val now: Long = new Date().getTime
val now2: String = now.toString
val action_df = rdd.map(_.value)
.map(_.split("-"))
.filter(x => x.length == 3)
.map(x => x(2))
.map(x => (x, 1))
.reduceByKey(_ + _)
.map(x => (now2, x._1, x._2))
.toDF("mytime", "action", "frequency")
val top_category = action_df.select("*").where("action like '%分類ID為%'").orderBy(action_df("frequency").desc)
if (top_category.count()>0){
top_category.show()
top_category.write.mode("append").jdbc(uri, "category", properties)}
val product_Popular_Buy = action_df.select("*").where("action like '%通過產品ID獲取產品資訊%'").orderBy(action_df("frequency").desc)
if (product_Popular_Buy.count()>0){product_Popular_Buy.show()
product_Popular_Buy.write.mode("append").jdbc(uri, "product", properties)}
val Active_users = action_df.select("*").where("action like '%用戶已登錄,用戶ID%'").orderBy(action_df("frequency"))
if(Active_users.count()>0){
Active_users.show()
Active_users.write.mode("append").jdbc(uri, "activeusers", properties)}
val money = action_df.select("*").where("action like '%總共支付金額為%'").orderBy(action_df("frequency").desc)
val money2 = money.withColumn("single_transaction", regexp_extract($"action", "/d+", 0))
if(money2.count()>0){
money2.show()
money2.write.mode("append").jdbc(uri, "trading", properties)
}
}
)
ssc.start()
ssc.awaitTermination()
}
}
運行查看控制臺輸出結果

再idea打包jar,上傳服務器,sparksubmit提交任務可將實時資料寫入mysql資料庫
當前登錄用戶

當前熱門分類

當前熱門商品

訂單數量

spark離線資料分析
3.4 創建hive表磁區

訂單總量統計
熱門分類統計
熱門商品統計
活躍用戶統計
四、資料可視化
五、 經驗總結
云服務器環境搭建問題(主要)
1、zookeeper、kafka、hadoop集群搭建時公網ip與內網ip的問題,導致無法識別,
通過對網上資料及博客的查閱發現是云服務器中只有一塊內網網卡,外網地址不是直接配置在云服務器中,程式無法系結公網IP地址,所以需要對/etc/hosts檔案進行修改,如下,
解決方法:本服務器配內網ip,其他服務器配公網ip
2、hadoop高可用問題,hadoop高可用環境導致flume日志采集失敗,
解決方法:添加hadoop組態檔到flume/lib下,
3、埠問題,因為服務器的埠未打開,導致部分行程無法啟動,
解決方法:在服務器安全組以及寶塔開放埠,
搭建環境常用的默認埠
hadoop 9000 50070 50010
zookeeper 2181 2888 3888
kafka 9092
mysql 3306
spark 7077
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/287585.html
標籤:其他
上一篇:kudu從0到1
下一篇:架構設計初識
