In stock
View Purchasing OptionsProject update 6 of 9
This week, we’re showcasing how asynchronous programming with the asyncio library can revolutionize your projects. Using the asyncio library allows multiple tasks to run concurrently, significantly improving the efficiency and performance of your Breadstick. Here’s a breakdown of how it works:
It’s easy to write a program that reads one sensor and drives one actuator, all you have to do is read, write, and repeat. Processors execute lines of code extremely quickly, so it’s common to put a delay in the code loop. The typical time.delay(seconds)
function is a "busy" delay. It prevents our processor from doing anything useful until it’s finished being delayed. Asyncio lets us turn the, "wait here and do nothing for a bit" message into "go do something else for a bit!" This lets you set up a bunch of individual loops, each with its unique delay before its code is run again. For instance, I wanted to update the LED strip 30 times per second but only wanted to have the device print the current variable values to the computer screen every one second; asyncio makes this very easy to do.
Allows the processor to efficiently switch between tasks, maximizing productivity and minimizing idle time.
await asyncio.sleep(5)
translates to, "You can go do something else, but come back after five seconds has passed."
This allows the individual loops to have a shared place to read and write data to.
Allows you to accomplish more complex tasks by scheduling how often each task should take place.
Watch it in action in this demo video:
'''
Raspberry Breadstick Multitasking with asyncio
This code creates a bunch of async loops that are revisited by the processor after a defined amount of time has passed.
Think of each async fuction as its own main loop running in a little container.
await asyncio.sleep(period) in each of those self-contained loops tells the processor it can go do something else for a bit.
A global dictionary is used as a way to share information across each of these self-contained loops.
'''
from board import *
import keypad
import asyncio
from digitalio import DigitalInOut
from adafruit_simplemath import map_range
#DotStar LED Libraries
from adafruit_dotstar import DotStar
from adafruit_fancyled.adafruit_fancyled import CRGB, CHSV
#I2C Acceleromter and Gyroscope
import busio
from adafruit_lsm6ds.lsm6ds3trc import LSM6DS3TRC as LSM6DS
from adafruit_lsm6ds import Rate, AccelRange, GyroRange
#Servo
from pwmio import PWMOut
from adafruit_motor.servo import Servo
#Potentiometer
from analogio import AnalogIn
async def status_update(frequency):
'''Prints current values in the global shared dictionary.'''
period = 1/frequency
global shared
while True:
print()
for key in shared.keys():
print(key, ":", shared[key])
await asyncio.sleep(period)
async def servo_update(frequency, pin, var):
'''Updates servo angle.'''
period = 1/frequency
pwm = PWMOut(pin, duty_cycle=2 ** 15, frequency=50)
servo = Servo(pwm)
new_angle = 0
global shared
while True:
new_angle = map_range(shared[var],0,65535,0,180)
servo.angle = new_angle
await asyncio.sleep(period)
async def pot_update(frequency,pin,var):
'''Reads new potentiometer value.'''
period = 1/frequency
global shared
adc = AnalogIn(pin)
while True:
shared[var] = adc.value
await asyncio.sleep(period)
async def led_update(frequency):
'''Updates LED Strip.'''
period = 1/frequency
global shared
leds = DotStar(DOTSTAR_CLOCK, DOTSTAR_DATA, 24, brightness=0.25, auto_write=False)
offset = 1
while True:
offset +=map_range(shared['gyro_z'],-10,10,-1,1)
leds.fill((0,0,0))
for i in range(map_range(shared['pot2'],0,65535,0,24)):
c = CRGB(CHSV(i/24/5+offset,1.0,0.1))
leds[i] = c.pack()
leds.show()
await asyncio.sleep(period)
async def imu_update(frequency):
'''Reads latest X,Y,Z values from the Gyroscope and Accelerometer.'''
period = 1/frequency
global shared
#Setup I2C Accelerometer and Gyroscope
i2c = busio.I2C(IMU_SCL, IMU_SDA)
IMU = LSM6DS(i2c)
IMU.accelerometer_range = AccelRange.RANGE_4G
print("Accelerometer range set to: %d G" % AccelRange.string[IMU.accelerometer_range])
IMU.gyro_range = GyroRange.RANGE_1000_DPS
print("Gyro range set to: %d DPS" % GyroRange.string[IMU.gyro_range])
IMU.accelerometer_data_rate = Rate.RATE_1_66K_HZ
print("Accelerometer rate set to: %d HZ" % Rate.string[IMU.accelerometer_data_rate])
IMU.gyro_data_rate = Rate.RATE_1_66K_HZ
print("Gyro rate set to: %d HZ" % Rate.string[IMU.gyro_data_rate])
while True:
shared['acc_x'], shared['acc_y'], shared['acc_z'] = IMU.acceleration
shared['gyro_x'], shared['gyro_y'], shared['gyro_z'] = IMU.gyro
await asyncio.sleep(period)
async def main():
'''Main Program Loop. Gets called below by asyncio.run(main()).'''
led_task = asyncio.create_task(led_update(30))
imu_task = asyncio.create_task(imu_update(30))
servo_task = asyncio.create_task(servo_update(50,D9,'pot1'))
pot1_task = asyncio.create_task(pot_update(50,A18,'pot1'))
pot2_task = asyncio.create_task(pot_update(50,A17,'pot2'))
status_task = asyncio.create_task(status_update(1))
await led_task
await imu_task
await servo_task
await pot1_task
await pot2_task
await status_task
#Dictionary of global values accessible to async functions
shared = {}
shared['pot1'] = 0
shared['pot2'] = 0
shared['acc_x'] = 0
shared['acc_y'] = 0
shared['acc_z'] = 0
shared['gyro_x'] = 0
shared['gyro_y'] = 0
shared['gyro_z'] = 0
#Run main loop
asyncio.run(main())
For a deeper understanding of async programming, check out "Python Asynchronous Programming - AsyncIO & Async/Await" by Tech With Tim.