Files
picar/SlamController/slam_streamer.py

127 lines
3.9 KiB
Python

import zmq
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import RPLidarA1 as LaserModel
from rplidar import RPLidar as Lidar
from .SlamController_pb2 import SlamScan, SlamLocation
# Left here as was used in the example, configure as necessary.
# MAP_SIZE_PIXELS = 500
# MAP_SIZE_METERS = 10
# LIDAR_DEVICE = '/dev/ttyUSB0'
class SlamStreamer:
can_scan = False
def __init__(self, map_pixels = None, map_meters = None, lidar_connection = None, port = None):
self._map_pixels = map_pixels
self._map_meters = map_meters
self._lidar_connection = lidar_connection
self._port = port
self._zmq_context = self._create_context()
def start(self):
'''
Does scanning and constructs the slam map,
and pushes to subscribers through a zmq pub socket.
This is done on the main thread, so you'll need
to run this method on a separate thread yourself.
All constructor parameters must be set prior
to calling this method, and changing those values after
calling this method will have no effect.
'''
print('Starting to stream')
self._socket = self._start_socket(self._create_socket(self._zmq_context), self._port)
# Block until user opens zmq.
self._socket.recv()
print('Received message from ZMQ')
# Adapted from BreezySLAM rpslam example.
# Connect to Lidar unit
lidar = Lidar(self._lidar_connection)
# Create an RMHC SLAM object with a laser model and optional robot model
slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters)
# Initialize empty map
mapbytes = bytearray(self.map_pixels * self.map_pixels)
# Create an iterator to collect scan data from the RPLidar
iterator = lidar.iter_scans()
# First scan is crap, so ignore it
next(iterator)
while self.can_scan:
# Extract (quality, angle, distance) triples from current scan
items = [item for item in next(iterator)]
# Extract distances and angles from triples
distances = [item[2] for item in items]
angles = [item[1] for item in items]
print('Updating map')
# Update SLAM with current Lidar scan and scan angles
slam.update(distances, scan_angles_degrees=angles)
print('Map updated')
self._push_map(slam.getmap(mapbytes), slam.getpos())
def _push_map(self, mapbytes, location):
'''
Pushes a scan over zmq using protocol buffers.
map should be the result of slam.getmap.
location should be a tuple, the result of slam.getpos()
'''
protoScan = SlamScan(map = bytes(mapbytes), \
location = SlamLocation(x = location[0], y = location[1], theta = location[3]))
print('Sending map')
self._socket.send(protoScan.SerializeToString())
def stop_scanning(self):
self.can_scan = False
def _create_context(self):
return zmq.Context.instance()
def _create_socket(self, context):
return context.socket(zmq.PAIR)
def _start_socket(self, socket, port):
socket.bind('tcp://*:' + str(self._port))
return socket
# Properties
@property
def map_pixels(self):
return self._map_pixels
@map_pixels.setter
def map_pixels(self, value):
self._map_pixels = value
@property
def map_meters(self):
return self._map_meters
@map_meters.setter
def map_meters(self, value):
self._map_meters = value
@property
def lidar_connection(self):
return self._lidar_connection
@lidar_connection.setter
def lidar_connection(self, value):
self._lidar_connection = value
@property
def port(self):
return self._port
@port.setter
def port(self, value):
self._port = value