Add 'car/' from commit 'eee0e8dc445691e600680f4abc77f2814b20b054'

git-subtree-dir: car
git-subtree-mainline: 1d29a5526c
git-subtree-split: eee0e8dc44
This commit is contained in:
Piv
2020-04-19 11:07:44 +09:30
93 changed files with 8401 additions and 0 deletions

0
car/tracking/__init__.py Normal file
View File

212
car/tracking/algorithms.py Normal file
View File

@@ -0,0 +1,212 @@
import math
class Group:
def __init__(self, number, points=[]):
self._points = points
self._number = number
self._minX = None
self._maxX = None
self._minY = None
self._maxY = None
def add_point(self, point):
self._points.append(point)
self._update_min_max(point)
def get_points(self):
return self._points
@property
def number(self):
return self._number
@number.setter
def number(self, number):
self._number = number
def _update_min_max(self, new_point):
"""
Updates the in and max points for this group.
This is to determine when assigning groups whether the
same group is selected.
"""
converted_point = convert_lidar_to_cartesian(new_point)
if self._minX is None or self._minX > converted_point[0]:
self._minX = converted_point[0]
if self._maxX is None or self._maxX < converted_point[0]:
self._maxX = converted_point[0]
if self._minY is None or self._minY > converted_point[1]:
self._minY = converted_point[1]
if self._maxY is None or self._maxY < converted_point[1]:
self._maxY = converted_point[1]
def get_minX(self):
return self._minY
def get_maxX(self):
return self._maxY
def get_minY(self):
return self._minY
def get_maxY(self):
return self._maxY
def convert_lidar_to_cartesian(new_point):
x = new_point[2] * math.sin(new_point[1])
y = new_point[2] * math.cos(new_point[1])
return (x, y)
def convert_cartesian_to_lidar(x, y):
"""
Converts a point on the grid (with car as the origin) to a lidar tuple (distance, angle)
Parameters
----------
x
Horizontal component of point to convert.
y
Vertical component of point to convert.
Returns
-------
converted
A tuple (distance, angle) that represents the point. Angle is in degrees.
"""
# Angle depends on x/y position.
# if x is positive and y is positive, then angle = tan-1(y/x)
# if x is positive and y is negative, then angle = 360 + tan-1(y/x)
# if x is negative and y is positive, then angle = 180 + tan-1(y/x)
# if x is negative and y is negative, then angle = 180 + tan-1(y/x)
return (math.sqrt(x ** 2 + y ** 2), math.degrees(math.atan(y/x)) + (180 if x < 0 else 270 if y < 0 else 0))
def calc_groups(scan):
"""
Calculates groups of points from a lidar scan. The scan should
already be sorted.
Parameters
----------
scan: Iterable
The lidar scan data to get groups of.
Should be of format: (quality, angle, distance)
Returns
-------
list
List of groups that were found.
"""
prevPoint = None
currentGroup = None
allGroups = []
currentGroupNumber = 0
# assume the list is already sorted.
for point in scan:
if prevPoint is None:
prevPoint = point
continue
# Distances are in mm.
# within 1cm makes a group. Will need to play around with this.
if (point[2] - prevPoint[2]) ** 2 < 10 ** 2:
if currentGroup is None:
currentGroup = Group(currentGroupNumber)
allGroups.append(currentGroup)
currentGroup.add_point(point)
else:
if currentGroup is not None:
currentGroupNumber += 1
currentGroup = None
prevPoint = point
return allGroups
def find_centre(group):
"""
Gets a tuple (x,y) of the centre of the group.
Parameters
----------
group: Group
A group of points to find the centre of.
Returns
-------
tuple (x,y)
The centre in the form of a tuple (x,y)
"""
return ((group.get_maxX() + group.get_minX()) / 2, (group.get_maxY() + group.get_minY()) / 2)
def assign_groups(prev_groups, new_groups):
"""
Assigns group numbers to a new scan based on the groups of an old scan.
"""
for group in prev_groups:
old_centre = find_centre(group)
for new_group in new_groups:
new_centre = find_centre(new_group)
# They are considered the same if the new group and old group centres are within 5cm.
if ((new_centre[0] - old_centre[0]) ** 2 + (new_centre[1] - old_centre[1]) ** 2) < 50 ** 2:
new_group.number = group.number
return new_groups
def updateCarVelocity(oldGroup, newGroup):
"""
Return a tuple (DistanceChange, AngleChange) indicating how the tracked groups have changed, which can
be used to then update the steering/throttle of the car (or other vehicle that
may be used)
Parameters
----------
oldGroup: Group
The positioning of points for the group in the last scan.
newGroup: Group
The positioning of points for the group in the latest scan.
Returns
-------
tuple (DistanceChange, AngleChange)
A tuple containing how the groups' centres changed in the form (distance,angle)
"""
old_polar = convert_cartesian_to_lidar(*find_centre(oldGroup))
new_centre = convert_cartesian_to_lidar(*find_centre(newGroup))
return (new_centre[0] - old_polar[0], new_centre[1] - old_polar[1])
def dualServoChange(newCentre, changeTuple):
"""
Gets a tuple (throttleChange, steeringChange) indicating the change that should be applied to the current
throttle/steering of an rc car that uses dual servos.
Parameters
---------
newCentre
Tuple (distance, angle) of the new centre of the tracked group.
changeTuple
Tuple (distanceChange, angleChange) from the old centre to the new centre.
Returns
-------
tuple
Tuple of (throttleChange, steeringChange) to apply to the 2 servos.
"""
return ((changeTuple[0] / 3) - (newCentre[0] / 4) + 1, 0)

