Skip to content
Snippets Groups Projects
Select Git revision
  • 47d6d87f78ee8059d3cbd14801d4f3e367a1ca0c
  • master default protected
2 results

threadpi.c

Blame
  • run-solleroot.py 11.29 KiB
    # python3
    import gatt
    import threading
    ##import time 
    
    # BLE UUID's
    root_identifier_uuid = '48c5d828-ac2a-442d-97a3-0c9822b04979'
    uart_service_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
    tx_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # Write
    rx_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # Notify
    
    # message types
    DATA_TYPE_COLOR_SENSOR = 4
    DATA_TYPE_BUMPER = 12
    DATA_TYPE_LIGHT = 13
    DATA_TYPE_TOUCH = 17
    DATA_TYPE_CLIFF = 20
    
    # color classifications
    color_black = [17, 68, 65, 20, 49, 67, 52, 16, 1]
    color_red = [34, 2, 32, 33, 18]
    color_green = [51, 48, 3]
    color_blue = [4, 64, 68]
                   
    # uncategorized
    # color_white = [0,
    # 33, 18
    
    class BluetoothDeviceManager(gatt.DeviceManager):
        robot = None # root robot device
    
        def device_discovered(self, device):
            print("[%s] Discovered: %s" % (device.mac_address, device.alias()))
            self.stop_discovery() # Stop searching
            self.robot = RootDevice(mac_address=device.mac_address, manager=self)
            self.robot.print_message()
            self.robot.connect()
    
    class RootDevice(gatt.Device):
        def __init__(self, mac_address, manager, managed=True):
            gatt.Device.__init__(self, mac_address, manager, managed=True)
            
            # states for our robot primatives
            self.currently_adjusting_for_perimeter = False
            self.edge_following_enabled = False
            self.patterning_lines_enabled = False
        
            # variables in robot object for each robot
            self.message = "no message yet"
            self.last_packet = []
            self.dash_count = 0
        
        def print_message(self):
            print(self.message)
        
        def connect_succeeded(self):
            super().connect_succeeded()
            print("[%s] Connected" % (self.mac_address))
    
        def connect_failed(self, error):
            super().connect_failed(error)
            print("[%s] Connection failed: %s" % (self.mac_address, str(error)))
    
        def disconnect_succeeded(self):
            super().disconnect_succeeded()
            print("[%s] Disconnected" % (self.mac_address))
    
        def services_resolved(self):
            super().services_resolved()
            print("[%s] Resolved services" % (self.mac_address))
    
            self.uart_service = next(
                s for s in self.services
                if s.uuid == uart_service_uuid)
    
            self.tx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == tx_characteristic_uuid)
    
            self.rx_characteristic = next(
                c for c in self.uart_service.characteristics
                if c.uuid == rx_characteristic_uuid)
    
            self.rx_characteristic.enable_notifications() # listen to RX messages
    
        def characteristic_value_updated(self, characteristic, value):
            new_data = []
            type = ""
            for byte in value:
                new_data.append(byte)
    #        print ("Messages from Root:")
            # Print sensor data
            if new_data[0] == DATA_TYPE_COLOR_SENSOR : type = "Color Sensor"
            if new_data[0] == DATA_TYPE_BUMPER: type = "Bumper"
            if new_data[0] == DATA_TYPE_LIGHT: type = "Light Sensor"
            if new_data[0] == DATA_TYPE_TOUCH: type = "Touch Sensor"
            if new_data[0] == DATA_TYPE_CLIFF: type = "Cliff Sensor"
    ##        print("{}: {}".format(type, new_data))
            
    ##        #attempt to back out of cliff even
    ##        if new_data[0] == DATA_TYPE_CLIFF:
    ##            self.drive_backwards()
    ##            print("end cliff event")
                
            # Perimeter detect, then other color functions
            # check if you're at the perimeter every time there is new color data
            if new_data[0] == DATA_TYPE_COLOR_SENSOR:
                # save COLOR INFO to object for future use
                self.last_packet = new_data
                self.adjust_for_perimeter_if_needed(new_data)
                return  # If you're busy adjusting for perimeter,
                        # any senor data is used in that function and
                        # then no longer valid
            if type == "Color Sensor" and self.edge_following_enabled:
                print("new data - calling edge following")
                self.follow_edge(new_data)
          
        def adjust_for_perimeter_if_needed(self, message):
            # Check to see if you're at perimeter!
            # If you are, turn and update your state
    ##        print("IN ADJUST PERIMETER")
    ##        print("{}: {}".format(type, message))
            
            n = 0
            for i in range(3,18):
                if message[i] in color_red:
                    n += 1
            if n <= 3:
                self.currently_adjusting_for_perimeter = False
                self.drive_forward()
                return
            if not self.currently_adjusting_for_perimeter:
    ##            self.pen_up()
                self.currently_adjusting_for_perimeter = True
                
            if n > 13: #number of sensors read the color
                self.drive_backwards()
                print("in all colors are red - driving backwards")
                return
            n = 0
            for i in range(3,8):
                if message[i] in color_red:
                    n += 1
            if n > 3:
                self.steer(90,10)
                print("steering left")
                return    
            n = 0
            for i in range(13,18):
                if message[i] in color_red:
                    n += 1
            if n > 3:
                self.steer(10,90)
                print("steering right")
                return
                
        def do_dash_movement(self):
            threading.Timer(1.0, self.do_dash_movement).start()
           # if self.total_dash_count >= MAX_TOTAL_DASHES:
               # return
            if(self.dash_count % 2 == 0):
                print("pen-down, driving")
                self.pen_down()
              #  self.total_dash_count += 1
            else:
                print("pen up, drive forward")
                self.pen_up()
            self.dash_count += 1
            
    
        def follow_edge(self, message):
            print("IN FOLLOW EDGE")
            
            n = 0
            for i in range(9,12)
                if message[i] in color_black:
                    n += 1
            if n > 1
                self.drive_forward:
                return
                
            left_n = 0
            for i in range(3,8):
                if message[i] in color_black:
                    left_n += 1
            right_n = 0
            for i in range(13,18):
                if message[i] in color_black:
                    right_n += 1
            if left_n > 21 and right_n > 1:
                self.drive_forward()
                return
            if left_n > 1:
                self.steer(70,10)
                print("steering left")
                return    
            if right_n > 1:
                self.steer(10,70)
                print("steering right")
                return
            self.drive_forward()
    
    
        def drive_forward(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xD1])
    
        def drive_left(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x8A])
    
        def drive_right(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25])
    
        def stop(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7E])
    
        def drive_backwards(self):
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, 0xFF, 0xFF, 0xFF, 0x9C, 0xFF, 0xFF, 0xFF, 0x9C, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71])
    
        def pen_up(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
    
        def pen_down(self):
            self.tx_characteristic.write_value([0x02, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
            
            
        def turn_rate(self, rate):
            left = 0
            right = 0
            if rate >= 0:
                left = rate
            if rate < 0:
                right = -1*rate
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
        def steer(self, left, right):
            leftbytes = left.to_bytes(4,byteorder='big',signed=True)  # need to convert to byte string
            rightbytes = right.to_bytes(4,byteorder='big',signed=True)
            # note that we're not dynamically calculating the CRC at the end, so just leaving it 0 (unchecked)
            self.tx_characteristic.write_value([0x01, 0x04, 0x00, leftbytes[0], leftbytes[1], leftbytes[2], leftbytes[3], rightbytes[0], rightbytes[1], rightbytes[2], rightbytes[3], 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0])
    
    
    def drive_root(command):
        angle = 0
    ##    last_packet = manager.robot.last_packet
        
        if command == "f":
            print ("Drive forward")
            manager.robot.drive_forward()
        if command == "b":
            print ("Drive backwards")
            manager.robot.drive_backwards()
        if command == "r":
            print ("Drive right")
            manager.robot.drive_right()
        if command == "l":
            print ("Drive left")
            manager.robot.drive_left()
        if command == "s":
            print ("Stop")
            manager.robot.stop()
        if command == "u":
            print ("Pen up")
            manager.robot.pen_up()
        if command == "d":
            print ("Pen down")
            manager.robot.pen_down()
        if command == "t":
            print ("Enter turn rate (up to +-90):")
            char = input()
            rate = int(char)
            print ("Turning ", rate)
            manager.robot.turn_rate(rate)
        if command == "p":
            print ("Steer")
            manager.robot.steer(30,40)
        if command == "clear":
            print ("clear all")
            manager.robot.stop()
            manager.robot.edge_following_enabled = False
            manager.robot.patterning_lines_enabled = False
        if command == "edge":
            print ("edge")
            manager.robot.edge_following_enabled = True
            manager.robot.drive_forward()
        if command == "pattern lines":
            print ("pattern lines")
            manager.robot.drive_forward()
            manager.robot.do_dash_movement()
            
    # start here if run as program / not imported as module
    if __name__ == "__main__":
        manager = BluetoothDeviceManager(adapter_name = 'hci0')
        manager.start_discovery(service_uuids=[root_identifier_uuid])
        thread = threading.Thread(target = manager.run)
        thread.start()
        char = ""
        try:
            while manager.robot is None:
                pass # wait for a root robot to be discovered
            print("Press letter (f,b,l,r) to drive robot (t) to turn, (s) to stop, (u or d) raise pen up or down, (q) to quit")
            while char != "q":
                char = input() # wait for keyboard input
                print('waiting for keyboard input')
                drive_root(char)
                print('yay!')
            print("Quitting")
            manager.stop()
            manager.robot.disconnect()
            thread.join()
        except KeyboardInterrupt:
            pass