Fixup factories to support environment variables

This commit is contained in:
Piv
2020-04-20 18:11:34 +09:30
parent a84af91323
commit d9c2f2a143
8 changed files with 65 additions and 24 deletions

View File

@@ -0,0 +1,22 @@
from .mockvehicle import MockVehicle
import os
def get_vehicle(motor_pin=19, steering_pin=18):
ENV_CAR = None
try:
ENV_CAR = os.environ['CAR_VEHICLE']
except KeyError:
print('Failed to find CAR_VEHICLE environment variable. Using mock.')
return MockVehicle()
if ENV_CAR == "CAR_2D":
try:
from .vehicle import Vehicle
return Vehicle(motor_pin, steering_pin)
except ImportError:
print(
'Could not import CAR_2D vehicle. Have you installed the GPIOZERO package?')
elif ENV_CAR == "CAR_MOCK":
return MockVehicle(motor_pin, steering_pin)
else:
print('No valid vehicle found. Have you set the CAR_VEHICLE environment variable?')

View File

@@ -5,6 +5,7 @@ class MockVehicle:
def __init__(self, motor_pin=19, servo_pin=18):
self.motor_pin = motor_pin
self.steering_pin = servo_pin
print('Using Mock Vehicle')
@property
def throttle(self):

View File

@@ -14,6 +14,7 @@ def _safely_set_servo_value(servo, value):
return False
return True
def _is_pin_valid(pin):
if isinstance(pin, int):
if pin < 2 or pin > 21:
@@ -26,6 +27,8 @@ def _is_pin_valid(pin):
# TODO: Allow a vector to be set to change the throttle/steering, for vehicles that don't use
# two servos for controls (e.g. drone, dog)
class Vehicle:
def __init__(self, motor_pin=19, servo_pin=18):
subprocess.call(['sudo', 'pigpiod'])
@@ -39,7 +42,8 @@ class Vehicle:
def initialise_motor(self):
self._motor_servo = Servo(
self._motor_pin, pin_factory=Device.pin_factory)
self._steering_servo = Servo(self._steering_pin, pin_factory=Device.pin_factory)
self._steering_servo = Servo(
self._steering_pin, pin_factory=Device.pin_factory)
@property
def throttle(self):
@@ -73,11 +77,13 @@ class Vehicle:
@steering_pin.setter
def steering_pin(self, value):
self._steering_pin = value if _is_pin_valid(value) else self._steering_pin
self._steering_pin = value if _is_pin_valid(
value) else self._steering_pin
def stop(self):
self.throttle = 0
self.steering = 0
def change_with_vector(self, vector):
# TBD how this will work. Could just be (magnitude,angle) change
pass

View File

@@ -7,7 +7,7 @@ import time
import grpc
import car.control.motorService_pb2_grpc as motorService_pb2_grpc
from car.control.gpio.vehicle import Vehicle
import car.control.gpio.factory as vehicle_factory
from car.control.motor_servicer import MotorServicer
from car.slam.slam_servicer import SlamServicer
import car.slam.SlamController_pb2_grpc as SlamController_pb2_grpc
@@ -22,12 +22,14 @@ class CarServer():
def start_server(self):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))
motorService_pb2_grpc.add_CarControlServicer_to_server(self.create_motor_servicer(), server)
motorService_pb2_grpc.add_CarControlServicer_to_server(
self.create_motor_servicer(), server)
SlamController_pb2_grpc.add_SlamControlServicer_to_server(
self.create_slam_servicer(), server)
lidar_tracker_pb2_grpc.add_PersonTrackingServicer_to_server(
self.create_lidar_servicer(), server)
# Disable tls for local testing.
# Disable tls for local testing for now.
# server.add_secure_port('[::]:50051', self.create_credentials())
server.add_insecure_port('[::]:50051')
server.start()
@@ -44,7 +46,7 @@ class CarServer():
return LidarServicer()
def create_credentials(self):
# Relativise this stuff.
# Relativise this stuff. Should add to source directory.
pvtKeyPath = '/home/pi/tls/device.key'
pvtCertPath = '/home/pi/tls/device.crt'
@@ -55,8 +57,7 @@ class CarServer():
if __name__ == '__main__':
vehicle = Vehicle()
server = CarServer(vehicle)
server = CarServer(vehicle_factory.get_vehicle())
# Can't remember why I do this, is it even needed?
service_thread = Thread(target=server.start_server)

View File

@@ -1,5 +1,6 @@
import car.slam.SlamController_pb2_grpc as grpc
import car.slam.SlamController_pb2 as proto
import car.empty_pb2 as empty
import car.slam.slam_streamer as slam
from multiprocessing import Process
@@ -21,11 +22,11 @@ class SlamServicer(grpc.SlamControlServicer):
self.slam.map_meters = request.map_size_meters
self.slam_thread = Process(target=self.slam.start)
self.slam_thread.start()
return proto.Empty()
return empty.Empty()
def stop_streaming(self, request, context):
if self.slam_thread is not None:
self.slam.stop_scanning()
self.slam_thread.join()
self.slam = None
return proto.Empty()
return empty.Empty()

View File

@@ -1,18 +1,27 @@
from car.tracking.devices.mock_lidar import MockLidar
import car.tracking.lidar_loader as loader
from .mock_lidar import MockLidar
from .. import lidar_loader as loader
import os
# connection = "TEST"
connection = '/dev/ttyUSB0'
MOCK_DEVICE = "LIDAR_MOCK"
RPLIDAR = "LIDAR_RPLIDAR"
def get_lidar():
# Need a way to configure this, maybe with environment variables
if connection == 'TEST':
return MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
else:
def get_lidar(device=None, connection='/dev/ttyUSB0'):
actual_device = None
try:
actual_device = device if device is not None else os.environ["CAR_LIDAR"]
except KeyError:
print(
'No lidar device specified and the CAR_LIDAR environment variable is not set.')
if actual_device == MOCK_DEVICE:
return MockLidar(loader.load_scans_bytes_file("car/tracking/out.pickle"))
elif actual_device == RPLIDAR:
try:
from rplidar import RPLidar
return RPLidar(connection)
except ImportError:
print('Could not import RPLidar, using mock with testing data.')
return MockLidar(loader.load_scans_bytes_file("tracking/out.pickle"))
print('Could not import RPLidar. Have you downloaded rplidar?')
else:
print('No valid lidar device found. Please choose one of ' +
MOCK_DEVICE + ' or ' + RPLIDAR)
return None

View File

@@ -6,11 +6,11 @@ As such, it is useful for testing, to create real lidar
data that can be reused later, without needing to connect the lidar.
"""
from rplidar import RPLidar
import pickle
def get_scans(num_scans, device='/dev/ttyUSB0', measurements_per_scan=100):
from rplidar import RPLidar
lidar = RPLidar(device)
scans = lidar.iter_scans(measurements_per_scan)
return [next(scans) for i in range(0, num_scans)]