import zmq from breezyslam.algorithms import RMHC_SLAM from breezyslam.sensors import RPLidarA1 as LaserModel from rplidar import RPLidar as Lidar from .SlamController_pb2 import SlamScan, SlamLocation # Left here as was used in the example, configure as necessary. # MAP_SIZE_PIXELS = 500 # MAP_SIZE_METERS = 10 # LIDAR_DEVICE = '/dev/ttyUSB0' class SlamStreamer: can_scan = False def __init__(self, map_pixels = None, map_meters = None, lidar_connection = None, port = None): self._map_pixels = map_pixels self._map_meters = map_meters self._lidar_connection = lidar_connection self._port = port self._socket = self._start_socket(self._create_tcp_pub_socket(), port) def start(self): ''' Does scanning and constructs the slam map, and pushes to subscribers through a zmq pub socket. This is done on the main thread, so you'll need to run this method on a separate thread yourself. All constructor parameters must be set prior to calling this method, and changing those values after calling this method will have no effect. ''' # Block until user opens zmq. self._socket.recv() # Adapted from BreezySLAM rpslam example. # Connect to Lidar unit lidar = Lidar(self._lidar_connection) # Create an RMHC SLAM object with a laser model and optional robot model slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters) # Initialize empty map mapbytes = bytearray(self.map_pixels * self.map_pixels) # Create an iterator to collect scan data from the RPLidar iterator = lidar.iter_scans() # First scan is crap, so ignore it next(iterator) while self.can_scan: # Extract (quality, angle, distance) triples from current scan items = [item for item in next(iterator)] # Extract distances and angles from triples distances = [item[2] for item in items] angles = [item[1] for item in items] # Update SLAM with current Lidar scan and scan angles slam.update(distances, scan_angles_degrees=angles) self._push_map(slam.getmap(mapbytes), slam.getpos()) def _push_map(self, mapbytes, location): ''' Pushes a scan over zmq using protocol buffers. map should be the result of slam.getmap. location should be a tuple, the result of slam.getpos() ''' protoScan = SlamScan(map = bytes(mapbytes), \ location = SlamLocation(x = location[0], y = location[1], theta = location[3])) self._socket.send(protoScan.SerializeToString()) def stop_scanning(self): self.can_scan = False def _create_context(self): return zmq.Context.instance() def _create_tcp_pub_socket(self): return self._create_context().socket(zmq.PAIR) def _start_socket(self, socket, port): socket.bind('tcp://*:' + str(self._port)) return socket # Properties @property def map_pixels(self): return self._map_pixels @map_pixels.setter def map_pixels(self, value): self._map_pixels = value @property def map_meters(self): return self._map_meters @map_meters.setter def map_meters(self, value): self._map_meters = value @property def lidar_connection(self): return self._lidar_connection @lidar_connection.setter def lidar_connection(self, value): self._lidar_connection = value @property def port(self): return self._port @port.setter def port(self, value): self._port = value