Move root car to pycar, put other pycar back to car.

This commit is contained in:
=
2020-05-29 21:50:46 +09:30
parent 0bd92e731f
commit 858cbcb2ff
98 changed files with 0 additions and 387 deletions

View File

View File

@@ -0,0 +1,212 @@
import math
class Group:
def __init__(self, number, points=[]):
self._points = points
self._number = number
self._minX = None
self._maxX = None
self._minY = None
self._maxY = None
def add_point(self, point):
self._points.append(point)
self._update_min_max(point)
def get_points(self):
return self._points
@property
def number(self):
return self._number
@number.setter
def number(self, number):
self._number = number
def _update_min_max(self, new_point):
"""
Updates the min and max points for this group.
This is to determine when assigning groups whether the
same group is selected.
"""
converted_point = convert_lidar_to_cartesian(new_point)
if self._minX is None or self._minX > converted_point[0]:
self._minX = converted_point[0]
if self._maxX is None or self._maxX < converted_point[0]:
self._maxX = converted_point[0]
if self._minY is None or self._minY > converted_point[1]:
self._minY = converted_point[1]
if self._maxY is None or self._maxY < converted_point[1]:
self._maxY = converted_point[1]
def get_minX(self):
return self._minY
def get_maxX(self):
return self._maxY
def get_minY(self):
return self._minY
def get_maxY(self):
return self._maxY
def convert_lidar_to_cartesian(new_point):
x = new_point[2] * math.sin(new_point[1])
y = new_point[2] * math.cos(new_point[1])
return (x, y)
def convert_cartesian_to_lidar(x, y):
"""
Converts a point on the grid (with car as the origin) to a lidar tuple (distance, angle)
Parameters
----------
x
Horizontal component of point to convert.
y
Vertical component of point to convert.
Returns
-------
converted
A tuple (distance, angle) that represents the point. Angle is in degrees.
"""
# Angle depends on x/y position.
# if x is positive and y is positive, then angle = tan-1(y/x)
# if x is positive and y is negative, then angle = 360 + tan-1(y/x)
# if x is negative and y is positive, then angle = 180 + tan-1(y/x)
# if x is negative and y is negative, then angle = 180 + tan-1(y/x)
return (math.sqrt(x ** 2 + y ** 2), math.degrees(math.atan(y/x)) + (180 if x < 0 else 270 if y < 0 else 0))
def calc_groups(scan):
"""
Calculates groups of points from a lidar scan. The scan should
already be sorted.
Parameters
----------
scan: Iterable
The lidar scan data to get groups of.
Should be of format: (quality, angle, distance)
Returns
-------
list
List of groups that were found.
"""
prevPoint = None
currentGroup = None
allGroups = []
currentGroupNumber = 0
# assume the list is already sorted.
for point in scan:
if prevPoint is None:
prevPoint = point
continue
# Distances are in mm.
# within 1cm makes a group. Will need to play around with this.
if (point[2] - prevPoint[2]) ** 2 < 10 ** 2:
if currentGroup is None:
currentGroup = Group(currentGroupNumber)
allGroups.append(currentGroup)
currentGroup.add_point(point)
else:
if currentGroup is not None:
currentGroupNumber += 1
currentGroup = None
prevPoint = point
return allGroups
def find_centre(group):
"""
Gets a tuple (x,y) of the centre of the group.
Parameters
----------
group: Group
A group of points to find the centre of.
Returns
-------
tuple (x,y)
The centre in the form of a tuple (x,y)
"""
return ((group.get_maxX() + group.get_minX()) / 2, (group.get_maxY() + group.get_minY()) / 2)
def assign_groups(prev_groups, new_groups):
"""
Assigns group numbers to a new scan based on the groups of an old scan.
"""
for group in prev_groups:
old_centre = find_centre(group)
for new_group in new_groups:
new_centre = find_centre(new_group)
# They are considered the same if the new group and old group centres are within 5cm.
if ((new_centre[0] - old_centre[0]) ** 2 + (new_centre[1] - old_centre[1]) ** 2) < 50 ** 2:
new_group.number = group.number
return new_groups
def updateCarVelocity(oldGroup, newGroup):
"""
Return a tuple (DistanceChange, AngleChange) indicating how the tracked groups have changed, which can
be used to then update the steering/throttle of the car (or other vehicle that
may be used)
Parameters
----------
oldGroup: Group
The positioning of points for the group in the last scan.
newGroup: Group
The positioning of points for the group in the latest scan.
Returns
-------
tuple (DistanceChange, AngleChange)
A tuple containing how the groups' centres changed in the form (distance,angle)
"""
old_polar = convert_cartesian_to_lidar(*find_centre(oldGroup))
new_centre = convert_cartesian_to_lidar(*find_centre(newGroup))
return (new_centre[0] - old_polar[0], new_centre[1] - old_polar[1])
def dualServoChange(newCentre, changeTuple):
"""
Gets a tuple (throttleChange, steeringChange) indicating the change that should be applied to the current
throttle/steering of an rc car that uses dual servos.
Parameters
---------
newCentre
Tuple (distance, angle) of the new centre of the tracked group.
changeTuple
Tuple (distanceChange, angleChange) from the old centre to the new centre.
Returns
-------
tuple
Tuple of (throttleChange, steeringChange) to apply to the 2 servos.
"""
return ((changeTuple[0] / 3) - (newCentre[0] / 4) + 1, 0)

