import zmq from breezyslam.algorithms import RMHC_SLAM from breezyslam.sensors import RPLidarA1 as LaserModel from rplidar import RPLidar as Lidar from .SlamController_pb2 import SlamScan, SlamLocation # Left here as was used in the example, configure as necessary. # MAP_SIZE_PIXELS = 500 # MAP_SIZE_METERS = 10 # LIDAR_DEVICE = '/dev/ttyUSB0' class SlamStreamer: can_scan = False def __init__(self, map_pixels=None, map_meters=None, lidar_connection=None, port=None): self._map_pixels = map_pixels self._map_meters = map_meters self._lidar_connection = lidar_connection self._port = port self._zmq_context = self._create_context() def start(self): ''' Does scanning and constructs the slam map, and pushes to subscribers through a zmq pub socket. This is done on the main thread, so you'll need to run this method on a separate thread yourself. All constructor parameters must be set prior to calling this method, and changing those values after calling this method will have no effect. ''' print('Starting to stream') self._socket = self._start_socket( self._create_socket(self._zmq_context), self._port) print('Started and bound zmq socket.') # Adapted from BreezySLAM rpslam example. # Connect to Lidar unit lidar = Lidar(self._lidar_connection) # Create an RMHC SLAM object with a laser model and optional robot model slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters) # Initialize empty map mapbytes = bytearray(self.map_pixels * self.map_pixels) # Create an iterator to collect scan data from the RPLidar iterator = lidar.iter_scans() # First scan is crap, so ignore it next(iterator) while self.can_scan: # Extract (quality, angle, distance) triples from current scan items = [item for item in next(iterator)] # Extract distances and angles from triples distances = [item[2] for item in items] angles = [item[1] for item in items] print('Updating map') # Update SLAM with current Lidar scan and scan angles slam.update(distances, scan_angles_degrees=angles) print('Map updated') self._push_map(slam.getmap(mapbytes), slam.getpos()) def _push_map(self, mapbytes, location): ''' Pushes a scan over zmq using protocol buffers. map should be the result of slam.getmap. location should be a tuple, the result of slam.getpos() ''' protoScan = SlamScan(map=bytes(mapbytes), location=SlamLocation(x=location[0], y=location[1], theta=location[3])) print('Sending map') self._socket.send_multipart(b'slam_map', protoScan.SerializeToString()) def stop_scanning(self): self.can_scan = False def _create_context(self): return zmq.Context.instance() def _create_socket(self, context): return context.socket(zmq.PUB) def _start_socket(self, socket, port): socket.bind('tcp://*:' + str(self._port)) return socket # Properties @property def map_pixels(self): return self._map_pixels @map_pixels.setter def map_pixels(self, value): self._map_pixels = value @property def map_meters(self): return self._map_meters @map_meters.setter def map_meters(self, value): self._map_meters = value @property def lidar_connection(self): return self._lidar_connection @lidar_connection.setter def lidar_connection(self, value): self._lidar_connection = value @property def port(self): return self._port @port.setter def port(self, value): self._port = value