Repackage to latest boilerplate.

This commit is contained in:
Phil Howard
2023-11-17 09:11:10 +00:00
parent 2b0c8aefd7
commit bebe4504ed
28 changed files with 541 additions and 309 deletions

72
grow/__init__.py Normal file
View File

@@ -0,0 +1,72 @@
__version__ = '0.0.2'
import atexit
import threading
import time
import RPi.GPIO as GPIO
class Piezo():
def __init__(self, gpio_pin=13):
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(gpio_pin, GPIO.OUT, initial=GPIO.LOW)
self.pwm = GPIO.PWM(gpio_pin, 440)
self.pwm.start(0)
self._timeout = None
atexit.register(self._exit)
def frequency(self, value):
"""Change the piezo frequency.
Loosely corresponds to musical pitch, if you suspend disbelief.
"""
self.pwm.ChangeFrequency(value)
def start(self, frequency=None):
"""Start the piezo.
Sets the Duty Cycle to 100%
"""
if frequency is not None:
self.frequency(frequency)
self.pwm.ChangeDutyCycle(1)
def stop(self):
"""Stop the piezo.
Sets the Duty Cycle to 0%
"""
self.pwm.ChangeDutyCycle(0)
def beep(self, frequency=440, timeout=0.1, blocking=True, force=False):
"""Beep the piezo for time seconds.
:param freq: Frequency, in hertz, of the piezo
:param timeout: Time, in seconds, of the piezo beep
:param blocking: If true, function will block until piezo has stopped
"""
if blocking:
self.start(frequency=frequency)
time.sleep(timeout)
self.stop()
return True
else:
if self._timeout is not None:
if self._timeout.is_alive():
if force:
self._timeout.cancel()
else:
return False
self._timeout = threading.Timer(timeout, self.stop)
self.start(frequency=frequency)
self._timeout.start()
return True
def _exit(self):
self.pwm.stop()

151
grow/moisture.py Normal file
View File

@@ -0,0 +1,151 @@
import time
import RPi.GPIO as GPIO
MOISTURE_1_PIN = 23
MOISTURE_2_PIN = 8
MOISTURE_3_PIN = 25
MOISTURE_INT_PIN = 4
class Moisture(object):
"""Grow moisture sensor driver."""
def __init__(self, channel=1, wet_point=None, dry_point=None):
"""Create a new moisture sensor.
Uses an interrupt to count pulses on the GPIO pin corresponding to the selected channel.
The moisture reading is given as pulses per second.
:param channel: One of 1, 2 or 3. 4 can optionally be used to set up a sensor on the Int pin (BCM4)
:param wet_point: Wet point in pulses/sec
:param dry_point: Dry point in pulses/sec
"""
self._gpio_pin = [MOISTURE_1_PIN, MOISTURE_2_PIN, MOISTURE_3_PIN, MOISTURE_INT_PIN][channel - 1]
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._gpio_pin, GPIO.IN)
self._count = 0
self._reading = 0
self._history = []
self._history_length = 200
self._last_pulse = time.time()
self._new_data = False
self._wet_point = wet_point if wet_point is not None else 0.7
self._dry_point = dry_point if dry_point is not None else 27.6
self._time_last_reading = time.time()
try:
GPIO.add_event_detect(self._gpio_pin, GPIO.RISING, callback=self._event_handler, bouncetime=1)
except RuntimeError as e:
if self._gpio_pin == 8:
raise RuntimeError("""Unable to set up edge detection on BCM8.
Please ensure you add the following to /boot/config.txt and reboot:
dtoverlay=spi0-cs,cs0_pin=14 # Re-assign CS0 from BCM 8 so that Grow can use it
""")
else:
raise e
self._time_start = time.time()
def _event_handler(self, pin):
self._count += 1
self._last_pulse = time.time()
if self._time_elapsed >= 1.0:
self._reading = self._count / self._time_elapsed
self._history.insert(0, self._reading)
self._history = self._history[:self._history_length]
self._count = 0
self._time_last_reading = time.time()
self._new_data = True
@property
def history(self):
history = []
for moisture in self._history:
saturation = float(moisture - self._dry_point) / self.range
saturation = round(saturation, 3)
history.append(max(0.0, min(1.0, saturation)))
return history
@property
def _time_elapsed(self):
return time.time() - self._time_last_reading
def set_wet_point(self, value=None):
"""Set the sensor wet point.
This is the watered, wet state of your soil.
It should be set shortly after watering. Leave ~5 mins for moisture to permeate.
:param value: Wet point value to set in pulses/sec, leave as None to set the last sensor reading.
"""
self._wet_point = value if value is not None else self._reading
def set_dry_point(self, value=None):
"""Set the sensor dry point.
This is the unwatered, dry state of your soil.
It should be set when the soil is dry to the touch.
:param value: Dry point value to set in pulses/sec, leave as None to set the last sensor reading.
"""
self._dry_point = value if value is not None else self._reading
@property
def moisture(self):
"""Return the raw moisture level.
The value returned is the pulses/sec read from the soil moisture sensor.
This value is inversely proportional to the amount of moisture.
Full immersion in water is approximately 50 pulses/sec.
Fully dry (in air) is approximately 900 pulses/sec.
"""
self._new_data = False
return self._reading
@property
def active(self):
"""Check if the moisture sensor is producing a valid reading."""
return (time.time() - self._last_pulse) < 1.0 and self._reading > 0 and self._reading < 28
@property
def new_data(self):
"""Check for new reading.
Returns True if moisture value has been updated since last reading moisture or saturation.
"""
return self._new_data
@property
def range(self):
"""Return the range sensor range (wet - dry points)."""
return self._wet_point - self._dry_point
@property
def saturation(self):
"""Return saturation as a float from 0.0 to 1.0.
This value is calculated using the wet and dry points.
"""
saturation = float(self.moisture - self._dry_point) / self.range
saturation = round(saturation, 3)
return max(0.0, min(1.0, saturation))