Binary file not shown.

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env python3
'''Animates distances and measurment quality'''
from car.tracking.mock_lidar import MockLidar
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
import car.tracking.lidar_loader as loader
PORT_NAME = '/dev/ttyUSB0'
DMAX = 4000
IMIN = 0
IMAX = 50
def update_line(num, iterator, line):
scan = next(iterator)
offsets = np.array([(np.radians(meas[1]), meas[2]) for meas in scan])
line.set_offsets(offsets)
intens = np.array([meas[0] for meas in scan])
line.set_array(intens)
return line,
def run():
lidar = MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
fig = plt.figure()
ax = plt.subplot(111, projection='polar')
line = ax.scatter([0, 0], [0, 0], s=5, c=[IMIN, IMAX],
cmap=plt.cm.Greys_r, lw=0)
ax.set_rmax(DMAX)
ax.grid(True)
iterator = lidar.iter_scans()
ani = animation.FuncAnimation(fig, update_line,
fargs=(iterator, line), interval=50)
plt.show()
lidar.stop()
lidar.disconnect()
if __name__ == '__main__':
run()

View File

@@ -0,0 +1,55 @@
"""
Animates distances and angle of lidar
Uses model-free algorithms to track grouping of points (objects/groups)
"""
from tracking.mock_lidar import MockLidar
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
import car.tracking.lidar_loader as loader
import car.tracking.algorithms as alg
PORT_NAME = '/dev/ttyUSB0'
DMAX = 4000
IMIN = 0
IMAX = 50
def update_line(num, iterator, line, prev_groups):
scan = next(iterator)
# Now update the groups, and then update the maps with different colours for different groups.
if(prev_groups.groups is None):
prev_groups = alg.calc_groups(scan)
groups = alg.assign_groups(prev_groups, alg.calc_groups(scan))
offsets = np.array([(np.radians(meas[1]), meas[2]) for meas in scan])
line.set_offsets(offsets)
intens = np.array([meas[0] for meas in scan])
line.set_array(intens)
# Set the colour matrix: Just set the colours to 2 * np.pi * group number (for every group number)
# line.set_color()
return line,
class Bunch:
def __init__(self, **kwds):
self.__dict__.update(kwds)
def run():
lidar = MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
fig = plt.figure()
ax = plt.subplot(111, projection='polar')
line = ax.scatter([0, 0], [0, 0], s=5, c=[IMIN, IMAX],
cmap=plt.cm.Greys_r, lw=0)
ax.set_rmax(DMAX)
ax.grid(True)
prev_groups = Bunch(groups=None)
iterator = lidar.iter_scans()
ani = animation.FuncAnimation(fig, update_line,
fargs=(iterator, line, prev_groups), interval=50)
plt.show()
lidar.stop()
lidar.disconnect()
if __name__ == '__main__':
run()

View File

