From 50d8923ad08f1e7218f168e4ea2725b30b578506 Mon Sep 17 00:00:00 2001 From: Piv <18462828+Piv200@users.noreply.github.com> Date: Mon, 9 Mar 2020 23:02:19 +1030 Subject: [PATCH] Tidy up some lidar stuff for testing and clarity. --- tracking/algorithms.py | 35 +++++++++++++++++++++++------------ tracking/lidar_cache.py | 25 ++++++++++++++----------- tracking/lidar_loader.py | 8 ++++++++ tracking/lidar_servicer.py | 9 ++++----- tracking/mock_lidar.py | 36 ++++++++++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 28 deletions(-) create mode 100644 tracking/mock_lidar.py diff --git a/tracking/algorithms.py b/tracking/algorithms.py index adbbf0b..53d7bc4 100644 --- a/tracking/algorithms.py +++ b/tracking/algorithms.py @@ -27,11 +27,11 @@ class Group: self._number = number def _update_min_max(self, new_point): - ''' + """ Updates the in and max points for this group. This is to determine when assigning groups whether the same group is selected. - ''' + """ converted_point = convert_lidar_to_cartesian(new_point) if self._minX is None or self._minX > converted_point[0]: @@ -66,11 +66,22 @@ def convert_lidar_to_cartesian(new_point): def calc_groups(scan): - ''' + """ Calculates groups of points from a lidar scan. The scan should already be sorted. - Should return all groups. - ''' + + Parameters + ---------- + + scan: Iterable + The lidar scan data to get groups of. + Should be of format: (quality, angle, distance) + + Returns + ------- + list + List of groups that were found. + """ prevPoint = None currentGroup = None allGroups = [] @@ -104,9 +115,9 @@ def find_centre(group): def assign_groups(prev_groups, new_groups): - ''' + """ Assigns group numbers to a new scan based on the groups of an old scan. - ''' + """ for group in prev_groups: old_centre = find_centre(prev_groups) for new_group in new_groups: @@ -119,9 +130,9 @@ def assign_groups(prev_groups, new_groups): def updateCarVelocity(oldGroup, newGroup): - ''' - Return a tuple (throttleChange, steeringChange) that should be - applied given the change in the centre of the groups. - ''' - + """ + Return a vector indicating how the tracked group has changed, which can + be used to then update the steering/throttle of the car (or other vehicle that + may be used) + """ pass diff --git a/tracking/lidar_cache.py b/tracking/lidar_cache.py index 4641513..ebb8443 100644 --- a/tracking/lidar_cache.py +++ b/tracking/lidar_cache.py @@ -1,5 +1,3 @@ -import rplidar -from rplidar import RPLidar from threading import Thread from tracking import algorithms import tracking.lidar_tracker_pb2 as tracker_pb @@ -8,14 +6,14 @@ import Messaging.messages as messages class LidarCache(): - ''' + """ A class that retrieves scans from the lidar, runs grouping algorithms between scans and keeps a copy of the group data. - ''' + """ - def __init__(self, measurements=100): - self.lidar = RPLidar('/dev/ttyUSB0') + def __init__(self, lidar, measurements=100): + self.lidar = lidar self.measurements = measurements print('Info: ' + self.lidar.get_info()) print('Health: ' + self.lidar.get_health()) @@ -29,14 +27,19 @@ class LidarCache(): self.thread.start() def do_scanning(self, sender): - ''' - Performs a scan for the given number of iterations. - ''' + """Performs scans whilst cache is running, and will pass calculated groups data to the sender. + + Parameters + ---------- + sender: + Any class given in messaging.message_factory. This acts as a listener. + + """ # Create the 0MQ socket first. This should not be passed between threads. self._mFactory = sender - for i, scan in enumerate(self.lidar.iter_scans(min_len=self.measurements)): - print('%d: Got %d measurments' % (i, len(scan))) + for scan in self.lidar.iter_scans(min_len=self.measurements): + print('Got %d measurments' % (len(scan))) if(not self.run): break diff --git a/tracking/lidar_loader.py b/tracking/lidar_loader.py index 6e8a867..ced6b1b 100644 --- a/tracking/lidar_loader.py +++ b/tracking/lidar_loader.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +""" +This module is a utility to load and save lidar +scans to disk. +As such, it is useful for testing, to create real lidar +data that can be reused later, without needing to connect the lidar. +""" + from rplidar import RPLidar import pickle diff --git a/tracking/lidar_servicer.py b/tracking/lidar_servicer.py index e5a5b89..38c1ed4 100644 --- a/tracking/lidar_servicer.py +++ b/tracking/lidar_servicer.py @@ -3,12 +3,13 @@ from tracking.lidar_tracker_pb2_grpc import PersonTrackingServicer from tracking.lidar_cache import LidarCache from multiprocessing import Process import Messaging.message_factory as mf - +from rplidar import RPLidar class LidarServicer(PersonTrackingServicer): def __init__(self): - self.cache = LidarCache(measurements=100) + #TODO: Put the rplidar creation in a factory or something, to make it possible to test this servicer. + self.cache = LidarCache(RPLidar('/dev/ttyUSB0'), measurements=100) def set_tracking_group(self, request, context): pass @@ -17,7 +18,5 @@ class LidarServicer(PersonTrackingServicer): self.cache.stop_scanning() def start_tracking(self, request, context): - ''' - Starts the lidar cache. - ''' + """Starts the lidar cache, streaming on the provided port.""" self.cache.start_cache(mf.getZmqPubSubStreamer(request.value)) diff --git a/tracking/mock_lidar.py b/tracking/mock_lidar.py new file mode 100644 index 0000000..0ac0a57 --- /dev/null +++ b/tracking/mock_lidar.py @@ -0,0 +1,36 @@ +""" +This module contains a MockLidar class, for use in place of RPLidar. +Importantly, it implements iter_scans, so it can be substituted for RPLidar +in the lidar_cache for testing (or anywhere else the rplidar may be used) +""" + +import tracking.lidar_loader as loader + +class MockLidar: + + def __init__(self, scan_iter=None): + """ + Create mock lidar with an iterator that can be used as fake (or reused) scan data. + + Examples + -------- + lidar = MockLidar(scans) + first_scan = next(lidar.iter_scans(measurements=100)) + + Parameters + ---------- + + scan_iter: Iterable + An iterator that will generate/provide the fake/old scan data. + + """ + self._iter = scan_iter + + def iter_scans(self, measurements=100): + return self._iter + + def get_health(self): + return "Mock Lidar has scans" if self._iter is not None else "Mock lidar won't work properly!" + + def get_info(self): + return self.get_health()