BIN
car/tracking/all_scans.txt Normal file

Binary file not shown.

43
car/tracking/animate.py Executable file
View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
'''Animates distances and measurment quality'''
from tracking.mock_lidar import MockLidar
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
import tracking.lidar_loader as loader
PORT_NAME = '/dev/ttyUSB0'
DMAX = 4000
IMIN = 0
IMAX = 50
def update_line(num, iterator, line):
scan = next(iterator)
offsets = np.array([(np.radians(meas[1]), meas[2]) for meas in scan])
line.set_offsets(offsets)
intens = np.array([meas[0] for meas in scan])
line.set_array(intens)
return line,
def run():
lidar = MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
fig = plt.figure()
ax = plt.subplot(111, projection='polar')
line = ax.scatter([0, 0], [0, 0], s=5, c=[IMIN, IMAX],
cmap=plt.cm.Greys_r, lw=0)
ax.set_rmax(DMAX)
ax.grid(True)
iterator = lidar.iter_scans()
ani = animation.FuncAnimation(fig, update_line,
fargs=(iterator, line), interval=50)
plt.show()
lidar.stop()
lidar.disconnect()
if __name__ == '__main__':
run()

View File

@@ -0,0 +1,55 @@
"""
Animates distances and angle of lidar
Uses model-free algorithms to track grouping of points (objects/groups)
"""
from tracking.mock_lidar import MockLidar
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
import tracking.lidar_loader as loader
import tracking.algorithms as alg
PORT_NAME = '/dev/ttyUSB0'
DMAX = 4000
IMIN = 0
IMAX = 50
def update_line(num, iterator, line, prev_groups):
scan = next(iterator)
# Now update the groups, and then update the maps with different colours for different groups.
if(prev_groups.groups is None):
prev_groups = alg.calc_groups(scan)
groups = alg.assign_groups(prev_groups, alg.calc_groups(scan))
offsets = np.array([(np.radians(meas[1]), meas[2]) for meas in scan])
line.set_offsets(offsets)
intens = np.array([meas[0] for meas in scan])
line.set_array(intens)
# Set the colour matrix: Just set the colours to 2 * np.pi * group number (for every group number)
# line.set_color()
return line,
class Bunch:
def __init__(self, **kwds):
self.__dict__.update(kwds)
def run():
lidar = MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
fig = plt.figure()
ax = plt.subplot(111, projection='polar')
line = ax.scatter([0, 0], [0, 0], s=5, c=[IMIN, IMAX],
cmap=plt.cm.Greys_r, lw=0)
ax.set_rmax(DMAX)
ax.grid(True)
prev_groups = Bunch(groups=None)
iterator = lidar.iter_scans()
ani = animation.FuncAnimation(fig, update_line,
fargs=(iterator, line, prev_groups), interval=50)
plt.show()
lidar.stop()
lidar.disconnect()
if __name__ == '__main__':
run()

View File

View File

@@ -0,0 +1,13 @@
from tracking.devices.mock_lidar import MockLidar
from rplidar import RPLidar
import tracking.lidar_loader as loader
connection = "TEST"
# connection = '/dev/ttyUSB0'
def get_lidar():
# Need a way to configure this, maybe with environment variables
if connection == 'TEST':
return MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
else:
return RPLidar(connection)

View File

@@ -0,0 +1,43 @@
"""
This module contains a MockLidar class, for use in place of RPLidar.
Importantly, it implements iter_scans, so it can be substituted for RPLidar
in the lidar_cache for testing (or anywhere else the rplidar may be used)
"""
import tracking.lidar_loader as loader
class MockLidar:
def __init__(self, scan_iter=None):
"""
Create mock lidar with an iterator that can be used as fake (or reused) scan data.
Examples
--------
lidar = MockLidar(scans)
first_scan = next(lidar.iter_scans(measurements=100))
Parameters
----------
scan_iter: Iterable
An iterator that will generate/provide the fake/old scan data.
"""
self._iter = scan_iter
def iter_scans(self, min_len=100):
return iter(self._iter)
def get_health(self):
return "Mock Lidar has scans" if self._iter is not None else "Mock lidar won't work properly!"
def get_info(self):
return self.get_health()
def stop(self):
pass
def disconnect(self):
pass

View File