@@ -0,0 +1,26 @@
from .mock_lidar import MockLidar
from .. import lidar_loader as loader
import os
MOCK_DEVICE = "LIDAR_MOCK"
def get_lidar(device=None):
actual_device = None
try:
actual_device = device if device is not None else os.environ["CAR_LIDAR"]
except KeyError:
print(
'No lidar device specified and the CAR_LIDAR environment variable is not set.')
if actual_device == MOCK_DEVICE:
return MockLidar(loader.load_scans_bytes_file("car/src/car/tracking/out.pickle"))
elif actual_device != '':
try:
from rplidar import RPLidar
return RPLidar(device)
except ImportError:
print('Could not import RPLidar. Have you downloaded rplidar?')
else:
print('No valid lidar device found. Please choose ' +
MOCK_DEVICE + ' or a dn address for the lidar device.')
return None

View File

@@ -0,0 +1,43 @@
"""
This module contains a MockLidar class, for use in place of RPLidar.
Importantly, it implements iter_scans, so it can be substituted for RPLidar
in the lidar_cache for testing (or anywhere else the rplidar may be used)
"""
import car.tracking.lidar_loader as loader
class MockLidar:
def __init__(self, scan_iter=None):
"""
Create mock lidar with an iterator that can be used as fake (or reused) scan data.
Examples
--------
lidar = MockLidar(scans)
first_scan = next(lidar.iter_scans(measurements=100))
Parameters
----------
scan_iter: Iterable
An iterator that will generate/provide the fake/old scan data.
"""
self._iter = scan_iter
def iter_scans(self, min_len=100):
return iter(self._iter)
def get_health(self):
return "Mock Lidar has scans" if self._iter is not None else "Mock lidar won't work properly!"
def get_info(self):
return self.get_health()
def stop(self):
pass
def disconnect(self):
pass

View File

@@ -0,0 +1,52 @@
import datetime
class RecordingLidarDecorator:
def __init__(self, lidar):
self._lidar = lidar
self._scans = []
self._record = False
@property
def record(self):
return self._record
@record.setter
def record(self, value):
self.record = value
def save_data(self, filename):
with open(filename, 'w') as f:
for scan in self._scans:
f.write("%s\n" % scan)
def iter_scans(self, min_len=100):
# Need to customise the iterable.
return RecordingIterator(self._lidar.iter_scans(min_len), self._scans)
def get_health(self):
return self._lidar.get_health()
def get_info(self):
return self._lidar.get_info()
def stop(self):
return self._lidar.stop()
def disconnect(self):
return self._lidar.disconnect()
class RecordingIterator:
def __init__(self, iterator, scan_list):
self._iterator = iterator
self._scans = scan_list
def __iter__(self):
return self
def __next__(self):
nextIter = next(self._iterator)
self._scans.append((nextIter, str(datetime.datetime.now())))
return nextIter

View File

@@ -0,0 +1,95 @@
from threading import Thread
from car.tracking import algorithms
import car.tracking.lidar_tracker_pb2 as tracker_pb
import zmq
from car.tracking.devices.mock_lidar import MockLidar
import car.tracking.lidar_loader as lidar_loader
import time
class LidarCache():
"""
A class that retrieves scans from the lidar,
runs grouping algorithms between scans and
keeps a copy of the group data.
"""
def __init__(self, lidar, measurements=100):
self.lidar = lidar
self.measurements = measurements
print('Info: ' + str(self.lidar.get_info()))
print('Health: ' + str(self.lidar.get_health()))
self.run = True
self.tracking_group_number = -1
self.currentGroups = None
self._group_listeners = []
def start_cache(self):
self.thread = Thread(target=self.do_scanning)
self.thread.start()
def do_scanning(self):
"""Performs scans whilst cache is running, and will pass calculated groups data to the sender.
Parameters
----------
listener:
Any object that includes the onGroupsChanged method.
"""
# Batch over scans, so we don't need to do our own batching to determine groups
# TODO: Implement custom batching, as iter_scans can be unreliable
for scan in self.lidar.iter_scans(min_len=self.measurements):
print('Got %d measurments' % (len(scan)))
if len(scan) < self.measurements:
# Poor scan, likely since it was the first scan.
continue
if not self.run:
break
# Now process the groups.
if self.currentGroups is not None:
self.currentGroups = algorithms.assign_groups(
self.currentGroups, algorithms.calc_groups(scan))
else:
self.currentGroups = algorithms.calc_groups(scan)
self.fireGroupsChanged()
def fireGroupsChanged(self):
# Send the updated groups to 0MQ socket.
# Rename this to be a generic listener method, rather than an explicit 'send' (even though it can be treated as such already)
pointScan = tracker_pb.PointScan()
for group in self.currentGroups:
for point in group.get_points():
pointScan.points.append(tracker_pb.Point(
angle=point[1], distance=point[2], group_number=group.number))
for listener in self._group_listeners:
listener(pointScan)
def add_groups_changed_listener(self, listener):
"""
Add a listener for a change in scans. THis will provide a tuple with the new group
scans, which can then be sent off to a network listener for display, or to update the
vehicle with a new velocity.
Parameters
----------
listener
An function that takes a PointScan proto object as its argument.
"""
self._group_listeners.append(listener)
def stop_scanning(self):
self.run = False
if __name__ == '__main__':
lidar = MockLidar(iter(lidar_loader.load_scans_bytes_file('car/src/car/tracking/out.pickle')))
cache = LidarCache(lidar)
cache.add_groups_changed_listener(lambda a : print(a))
cache.start_cache()
time.sleep(1)
cache.stop_scanning()

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
"""
This module is a utility to load and save lidar
scans to disk.
As such, it is useful for testing, to create real lidar
data that can be reused later, without needing to connect the lidar.
"""
import pickle
def get_scans(num_scans, device='/dev/ttyUSB0', measurements_per_scan=100):
from rplidar import RPLidar
lidar = RPLidar(device)
scans = lidar.iter_scans(measurements_per_scan)
return [next(scans) for i in range(0, num_scans)]
def save_scans_bytes(scans, filename='out.pickle'):
with open(filename, 'wb') as f:
pickle.dump(scans, f)
def load_scans_bytes_file(filename):
with open(filename, 'rb') as f:
return pickle.load(f)