96
grow/pump.py Normal file
View File

@@ -0,0 +1,96 @@
import atexit
import threading
import time
import RPi.GPIO as GPIO
PUMP_1_PIN = 17
PUMP_2_PIN = 27
PUMP_3_PIN = 22
PUMP_PWM_FREQ = 10000
PUMP_MAX_DUTY = 90
global_lock = threading.Lock()
class Pump(object):
"""Grow pump driver."""
def __init__(self, channel=1):
"""Create a new pump.
Uses soft PWM to drive a Grow pump.
:param channel: One of 1, 2 or 3.
"""
self._gpio_pin = [PUMP_1_PIN, PUMP_2_PIN, PUMP_3_PIN][channel - 1]
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(self._gpio_pin, GPIO.OUT, initial=GPIO.LOW)
self._pwm = GPIO.PWM(self._gpio_pin, PUMP_PWM_FREQ)
self._pwm.start(0)
self._timeout = None
atexit.register(self._stop)
def _stop(self):
self._pwm.stop(0)
GPIO.setup(self._gpio_pin, GPIO.IN)
def set_speed(self, speed):
"""Set pump speed (PWM duty cycle)."""
if speed > 1.0 or speed < 0:
raise ValueError("Speed must be between 0 and 1")
if speed == 0:
global_lock.release()
elif not global_lock.acquire(blocking=False):
return False
self._pwm.ChangeDutyCycle(int(PUMP_MAX_DUTY * speed))
self._speed = speed
return True
def get_speed(self):
"""Return Pump speed (PWM duty cycle)."""
return self._speed
def stop(self):
"""Stop the pump."""
if self._timeout is not None:
self._timeout.cancel()
self._timeout = None
self.set_speed(0)
def dose(self, speed, timeout=0.1, blocking=True, force=False):
"""Pulse the pump for timeout seconds.
:param timeout: Timeout, in seconds, of the pump pulse
:param blocking: If true, function will block until pump has stopped
:param force: Applies only to non-blocking. If true, any previous dose will be replaced
"""
if blocking:
if self.set_speed(speed):
time.sleep(timeout)
self.stop()
return True
else:
if self._timeout is not None:
if self._timeout.is_alive():
if force:
self._timeout.cancel()
self._timeout = threading.Timer(timeout, self.stop)
if self.set_speed(speed):
self._timeout.start()
return True
return False