Tidy up some lidar stuff for testing and clarity.

This commit is contained in:
Piv
2020-03-09 23:02:19 +10:30
parent ef83365fd8
commit 50d8923ad0
5 changed files with 85 additions and 28 deletions

View File

@@ -27,11 +27,11 @@ class Group:
self._number = number
def _update_min_max(self, new_point):
'''
"""
Updates the in and max points for this group.
This is to determine when assigning groups whether the
same group is selected.
'''
"""
converted_point = convert_lidar_to_cartesian(new_point)
if self._minX is None or self._minX > converted_point[0]:
@@ -66,11 +66,22 @@ def convert_lidar_to_cartesian(new_point):
def calc_groups(scan):
'''
"""
Calculates groups of points from a lidar scan. The scan should
already be sorted.
Should return all groups.
'''
Parameters
----------
scan: Iterable
The lidar scan data to get groups of.
Should be of format: (quality, angle, distance)
Returns
-------
list
List of groups that were found.
"""
prevPoint = None
currentGroup = None
allGroups = []
@@ -104,9 +115,9 @@ def find_centre(group):
def assign_groups(prev_groups, new_groups):
'''
"""
Assigns group numbers to a new scan based on the groups of an old scan.
'''
"""
for group in prev_groups:
old_centre = find_centre(prev_groups)
for new_group in new_groups:
@@ -119,9 +130,9 @@ def assign_groups(prev_groups, new_groups):
def updateCarVelocity(oldGroup, newGroup):
'''
Return a tuple (throttleChange, steeringChange) that should be
applied given the change in the centre of the groups.
'''
"""
Return a vector indicating how the tracked group has changed, which can
be used to then update the steering/throttle of the car (or other vehicle that
may be used)
"""
pass

View File

@@ -1,5 +1,3 @@
import rplidar
from rplidar import RPLidar
from threading import Thread
from tracking import algorithms
import tracking.lidar_tracker_pb2 as tracker_pb
@@ -8,14 +6,14 @@ import Messaging.messages as messages
class LidarCache():
'''
"""
A class that retrieves scans from the lidar,
runs grouping algorithms between scans and
keeps a copy of the group data.
'''
"""
def __init__(self, measurements=100):
self.lidar = RPLidar('/dev/ttyUSB0')
def __init__(self, lidar, measurements=100):
self.lidar = lidar
self.measurements = measurements
print('Info: ' + self.lidar.get_info())
print('Health: ' + self.lidar.get_health())
@@ -29,14 +27,19 @@ class LidarCache():
self.thread.start()
def do_scanning(self, sender):
'''
Performs a scan for the given number of iterations.
'''
"""Performs scans whilst cache is running, and will pass calculated groups data to the sender.
Parameters
----------
sender:
Any class given in messaging.message_factory. This acts as a listener.
"""
# Create the 0MQ socket first. This should not be passed between threads.
self._mFactory = sender
for i, scan in enumerate(self.lidar.iter_scans(min_len=self.measurements)):
print('%d: Got %d measurments' % (i, len(scan)))
for scan in self.lidar.iter_scans(min_len=self.measurements):
print('Got %d measurments' % (len(scan)))
if(not self.run):
break

View File

@@ -1,3 +1,11 @@
# -*- coding: utf-8 -*-
"""
This module is a utility to load and save lidar
scans to disk.
As such, it is useful for testing, to create real lidar
data that can be reused later, without needing to connect the lidar.
"""
from rplidar import RPLidar
import pickle

View File

@@ -3,12 +3,13 @@ from tracking.lidar_tracker_pb2_grpc import PersonTrackingServicer
from tracking.lidar_cache import LidarCache
from multiprocessing import Process
import Messaging.message_factory as mf
from rplidar import RPLidar
class LidarServicer(PersonTrackingServicer):
def __init__(self):
self.cache = LidarCache(measurements=100)
#TODO: Put the rplidar creation in a factory or something, to make it possible to test this servicer.
self.cache = LidarCache(RPLidar('/dev/ttyUSB0'), measurements=100)
def set_tracking_group(self, request, context):
pass
@@ -17,7 +18,5 @@ class LidarServicer(PersonTrackingServicer):
self.cache.stop_scanning()
def start_tracking(self, request, context):
'''
Starts the lidar cache.
'''
"""Starts the lidar cache, streaming on the provided port."""
self.cache.start_cache(mf.getZmqPubSubStreamer(request.value))

36
tracking/mock_lidar.py Normal file
View File

@@ -0,0 +1,36 @@
"""
This module contains a MockLidar class, for use in place of RPLidar.
Importantly, it implements iter_scans, so it can be substituted for RPLidar
in the lidar_cache for testing (or anywhere else the rplidar may be used)
"""
import tracking.lidar_loader as loader
class MockLidar:
def __init__(self, scan_iter=None):
"""
Create mock lidar with an iterator that can be used as fake (or reused) scan data.
Examples
--------
lidar = MockLidar(scans)
first_scan = next(lidar.iter_scans(measurements=100))
Parameters
----------
scan_iter: Iterable
An iterator that will generate/provide the fake/old scan data.
"""
self._iter = scan_iter
def iter_scans(self, measurements=100):
return self._iter
def get_health(self):
return "Mock Lidar has scans" if self._iter is not None else "Mock lidar won't work properly!"
def get_info(self):
return self.get_health()