Port moisture to gpiod.

This commit is contained in:
Phil Howard
2025-02-26 14:11:00 +00:00
parent dd0e25da5b
commit d98b694622

View File

@@ -1,6 +1,13 @@
import time
import select
import threading
from datetime import timedelta
import atexit
import gpiodevice
from gpiod import LineSettings
from gpiod.line import Edge, Bias
import RPi.GPIO as GPIO
MOISTURE_1_PIN = 23
MOISTURE_2_PIN = 8
@@ -25,46 +32,55 @@ class Moisture(object):
"""
self._gpio_pin = [MOISTURE_1_PIN, MOISTURE_2_PIN, MOISTURE_3_PIN, MOISTURE_INT_PIN][channel - 1]
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._gpio_pin, GPIO.IN)
self._count = 0
self._reading = 0
self._history = []
self._history_length = 200
self._last_pulse = time.time()
self._new_data = False
self._wet_point = wet_point if wet_point is not None else 0.7
self._dry_point = dry_point if dry_point is not None else 27.6
self._time_last_reading = time.time()
self._last_reading = 0
self._int, self._offset = gpiodevice.get_pin(self._gpio_pin, f"grow-m-ch{channel}", LineSettings(
edge_detection=Edge.RISING, bias=Bias.PULL_DOWN, debounce_period=timedelta(milliseconds=10)
))
self._poll_thread_event = threading.Event()
self._poll_thread = threading.Thread(target=self._thread_poll)
self._poll_thread.start()
atexit.register(self._thread_stop)
def __del__(self):
self._thread_stop()
def _thread_stop(self):
self._poll_thread_event.set()
self._poll_thread.join()
def _thread_poll(self):
poll = select.poll()
try:
GPIO.add_event_detect(self._gpio_pin, GPIO.RISING, callback=self._event_handler, bouncetime=1)
except RuntimeError as e:
if self._gpio_pin == 8:
raise RuntimeError("""Unable to set up edge detection on BCM8.
poll.register(self._int.fd, select.POLLIN)
except TypeError:
return
while not self._poll_thread_event.wait(1.0):
if not poll.poll(0):
# No pulses in 1s, this is not a valid reading
continue
Please ensure you add the following to /boot/config.txt and reboot:
dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
""")
else:
raise e
self._time_start = time.time()
def _event_handler(self, pin):
self._count += 1
self._last_pulse = time.time()
if self._time_elapsed >= 1.0:
self._reading = self._count / self._time_elapsed
# We got pulses, since we waited for 1s we can be relatively
# sure the number of pulses is approximately the sensor frequency
events = self._int.read_edge_events()
self._last_reading = time.time()
self._reading = len(events) # Pulse frequency
self._history.insert(0, self._reading)
self._history = self._history[:self._history_length]
self._count = 0
self._time_last_reading = time.time()
self._new_data = True
poll.unregister(self._int.fd)
@property
def history(self):
history = []
@@ -76,10 +92,6 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
return history
@property
def _time_elapsed(self):
return time.time() - self._time_last_reading
def set_wet_point(self, value=None):
"""Set the sensor wet point.
@@ -123,7 +135,7 @@ dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
@property
def active(self):
"""Check if the moisture sensor is producing a valid reading."""
return (time.time() - self._last_pulse) < 1.0 and self._reading > 0 and self._reading < 28
return (time.time() - self._last_reading) <= 1.0 and self._reading > 0 and self._reading < 28
@property
def new_data(self):