mirror of
				https://github.com/pimoroni/grow-python
				synced 2025-10-25 15:19:23 +00:00 
			
		
		
		
	Switch pump to class
This commit is contained in:
		| @@ -1,78 +1,63 @@ | ||||
| import time | ||||
| import atexit | ||||
| import threading | ||||
| import RPi.GPIO as GPIO | ||||
|  | ||||
| PUMP_1_PIN = 17 | ||||
| PUMP_2_PIN = 27 | ||||
| PUMP_3_PIN = 22 | ||||
|  | ||||
| _is_setup = False | ||||
| PUMP_PWM_FREQ = 10000 | ||||
| PUMP_MAX_DUTY = 60 | ||||
|  | ||||
|  | ||||
| class Pump(object): | ||||
|     __slots__ = 'out1', 'out2', 'out3' | ||||
|     def __init__(self, channel): | ||||
|         self._gpio_pin = [PUMP_1_PIN, PUMP_2_PIN, PUMP_3_PIN][channel] | ||||
|  | ||||
|     def __init__(self, out1, out2, out3): | ||||
|         self.out1 = out1 | ||||
|         self.out2 = out2 | ||||
|         self.out3 = out3 | ||||
|         GPIO.setmode(GPIO.BCM) | ||||
|         GPIO.setwarnings(False) | ||||
|         GPIO.setup(self._gpio_pin, GPIO.OUT, initial=GPIO.LOW) | ||||
|         self._pwm = GPIO.PWM(self._gpio_pin, PUMP_PWM_FREQ) | ||||
|         self._pwm.start(0) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         fmt = """Pump 1: {out1} | ||||
| Pump 1: {out2} Ohms | ||||
| Pump 1: {out3} Ohms""" | ||||
|          | ||||
|         return fmt.format( | ||||
|             out1=self.out1, | ||||
|             out2=self.out2, | ||||
|             out3=self.out3) | ||||
|         self._timeout = None | ||||
|  | ||||
|     __str__ = __repr__ | ||||
|     def set_speed(self, speed): | ||||
|         """Set pump speed (PWM duty cycle).""" | ||||
|         if speed > 1.0 or speed < 0: | ||||
|             raise ValueError("Speed must be between 0 and 1") | ||||
|         self._pwm.ChangeDutyCycle(int(PUMP_MAX_DUTY * speed)) | ||||
|         self._speed = speed | ||||
|  | ||||
|     def get_speed(self): | ||||
|         """Return Pump speed (PWM duty cycle).""" | ||||
|         return self._speed | ||||
|  | ||||
| def setup(): | ||||
|     global _is_setup | ||||
|     global _pump | ||||
|      | ||||
|     if _is_setup: | ||||
|         return | ||||
|     _is_setup = True | ||||
|     def stop(self): | ||||
|         """Stop the pump.""" | ||||
|         self.set_speed(0) | ||||
|  | ||||
|     GPIO.setwarnings(False) | ||||
|     GPIO.setmode(GPIO.BCM) | ||||
|     GPIO.setup(PUMP_1_PIN, GPIO.OUT) | ||||
|     GPIO.input(PUMP_1_PIN, 0) | ||||
|     GPIO.setup(PUMP_2_PIN, GPIO.OUT) | ||||
|     GPIO.input(PUMP_2_PIN, 0) | ||||
|     GPIO.setup(PUMP_3_PIN, GPIO.OUT) | ||||
|     GPIO.input(PUMP_3_PIN, 0) | ||||
|      | ||||
|     atexit.register(cleanup) | ||||
|     def pulse(self, speed, timeout=0.1, blocking=True, force=False): | ||||
|         """Pulse the pump for timeout seconds. | ||||
|  | ||||
|         :param timeout: Timeout, in seconds, of the pump pulse | ||||
|         :param blocking: If true, function will block until pump has stopped | ||||
|  | ||||
| def set_pump_on(channel): | ||||
|     """Set wet point for a moisture channel.""" | ||||
|     _pump[channel] = 1 | ||||
|         """ | ||||
|         if blocking: | ||||
|             self.set_speed(speed) | ||||
|             time.sleep(timeout) | ||||
|             self.stop() | ||||
|             return True | ||||
|         else: | ||||
|             if self._timeout is not None: | ||||
|                 if self._timeout.is_alive(): | ||||
|                     if force: | ||||
|                         self._timeout.cancel() | ||||
|                     else: | ||||
|                         return False | ||||
|             self._timeout = threading.Timer(timeout, self.stop) | ||||
|             self.set_speed(speed) | ||||
|             self._timeout.start() | ||||
|             return True | ||||
|  | ||||
| def set_pump_off(channel, value): | ||||
|     """Set wet point for a moisture channel.""" | ||||
|     _pump[channel] = 0 | ||||
|  | ||||
| def read_pump(channel): | ||||
|     """Get current value for the additional ADC pin.""" | ||||
|     setup() | ||||
|     return _pump[channel] | ||||
|  | ||||
| def cleanup(): | ||||
|     GPIO.output(PUMP_1_PIN, 0) | ||||
|     GPIO.output(PUMP_2_PIN, 0) | ||||
|     GPIO.output(PUMP_3_PIN, 0) | ||||
|  | ||||
| def read_all(): | ||||
|     """Return pump state""" | ||||
|     setup() | ||||
|     in1 = _pump[1] | ||||
|     in2 = _pump[2] | ||||
|     in3 = _pump[3] | ||||
|  | ||||
|     return _pump(in1, in2, in3) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user