Make pumps mutually exclusive

Forces only one pump to be active at a time, to avoid the sudden inrush current browning out the Pi or causing weirdness when multiple pumps fire and drop the voltage.
This commit is contained in:
Phil Howard
2020-10-21 13:04:41 +01:00
parent 0dc091d05f
commit 85db231475
2 changed files with 56 additions and 9 deletions

View File

@@ -10,6 +10,9 @@ PUMP_PWM_FREQ = 10000
PUMP_MAX_DUTY = 90
global_lock = threading.Lock()
class Pump(object):
"""Grow pump driver."""
@@ -42,8 +45,11 @@ class Pump(object):
"""Set pump speed (PWM duty cycle)."""
if speed > 1.0 or speed < 0:
raise ValueError("Speed must be between 0 and 1")
if not global_lock.acquire(blocking=False):
return False
self._pwm.ChangeDutyCycle(int(PUMP_MAX_DUTY * speed))
self._speed = speed
return True
def get_speed(self):
"""Return Pump speed (PWM duty cycle)."""
@@ -55,6 +61,7 @@ class Pump(object):
if self._timeout is not None:
self._timeout.cancel()
self._timeout = None
global_lock.release()
def dose(self, speed, timeout=0.1, blocking=True, force=False):
"""Pulse the pump for timeout seconds.
@@ -64,19 +71,22 @@ class Pump(object):
:param force: Applies only to non-blocking. If true, any previous dose will be replaced
"""
if blocking:
self.set_speed(speed)
time.sleep(timeout)
self.stop()
return True
if self.set_speed(speed):
time.sleep(timeout)
self.stop()
return True
else:
if self._timeout is not None:
if self._timeout.is_alive():
if force:
self._timeout.cancel()
else:
return False
self._timeout = threading.Timer(timeout, self.stop)
self.set_speed(speed)
self._timeout.start()
return True
if self.set_speed(speed):
self._timeout.start()
return True
return False

View File

@@ -0,0 +1,37 @@
import time
def test_pumps_are_mutually_exclusive(GPIO, smbus):
from grow.pump import Pump, global_lock
ch1 = Pump(channel=1)
ch2 = Pump(channel=2)
ch3 = Pump(channel=3)
ch1.dose(speed=0.5, timeout=1.0, blocking=False)
assert global_lock.locked() is True
assert ch2.dose(speed=0.5) is False
assert ch2.dose(speed=0.5, blocking=False) is False
assert ch3.dose(speed=0.5) is False
assert ch3.dose(speed=0.5, blocking=False) is False
def test_pumps_run_sequentially(GPIO, smbus):
from grow.pump import Pump, global_lock
ch1 = Pump(channel=1)
ch2 = Pump(channel=2)
ch3 = Pump(channel=3)
assert ch1.dose(speed=0.5, timeout=0.1, blocking=False) is True
assert global_lock.locked() is True
time.sleep(0.3)
assert ch2.dose(speed=0.5, timeout=0.1, blocking=False) is True
assert global_lock.locked() is True
time.sleep(0.3)
assert ch3.dose(speed=0.5, timeout=0.1, blocking=False) is True
assert global_lock.locked() is True
time.sleep(0.3)