1.專案介紹
本專案采用的資料為通話記錄資料,例(張三 李四 2021-4-23 12:32:13 2942)意思是張三在2021-4-23 12:32:13這個時間給李四通話,通話時長為2942秒
- 資料來源【程式自己模擬資料的產生,交給Kafka的生產者】
- Kafka的消費者端接的是HBase資料庫
- MapReduce讀取HBase中的資料進行分析
- 再將分析的資料匯入MySQL
2.各類介紹
Produce模塊

- DataProduce:主要負責生產資料
- Main:函式的入口
- testAPI:進行功能測驗
- KafkaUtils:將資料發送到topic
Consumer模塊

- Main:程式的入口
- HBaseConsumer:消費者拉取資料
- HBaseDao:HBase的客戶端物件,創建表匯入資料
- HBaseUtils:主要是創建RowKey,還有一些建表和命名空間的操作
Analysis模塊

- HashUtils:將每個Cell中的資料存入到HashMap中
- MysqlUtils:主要是Mysql的連接操作
- CountMap:計算每個用戶之間的通話記錄次數
- DBWrite:實作了Writable、DBWritable,用于序列化以及寫資料庫操作
3.專案各模塊
專案分為三個模塊,分別是produce、consumer、analysis

- produce:實作資料生產
- consumer:Kafka將資料寫入HBase
- analysis:利用MapReduce分析資料將結果匯入Mysql
2.1 produce
2.1.1 entry
public class Main {
public static void main(String[] args) throws ParseException, InterruptedException {
//生產資料,發到Kafka
KafkaUtils.writeDataIntoKafka();
}
}
2.1.2 dataProduce
public String produce(String startTime, String endTime) throws ParseException {
// 張三 李四 2021-2-3 13:43:25 1204
initMetaData();
//獲得隨機下標來獲得撥打電話者
int callerIndex = (int) (Math.random() * telePhone.size());
String callerName = phoneToName.get(telePhone.get(callerIndex));
//獲得被拔打電話者
int calleeIndex;
do {
calleeIndex = (int) (Math.random() * telePhone.size());
} while (callerIndex == calleeIndex);
String calleeName = phoneToName.get(telePhone.get(calleeIndex));
//定義決議時間的物件
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
//定義起止時間
Date startDate = null;
Date endDate = null;
//決議傳入的時間字串,將其轉化成Date格式
startDate = sdf.parse(startTime);
endDate = sdf.parse(endTime);
//獲得一個時間戳,來初始打電話的時間
long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
Date resultDate = new Date(randomTs);
//將初始化好的Date時間,轉化成字串
String resultTimeString = sdf.format(resultDate);
//隨機初始化小時、分鐘、秒
int hour = (int) (Math.random() * 24);
int minute = (int) (Math.random() * 60);
int second = (int) (Math.random() * 60);
//初始化具體時間,精確到小時、分鐘、秒
String specificTime = String.format(String.format("%02d", hour) + ":"
+ String.format("%02d", minute) + ":"
+ String.format("%02d", second));
//定義時間跨度,表明電話的撥打時長
int duration = (int) (Math.random() * 3600);
//拼接結果 張三 李四 2021-2-3 13:43:25 1204
String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration;
return result;
}
2.1.3 kafkaUtils
public static void writeDataIntoKafka() throws ParseException, InterruptedException {
//定義配置物件
Properties properties = new Properties();
//定義主機名
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//字串序列化的類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//Kafka的主題
String topic = "telecom";
//定義一個生產者物件
KafkaProducer producer = new KafkaProducer<String, String>(properties);
//回圈發送資料到Kafka
for (int i = 0; i < 1000; i++) {
//按給定起止時間生成資料
String value = dataProduce.produce("2021-1-1", "2021-5-1");
//睡眠1秒
Thread.sleep(1000);
//創建ProducerRecord物件
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
//發送資料
producer.send(record);
}
//關閉資源
producer.close();
}
2.2 consumer
2.2.1 entry
public class Main {
public static void main(String[] args) throws IOException, InterruptedException, ParseException {
//創建HBase消費者
HBaseConsumer hBaseConsumer = new HBaseConsumer();
//從Kafka中獲取資料輸到HBase
hBaseConsumer.getDataFromKafka();
}
}
2.2.2 hbase
2.2.2.1 HBaseConsumer
public class HBaseConsumer {
public void getDataFromKafka() throws InterruptedException, IOException, ParseException {
//定義配置物件
Properties properties = new Properties();
//連接主機名
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
//是否自動提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自動提交的時間間隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
//消費者組名
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
//字串序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//創建消費者物件
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
//消費者訂閱主題
consumer.subscribe(Arrays.asList("telecom"));
//創建一個Dao物件,用于上傳資料到HBase
HBaseDao hBaseDao = new HBaseDao();
//從Kafka拉取資料
while (true) {
//拉取的時間間隔
ConsumerRecords<String,String> records = consumer.poll(100);
//拉取資料輸到HBase
for (ConsumerRecord<String,String> record : records) {
String value = record.value();
System.out.println(value);
Thread.sleep(1000);
//上傳資料
hBaseDao.put(value);
}
}
}
}
2.2.2.2 HBaseDao
public class HBaseDao {
//命名空間
private String nameSpace;
//表名
private String tableName;
//配置物件
public static Configuration conf;
//表物件
private Table table;
//連接HBase物件
private Connection connection;
//決議日期物件
private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");
//初始化配置物件
static {
conf = HBaseConfiguration.create();
}
public HBaseDao() throws IOException {
nameSpace = "telecom";
tableName = "teleRecord";
connection = ConnectionFactory.createConnection(conf);
if (!HBaseUtils.isExistTable(conf, tableName)) {
HBaseUtils.initNamespace(conf, nameSpace);
HBaseUtils.createTable(conf, tableName, "f1", "f2");
}
table = connection.getTable(TableName.valueOf(tableName));
}
//將資料匯入HBase
public void put(String value) throws ParseException, IOException {
//將Kafka拉取的資料切分
String[] splitValues = value.split(" ");
String caller = splitValues[0];
String callee = splitValues[1];
String buildTime = splitValues[2];
String specificTime = splitValues[3];
String duration = splitValues[4];
//2021-03-21 12:23:04
buildTime = buildTime + " " + specificTime;
//20210321122304 用于創建rowKey
String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
//時間戳
String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());
//獲得rowKey
String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration);
//創建put物件
Put put = new Put(Bytes.toBytes(rowKey));
//添加各列屬性
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));
//添加put
table.put(put);
}
}
2.2.3 hbaseUtils
public class HBaseUtils {
//判斷表是否存在
public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
boolean result = admin.tableExists(TableName.valueOf(tableName));
admin.close();
connection.close();
return result;
}
//判斷命名空間是否存在
public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
boolean result = false;
admin.close();
connection.close();
return result;
}
//創建命名空間
public static void initNamespace(Configuration conf, String nameSpace) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
try {
NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
admin.createNamespace(descriptor);
} catch (NamespaceExistException e) {
} finally {
admin.close();
connection.close();
}
}
//創建表
public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
for (String cf : cfs) {
hTableDescriptor.addFamily(new HColumnDescriptor(cf));
}
admin.createTable(hTableDescriptor);
admin.close();
connection.close();
}
//創建RowKey
public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) {
StringBuilder rowKey = new StringBuilder();
rowKey.append(caller + "_")
.append(buildTime + "_")
.append(callee + "_")
.append(flag + "_")
.append(duration);
return rowKey.toString();
}
}
2.3 analysis
2.3.1 hashUtils
public class HashUtils {
public static void putValue(Cell cell, HashMap<String, String> hashMap) {
//獲取cell中的列名
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
//獲取每列的值
String value = Bytes.toString(CellUtil.cloneValue(cell));
//存入map
hashMap.put(qualifier, value);
}
}
2.3.2 mysqlUtils
public class MysqlUtils {
public static Connection connection;
public static String userName = "root";
public static String passwd = "123456";
public static PreparedStatement ps = null;
//獲得Connection物件
static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC",
userName,
passwd);
} catch (SQLException e) {
e.printStackTrace();
}
}
//清空表資料
public static void deleteData(String tableName) throws SQLException {
String sql = String.format("delete from %s", tableName);
ps = connection.prepareStatement(sql);
ps.executeUpdate();
}
}
2.3.3 hbaseToMR
2.3.3.1 callCount
2.3.3.1.1 Map
public class CountMap extends TableMapper<Text, IntWritable> {
//輸出 張三 1
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
HashMap<String, String> hashMap = new HashMap<>();
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashMap);
}
String caller = hashMap.get("caller");
String callee = hashMap.get("callee");
context.write(new Text(caller + "-" + callee), new IntWritable(1));
}
}
2.3.3.1.2 Reduce
public class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> {
//輸出 張三 23
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += 1;
}
context.write(new DBWrite(key.toString(), count), NullWritable.get());
}
}
2.3.3.1.3 Driver
public class CountDriver implements Tool {
//配置物件
public static Configuration conf = null;
//Mysql資料庫表名
public static String mysqlTableName = "callcounts";
//Mysql表中列名
public static String[] fieldNames = {"callercallee", "counts"};
//Mysql驅動類
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用戶名
public static String userName = "root";
//Mysql的用戶密碼
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//獲得job物件
Job job = Job.getInstance(conf);
//關聯Jar
job.setJarByClass(CountDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
CountMap.class,
Text.class,
IntWritable.class,
job);
//關聯Reduce類
job.setReducerClass(CountReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//設定輸出型別
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任務
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new CountDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.3.3.1.4 DBWrite
public class DBWrite implements Writable, DBWritable {
String caller_callee = "";
int count = 0;
public DBWrite(){}
public DBWrite(String caller_callee, int count){
this.caller_callee=caller_callee;
this.count=count;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(caller_callee);
out.writeInt(count);
}
@Override
public void readFields(DataInput in) throws IOException {
this.caller_callee = in.readUTF();
this.count = in.readInt();
}
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, caller_callee);
preparedStatement.setInt(2, count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.caller_callee = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
}
2.3.3.2 callerDuration
2.3.3.2.1 Map
public class DurationMap extends TableMapper<Text, LongWritable> {
//輸出 張三 2041
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//創建HashMap物件,為了下面取出對應值用
HashMap<String, String> hashMap = new HashMap<>();
//迭代rowkey對應的每個單元
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashMap);
}
//獲得電話發起人
String caller = hashMap.get("caller");
//獲得每次電話時長
String duration = hashMap.get("duration");
//寫出
context.write(new Text(caller), new LongWritable(Long.valueOf(duration)));
}
}
2.3.3.2.2 Reduce
public class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
//輸出 張三 4204
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//存盤每個人撥打電話的總時長
long sum = 0;
//迭代每個時長
for (LongWritable value : values) {
sum += value.get();
}
//將結果寫出
context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get());
}
}
2.3.3.2.3 Driver
public class DurationDriver implements Tool {
//配置物件
public static Configuration conf = null;
//Mysql資料庫表名
public static String mysqlTableName = "callerdurations";
//Mysql表中列名
public static String[] fieldNames = {"caller", "durations"};
//Mysql驅動類
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用戶名
public static String userName = "root";
//Mysql的用戶密碼
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//獲得job物件
Job job = Job.getInstance(conf);
//關聯Jar
job.setJarByClass(DurationDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
DurationMap.class,
Text.class,
LongWritable.class,
job);
//關聯Reduce類
job.setReducerClass(DurationReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//設定輸出型別
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任務
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new DurationDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.3.3.3 dayCountDuration
2.3.3.3.1 Map
public class dayCountDurationMap extends TableMapper<Text, LongWritable> {
//2021-01-13 3042
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
HashMap<String, String> hashmap = new HashMap<>();
for (Cell cell : value.rawCells()) {
HashUtils.putValue(cell, hashmap);
}
String date = hashmap.get("build_time").substring(0, 10);
String duration = hashmap.get("duration");
context.write(new Text(date), new LongWritable(Long.valueOf(duration)));
}
}
2.3.3.3.2 Reduce
public class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
//輸出 2021-01-13 2042
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long durations = 0;
for (LongWritable value : values) {
durations += value.get();
}
context.write(new DBWrite(key.toString(), durations), NullWritable.get());
}
}
2.3.3.3.3 Driver
public class dayCountDurationDriver implements Tool {
//配置物件
public static Configuration conf = null;
//Mysql資料庫表名
public static String mysqlTableName = "daydurations";
//Mysql表中列名
public static String[] fieldNames = {"date", "durations"};
//Mysql驅動類
public static String driverClass = "com.mysql.cj.jdbc.Driver";
//Mysql的URL
public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
"?useSSL=false" +
"&allowPublicKeyRetrieval=true" +
"&serverTimezone=UTC";
//Mysql的用戶名
public static String userName = "root";
//Mysql的用戶密碼
public static String passwd = "123456";
@Override
public int run(String[] strings) throws Exception {
//配置Mysql
DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
//清空表
MysqlUtils.deleteData(mysqlTableName);
//獲得job物件
Job job = Job.getInstance(conf);
//關聯Jar
job.setJarByClass(dayCountDurationDriver.class);
//配置MapperJob
TableMapReduceUtil.initTableMapperJob("teleRecord",
new Scan(),
dayCountDurationMap.class,
Text.class,
LongWritable.class,
job);
//關聯Reduce類
job.setReducerClass(dayCountDurationReduce.class);
job.setOutputKeyClass(DBWrite.class);
job.setOutputValueClass(NullWritable.class);
//設定輸出型別
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
//提交job任務
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration configuration) {
conf = configuration;
conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
int run = ToolRunner.run(conf, new dayCountDurationDriver(), args);
System.exit(run);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4 專案原始碼
Github地址
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/281437.html
標籤:其他
上一篇:MapReduce提升
