我想做一個小函式,在后臺為我的應用程式做一些獲取作業,所以我希望我的應用程式在呼叫這個函式后能繼續正常執行(所以這應該是無阻塞的)。我想讓它每隔X秒在一個預定的執行緒上運行。 為了做到這一點,我有以下內容:
def start(self)。
sync_thread = threading.Timer(30, self.readMessages(self.queue, self.client)
sync_thread.start()
其中queue和client在__init__函式中被初始化并分配給self。
執行緒的第一次啟動和對我的readMessages函式的呼叫運行良好,盡管目前在30秒后我得到了以下錯誤:
Exception in thread Thread-4:
回溯(最近一次呼叫)。
檔案 "/usr/local/lib/python3.8/threading.py", 行 932, in _bootstrap_inner
self.run()
檔案 "/usr/local/lib/python3.8/threading.py", line 1254, in run
self.function(*self.args, **self.kwargs)
型別錯誤。'NoneType' object is not callable
有什么想法嗎,或者有什么更好的方法來做這件事?
uj5u.com熱心網友回復:
你得到了TypeError: 'NoneType' object is not callable,因為threading.Timer例程正試圖呼叫你傳遞的函式,然而,正如@CherryDT在評論中提到的,你沒有傳遞任何函式,而是傳遞了你的類中readMessage方法的回傳值。
你需要做的是:
def MinPrinter。
def __init__(self, xs):
self.xs = xs
def callback(self, xs)。# This is your 'readMessages' function.
print(min(xs))。
def start(self, n)。
t = threading.Timer(n, self.callback, args=(self.xs,))
t.start()
mp = MinPrinter([ 1, 2, 0, 3, - 1]
mp.start(30)
# 將在30秒后列印。
-1
引數需要用args引數來指定。另外,你也可以使用kwargs引數傳遞關鍵字引數(需要傳遞一個字典)。
uj5u.com熱心網友回復:
嘗試
def start(self)。
sync_thread = threading.Timer(30, self.readMessages, args=(self.queue,
self.client))
sync_thread.start()
你應該傳遞一個可呼叫的物件給timer,當你使用self.readMessages(self.queue, self.client)時,你正在執行這個函式,而不是傳遞它。
無論如何,使用Timer不會給你帶來你想要的東西。它將在30秒后呼叫函式self.readMessages一次,但它不會每30秒重復一次。
一個(非常)簡單的方法
from threading import Thread
from sched import scheduler
import time
class SchedTask(Thread)。
def __init__(self, interval, task, task_args=()) 。
super().__init__()
self.scd = scheduler(time.time, time.sleep)
self.interval = interval
self.task = task
self.task_args = task_args
def schedual_task(self)。
self.scd.enter(self.interval, 1, self.schedual_task)
self.task(self.task_args)
self.scd.run()
def run(self)。
self.schedual_task()
然后在你的代碼中你可以這樣使用它
def t(*args)。
print("我是一個任務")
st = SchedTask(5,t)
st.start()
#rest of your code你的代碼的其余部分
...
有很多方法可以做到這一點,但這只是一個簡單的例子。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/308341.html
標籤:
