Files
picar/SlamController/slam_streamer.py

138 lines
4.1 KiB
Python

import zmq
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import RPLidarA1 as LaserModel
from rplidar import RPLidar as Lidar
from .SlamController_pb2 import SlamScan, SlamLocation
# Left here as was used in the example, configure as necessary.
# MAP_SIZE_PIXELS = 500
# MAP_SIZE_METERS = 10
# LIDAR_DEVICE = '/dev/ttyUSB0'
class SlamStreamer:
can_scan = False
def __init__(self, map_pixels=None, map_meters=None, lidar_connection=None, port=None):
self._map_pixels = map_pixels
self._map_meters = map_meters
self._lidar_connection = lidar_connection
self._port = port
self._zmq_context = self._create_context()
def start(self):
'''
Does scanning and constructs the slam map,
and pushes to subscribers through a zmq pub socket.
This is done on the main thread, so you'll need
to run this method on a separate thread yourself.
All constructor parameters must be set prior
to calling this method, and changing those values after
calling this method will have no effect.
'''
self.can_scan = True
print('Starting to stream')
self._socket = self._start_socket(
self._create_socket(self._zmq_context), self._port)
print('Started and bound zmq socket.')
# Adapted from BreezySLAM rpslam example.
# Connect to Lidar unit
lidar = Lidar(self._lidar_connection)
print('Initialised lidar')
# Create an RMHC SLAM object with a laser model and optional robot model
slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters)
print('initialised slam')
# Initialize empty map
mapbytes = bytearray(self.map_pixels * self.map_pixels)
print('Initialised byte []')
# Create an iterator to collect scan data from the RPLidar
iterator = lidar.iter_scans()
print('Scanning')
# First scan is crap, so ignore it
next(iterator)
print('Going to next scan')
while self.can_scan:
# Extract (quality, angle, distance) triples from current scan
items = [item for item in next(iterator)]
# Extract distances and angles from triples
distances = [item[2] for item in items]
angles = [item[1] for item in items]
print('Updating map')
# Update SLAM with current Lidar scan and scan angles
slam.update(distances, scan_angles_degrees=angles)
print('Map updated')
slam.getmap(mapbytes)
self._push_map(mapbytes, slam.getpos())
def _push_map(self, mapbytes, location):
'''
Pushes a scan over zmq using protocol buffers.
map should be the result of slam.getmap.
location should be a tuple, the result of slam.getpos()
'''
protoScan = SlamScan(map=bytes(mapbytes),
location=SlamLocation(x=location[0], y=location[1], theta=location[3]))
print('Sending map')
self._socket.send_multipart(b'slam_map', protoScan.SerializeToString())
def stop_scanning(self):
self.can_scan = False
def _create_context(self):
return zmq.Context.instance()
def _create_socket(self, context):
return context.socket(zmq.PUB)
def _start_socket(self, socket, port):
socket.bind('tcp://*:' + str(self._port))
return socket
# Properties
@property
def map_pixels(self):
return self._map_pixels
@map_pixels.setter
def map_pixels(self, value):
self._map_pixels = value
@property
def map_meters(self):
return self._map_meters
@map_meters.setter
def map_meters(self, value):
self._map_meters = value
@property
def lidar_connection(self):
return self._lidar_connection
@lidar_connection.setter
def lidar_connection(self, value):
self._lidar_connection = value
@property
def port(self):
return self._port
@port.setter
def port(self, value):
self._port = value