mirror of
https://github.com/pimoroni/grow-python
synced 2025-10-25 15:19:23 +00:00
Compare commits
6 Commits
9eda924814
...
c119e5f7bf
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c119e5f7bf | ||
|
|
c452289fa6 | ||
|
|
ff8bd71fd5 | ||
|
|
3ff4f65a61 | ||
|
|
d98b694622 | ||
|
|
dd0e25da5b |
@@ -1,6 +1,12 @@
|
||||
import atexit
|
||||
import select
|
||||
import threading
|
||||
import time
|
||||
from datetime import timedelta
|
||||
|
||||
import RPi.GPIO as GPIO
|
||||
import gpiodevice
|
||||
from gpiod import LineSettings
|
||||
from gpiod.line import Bias, Edge
|
||||
|
||||
MOISTURE_1_PIN = 23
|
||||
MOISTURE_2_PIN = 8
|
||||
@@ -25,46 +31,55 @@ class Moisture(object):
|
||||
"""
|
||||
self._gpio_pin = [MOISTURE_1_PIN, MOISTURE_2_PIN, MOISTURE_3_PIN, MOISTURE_INT_PIN][channel - 1]
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setup(self._gpio_pin, GPIO.IN)
|
||||
|
||||
self._count = 0
|
||||
self._reading = 0
|
||||
self._history = []
|
||||
self._history_length = 200
|
||||
self._last_pulse = time.time()
|
||||
self._new_data = False
|
||||
self._wet_point = wet_point if wet_point is not None else 0.7
|
||||
self._dry_point = dry_point if dry_point is not None else 27.6
|
||||
self._time_last_reading = time.time()
|
||||
|
||||
self._last_reading = 0
|
||||
|
||||
self._int, self._offset = gpiodevice.get_pin(self._gpio_pin, f"grow-m-ch{channel}", LineSettings(
|
||||
edge_detection=Edge.RISING, bias=Bias.PULL_DOWN, debounce_period=timedelta(milliseconds=10)
|
||||
))
|
||||
|
||||
self._poll_thread_event = threading.Event()
|
||||
self._poll_thread = threading.Thread(target=self._thread_poll)
|
||||
self._poll_thread.start()
|
||||
|
||||
atexit.register(self._thread_stop)
|
||||
|
||||
def __del__(self):
|
||||
self._thread_stop()
|
||||
|
||||
def _thread_stop(self):
|
||||
self._poll_thread_event.set()
|
||||
self._poll_thread.join()
|
||||
|
||||
def _thread_poll(self):
|
||||
poll = select.poll()
|
||||
try:
|
||||
GPIO.add_event_detect(self._gpio_pin, GPIO.RISING, callback=self._event_handler, bouncetime=1)
|
||||
except RuntimeError as e:
|
||||
if self._gpio_pin == 8:
|
||||
raise RuntimeError("""Unable to set up edge detection on BCM8.
|
||||
poll.register(self._int.fd, select.POLLIN)
|
||||
except TypeError:
|
||||
return
|
||||
while not self._poll_thread_event.wait(1.0):
|
||||
if not poll.poll(0):
|
||||
# No pulses in 1s, this is not a valid reading
|
||||
continue
|
||||
|
||||
Please ensure you add the following to /boot/config.txt and reboot:
|
||||
|
||||
dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
|
||||
|
||||
""")
|
||||
else:
|
||||
raise e
|
||||
|
||||
self._time_start = time.time()
|
||||
|
||||
def _event_handler(self, pin):
|
||||
self._count += 1
|
||||
self._last_pulse = time.time()
|
||||
if self._time_elapsed >= 1.0:
|
||||
self._reading = self._count / self._time_elapsed
|
||||
# We got pulses, since we waited for 1s we can be relatively
|
||||
# sure the number of pulses is approximately the sensor frequency
|
||||
events = self._int.read_edge_events()
|
||||
self._last_reading = time.time()
|
||||
self._reading = len(events) # Pulse frequency
|
||||
self._history.insert(0, self._reading)
|
||||
self._history = self._history[:self._history_length]
|
||||
self._count = 0
|
||||
self._time_last_reading = time.time()
|
||||
self._new_data = True
|
||||
|
||||
poll.unregister(self._int.fd)
|
||||
|
||||
@property
|
||||
def history(self):
|
||||
history = []
|
||||
@@ -76,10 +91,6 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
|
||||
|
||||
return history
|
||||
|
||||
@property
|
||||
def _time_elapsed(self):
|
||||
return time.time() - self._time_last_reading
|
||||
|
||||
def set_wet_point(self, value=None):
|
||||
"""Set the sensor wet point.
|
||||
|
||||
@@ -123,7 +134,7 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
|
||||
@property
|
||||
def active(self):
|
||||
"""Check if the moisture sensor is producing a valid reading."""
|
||||
return (time.time() - self._last_pulse) < 1.0 and self._reading > 0 and self._reading < 28
|
||||
return (time.time() - self._last_reading) <= 1.0 and self._reading > 0 and self._reading < 28
|
||||
|
||||
@property
|
||||
def new_data(self):
|
||||
|
||||
11
grow/pwm.py
11
grow/pwm.py
@@ -1,5 +1,5 @@
|
||||
import time
|
||||
from threading import Thread
|
||||
from threading import Event, Thread
|
||||
|
||||
import gpiod
|
||||
import gpiodevice
|
||||
@@ -11,19 +11,18 @@ OUTL = gpiod.LineSettings(direction=Direction.OUTPUT, output_value=Value.INACTIV
|
||||
class PWM:
|
||||
_pwms: list = []
|
||||
_t_pwm: Thread = None
|
||||
_pwm_running: bool = False
|
||||
_t_pwm_event: Event = Event()
|
||||
|
||||
@staticmethod
|
||||
def start_thread():
|
||||
if PWM._t_pwm is None:
|
||||
PWM._pwm_running = True
|
||||
PWM._t_pwm = Thread(target=PWM._run)
|
||||
PWM._t_pwm.start()
|
||||
|
||||
@staticmethod
|
||||
def stop_thread():
|
||||
if PWM._t_pwm is not None:
|
||||
PWM._pwm_running = False
|
||||
PWM._t_pwm_event.set()
|
||||
PWM._t_pwm.join()
|
||||
PWM._t_pwm = None
|
||||
|
||||
@@ -34,13 +33,13 @@ class PWM:
|
||||
@staticmethod
|
||||
def _remove(pwm):
|
||||
index = PWM._pwms.index(pwm)
|
||||
del PWM._pwms[index]
|
||||
PWM._pwms.pop(index)
|
||||
if len(PWM._pwms) == 0:
|
||||
PWM.stop_thread()
|
||||
|
||||
@staticmethod
|
||||
def _run():
|
||||
while PWM._pwm_running:
|
||||
while not PWM._t_pwm_event.is_set():
|
||||
PWM.run()
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -26,12 +26,24 @@ def cleanup():
|
||||
|
||||
|
||||
@pytest.fixture(scope='function', autouse=False)
|
||||
def GPIO():
|
||||
def gpiod():
|
||||
"""Mock gpiod module."""
|
||||
gpiod = mock.MagicMock()
|
||||
sys.modules['gpiod'] = gpiod
|
||||
yield gpiod
|
||||
del sys.modules['gpiod']
|
||||
sys.modules["gpiod"] = mock.Mock()
|
||||
sys.modules["gpiod.line"] = mock.Mock()
|
||||
yield sys.modules["gpiod"]
|
||||
del sys.modules["gpiod.line"]
|
||||
del sys.modules["gpiod"]
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=False)
|
||||
def gpiodevice():
|
||||
gpiodevice = mock.Mock()
|
||||
gpiodevice.get_pins_for_platform.return_value = [(mock.Mock(), 0), (mock.Mock(), 0), (mock.Mock(), 0)]
|
||||
gpiodevice.get_pin.return_value = (mock.Mock(), 0)
|
||||
|
||||
sys.modules["gpiodevice"] = gpiodevice
|
||||
yield gpiodevice
|
||||
del sys.modules["gpiodevice"]
|
||||
|
||||
|
||||
@pytest.fixture(scope='function', autouse=False)
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import time
|
||||
|
||||
|
||||
def test_pumps_actually_stop(gpiod, smbus2):
|
||||
def test_pumps_actually_stop(gpiod, gpiodevice, smbus2):
|
||||
from grow.pump import Pump
|
||||
from grow.pwm import PWM
|
||||
|
||||
ch1 = Pump(channel=1)
|
||||
|
||||
@@ -10,9 +11,13 @@ def test_pumps_actually_stop(gpiod, smbus2):
|
||||
time.sleep(0.1)
|
||||
assert ch1.get_speed() == 0
|
||||
|
||||
PWM.stop_thread()
|
||||
|
||||
def test_pumps_are_mutually_exclusive(gpiod, smbus2):
|
||||
|
||||
|
||||
def test_pumps_are_mutually_exclusive(gpiod, gpiodevice, smbus2):
|
||||
from grow.pump import Pump, global_lock
|
||||
from grow.pwm import PWM
|
||||
|
||||
ch1 = Pump(channel=1)
|
||||
ch2 = Pump(channel=2)
|
||||
@@ -28,9 +33,13 @@ def test_pumps_are_mutually_exclusive(gpiod, smbus2):
|
||||
assert ch3.dose(speed=0.5) is False
|
||||
assert ch3.dose(speed=0.5, blocking=False) is False
|
||||
|
||||
PWM.stop_thread()
|
||||
|
||||
def test_pumps_run_sequentially(gpiod, smbus2):
|
||||
|
||||
|
||||
def test_pumps_run_sequentially(gpiod, gpiodevice, smbus2):
|
||||
from grow.pump import Pump, global_lock
|
||||
from grow.pwm import PWM
|
||||
|
||||
ch1 = Pump(channel=1)
|
||||
ch2 = Pump(channel=2)
|
||||
@@ -45,3 +54,5 @@ def test_pumps_run_sequentially(gpiod, smbus2):
|
||||
assert ch3.dose(speed=0.5, timeout=0.1, blocking=False) is True
|
||||
assert global_lock.locked() is True
|
||||
time.sleep(0.3)
|
||||
|
||||
PWM.stop_thread()
|
||||
|
||||
@@ -1,21 +1,12 @@
|
||||
import mock
|
||||
|
||||
|
||||
def test_moisture_setup(gpiod, smbus2):
|
||||
def test_moisture_setup(gpiod, gpiodevice, smbus2):
|
||||
from grow.moisture import Moisture
|
||||
|
||||
ch1 = Moisture(channel=1)
|
||||
ch2 = Moisture(channel=2)
|
||||
ch3 = Moisture(channel=3)
|
||||
|
||||
GPIO.setup.assert_has_calls([
|
||||
mock.call(ch1._gpio_pin, GPIO.IN),
|
||||
mock.call(ch2._gpio_pin, GPIO.IN),
|
||||
mock.call(ch3._gpio_pin, GPIO.IN)
|
||||
])
|
||||
_ = Moisture(channel=1)
|
||||
_ = Moisture(channel=2)
|
||||
_ = Moisture(channel=3)
|
||||
|
||||
|
||||
def test_moisture_read(gpiod, smbus2):
|
||||
def test_moisture_read(gpiod, gpiodevice, smbus2):
|
||||
from grow.moisture import Moisture
|
||||
|
||||
assert Moisture(channel=1).saturation == 1.0
|
||||
@@ -27,24 +18,14 @@ def test_moisture_read(gpiod, smbus2):
|
||||
assert Moisture(channel=3).moisture == 0
|
||||
|
||||
|
||||
def test_pump_setup(gpiod, smbus2):
|
||||
from grow.pump import PUMP_PWM_FREQ, Pump
|
||||
def test_pump_setup(gpiod, gpiodevice, smbus2):
|
||||
from grow.pump import Pump
|
||||
from grow.pwm import PWM
|
||||
|
||||
ch1 = Pump(channel=1)
|
||||
ch2 = Pump(channel=2)
|
||||
ch3 = Pump(channel=3)
|
||||
_ = Pump(channel=1)
|
||||
_ = Pump(channel=2)
|
||||
_ = Pump(channel=3)
|
||||
|
||||
GPIO.setup.assert_has_calls([
|
||||
mock.call(ch1._gpio_pin, GPIO.OUT, initial=GPIO.LOW),
|
||||
mock.call(ch2._gpio_pin, GPIO.OUT, initial=GPIO.LOW),
|
||||
mock.call(ch3._gpio_pin, GPIO.OUT, initial=GPIO.LOW)
|
||||
])
|
||||
# Threads. Not even once.
|
||||
PWM.stop_thread()
|
||||
|
||||
GPIO.PWM.assert_has_calls([
|
||||
mock.call(ch1._gpio_pin, PUMP_PWM_FREQ),
|
||||
mock.call().start(0),
|
||||
mock.call(ch2._gpio_pin, PUMP_PWM_FREQ),
|
||||
mock.call().start(0),
|
||||
mock.call(ch3._gpio_pin, PUMP_PWM_FREQ),
|
||||
mock.call().start(0)
|
||||
])
|
||||
|
||||
Reference in New Issue
Block a user