我有一個腳本可以啟動兩個類(控制 LED 燈條和溫度/嗡嗡聲傳感器)。每個類都運行一個可以終止的 while 回圈,signal_handler()基本上呼叫sys.exit(0). signal_handler()我正在考慮像處理類本身一樣處理主程式的退出。但是,當我嘗試CTRL C退出腳本時,程式退出并出現錯誤(請參見下面的代碼)并且燈程式沒有正確退出(即,燈在正常退出時應該關閉時仍然亮著)。
import threading
from light_controller import LightController
from thermometer import Thermometer
import signal
def signal_handler():
print("\nhouse.py terminated with Ctrl C.")
if l_thread.is_alive():
l_thread.join()
if t_thread.is_alive():
t_thread.join()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
lights = LightController()
temp = Thermometer()
t_thread = threading.Thread(target = temp.run)
t_thread.daemon = True
t_thread.start()
l_thread = threading.Thread(target = lights.run)
l_thread.daemon = True
l_thread.start()
Thermometer() terminated with Ctrl C.
Exception ignored in: <module 'threading' from '/usr/lib/python3.7/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 1281, in _shutdown
t.join()
File "/usr/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
File "/home/pi/Desktop/house/thermometer.py", line 51, in signal_handler
sys.exit(0)
我的看法是這種情況正在發生,因為我signal_handler()在兩個類和主程式中都有復制。這兩個類都將運行無限回圈并且可能被它們自己使用,所以我寧愿保留signal_handler()這兩個類中的每一個。我不確定是否有可能真正保持這種狀態。我也不知道是否sys.exit()真的是在不引起錯誤的情況下脫身的方法。我可以對主程式使用不同的退出方法house.py而不是 CTRL C。
更新
感謝您的拼寫檢查!
這是類的代碼。
thermometer.py
from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106
from luma.core.error import DeviceNotFoundError
import os
import time
import signal
import sys
import socket
from PIL import ImageFont, ImageDraw
# adafruit
import board
import busio
from adafruit_htu21d import HTU21D
class Thermometer(object):
"""docstring for Thermometer"""
def __init__(self):
super(Thermometer, self).__init__()
# TODO: Check for pixelmix.ttf in folder
self.drawfont = "pixelmix.ttf"
self.sleep_secs = 30
try:
signal.signal(signal.SIGINT, self.signal_handler)
self.serial = i2c(port=1, address=0x3C)
self.oled_device = ssd1306(self.serial, rotate=0)
except DeviceNotFoundError:
print("I2C mini OLED display not found.")
sys.exit(1)
try:
# Create library object using our Bus I2C port
#self.i2c_port = busio.I2C(board.SCL, board.SDA)
#self.temp_sensor = HTU21D(self.i2c_port)
print("Running temp in debug mode")
except ValueError:
print("Temperature sensor not found")
sys.exit(1)
def getIP(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
def signal_handler(self, sig, frame):
print("\nThermometer() terminated with Ctrl C.")
sys.exit(0)
def run(self):
try:
while True:
# Measure things
temp_value = 25
hum_value = 50
#temp_value = round(self.temp_sensor.temperature, 1)
#hum_value = round(self.temp_sensor.relative_humidity, 1)
# Display results
with canvas(self.oled_device) as draw:
draw.rectangle(self.oled_device.bounding_box, outline="white", fill="black")
font = ImageFont.truetype(self.drawfont, 10)
ip = self.getIP()
draw.text((5, 5), "IP: " ip, fill="white", font=font)
font = ImageFont.truetype(self.drawfont, 12)
draw.text((5, 20), f"T: {temp_value} C", fill="white", font=font)
draw.text((5, 40), f"H: {hum_value}%", fill="white", font=font)
# TODO ADD SAVING Here
time.sleep(self.sleep_secs)
except SystemExit:
print("Exiting...")
sys.exit(0)
except:
print("Unexpected error:", sys.exc_info()[0])
sys.exit(2)
if __name__ == '__main__':
thermo = Thermometer()
thermo.run()
light_controller.py
import RPi.GPIO as GPIO
import time
import signal
import datetime
import sys
class LightController(object):
"""docstring for LightController"""
def __init__(self):
super(LightController, self).__init__()
signal.signal(signal.SIGTERM, self.safe_exit)
signal.signal(signal.SIGHUP, self.safe_exit)
signal.signal(signal.SIGINT, self.safe_exit)
self.red_pin = 9
self.green_pin = 11
# might be white pin if hooking up a white LED here
self.blue_pin = 10
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self.red_pin, GPIO.OUT)
GPIO.setup(self.green_pin, GPIO.OUT)
GPIO.setup(self.blue_pin, GPIO.OUT)
self.pwm_red = GPIO.PWM(self.red_pin, 500) # We need to activate PWM on LED so we can dim, use 1000 Hz
self.pwm_green = GPIO.PWM(self.green_pin, 500)
self.pwm_blue = GPIO.PWM(self.blue_pin, 500)
# Start PWM at 0% duty cycle (off)
self.pwm_red.start(0)
self.pwm_green.start(0)
self.pwm_blue.start(0)
self.pin_zip = zip([self.red_pin, self.green_pin, self.blue_pin],
[self.pwm_red, self.pwm_green, self.pwm_blue])
# Config lights on-off cycle here
self.lights_on = 7
self.lights_off = 19
print(f"Initalizing LightController with lights_on: {self.lights_on}h & lights_off: {self.lights_off}h")
print("------------------------------")
def change_intensity(self, pwm_object, intensity):
pwm_object.ChangeDutyCycle(intensity)
def run(self):
while True:
#for pin, pwm_object in self.pin_zip:
# pwm_object.ChangeDutyCycle(100)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(20)
# time.sleep(10)
# pwm_object.ChangeDutyCycle(0)
current_hour = datetime.datetime.now().hour
# evaluate between
if self.lights_on <= current_hour <= self.lights_off:
self.pwm_blue.ChangeDutyCycle(100)
else:
self.pwm_blue.ChangeDutyCycle(0)
# run this once a second
time.sleep(1)
# ------- Safe Exit ---------- #
def safe_exit(self, signum, frame):
print("\nLightController() terminated with Ctrl C.")
sys.exit(0)
if __name__ == '__main__':
controller = LightController()
controller.run()
uj5u.com熱心網友回復:
選項 1:執行緒很難
擴展我所說的“沒有內部回圈”的含義——執行緒很難,所以讓我們做點別的。
- 我在這里添加了
__enter__和__exit__到溫度計和 LightController 類;這使它們可用作背景關系管理器(即與with塊一起使用)。當您擁有“擁有”其他資源的物件時,這很有用;在這種情況下,溫度計擁有串行設備,而燈控制器觸摸 GPIO。 - 然后,不是每個類都有
.run(),它們將永遠停留在哪里,讓我們讓“外部”程式控制它:它在一個永遠的 while 回圈中運行,并要求每個“設備”在再次等待一秒鐘之前做它的事情。(您也可以使用stdlibsched模塊讓類注冊函式以不同的時間間隔運行,或者如果不同的類碰巧需要不同的檢查時間間隔,則更聰明。) - 由于沒有執行緒,因此也不需要設定信號處理程式;程式中的 ctrl c 會
KeyboardInterrupt像常規一樣彈出例外,并且with塊的__exit__處理程式有機會進行清理。
class Thermometer:
def __enter__(self):
self.serial = ...
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: Cleanup the i2c/ssd devices
pass
def step(self):
""" Measure and draw things """
# Measure things...
# Draw things...
class LightController:
def __enter__(self):
GPIO.setmode(...)
def __exit__(self, exc_type, exc_val, exc_tb):
# TODO: cleanup GPIO
pass
def step(self):
current_hour = datetime.datetime.now().hour
# etc...
def main():
with LightController() as lights, Thermometer() as temp:
while True:
lights.step()
temp.step()
time.sleep(1)
if __name__ == '__main__':
main()
選項 2:執行緒很難,但我們還是要這樣做
另一種選擇是讓您的執行緒合作并在需要時關閉,是使用 anEvent來控制它們的內部回圈。
這里的想法是,不是time.sleep()在回圈中,而是Event.wait()在等待,因為它接受一個可選的超時來等待事件被設定(或不被設定)。事實上,在某些作業系統上,time.sleep()它被實作為讓執行緒等待匿名事件。
當您希望執行緒退出時,您設定停止事件,它們將完成他們正在做的事情。
為了方便起見,我還把這個概念打包成一個“DeviceThread”。
import threading
import time
class DeviceThread(threading.Thread):
interval = 1
def __init__(self, stop_event):
super().__init__(name=self.__class__.__name__)
self.stop_event = stop_event
def step(self):
pass
def initialize(self):
pass
def cleanup(self):
pass
def run(self):
try:
self.initialize()
while not self.stop_event.wait(self.interval):
self.step()
finally:
self.cleanup()
class ThermometerThread(DeviceThread):
def initialize(self):
self.serial = ...
def cleanup(self):
... # close serial port
def step(self):
... # measure and draw
def main():
stop_event = threading.Event()
threads = [ThermometerThread(stop_event)]
for thread in threads:
thread.start()
try:
while True:
# Nothing to do in the main thread...
time.sleep(1)
except KeyboardInterrupt:
print("Caught keyboard interrupt, stopping threads")
stop_event.set()
for thread in threads:
print(f"Waiting for {thread.name} to stop")
thread.join()
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/485009.html
上一篇:是假用堆記憶體共享案例嗎?
