場景
有時候需要處理并存盤大量流式資料,以Gps資料為例,面對一天千萬級別的資料,時常要分庫分表,資料偶爾會存在丟失的情況,很難做到既要保證大量資料處理的同時又要保證資料查詢的實時性,這時借助云平臺就是一個很好的做法,
資料流向

大致資料流向資料源->緩沖區->資料庫處理中心->云資料庫
所用Azure云平臺介紹
1.Azure EventHub (云Kakfa)
Azure 事件中心是大資料流式處理平臺和事件引入服務, 它可以每秒接收和處理數百萬個事件, 可以使用任何實時分析提供程式或批處理/存盤配接器轉換和存盤發送到事件中心的資料,
2.Azure Databricks (云Spark)
是一個已針對 Microsoft Azure 云服務平臺進行優化的資料分析平臺, Azure Databricks開發資料密集型應用程式的三個環境:Databricks SQL、Databricks Data Science Engineering 和 & Databricks 機器學習,Databricks SQL 為想要針對資料湖運行 SQL 查詢、創建多種可視化型別以從不同角度探索查詢結果,以及生成和共享儀表板的分析員提供了一個易于使用的平臺,
3.Azure Cosmos DB Cassandra(云資料庫)
Azure Cosmos DB Cassandra API 可以充當為 Apache Cassandra 撰寫的應用的資料存盤, 這意味著通過使用現有的符合 CQLv4 的 Apache 驅動程式,現有 Cassandra 應用程式現在可以與 Azure Cosmos DB Cassandra API 通信, 在許多情況下,只需更改連接字串,就可以從使用 Apache Cassandra 切換為使用 Azure Cosmos DB Cassandra API,通過 Cassandra API 可以使用 Cassandra 查詢語言 (CQL)、基于 Cassandra 的工具(如 cqlsh)和熟悉的 Cassandra 客戶端驅動程式與 Azure Cosmos DB 中存盤的資料進行互動,
技術方案優勢
1.高效實時,高吞吐(理論上只要預算夠無論多大的資料都能處理)
2.只要部署好,根本不用擔心自己搭的服務器會爆炸
3.減少各種環境搭建程序,減少人力機器維護成本
4.后續查詢擴展快速方便,可以根據業務需求做出各種定制
具體講解
- 資料發送部分
資料發送部分主要將資料源發送至EventHub,那么這里以.net為例附上一部分發送代碼,因為是在老專案上改的,用的是.net fk,所以沒有太多的操作空間(不然不兼容),EventHub使用環境在.net fk4.6以上,大家記得更新框架,推薦使用.net core或者.net 5的BackGroundService,
大概說一下思路,一個并發佇列,4執行緒發送,而且做了磁區,可以根據自身語言環境或者需要更改,最好自己寫
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.AppCenter.Crashes;
namespace EHiGPSSocket.FormsApp
{
public static class EventHub
{
private const string ConnectionString =
"Your ConnectionString ";
private const string EventHubName = "EventHubName";
private const int Capacity = 500;
private const int MacCapacity = 200000;
private static EventHubProducerClient _producerClient =
new EventHubProducerClient(ConnectionString, EventHubName);
private static ConcurrentDictionary<string, ConcurrentQueue<string>> DicQueues = new ConcurrentDictionary<string, ConcurrentQueue<string>>();
static EventHub()
{
for (int i = 0; i < 4; i++)
{
Task.Factory.StartNew(SendAsync, CancellationToken.None, TaskCreationOptions.LongRunning);
}
}
public static void Enqueue(string message, string topic)
{
if (!DicQueues.Keys.Contains(topic))
{
var queue = new ConcurrentQueue<string>();
queue.Enqueue(message);
DicQueues.TryAdd(topic, queue);
}
else
{
DicQueues.TryGetValue(topic, out var queue);
if (queue?.Count < MacCapacity)
{
queue?.Enqueue(message);
}
}
}
static async Task SendAsync(object state)
{
while (true)
{
try
{
foreach (var dicQueue in DicQueues)
{
var queue = dicQueue.Value;
if (queue.Count > Capacity)
{
var count = 0;
var list = new List<EventData>();
while (count < Capacity && count < dicQueue.Value.Count)
{
queue.TryDequeue(out var data);
if (!string.IsNullOrEmpty(data))
{
list.Add(new EventData(data));
count++;
}
}
if (_producerClient.IsClosed)
{
_producerClient = new EventHubProducerClient(ConnectionString, EventHubName);
}
await _producerClient.SendAsync(list, new SendEventOptions() { PartitionKey = dicQueue.Key }).ConfigureAwait(false);
}
}
}
catch (Exception e)
{
Crashes.TrackError(e);
}
}
}
}
}
2.EventHub
資料發送至EventHub后,可以看到大致的資料量

3.Azure Databricks
Azure Databricks從EventHub讀取資料
大致的讀取代碼如下Scala
def EventHandle(): Unit = {
val namespaceName = "namespaceName "
val eventHubName = "eventHubName "
val sasKeyName = "sasKeyName "
val sasKey = "sasKey "
val domainName="domainName.chinacloudapi.cn"
val connStr = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()
.setEndpoint(namespaceName,domainName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey)
val customEventhubParameters =EventHubsConf(connStr.toString()).setMaxEventsPerTrigger(100)
customEventhubParameters.setConsumerGroup("$Default")
println(customEventhubParameters.consumerGroup)
println(customEventhubParameters.connectionString)
val carDF = spark.sql("select * from xxxx")
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
val messages =incomingStream.withColumn("Body", $"body".cast(StringType)).withColumn("PartitionKey", $"PartitionKey".cast(StringType)).select("Body","PartitionKey")
val query= messages.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long)=>
batchDF.filter("PartitionKey='DBQ'")
//這里處理DF
.write.cassandraFormat("gpsdata", "gpsprofile").mode("append").save()
}
query.start().awaitTermination()
}
4.Azure Cosmos DB Cassandra
最后丟入Azure Cosmos DB Cassandra

總結
EventHub Azure Databricks Azure Cosmos DB Cassandra
這三個云平臺中需要有很多可以配置的地方,大家參閱官方檔案應該都可以解決,這里只 、提供一些思路和解決方案,具體操作與選型還是要看個人業務需求與技術選型
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/430294.html
標籤:其他
