重新推出重制版,單房間推送提醒版本(后文有擴展多房間思路),適合沒有服務器的小伙伴學習使用,
Gitee檔案同步更新:https://gitee.com/LGW_space/dy-live-lintener
語雀檔案更為詳細(包含成品案例):https://www.yuque.com/books/share/fdc7c120-e4eb-47d5-93de-dcf5d347e5a8?# 《斗魚直播提醒服務》
斗魚直播提醒 ——簡化版
功能介紹:
當然在學習Python的道路上肯定會困難,沒有好的學習資料,怎么去學習呢? 學習Python中有不明白推薦加入交流Q群號:928946953 群里有志同道合的小伙伴,互幫互助, 群里有不錯的視頻學習教程和PDF! 還有大牛解答!
主播開播后或更換房間標題后推送提醒到微信(單房間)
> 理論上可以配置多個房間,可以達到和斗魚全房間推送的效果一樣,
專案截圖:
<ignore_js_op>
[TOC]
一、專案準備
1. 用到的技術內容
- 云函式
- 物件存盤(為什么使用物件存盤,下文有介紹)
- python(3.6/3.7)
- WxPusher
2. 需要的工具
- 騰訊云(云函式和云物件存盤)
二、開通物件存盤
這里我們使用物件存盤相當于資料庫的一個作用,為什么使用物件存盤呢?
> 物件存盤采用扁平的檔案組織方式,所以在檔案量上升至千萬、億級別,容量在PB級別的時候,這種檔案組織方式下的性能優勢就顯現出來了,檔案不在有目錄樹深度的問題,歷史和近線資料有同樣的訪問效率,另外,物件存盤多采用分布式架構,簡而言之就是便宜效率高
官方入門步驟
1. 注冊騰訊云
官方的注冊方法
2. 開通 COS 服務
在 騰訊云控制臺 中,選擇【云產品】>【物件存盤】,進入 COS 控制臺,按照界面提示開通 COS 服務,(如果您已開通,請跳過該步驟,)
3. 創建存盤桶
我們需要創建一個用于存放物件的存盤桶:
-
在 物件存盤控制臺 左側導航欄中單擊【存盤桶串列】,進入存盤桶管理頁,
-
單擊【創建存盤桶】,輸入以下配置資訊,其他配置保持默認即可,
- 名稱:輸入存盤桶名稱,名稱設定后不可修改,此處舉例輸入 examplebucket,
- 所屬地域:存盤桶所屬地域,選擇與您業務最近的一個地區,例如廣州地域,
- 訪問權限:存盤桶訪問權限,此處我們保持默認為“私有讀寫”,
-
單擊【確定】,即可創建完成,
三、云函式的開發
云函式我們使用騰訊云函式作為示例,阿里云也同理,(阿里云似乎是不允許觸發器的觸發周期低于1分鐘)
1. 注冊騰訊云
官方的注冊方法
2. 創建云函式
- 登錄騰訊云,在云產品里找到云函式,在【函式服務】里新建云函式
- 選擇【自定義創建】
- 修改【函式名稱】、【地域】、【運行環境(Python3.6)】、【提交方法(在線編輯)】、【執行方法(
index.main_handler)】暫時不用修改默認代碼, - 點擊完成
3. 完善功能
在寫代碼之前,我們需要用到這幾個關鍵資訊:
- appid :你的APPID(賬號資訊里有)
- secret_id : 你的 SecretId (API密鑰管理里獲取)
- secret_key :你的 SecretKey(API密鑰管理里獲取)
- region :存盤桶所在的地域
- BUCKET:存盤桶名稱
- token :沒有可為空
- FILE_NAME :你要上傳的檔案名 如room_info
- WxPusherToken :WxPusher的用戶appToken
3.1 引入所需包,完成基本配置
復制代碼 隱藏代碼 # -*- coding: utf8 -*- from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client from qcloud_cos_v5 import CosServiceError from qcloud_cos_v5 import CosClientError import datetime import json import logging import requests from urllib.parse import parse_qs logger = logging.getLogger() logger.setLevel(logging.ERROR) # 默認列印 INFO 級別日志,可根據需要調整為 DEBUG、WARNING、ERROR、CRITICAL 級日志 appid = 123123 # Please replace with your APPID. 請替換為您的 APPID secret_id = u'xxx' # Please replace with your SecretId. 請替換為您的 SecretId secret_key = u'xxx' # Please replace with your SecretKey. 請替換為您的 SecretKey region = u'ap-shanghai' # Please replace with the region where COS bucket located. 請替換為您bucket 所在的地域 BUCKET = 'xxx' FILE_NAME = 'xxx' token = '' WxPusherToken = 'xxxxxx' # 配置存盤桶 config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region, Token=token) client = CosS3Client(config)
注:qcloud_cos_v5 為 COS的SDK
3.2 創建方法 根據房間號獲取直播狀態
斗魚第三方API,通過房間號獲取房間資訊,自行百度獲取,
最后回傳 房間資訊復制代碼 隱藏代碼 def getRoomInfo(): url = '斗魚官方API/房間號' try: r = requests.get(url, timeout=5) except requests.exceptions.RequestException as e: print(str(e)) return 'timeout' else: if r.status_code == 404: return '404' json_str = json.loads(r.text) if isinstance(json_str, dict): if 'error' in json_str.keys() and json_str['error'] == 0: room_info = json_str['data'] return room_info
3.3 創建方法 發送WxPusher訊息
復制代碼 隱藏代碼 def send_WxPusher_msg(c): headers = { 'Content-Type': 'application/json' } url = 'http://wxpusher.zjiecode.com/api/send/message' parameter = { "appToken": WxPusherToken, "content": c, "contentType": 1, # 內容型別 1表示文字 2表示html(只發送body標簽內部的資料即可,不包括body標簽) 3表示markdown "topicIds":[ 你的主題號 ] } r = requests.post(url=url, headers=headers, data=json.dumps(parameter, ensure_ascii=False).encode('utf-8')) json_str = json.loads(r.text) datas = json_str['data'] for data in datas: if data['code'] == 1001: logger.error(data['status'])
3.4 創建方法 更新(上傳)檔案
設定更新時間欄位,用于鎖定更新,在指定時間內不允許更新/重復推送(我猜是騰訊云多執行緒的問題,會導致重復更新、重復推送)
復制代碼 隱藏代碼 def updateLocalInfo(roominfo): DIC = { "room_id": roominfo["room_id"], "room_status": roominfo["room_status"], "room_name": roominfo["room_name"], "update_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } response = client.put_object( Bucket=BUCKET, Body=DIC, Key=FILE_NAME ) print("更新成功:",response['ETag'])
3.5 創建方法 獲取檔案
url獲取:上傳檔案后,在存盤桶內點擊檔案的
詳情,基本資訊的物件地址資訊
如 https://存盤桶名.cos.地域.myqcloud.com/檔案名復制代碼 隱藏代碼 def getLocalInfo(): url = '檔案的物件地址' r = requests.get(url) info_str = r.text params = parse_qs(info_str) result = {key: params[key][0] for key in params} return result
3.6 創建方法 資訊整合
復制代碼 隱藏代碼 def time_last(time_str): # 計算時間差 t_r = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S") now_time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") t_n = datetime.datetime.strptime(now_time_str, "%Y-%m-%d %H:%M:%S") return (t_n - t_r).seconds def checkRoomInfo(): room = getLocalInfo() # 獲取歷史房間資訊 curr_info = getRoomInfo() # 獲取當前真實房間資訊 time_l = time_last(room['update_time']) # 計算當前時間和上次更新時間的時間差 # 直播開播提醒 if curr_info['room_status'] == "1": # 正在直播 if room['room_status'] == "2": # 之前狀態為 未開播 if time_l > 300: # 300秒內無法重復推送 content = '【開播】您關注的主播:' + curr_info['owner_name'] + '開始直播啦' + \ '\n房間標題:' + curr_info['room_name'] + \ '\n正在直播:' + curr_info['cate_name'] send_WxPusher_msg(content) updateLocalInfo(roominfo=curr_info) else: logger.error( '【開播】房間號:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n時間差:' + str( time_l)) else: if room['room_status'] == "1": # 關閉了直播 updateLocalInfo(roominfo=curr_info) # 更換房間標題提醒 if curr_info['room_name'] != room['room_name']: if time_l > 300: content1 = '【更換標題】您關注的主播:' + curr_info['owner_name'] + '改變了房間標題' + \ '\n當前房間標題:' + curr_info['room_name'] send_WxPusher_msg(content1) updateLocalInfo(roominfo=curr_info) else: logger.error( '【更換標題】房間號:' + room['room_id'] + '\n上次更新:' + room['update_time'] + '\n時間差:' + str( time_l))
到這所有工能就基本全部完成啦~
開發擴展思路
多房間支持:支持一個房間就一定支持多個房間,
- 下載檔案:檔案內容為字典/json陣列,每一個字典記錄房間的資訊,
- 遍歷:獲取到存盤桶內的房間資訊后,用斗魚API遍歷得到每一個房間的真實資訊,
- 更新/上傳:通過比對真實資訊,更新字典陣列,最后再上傳到COS,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/298835.html
標籤:其他

