Select Git revision
-
Neil Gershenfeld authoredNeil Gershenfeld authored
run-solleroot.py 11.29 KiB
# python3
import gatt
import threading
##import time
# BLE UUID's
root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
# message types
DATA_TYPE_COLOR_SENSOR = 4
DATA_TYPE_BUMPER = 12
DATA_TYPE_LIGHT = 13
DATA_TYPE_TOUCH = 17
DATA_TYPE_CLIFF = 20
# color classifications
color_black = [17, 68, 65, 20, 49, 67, 52, 16, 1]
color_red = [34, 2, 32, 33, 18]
color_green = [51, 48, 3]
color_blue = [4, 64, 68]
# uncategorized
# color_white = [0,
# 33, 18
class BluetoothDeviceManager(gatt.DeviceManager):
robot = None # root robot device
def device_discovered(self, device):
print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
self.stop_discovery() # Stop searching
self.robot = RootDevice(mac_address=device.mac_address, manager=self)
self.robot.print_message()
self.robot.connect()
class RootDevice(gatt.Device):
def __init__(self, mac_address, manager, managed=True):
gatt.Device.__init__(self, mac_address, manager, managed=True)
# states for our robot primatives
self.currently_adjusting_for_perimeter = False
self.edge_following_enabled = False
self.patterning_lines_enabled = False
# variables in robot object for each robot
self.message = "no message yet"
self.last_packet = []
self.dash_count = 0
def print_message(self):
print(self.message)
def connect_succeeded(self):
super().connect_succeeded()
print("[%s] Connected" % (self.mac_address))
def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
def disconnect_succeeded(self):
super().disconnect_succeeded()
print("[%s] Disconnected" % (self.mac_address))
def services_resolved(self):
super().services_resolved()
print("[%s] Resolved services" % (self.mac_address))
self.uart_service = next(
s for s in self.services
if s.uuid == uart_service_uuid)
self.tx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == tx_characteristic_uuid)
self.rx_characteristic = next(
c for c in self.uart_service.characteristics
if c.uuid == rx_characteristic_uuid)
self.rx_characteristic.enable_notifications() # listen to RX messages
def characteristic_value_updated(self, characteristic, value):
new_data = []
type = ""
for byte in value:
new_data.append(byte)
# print ("Messages from Root:")
# Print sensor data
if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
## print("{}: {}".format(type, new_data))
## #attempt to back out of cliff even
## if new_data[0] == DATA_TYPE_CLIFF:
## self.drive_backwards()
## print("end cliff event")
# Perimeter detect, then other color functions
# check if you're at the perimeter every time there is new color data
if new_data[0] == DATA_TYPE_COLOR_SENSOR:
# save COLOR INFO to object for future use
self.last_packet = new_data
self.adjust_for_perimeter_if_needed(new_data)
return # If you're busy adjusting for perimeter,
# any senor data is used in that function and
# then no longer valid
if type == "Color Sensor" and self.edge_following_enabled:
print("new data - calling edge following")
self.follow_edge(new_data)
def adjust_for_perimeter_if_needed(self, message):
# Check to see if you're at perimeter!
# If you are, turn and update your state
## print("IN ADJUST PERIMETER")
## print("{}: {}".format(type, message))
n = 0
for i in range(3,18):
if message[i] in color_red:
n += 1
if n <= 3:
self.currently_adjusting_for_perimeter = False
self.drive_forward()
return
if not self.currently_adjusting_for_perimeter:
## self.pen_up()
self.currently_adjusting_for_perimeter = True
if n > 13: #number of sensors read the color
self.drive_backwards()
print("in all colors are red - driving backwards")
return
n = 0
for i in range(3,8):
if message[i] in color_red:
n += 1
if n > 3:
self.steer(90,10)
print("steering left")
return
n = 0
for i in range(13,18):
if message[i] in color_red:
n += 1
if n > 3:
self.steer(10,90)
print("steering right")
return
def do_dash_movement(self):
threading.Timer(1.0, self.do_dash_movement).start()
# if self.total_dash_count >= MAX_TOTAL_DASHES:
# return
if(self.dash_count % 2 == 0):
print("pen-down, driving")
self.pen_down()
# self.total_dash_count += 1
else:
print("pen up, drive forward")
self.pen_up()
self.dash_count += 1
def follow_edge(self, message):
print("IN FOLLOW EDGE")
n = 0
for i in range(9,12)
if message[i] in color_black:
n += 1
if n > 1
self.drive_forward:
return
left_n = 0
for i in range(3,8):
if message[i] in color_black:
left_n += 1
right_n = 0
for i in range(13,18):
if message[i] in color_black:
right_n += 1
if left_n > 21 and right_n > 1:
self.drive_forward()
return
if left_n > 1:
self.steer(70,10)
print("steering left")
return
if right_n > 1:
self.steer(10,70)
print("steering right")
return
self.drive_forward()
def drive_forward(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
def drive_left(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
def drive_right(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
def stop(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
def drive_backwards(self):
self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
def pen_up(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def pen_down(self):
self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
def turn_rate(self, rate):
left = 0
right = 0
if rate >= 0:
left = rate
if rate < 0:
right = -1*rate
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def steer(self, left, right):
leftbytes = left.to_bytes(4,byteorder='big',signed=True) # need to convert to byte string
rightbytes = right.to_bytes(4,byteorder='big',signed=True)
# note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
def drive_root(command):
angle = 0
## last_packet = manager.robot.last_packet
if command == "f":
print ("Drive forward")
manager.robot.drive_forward()
if command == "b":
print ("Drive backwards")
manager.robot.drive_backwards()
if command == "r":
print ("Drive right")
manager.robot.drive_right()
if command == "l":
print ("Drive left")
manager.robot.drive_left()
if command == "s":
print ("Stop")
manager.robot.stop()
if command == "u":
print ("Pen up")
manager.robot.pen_up()
if command == "d":
print ("Pen down")
manager.robot.pen_down()
if command == "t":
print ("Enter turn rate (up to +-90):")
char = input()
rate = int(char)
print ("Turning ", rate)
manager.robot.turn_rate(rate)
if command == "p":
print ("Steer")
manager.robot.steer(30,40)
if command == "clear":
print ("clear all")
manager.robot.stop()
manager.robot.edge_following_enabled = False
manager.robot.patterning_lines_enabled = False
if command == "edge":
print ("edge")
manager.robot.edge_following_enabled = True
manager.robot.drive_forward()
if command == "pattern lines":
print ("pattern lines")
manager.robot.drive_forward()
manager.robot.do_dash_movement()
# start here if run as program / not imported as module
if __name__ == "__main__":
manager = BluetoothDeviceManager(adapter_name = 'hci0')
manager.start_discovery(service_uuids=[root_identifier_uuid])
thread = threading.Thread(target = manager.run)
thread.start()
char = ""
try:
while manager.robot is None:
pass # wait for a root robot to be discovered
print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
while char != "q":
char = input() # wait for keyboard input
print('waiting for keyboard input')
drive_root(char)
print('yay!')
print("Quitting")
manager.stop()
manager.robot.disconnect()
thread.join()
except KeyboardInterrupt:
pass