本文將嘗試通過 MapReduce 模型實作一個簡單的 WordCount 演算法,區別于傳統使用 Hadoop 等大資料框架,本文使用云函式 SCF 與物件存盤 COS 來實作,
MapReduce 在維基百科中的解釋如下:
MapReduce 是 Google 提出的一個軟體架構,用于大規模資料集(大于 1TB)的并行運算,概念「Map(映射)」和「Reduce(歸納)」,及他們的主要思想,都是從函式式編程語言借來的,還有從矢量編程語言借來的特性,
通過這段描述,我們知道,MapReduce 是面向大資料并行處理的計算模型、框架和平臺,在傳統學習中,通常會在 Hadoop 等分布式框架下進行 MapReduce 相關作業,隨著云計算的逐漸發展,各個云廠商也都先后推出了在線的 MapReduce 業務,
理論基礎
在開始之前,我們根據 MapReduce 的要求,先繪制一個簡單的流程圖:

在這個結構中,我們需要 2 個云函式分別作 Mapper 和 Reducer;以及 3 個物件存盤的存盤桶,分別作為輸入的存盤桶、中間臨時快取存盤桶和結果存盤桶,在實體前,由于我們的函式即將部署在廣州區,因此在廣州區建立 3 個存盤桶:
物件存盤1 ap-guangzhou srcmr
物件存盤2 ap-guangzhou middlestagebucket
物件存盤3 ap-guangzhou destcmr
為了讓整個 Mapper 和 Reducer 邏輯更加清晰,在開始之前先對傳統的 WordCount 結構進行改造,使其更加適合云函式,同時合理分配
Mapper 和 Reducer 的作業:

功能實作
撰寫 Mapper 相關邏輯,代碼如下:
# -*- coding: utf8 -*-
import datetime
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
import re
import os
import sys
import logging
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
region = u'ap-guangzhou' # 根據實際情況,修改地域
middle_stage_bucket = 'middlestagebucket' # 根據實際情況,修改bucket名
def delete_file_folder(src):
if os.path.isfile(src):
try:
os.remove(src)
except:
pass
elif os.path.isdir(src):
for item in os.listdir(src):
itemsrc = https://www.cnblogs.com/serverlesscloud/p/os.path.join(src, item)
delete_file_folder(itemsrc)
try:
os.rmdir(src)
except:
pass
def download_file(cos_client, bucket, key, download_path):
logger.info("Get from [%s] to download file [%s]" % (bucket, key))
try:
response = cos_client.get_object(Bucket=bucket, Key=key, )
response['Body'].get_stream_to_file(download_path)
except CosServiceError as e:
print(e.get_error_code())
print(e.get_error_msg())
return -1
return 0
def upload_file(cos_client, bucket, key, local_file_path):
logger.info("Start to upload file to cos")
try:
response = cos_client.put_object_from_local_file(
Bucket=bucket,
LocalFilePath=local_file_path,
Key='{}'.format(key))
except CosServiceError as e:
print(e.get_error_code())
print(e.get_error_msg())
return -1
logger.info("Upload data map file [%s] Success" % key)
return 0
def do_mapping(cos_client, bucket, key, middle_stage_bucket, middle_file_key):
src_file_path = u'/tmp/' + key.split('/')[-1]
middle_file_path = u'/tmp/' + u'mapped_' + key.split('/')[-1]
download_ret = download_file(cos_client, bucket, key, src_file_path) # download src file
if download_ret == 0:
inputfile = open(src_file_path, 'r') # open local /tmp file
mapfile = open(middle_file_path, 'w') # open a new file write stream
for line in inputfile:
line = re.sub('[^a-zA-Z0-9]', ' ', line) # replace non-alphabetic/number characters
words = line.split()
for word in words:
mapfile.write('%s\t%s' % (word, 1)) # count for 1
mapfile.write('\n')
inputfile.close()
mapfile.close()
upload_ret = upload_file(cos_client, middle_stage_bucket, middle_file_key,
middle_file_path) # upload the file's each word
delete_file_folder(src_file_path)
delete_file_folder(middle_file_path)
return upload_ret
else:
return -1
def map_caller(event, context, cos_client):
appid = event['Records'][0]['cos']['cosBucket']['appid']
bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid
key = event['Records'][0]['cos']['cosObject']['key']
key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)
logger.info("Key is " + key)
middle_bucket = middle_stage_bucket + '-' + appid
middle_file_key = '/' + 'middle_' + key.split('/')[-1]
return do_mapping(cos_client, bucket, key, middle_bucket, middle_file_key)
def main_handler(event, context):
logger.info("start main handler")
if "Records" not in event.keys():
return {"errorMsg": "event is not come from cos"}
secret_id = ""
secret_key = ""
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )
cos_client = CosS3Client(config)
start_time = datetime.datetime.now()
res = map_caller(event, context, cos_client)
end_time = datetime.datetime.now()
print("data mapping duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
if res == 0:
return "Data mapping SUCCESS"
else:
return "Data mapping FAILED"
同樣的方法,建立 reducer.py 檔案,撰寫 Reducer 邏輯,代碼如下:
# -*- coding: utf8 -*-
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from qcloud_cos_v5 import CosServiceError
from operator import itemgetter
import os
import sys
import datetime
import logging
region = u'ap-guangzhou' # 根據實際情況,修改地域
result_bucket = u'destmr' # 根據實際情況,修改bucket名
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger()
logger.setLevel(level=logging.INFO)
def delete_file_folder(src):
if os.path.isfile(src):
try:
os.remove(src)
except:
pass
elif os.path.isdir(src):
for item in os.listdir(src):
itemsrc = https://www.cnblogs.com/serverlesscloud/p/os.path.join(src, item)
delete_file_folder(itemsrc)
try:
os.rmdir(src)
except:
pass
def download_file(cos_client, bucket, key, download_path):
logger.info("Get from [%s] to download file [%s]" % (bucket, key))
try:
response = cos_client.get_object(Bucket=bucket, Key=key, )
response['Body'].get_stream_to_file(download_path)
except CosServiceError as e:
print(e.get_error_code())
print(e.get_error_msg())
return -1
return 0
def upload_file(cos_client, bucket, key, local_file_path):
logger.info("Start to upload file to cos")
try:
response = cos_client.put_object_from_local_file(
Bucket=bucket,
LocalFilePath=local_file_path,
Key='{}'.format(key))
except CosServiceError as e:
print(e.get_error_code())
print(e.get_error_msg())
return -1
logger.info("Upload data map file [%s] Success" % key)
return 0
def qcloud_reducer(cos_client, bucket, key, result_bucket, result_key):
word2count = {}
src_file_path = u'/tmp/' + key.split('/')[-1]
result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1]
download_ret = download_file(cos_client, bucket, key, src_file_path)
if download_ret == 0:
map_file = open(src_file_path, 'r')
result_file = open(result_file_path, 'w')
for line in map_file:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
logger.error("error value: %s, current line: %s" % (ValueError, line))
continue
map_file.close()
delete_file_folder(src_file_path)
sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1]
for wordcount in sorted_word2count:
res = '%s\t%s' % (wordcount[0], wordcount[1])
result_file.write(res)
result_file.write('\n')
result_file.close()
upload_ret = upload_file(cos_client, result_bucket, result_key, result_file_path)
delete_file_folder(result_file_path)
return upload_ret
def reduce_caller(event, context, cos_client):
appid = event['Records'][0]['cos']['cosBucket']['appid']
bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid
key = event['Records'][0]['cos']['cosObject']['key']
key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1)
logger.info("Key is " + key)
res_bucket = result_bucket + '-' + appid
result_key = '/' + 'result_' + key.split('/')[-1]
return qcloud_reducer(cos_client, bucket, key, res_bucket, result_key)
def main_handler(event, context):
logger.info("start main handler")
if "Records" not in event.keys():
return {"errorMsg": "event is not come from cos"}
secret_id = "SecretId"
secret_key = "SecretKey"
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, )
cos_client = CosS3Client(config)
start_time = datetime.datetime.now()
res = reduce_caller(event, context, cos_client)
end_time = datetime.datetime.now()
print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms")
if res == 0:
return "Data reducing SUCCESS"
else:
return "Data reducing FAILED"
部署與測驗
遵循 Serverless Framework 的 yaml 規范,撰寫 serveerless.yaml:
WordCountMapper:
component: "@serverless/tencent-scf"
inputs:
name: mapper
codeUri: ./code
handler: index.main_handler
runtime: Python3.6
region: ap-guangzhou
description: 網站監控
memorySize: 64
timeout: 20
events:
- cos:
name: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com
parameters:
bucket: srcmr-1256773370.cos.ap-guangzhou.myqcloud.com
filter:
prefix: ''
suffix: ''
events: cos:ObjectCreated:*
enable: true
WordCountReducer:
component: "@serverless/tencent-scf"
inputs:
name: reducer
codeUri: ./code
handler: index.main_handler
runtime: Python3.6
region: ap-guangzhou
description: 網站監控
memorySize: 64
timeout: 20
events:
- cos:
name: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com
parameters:
bucket: middlestagebucket-1256773370.cos.ap-guangzhou.myqcloud.com
filter:
prefix: ''
suffix: ''
events: cos:ObjectCreated:*
enable: true
完成之后,通過 sls --debug 指令進行部署,部署成功之后,進行基本的測驗:
- 準備一個英文檔案:

