- 背景:
- 開發背景:公司相關報表需求需要將訂單業務資料同步至RocketMQ中,由于需要保證開發的一致性(多個部門協同開發),所以采用讀取Hive離線資料的方式通過PythonAPI寫入RocketMQ中,便于其他開發同事呼叫~
- 開發環境:
- 本地除錯
系統 Mac Python 3.7.5 rocketmq 0.4.4 (Python模塊) rocketmq-client-python 2.0.0 (Python模塊) - 服務器
系統 CentOS Linux release 7.4.1708 Python 3.7.3 rocketmq 0.4.4 (Python模塊) rocketmq-client-python 2.0.0 (Python模塊)
- 本地除錯
- Python相關代碼
- Produce端
import json from rocketmq.client import Producer, Message #BDP_Process_GroupID:groupid #max_message_size:訊息位元組長度限制,此處為1M,如果訊息過大,可以修改此處引數 producer = Producer('BDP_Process',max_message_size=1024*1024) producer.set_namesrv_addr('xxxx:9876')#ip:prot #producer.set_name_server_address('xxxx:9876')#linux和mac代碼不通,此處為mac系統書寫格式 #producer.set_namesrv_addr('xxxx:9876,xxxx:9876,xxxx:9876')#集群模式可以這樣寫 producer.start() msg_body = { "id":212331, "orderId":"320106004011202105318032", "storeId":"X12N", "riqi":"2020-12-12 22:12:32", "processTime":"1595311611000" } #將字典封裝成訊息并修改訊息編碼,此處不修改編碼可能會中文亂碼 data = json.dumps(msg_body,ensure_ascii=False).encode('utf-8') msg = Message('YOUR-TOPIC')#訊息主題 msg.set_keys(msg_body.get('orderId'))#訊息TAG,用于訊息過濾對訊息的整體分類 msg.set_tags('BPD_Process')#Message索引鍵 msg.set_body(data)#訊息主體 ret = producer.send_sync(msg)#發送異步訊息 print(ret.status, ret.msg_id, ret.offset) producer.shutdown() - PushConsumer端
import time from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUME_SUCCESS #BDP_Process_GroupID:groupid consumer = PushConsumer('BDP_Process_GroupID') consumer.set_name_server_address('xxxx:9876')#ip:prot consumer.subscribe('YOUR-TOPIC', callback)#Topic consumer.start() while True: time.sleep(3600) consumer.shutdown() - Produce端讀取Hive
import re import sys import time import json import logging from pyhive import hive,presto #qry為查詢hive的SQL def Hive(self,qry): try: connect = hive.Connection( host=hive_host, port=hive_port, database=hive_database, username=hive_username, password=hive_password, auth='LDAP',#網路通信的方式 configuration={"mapreduce.job.queuename":"root.EXTRACT"}#佇列 ) cursor = connect.cursor() cursor.execute(qry) fetchall = cursor.fetchall() result = [list(i) for i in fetchall] return result except Exception as e: if (str(e) == "No result set"): logger.info("該sql無回傳值:\t%s" % qry) logger.info(e) else: logger.warning("Hive Connect ERROR\n") logger.info("該sql有誤:%s" % qry) logger.info(e)
- Produce端
- 開發程序中遇到的問題整理
- Python開發RocketMQ代碼前,需要先安裝對應的輕量級client,鏈接地址https://github.com/apache/rocketmq-client-python
- rocketmq-client-python2.0.0目前只支持Mac、Linux系統
- 開發前期請先確認rocketMq相關訪問的埠(9876、10911(VIP訊息埠))、Topic是否創建,以及網路是否可以訪問
- Linux和Mac代碼有不同的地方,例如set_name_server_address和set_namesrv_addr,消費端也有不同的地方,但是因為沒有涉及到消費端的代碼除錯,此處就不一一列出了,這個是很容易被誤導的地方,
- 總結
- 在著手代碼開發前,先把網路相關的埠、IP梳理清楚,是否可以直接訪問,這樣會省很多開發時間
- 開發程序中盡量與其他同事溝通好,看雙方訊息協議、資料型別等是否都一致,都需要確認好,這樣會少走很多彎路,
- 開發中不反對百度,但是百度很可能解決不了你的問題,你可以在官網看看是否有相關報錯的issue,或者本地自己一步步Debug,這樣你下次會更快的定位到問題的根本原因,
博客到此就告一段落了,如果代碼或者書寫哪里有問題,大家可以評論留言,博主看到后會修改代碼,也謝謝各位啦~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/289516.html
標籤:其他
上一篇:Hbase
