from threading import Thread from car.tracking import algorithms import car.tracking.lidar_tracker_pb2 as tracker_pb import zmq from car.tracking.devices.mock_lidar import MockLidar import car.tracking.lidar_loader as lidar_loader import time import timeit class LidarCache(): """ A class that retrieves scans from the lidar, runs grouping algorithms on the scans and keeps a copy of the group data. """ def __init__(self, lidar, measurements=100): self.lidar = lidar self.measurements = measurements print('Info: ' + str(self.lidar.get_info())) print('Health: ' + str(self.lidar.get_health())) self.run = True self.tracking_group_number = -1 self.currentGroups = None self._group_listeners = [] def start_cache(self): self.thread = Thread(target=self.do_scanning) self.thread.start() def do_scanning(self): """Performs scans whilst cache is running, and will pass calculated groups data to the sender. Parameters ---------- listener: Any object that includes the onGroupsChanged method. """ # Batch over scans, so we don't need to do our own batching to determine groups # TODO: Implement custom batching, as iter_scans can be unreliable for scan in self.lidar.iter_scans(min_len=self.measurements): print('Got %d measurements' % (len(scan))) if len(scan) < self.measurements: # Poor scan, likely since it was the first scan. continue if not self.run: break start_time = time.time() # Now process the groups. if self.currentGroups is not None: self.currentGroups = algorithms.assign_groups( self.currentGroups, algorithms.calc_groups(scan)) else: self.currentGroups = algorithms.calc_groups(scan) print("total time: " + (str)(time.time() - start_time)) self._fireGroupsChanged() def _fireGroupsChanged(self): pointScan = self.current_scan for listener in self._group_listeners: listener(pointScan) def add_groups_changed_listener(self, listener): """ Add a listener for a change in scans. THis will provide a tuple with the new group scans, which can then be sent off to a network listener for display, or to update the vehicle with a new velocity. Parameters ---------- listener An function that takes a PointScan proto object as its argument. """ if(listener not in self._group_listeners): self._group_listeners.append(listener) def clear_listeners(self): """ Clear all group change listeners. """ self._group_listeners = [] @property def current_scan(self): pointScan = tracker_pb.PointScan() tempGroups = self.currentGroups for group in tempGroups: for point in group.get_points(): pointScan.points.append(tracker_pb.Point( angle=point[1], distance=point[2], group_number=group.number)) return pointScan def stop_scanning(self): self.run = False if __name__ == '__main__': lidar = MockLidar(iter(lidar_loader.load_scans_bytes_file( 'pycar/src/car/tracking/out.pickle'))) cache = LidarCache(lidar) # cache.add_groups_changed_listener(lambda a: print(a)) cache.start_cache()