6 Commits

Author SHA1 Message Date
Phil Howard
c119e5f7bf CI: Make ruff happy. 2025-02-26 14:46:38 +00:00
Phil Howard
c452289fa6 CI: Apply isort suggestions. 2025-02-26 14:30:17 +00:00
Phil Howard
ff8bd71fd5 CI: Fixup tests. 2025-02-26 14:11:49 +00:00
Phil Howard
3ff4f65a61 Switch PWM to use an Event. 2025-02-26 14:11:19 +00:00
Phil Howard
d98b694622 Port moisture to gpiod. 2025-02-26 14:11:00 +00:00
Phil Howard
dd0e25da5b CI: Fix ruff invocation. 2025-02-26 14:10:47 +00:00
6 changed files with 94 additions and 80 deletions

View File

@@ -1,6 +1,12 @@
import atexit
import select
import threading
import time
from datetime import timedelta
import RPi.GPIO as GPIO
import gpiodevice
from gpiod import LineSettings
from gpiod.line import Bias, Edge
MOISTURE_1_PIN = 23
MOISTURE_2_PIN = 8
@@ -25,46 +31,55 @@ class Moisture(object):
"""
self._gpio_pin = [MOISTURE_1_PIN, MOISTURE_2_PIN, MOISTURE_3_PIN, MOISTURE_INT_PIN][channel - 1]
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._gpio_pin, GPIO.IN)
self._count = 0
self._reading = 0
self._history = []
self._history_length = 200
self._last_pulse = time.time()
self._new_data = False
self._wet_point = wet_point if wet_point is not None else 0.7
self._dry_point = dry_point if dry_point is not None else 27.6
self._time_last_reading = time.time()
self._last_reading = 0
self._int, self._offset = gpiodevice.get_pin(self._gpio_pin, f"grow-m-ch{channel}", LineSettings(
edge_detection=Edge.RISING, bias=Bias.PULL_DOWN, debounce_period=timedelta(milliseconds=10)
))
self._poll_thread_event = threading.Event()
self._poll_thread = threading.Thread(target=self._thread_poll)
self._poll_thread.start()
atexit.register(self._thread_stop)
def __del__(self):
self._thread_stop()
def _thread_stop(self):
self._poll_thread_event.set()
self._poll_thread.join()
def _thread_poll(self):
poll = select.poll()
try:
GPIO.add_event_detect(self._gpio_pin, GPIO.RISING, callback=self._event_handler, bouncetime=1)
except RuntimeError as e:
if self._gpio_pin == 8:
raise RuntimeError("""Unable to set up edge detection on BCM8.
poll.register(self._int.fd, select.POLLIN)
except TypeError:
return
while not self._poll_thread_event.wait(1.0):
if not poll.poll(0):
# No pulses in 1s, this is not a valid reading
continue
Please ensure you add the following to /boot/config.txt and reboot:
dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
""")
else:
raise e
self._time_start = time.time()
def _event_handler(self, pin):
self._count += 1
self._last_pulse = time.time()
if self._time_elapsed >= 1.0:
self._reading = self._count / self._time_elapsed
# We got pulses, since we waited for 1s we can be relatively
# sure the number of pulses is approximately the sensor frequency
events = self._int.read_edge_events()
self._last_reading = time.time()
self._reading = len(events) # Pulse frequency
self._history.insert(0, self._reading)
self._history = self._history[:self._history_length]
self._count = 0
self._time_last_reading = time.time()
self._new_data = True
poll.unregister(self._int.fd)
@property
def history(self):
history = []
@@ -76,10 +91,6 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
return history
@property
def _time_elapsed(self):
return time.time() - self._time_last_reading
def set_wet_point(self, value=None):
"""Set the sensor wet point.
@@ -123,7 +134,7 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
@property
def active(self):
"""Check if the moisture sensor is producing a valid reading."""
return (time.time() - self._last_pulse) < 1.0 and self._reading > 0 and self._reading < 28
return (time.time() - self._last_reading) <= 1.0 and self._reading > 0 and self._reading < 28
@property
def new_data(self):

View File