@@ -0,0 +1,84 @@
from threading import Thread
from tracking import algorithms
import tracking.lidar_tracker_pb2 as tracker_pb
import zmq
class LidarCache():
"""
A class that retrieves scans from the lidar,
runs grouping algorithms between scans and
keeps a copy of the group data.
"""
def __init__(self, lidar, measurements=100):
self.lidar = lidar
self.measurements = measurements
print('Info: ' + self.lidar.get_info())
print('Health: ' + self.lidar.get_health())
self.run = True
self.tracking_group_number = -1
self.currentGroups = None
self._group_listeners = []
def start_cache(self):
self.thread = Thread(target=self.do_scanning)
self.thread.start()
def do_scanning(self):
"""Performs scans whilst cache is running, and will pass calculated groups data to the sender.
Parameters
----------
listener:
Any object that includes the onGroupsChanged method.
"""
# Batch over scans, so we don't need to do our own batching to determine groups
# TODO: Implement custom batching, as iter_scans can be unreliable
for scan in self.lidar.iter_scans(min_len=self.measurements):
print('Got %d measurments' % (len(scan)))
if len(scan) < self.measurements:
# Poor scan, likely since it was the first scan.
continue
if not self.run:
break
# Now process the groups.
if self.currentGroups is not None:
self.currentGroups = algorithms.assign_groups(
self.currentGroups, algorithms.calc_groups(scan))
else:
self.currentGroups = algorithms.calc_groups(scan)
self.fireGroupsChanged()
def fireGroupsChanged(self):
# Send the updated groups to 0MQ socket.
# Rename this to be a generic listener method, rather than an explicit 'send' (even though it can be treated as such already)
pointScan = tracker_pb.PointScan()
for group in self.currentGroups:
for point in group.get_points():
pointScan.points.append(tracker_pb.Point(
angle=point[1], distance=point[2], group_number=group.number))
for listener in self._group_listeners:
listener.onGroupsChanged(pointScan)
def add_groups_changed_listener(self, listener):
"""
Add a listener for a change in scans. THis will provide a tuple with the new group
scans, which can then be sent off to a network listener for display, or to update the
vehicle with a new velocity.
Parameters
----------
listener
An object that implements the onGroupsChanged(message) method.
"""
self._group_listeners.append(listener)
def stop_scanning(self):
self.run = False

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
This module is a utility to load and save lidar
scans to disk.
As such, it is useful for testing, to create real lidar
data that can be reused later, without needing to connect the lidar.
"""
from rplidar import RPLidar
import pickle
def get_scans(num_scans, device='/dev/ttyUSB0', measurements_per_scan=100):
lidar = RPLidar(device)
scans = lidar.iter_scans(measurements_per_scan)
return [next(scans) for i in range(0, num_scans)]
def save_scans_bytes(scans, filename='out.pickle'):
with open(filename, 'wb') as f:
pickle.dump(scans, f)
def load_scans_bytes_file(filename):
with open(filename, 'rb') as f:
return pickle.load(f)

View File

@@ -0,0 +1,44 @@
import tracking.lidar_tracker_pb2 as lidar_tracker_pb2
from tracking.lidar_tracker_pb2_grpc import PersonTrackingServicer
from tracking.lidar_cache import LidarCache
from multiprocessing import Process
import messaging.message_factory as mf
import tracking.devices.factory as lidar_factory
from messaging import messages
import tracking.algorithms as alg
class LidarServicer(PersonTrackingServicer):
def __init__(self, vehicle=None):
# TODO: Put the rplidar creation in a factory or something, to make it possible to test this servicer.
# Also, it would allow creating the service without the lidar being connected.
self.cache = LidarCache(lidar_factory.get_lidar(), measurements=100)
self.cache.add_groups_changed_listener(self)
self._mFactory = None
self._port = None
self._vehicle = vehicle
self._tracked_group = None
def set_tracking_group(self, request, context):
self._tracked_group = request.value
def stop_tracking(self, request, context):
self.cache.stop_scanning()
def start_tracking(self, request, context):
"""Starts the lidar cache, streaming on the provided port."""
self._port = request.value
self.cache.start_cache()
def onGroupsChanged(self, message):
if self._mFactory is None:
# Create the zmq socket in the thread that it will be used, just to be safe.
self._mFactory = mf.getZmqPubSubStreamer(self._port)
self._mFactory.send_message_topic("lidar_map", messages.ProtoMessage(message=message.SerializeToString()))
if self._tracked_group is not None and self._vehicle is not None:
# Update vehicle to correctly follow the tracked group.
# Leave for now, need to work out exactly how this will change.
# alg.dualServoChange(alg.find_centre())
pass

View File

@@ -0,0 +1,5 @@
from tracking.lidar_cache import LidarCache
import Messaging.message_factory as mf

BIN
car/tracking/out.pickle Normal file

Binary file not shown.

4
car/tracking/readme.txt Normal file
View File

@@ -0,0 +1,4 @@
To load the lidar dummy scans in all_scans.txt,
use python pickle:
with open('path/to/all_scans.txt', 'rb') as fp:
all_scans = pickle.load(fp)