mirror of
https://github.com/pimoroni/grow-python
synced 2025-10-25 15:19:23 +00:00
Do some more basic housekeeping to get to a building working version 0.0.1
This commit is contained in:
@@ -1 +1 @@
|
||||
__version__ = '0.0.3'
|
||||
__version__ = '0.0.1'
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
"""Read the MICS6814 via an ads1015 ADC"""
|
||||
|
||||
import time
|
||||
import atexit
|
||||
import ads1015
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
MICS6814_HEATER_PIN = 24
|
||||
MICS6814_GAIN = 6.144
|
||||
|
||||
ads1015.I2C_ADDRESS_DEFAULT = ads1015.I2C_ADDRESS_ALTERNATE
|
||||
_is_setup = False
|
||||
_adc_enabled = False
|
||||
_adc_gain = 6.148
|
||||
|
||||
|
||||
class Mics6814Reading(object):
|
||||
__slots__ = 'oxidising', 'reducing', 'nh3', 'adc'
|
||||
|
||||
def __init__(self, ox, red, nh3, adc=None):
|
||||
self.oxidising = ox
|
||||
self.reducing = red
|
||||
self.nh3 = nh3
|
||||
self.adc = adc
|
||||
|
||||
def __repr__(self):
|
||||
fmt = """Oxidising: {ox:05.02f} Ohms
|
||||
Reducing: {red:05.02f} Ohms
|
||||
NH3: {nh3:05.02f} Ohms"""
|
||||
if self.adc is not None:
|
||||
fmt += """
|
||||
ADC: {adc:05.02f} Volts
|
||||
"""
|
||||
return fmt.format(
|
||||
ox=self.oxidising,
|
||||
red=self.reducing,
|
||||
nh3=self.nh3,
|
||||
adc=self.adc)
|
||||
|
||||
__str__ = __repr__
|
||||
|
||||
|
||||
def setup():
|
||||
global adc, _is_setup
|
||||
if _is_setup:
|
||||
return
|
||||
_is_setup = True
|
||||
|
||||
adc = ads1015.ADS1015(i2c_addr=0x49)
|
||||
adc.set_mode('single')
|
||||
adc.set_programmable_gain(MICS6814_GAIN)
|
||||
adc.set_sample_rate(1600)
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setup(MICS6814_HEATER_PIN, GPIO.OUT)
|
||||
GPIO.output(MICS6814_HEATER_PIN, 1)
|
||||
atexit.register(cleanup)
|
||||
|
||||
|
||||
def enable_adc(value=True):
|
||||
"""Enable reading from the additional ADC pin."""
|
||||
global _adc_enabled
|
||||
_adc_enabled = value
|
||||
|
||||
|
||||
def set_adc_gain(value):
|
||||
"""Set gain value for the additional ADC pin."""
|
||||
global _adc_gain
|
||||
_adc_gain = value
|
||||
|
||||
|
||||
def cleanup():
|
||||
GPIO.output(MICS6814_HEATER_PIN, 0)
|
||||
|
||||
|
||||
def read_all():
|
||||
"""Return gas resistence for oxidising, reducing and NH3"""
|
||||
setup()
|
||||
ox = adc.get_voltage('in0/gnd')
|
||||
red = adc.get_voltage('in1/gnd')
|
||||
nh3 = adc.get_voltage('in2/gnd')
|
||||
|
||||
try:
|
||||
ox = (ox * 56000) / (3.3 - ox)
|
||||
except ZeroDivisionError:
|
||||
ox = 0
|
||||
|
||||
try:
|
||||
red = (red * 56000) / (3.3 - red)
|
||||
except ZeroDivisionError:
|
||||
red = 0
|
||||
|
||||
try:
|
||||
nh3 = (nh3 * 56000) / (3.3 - nh3)
|
||||
except ZeroDivisionError:
|
||||
nh3 = 0
|
||||
|
||||
analog = None
|
||||
|
||||
if _adc_enabled:
|
||||
if _adc_gain == MICS6814_GAIN:
|
||||
analog = adc.get_voltage('ref/gnd')
|
||||
else:
|
||||
adc.set_programmable_gain(_adc_gain)
|
||||
time.sleep(0.05)
|
||||
analog = adc.get_voltage('ref/gnd')
|
||||
adc.set_programmable_gain(MICS6814_GAIN)
|
||||
|
||||
return Mics6814Reading(ox, red, nh3, analog)
|
||||
|
||||
|
||||
def read_oxidising():
|
||||
"""Return gas resistance for oxidising gases.
|
||||
|
||||
Eg chlorine, nitrous oxide
|
||||
"""
|
||||
setup()
|
||||
return read_all().oxidising
|
||||
|
||||
|
||||
def read_reducing():
|
||||
"""Return gas resistance for reducing gases.
|
||||
|
||||
Eg hydrogen, carbon monoxide
|
||||
"""
|
||||
setup()
|
||||
return read_all().reducing
|
||||
|
||||
|
||||
def read_nh3():
|
||||
"""Return gas resistance for nh3/ammonia"""
|
||||
setup()
|
||||
return read_all().nh3
|
||||
|
||||
|
||||
def read_adc():
|
||||
"""Return spare ADC channel value"""
|
||||
setup()
|
||||
return read_all().adc
|
||||
93
library/grow/moisture.py
Normal file
93
library/grow/moisture.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import time
|
||||
import atexit
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
MOISTURE_1_PIN = 23
|
||||
MOISTURE_2_PIN = 25
|
||||
MOISTURE_3_PIN = 8
|
||||
|
||||
_is_setup = False
|
||||
|
||||
|
||||
class Moisture(object):
|
||||
__slots__ = 'in1', 'in2', 'in3'
|
||||
|
||||
def __init__(self, in1, in2, in3):
|
||||
self.in1 = in1
|
||||
self.in2 = in2
|
||||
self.in3 = in3
|
||||
|
||||
def __repr__(self):
|
||||
fmt = """Moisture 1: {in1} Hz
|
||||
Moisture 1: {in2} Ohms
|
||||
Moisture 1: {in3} Ohms"""
|
||||
|
||||
return fmt.format(
|
||||
in1=self.in1,
|
||||
in2=self.in2,
|
||||
in3=self.in3)
|
||||
|
||||
__str__ = __repr__
|
||||
|
||||
|
||||
def setup():
|
||||
global _is_setup
|
||||
global _moisture
|
||||
|
||||
if _is_setup:
|
||||
return
|
||||
_is_setup = True
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setup(MOISTURE_1_PIN, GPIO.IN)
|
||||
GPIO.input(MOISTURE_1_PIN)
|
||||
GPIO.setup(MOISTURE_2_PIN, GPIO.IN)
|
||||
GPIO.input(MOISTURE_2_PIN)
|
||||
GPIO.setup(MOISTURE_3_PIN, GPIO.IN)
|
||||
GPIO.input(MOISTURE_3_PIN)
|
||||
|
||||
atexit.register(cleanup)
|
||||
|
||||
|
||||
def set_moisture_wet(channel, value):
|
||||
"""Set wet point for a moisture channel."""
|
||||
_moisture[channel].wet = value
|
||||
|
||||
def set_moisture_wet(channel, value):
|
||||
"""Set wet point for a moisture channel."""
|
||||
_moisture[channel].dry = value
|
||||
|
||||
def read_moiture(channel):
|
||||
"""Get current value for the additional ADC pin."""
|
||||
setup()
|
||||
return _moisture[channel].value
|
||||
|
||||
def cleanup():
|
||||
GPIO.output(MOISTURE_1_PIN, 0)
|
||||
GPIO.output(MOISTURE_2_PIN, 0)
|
||||
GPIO.output(MOISTURE_3_PIN, 0)
|
||||
|
||||
def read_all():
|
||||
"""Return gas resistence for oxidising, reducing and NH3"""
|
||||
setup()
|
||||
in1 = adc.get_voltage('in0/gnd')
|
||||
in2 = adc.get_voltage('in1/gnd')
|
||||
in3 = adc.get_voltage('in2/gnd')
|
||||
|
||||
try:
|
||||
in1 = (ox * 56000) / (3.3 - ox)
|
||||
except ZeroDivisionError:
|
||||
in1 = 0
|
||||
|
||||
try:
|
||||
in2 = (red * 56000) / (3.3 - red)
|
||||
except ZeroDivisionError:
|
||||
in2 = 0
|
||||
|
||||
try:
|
||||
in3 = (nh3 * 56000) / (3.3 - nh3)
|
||||
except ZeroDivisionError:
|
||||
in3 = 0
|
||||
|
||||
return _moisture(in1, in2, in3)
|
||||
@@ -1,90 +0,0 @@
|
||||
import sounddevice
|
||||
import numpy
|
||||
|
||||
|
||||
class Noise():
|
||||
def __init__(self,
|
||||
sample_rate=16000,
|
||||
duration=0.5):
|
||||
"""Noise measurement.
|
||||
|
||||
:param sample_rate: Sample rate in Hz
|
||||
:param duraton: Duration, in seconds, of noise sample capture
|
||||
|
||||
"""
|
||||
|
||||
self.duration = duration
|
||||
self.sample_rate = sample_rate
|
||||
|
||||
def get_amplitudes_at_frequency_ranges(self, ranges):
|
||||
"""Return the mean amplitude of frequencies in the given ranges.
|
||||
|
||||
:param ranges: List of ranges including a start and end range
|
||||
|
||||
"""
|
||||
recording = self._record()
|
||||
magnitude = numpy.abs(numpy.fft.rfft(recording[:, 0], n=self.sample_rate))
|
||||
result = []
|
||||
for r in ranges:
|
||||
start, end = r
|
||||
result.append(numpy.mean(magnitude[start:end]))
|
||||
return result
|
||||
|
||||
def get_amplitude_at_frequency_range(self, start, end):
|
||||
"""Return the mean amplitude of frequencies in the specified range.
|
||||
|
||||
:param start: Start frequency (in Hz)
|
||||
:param end: End frequency (in Hz)
|
||||
|
||||
"""
|
||||
n = self.sample_rate // 2
|
||||
if start > n or end > n:
|
||||
raise ValueError("Maxmimum frequency is {}".format(n))
|
||||
|
||||
recording = self._record()
|
||||
magnitude = numpy.abs(numpy.fft.rfft(recording[:, 0], n=self.sample_rate))
|
||||
return numpy.mean(magnitude[start:end])
|
||||
|
||||
def get_noise_profile(self,
|
||||
noise_floor=100,
|
||||
low=0.12,
|
||||
mid=0.36,
|
||||
high=None):
|
||||
"""Returns a noise charateristic profile.
|
||||
|
||||
Bins all frequencies into 3 weighted groups expressed as a percentage of the total frequency range.
|
||||
|
||||
:param noise_floor: "High-pass" frequency, exclude frequencies below this value
|
||||
:param low: Percentage of frequency ranges to count in the low bin (as a float, 0.5 = 50%)
|
||||
:param mid: Percentage of frequency ranges to count in the mid bin (as a float, 0.5 = 50%)
|
||||
:param high: Optional percentage for high bin, effectively creates a "Low-pass" if total percentage is less than 100%
|
||||
|
||||
"""
|
||||
|
||||
if high is None:
|
||||
high = 1.0 - low - mid
|
||||
|
||||
recording = self._record()
|
||||
magnitude = numpy.abs(numpy.fft.rfft(recording[:, 0], n=self.sample_rate))
|
||||
|
||||
sample_count = (self.sample_rate // 2) - noise_floor
|
||||
|
||||
mid_start = noise_floor + int(sample_count * low)
|
||||
high_start = mid_start + int(sample_count * mid)
|
||||
noise_ceiling = high_start + int(sample_count * high)
|
||||
|
||||
amp_low = numpy.mean(magnitude[noise_floor:mid_start])
|
||||
amp_mid = numpy.mean(magnitude[mid_start:high_start])
|
||||
amp_high = numpy.mean(magnitude[high_start:noise_ceiling])
|
||||
amp_total = (amp_low + amp_mid + amp_high) / 3.0
|
||||
|
||||
return amp_low, amp_mid, amp_high, amp_total
|
||||
|
||||
def _record(self):
|
||||
return sounddevice.rec(
|
||||
int(self.duration * self.sample_rate),
|
||||
samplerate=self.sample_rate,
|
||||
blocking=True,
|
||||
channels=1,
|
||||
dtype='float64'
|
||||
)
|
||||
78
library/grow/pump.py
Normal file
78
library/grow/pump.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import time
|
||||
import atexit
|
||||
import RPi.GPIO as GPIO
|
||||
|
||||
PUMP_1_PIN = 17
|
||||
PUMP_2_PIN = 27
|
||||
PUMP_3_PIN = 22
|
||||
|
||||
_is_setup = False
|
||||
|
||||
|
||||
class Pump(object):
|
||||
__slots__ = 'out1', 'out2', 'out3'
|
||||
|
||||
def __init__(self, out1, out2, out3):
|
||||
self.out1 = out1
|
||||
self.out2 = out2
|
||||
self.out3 = out3
|
||||
|
||||
def __repr__(self):
|
||||
fmt = """Pump 1: {out1}
|
||||
Pump 1: {out2} Ohms
|
||||
Pump 1: {out3} Ohms"""
|
||||
|
||||
return fmt.format(
|
||||
out1=self.out1,
|
||||
out2=self.out2,
|
||||
out3=self.out3)
|
||||
|
||||
__str__ = __repr__
|
||||
|
||||
|
||||
def setup():
|
||||
global _is_setup
|
||||
global _pump
|
||||
|
||||
if _is_setup:
|
||||
return
|
||||
_is_setup = True
|
||||
|
||||
GPIO.setwarnings(False)
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setup(PUMP_1_PIN, GPIO.OUT)
|
||||
GPIO.input(PUMP_1_PIN, 0)
|
||||
GPIO.setup(PUMP_2_PIN, GPIO.OUT)
|
||||
GPIO.input(PUMP_2_PIN, 0)
|
||||
GPIO.setup(PUMP_3_PIN, GPIO.OUT)
|
||||
GPIO.input(PUMP_3_PIN, 0)
|
||||
|
||||
atexit.register(cleanup)
|
||||
|
||||
|
||||
def set_pump_on(channel):
|
||||
"""Set wet point for a moisture channel."""
|
||||
_pump[channel] = 1
|
||||
|
||||
def set_pump_off(channel, value):
|
||||
"""Set wet point for a moisture channel."""
|
||||
_pump[channel] = 0
|
||||
|
||||
def read_pump(channel):
|
||||
"""Get current value for the additional ADC pin."""
|
||||
setup()
|
||||
return _pump[channel]
|
||||
|
||||
def cleanup():
|
||||
GPIO.output(PUMP_1_PIN, 0)
|
||||
GPIO.output(PUMP_2_PIN, 0)
|
||||
GPIO.output(PUMP_3_PIN, 0)
|
||||
|
||||
def read_all():
|
||||
"""Return pump state"""
|
||||
setup()
|
||||
in1 = _pump[1]
|
||||
in2 = _pump[2]
|
||||
in3 = _pump[3]
|
||||
|
||||
return _pump(in1, in2, in3)
|
||||
Reference in New Issue
Block a user