-
登錄騰訊云后臺,打開我們最初建立的存盤桶:srcmr,并上傳該檔案;
-
上傳成功之后,稍等片刻即可看到 Reducer 程式已經在 Mapper 執行之后,產出日志:

此時,我們打開結果存盤桶,查看結果:

現在,我們就完成了簡單的詞頻統計功能,
總結
Serverless 架構是適用于大資料處理的,在騰訊云官網,我們也可以看到其關于資料 ETL 處理的場景描述:

本實體中,有一鍵部署多個函式的操作,在實際生產中,每個專案都不會是單個函式單打獨斗的,而是多個函陣列合應用,形成一個 Service 體系,所以一鍵部署多個函式就顯得尤為重要,通過本實體,希望讀者可以對 Serverless 架構的應用場景有更多的了解,并且能有所啟發,將云函式和不同觸發器進行組合,應用在自身業務中,
Serverless Framework 30 天試用計劃
我們誠邀您來體驗最便捷的 Serverless 開發和部署方式,在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實作 Serverless!
詳情可查閱:Serverless Framework 試用計劃
One More Thing
3 秒你能做什么?喝一口水,看一封郵件,還是 —— 部署一個完整的 Serverless 應用?
復制鏈接至 PC 瀏覽器訪問:https://serverless.cloud.tencent.com/deploy/express
3 秒極速部署,立即體驗史上最快的 Serverless HTTP 實戰開發!
傳送門:
- GitHub: github.com/serverless
- 官網:serverless.com
歡迎訪問:Serverless 中文網,您可以在 最佳實踐 里體驗更多關于 Serverless 應用的開發!
推薦閱讀:《Serverless 架構:從原理、設計到專案實戰》
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/10725.html
標籤:其他