View File

@@ -0,0 +1,71 @@
import car.tracking.lidar_tracker_pb2 as lidar_tracker_pb2
from car.tracking.lidar_tracker_pb2_grpc import PersonTrackingServicer
from car.tracking.lidar_cache import LidarCache
from multiprocessing import Process
import car.messaging.message_factory as mf
import car.tracking.devices.factory as lidar_factory
from car.tracking.devices.recording_lidar import RecordingLidarDecorator
from car.messaging import messages
import car.tracking.algorithms as alg
import os
import google.protobuf.empty_pb2 as empty
class LidarServicer(PersonTrackingServicer):
def __init__(self, vehicle=None):
self._lidar = RecordingLidarDecorator(
lidar_factory.get_lidar())
self.cache = LidarCache(self._lidar, measurements=100)
self.cache.add_groups_changed_listener(self.onGroupsChanged)
self._mFactory = None
self._port = 50052 if 'CAR_ZMQ_PORT' not in os.environ else os.environ[
'CAR_ZMQ_PORT']
self._vehicle = vehicle
self._tracked_group = None
self._should_stream = False
def set_tracking_group(self, request, context):
# Invalid groups should stop tracking
self._tracked_group = None if request.value < 0 else request.value
return empty.Empty()
def stop_tracking(self, request, context):
self._should_stream = False
self.cache.stop_scanning()
return empty.Empty()
def start_tracking(self, request, context):
"""Starts the lidar cache, streaming on the provided port."""
self._should_stream = True
self.cache.start_cache()
return empty.Empty()
def record(self, request, context):
# TODO: Fix this to not require
if request.value:
self.cache.start_cache()
else:
self.cache.stop_scanning()
self._lidar.record = request.value
return empty.Empty()
def save_lidar(self, request, context):
self._lidar.save_data(request.file)
return empty.Empty()
def onGroupsChanged(self, message):
if self._mFactory is None:
# Create the zmq socket in the thread that it will be used, just to be safe.
self._mFactory = mf.getZmqPubSubStreamer(self._port)
if self._should_stream:
self._mFactory.send_message_topic(
"lidar_map", messages.ProtoMessage(message=message.SerializeToString()))
if self._tracked_group is not None and self._vehicle is not None:
# Update vehicle to correctly follow the tracked group.
# Leave for now, need to work out exactly how this will change.
# alg.dualServoChange(alg.find_centre())
pass

View File

@@ -0,0 +1,5 @@
from tracking.lidar_cache import LidarCache
import Messaging.message_factory as mf

Binary file not shown.

View File

@@ -0,0 +1,4 @@
To load the lidar dummy scans in all_scans.txt,
use python pickle:
with open('path/to/all_scans.txt', 'rb') as fp:
all_scans = pickle.load(fp)