@@ -1,5 +1,5 @@
import time
from threading import Thread
from threading import Event, Thread
import gpiod
import gpiodevice
@@ -11,19 +11,18 @@ OUTL = gpiod.LineSettings(direction=Direction.OUTPUT, output_value=Value.INACTIV
class PWM:
_pwms: list = []
_t_pwm: Thread = None
_pwm_running: bool = False
_t_pwm_event: Event = Event()
@staticmethod
def start_thread():
if PWM._t_pwm is None:
PWM._pwm_running = True
PWM._t_pwm = Thread(target=PWM._run)
PWM._t_pwm.start()
@staticmethod
def stop_thread():
if PWM._t_pwm is not None:
PWM._pwm_running = False
PWM._t_pwm_event.set()
PWM._t_pwm.join()
PWM._t_pwm = None
@@ -34,13 +33,13 @@ class PWM:
@staticmethod
def _remove(pwm):
index = PWM._pwms.index(pwm)
del PWM._pwms[index]
PWM._pwms.pop(index)
if len(PWM._pwms) == 0:
PWM.stop_thread()
@staticmethod
def _run():
while PWM._pwm_running:
while not PWM._t_pwm_event.is_set():
PWM.run()
@staticmethod

View File

@@ -26,12 +26,24 @@ def cleanup():
@pytest.fixture(scope='function', autouse=False)
def GPIO():
def gpiod():
"""Mock gpiod module."""
gpiod = mock.MagicMock()
sys.modules['gpiod'] = gpiod
yield gpiod
del sys.modules['gpiod']
sys.modules["gpiod"] = mock.Mock()
sys.modules["gpiod.line"] = mock.Mock()
yield sys.modules["gpiod"]
del sys.modules["gpiod.line"]
del sys.modules["gpiod"]
@pytest.fixture(scope="function", autouse=False)
def gpiodevice():
gpiodevice = mock.Mock()
gpiodevice.get_pins_for_platform.return_value = [(mock.Mock(), 0), (mock.Mock(), 0), (mock.Mock(), 0)]
gpiodevice.get_pin.return_value = (mock.Mock(), 0)
sys.modules["gpiodevice"] = gpiodevice
yield gpiodevice
del sys.modules["gpiodevice"]
@pytest.fixture(scope='function', autouse=False)

View File

@@ -1,8 +1,9 @@
import time
def test_pumps_actually_stop(gpiod, smbus2):
def test_pumps_actually_stop(gpiod, gpiodevice, smbus2):
from grow.pump import Pump
from grow.pwm import PWM
ch1 = Pump(channel=1)
@@ -10,9 +11,13 @@ def test_pumps_actually_stop(gpiod, smbus2):
time.sleep(0.1)
assert ch1.get_speed() == 0
PWM.stop_thread()
def test_pumps_are_mutually_exclusive(gpiod, smbus2):
def test_pumps_are_mutually_exclusive(gpiod, gpiodevice, smbus2):
from grow.pump import Pump, global_lock
from grow.pwm import PWM
ch1 = Pump(channel=1)
ch2 = Pump(channel=2)
@@ -28,9 +33,13 @@ def test_pumps_are_mutually_exclusive(gpiod, smbus2):
assert ch3.dose(speed=0.5) is False
assert ch3.dose(speed=0.5, blocking=False) is False
PWM.stop_thread()
def test_pumps_run_sequentially(gpiod, smbus2):
def test_pumps_run_sequentially(gpiod, gpiodevice, smbus2):
from grow.pump import Pump, global_lock
from grow.pwm import PWM
ch1 = Pump(channel=1)
ch2 = Pump(channel=2)
@@ -45,3 +54,5 @@ def test_pumps_run_sequentially(gpiod, smbus2):
assert ch3.dose(speed=0.5, timeout=0.1, blocking=False) is True
assert global_lock.locked() is True
time.sleep(0.3)
PWM.stop_thread()

View File

@@ -1,21 +1,12 @@
import mock
def test_moisture_setup(gpiod, smbus2):
def test_moisture_setup(gpiod, gpiodevice, smbus2):
from grow.moisture import Moisture
ch1 = Moisture(channel=1)
ch2 = Moisture(channel=2)
ch3 = Moisture(channel=3)
GPIO.setup.assert_has_calls([
mock.call(ch1._gpio_pin, GPIO.IN),
mock.call(ch2._gpio_pin, GPIO.IN),
mock.call(ch3._gpio_pin, GPIO.IN)
])
_ = Moisture(channel=1)
_ = Moisture(channel=2)
_ = Moisture(channel=3)
def test_moisture_read(gpiod, smbus2):
def test_moisture_read(gpiod, gpiodevice, smbus2):
from grow.moisture import Moisture
assert Moisture(channel=1).saturation == 1.0
@@ -27,24 +18,14 @@ def test_moisture_read(gpiod, smbus2):
assert Moisture(channel=3).moisture == 0
def test_pump_setup(gpiod, smbus2):
from grow.pump import PUMP_PWM_FREQ, Pump
def test_pump_setup(gpiod, gpiodevice, smbus2):
from grow.pump import Pump
from grow.pwm import PWM
ch1 = Pump(channel=1)
ch2 = Pump(channel=2)
ch3 = Pump(channel=3)
_ = Pump(channel=1)
_ = Pump(channel=2)
_ = Pump(channel=3)
GPIO.setup.assert_has_calls([
mock.call(ch1._gpio_pin, GPIO.OUT, initial=GPIO.LOW),
mock.call(ch2._gpio_pin, GPIO.OUT, initial=GPIO.LOW),
mock.call(ch3._gpio_pin, GPIO.OUT, initial=GPIO.LOW)
])
# Threads. Not even once.
PWM.stop_thread()
GPIO.PWM.assert_has_calls([
mock.call(ch1._gpio_pin, PUMP_PWM_FREQ),
mock.call().start(0),
mock.call(ch2._gpio_pin, PUMP_PWM_FREQ),
mock.call().start(0),
mock.call(ch3._gpio_pin, PUMP_PWM_FREQ),
mock.call().start(0)
])

View File

@@ -20,7 +20,7 @@ commands =
python -m build --no-isolation
python -m twine check dist/*
isort --check .
ruff .
ruff check .
codespell .
deps =
check-manifest