封裝的Redis佇列
MyRedisQueue.py
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
import redis
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
# redis的默認引數為:host='localhost', port=6379, db=0, 其中db為定義redis database的數量
# r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 加上decode_responses=True,寫入的鍵值對中的value為str型別,不加這個引數寫入的則為位元組型別
# host是redis主機,需要redis服務端和客戶端都啟動 redis默認埠是6379
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
def qsize(self):
return self.__db.llen(self.key) # 回傳佇列里面list內元素的數量
def put(self, item, timeout=None):
self.__db.rpush(self.key, item) # 添加新元素到佇列最右方
if isinstance(timeout, int):
self.__db.expire(self.key, timeout)
def get_wait(self, timeout=None):
# 回傳佇列第一個元素,如果為空則等待至有元素被加入佇列(超時時間閾值為timeout,如果為None則一直等待)
item = self.__db.blpop(self.key, timeout=timeout)
# if item:
# item = item[1] # 回傳值為一個tuple
return item
def get_nowait(self):
# 直接回傳佇列第一個元素,如果佇列為慷訓傳的是None
item = self.__db.lpop(self.key)
return item
def get_all(self):
items = []
while True:
result = self.get_nowait()
if result:
items.append(eval(result))
else:
break
return items
接收端
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
from MyRedisQueue import RedisQueue
queue_name = "q1"
retCode = {"status": {"code": 0, "msg": "success"}}
redis_queue = RedisQueue(queue_name)
ret = redis_queue.get_wait(30) # 阻塞等待30s,直到佇列中有元素進來
if ret is None:
retCode["status"]["code"] = 2
retCode["status"]["msg"] = "超時未回應"
發送端
#!usr/bin/env python2.7
# -*- coding: utf-8 -*-
from MyRedisQueue import RedisQueue
queue_name = "q1"
redis_queue = RedisQueue(queue_name)
redis_queue.put("all done")
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/190815.html
標籤:Python
