Format files and add gRPC stream support.

This commit is contained in:
Piv
2020-05-03 16:10:40 +09:30
parent ff73d09855
commit 7fb20f9d86
14 changed files with 346 additions and 105 deletions

View File

@@ -15,7 +15,6 @@ import androidx.preference.PreferenceManager;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import org.vato.carcontroller.Int32Value;
import org.vato.carcontroller.PersonTrackingGrpc; import org.vato.carcontroller.PersonTrackingGrpc;
import org.vato.carcontroller.PointScan; import org.vato.carcontroller.PointScan;
import org.vato.carcontroller.Updaters.AbstractUpdater; import org.vato.carcontroller.Updaters.AbstractUpdater;
@@ -27,7 +26,8 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
public class LidarView extends SurfaceView implements AbstractUpdater.MapChangedListener<PointScan> { public class LidarView extends SurfaceView
implements AbstractUpdater.MapChangedListener<PointScan> {
private static final String LIDAR_TOPIC = "lidar_map"; private static final String LIDAR_TOPIC = "lidar_map";
@@ -60,11 +60,13 @@ public class LidarView extends SurfaceView implements AbstractUpdater.MapChanged
String host = prefs.getString("host", "10.0.0.53"); String host = prefs.getString("host", "10.0.0.53");
port = prefs.getString("zmqPort", "5050"); port = prefs.getString("zmqPort", "5050");
String gRPCPort = prefs.getString("port", "50051"); String gRPCPort = prefs.getString("port", "50051");
lidar = new ZmqUpdater<>(PointScan.getDefaultInstance().getParserForType(), LIDAR_TOPIC, host, port); lidar = new ZmqUpdater<>(PointScan.getDefaultInstance().getParserForType(), LIDAR_TOPIC,
host, port);
lidar.addMapChangedListener(this); lidar.addMapChangedListener(this);
surfaceHolder = getHolder(); surfaceHolder = getHolder();
lidarThread = new Thread(lidar); lidarThread = new Thread(lidar);
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, Integer.parseInt(gRPCPort)).usePlaintext().build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(host, Integer.parseInt(gRPCPort))
.usePlaintext().build();
stub = PersonTrackingGrpc.newStub(channel); stub = PersonTrackingGrpc.newStub(channel);
} }
@@ -127,7 +129,8 @@ public class LidarView extends SurfaceView implements AbstractUpdater.MapChanged
Canvas canvas = surfaceHolder.lockCanvas(); Canvas canvas = surfaceHolder.lockCanvas();
canvas.save(); canvas.save();
canvas.drawColor(Color.WHITE); canvas.drawColor(Color.WHITE);
for (Point point : points.getPointsList().stream().map(Point::fromProtoPoint).collect(Collectors.toList())) { for (Point point : points.getPointsList().stream().map(Point::fromProtoPoint).collect(
Collectors.toList())) {
// Now for each point, draw a circle for the point (so it's big enough) in the correct spot, // Now for each point, draw a circle for the point (so it's big enough) in the correct spot,
// and create a colour for that point to paint it correctly. // and create a colour for that point to paint it correctly.
// TODO: Dynamically change the colour of the paint object based on the point group number. // TODO: Dynamically change the colour of the paint object based on the point group number.
@@ -156,7 +159,8 @@ public class LidarView extends SurfaceView implements AbstractUpdater.MapChanged
} }
static Point fromHist(double distance, double angle, Point offset) { static Point fromHist(double distance, double angle, Point offset) {
return new Point(distance * Math.sin(angle) + offset.x, distance * Math.cos(angle) + offset.y); return new Point(distance * Math.sin(angle) + offset.x,
distance * Math.cos(angle) + offset.y);
} }
} }

View File

@@ -1,29 +1,35 @@
package org.vato.carcontroller; package org.vato.carcontroller;
import android.util.Log;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
public class PiLoader implements Runnable { public class PiLoader implements Runnable {
Integer steeringValue = 50; private Integer steeringValue = 50;
Integer throttleValue = 50; private Integer throttleValue = 50;
private ManagedChannel mChannel; private ManagedChannel mChannel;
private CarControlGrpc.CarControlBlockingStub stub; private CarControlGrpc.CarControlBlockingStub stub;
private AtomicBoolean stop = new AtomicBoolean(false); private AtomicBoolean stop = new AtomicBoolean(false);
Thread piUpdaterThread; private Thread piUpdaterThread;
private boolean useGrpcStream;
public PiLoader(String host, Integer port) { public PiLoader(String host, Integer port, boolean useGrpcStream) {
mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
// Create an async stub. // Create an async stub.
stub = CarControlGrpc.newBlockingStub(mChannel); stub = CarControlGrpc.newBlockingStub(mChannel);
this.useGrpcStream = useGrpcStream;
} }
private void createAndStart() { private void createAndStart() {
@@ -53,10 +59,23 @@ public class PiLoader implements Runnable {
@Override @Override
public void run() { public void run() {
if (useGrpcStream) {
doStreamUpdates();
} else {
doAtomicUpdates();
}
}
private void doAtomicUpdates() {
// Do atomic if the user hasn't check the option to use_grpc_streams. This method tends to be more compatible.
while (!stop.get() && !Thread.interrupted()) { while (!stop.get() && !Thread.interrupted()) {
try { try {
SteeringResponse steeringResponse = stub.setSteering(SteeringRequest.newBuilder().setSteering((float) steeringValue / 50f - 1).build()); SteeringResponse steeringResponse = stub.setSteering(
ThrottleResponse throttleResponse = stub.setThrottle(ThrottleRequest.newBuilder().setThrottle((float) throttleValue / 50f - 1).build()); SteeringRequest.newBuilder().setSteering((float) steeringValue / 50f - 1)
.build());
ThrottleResponse throttleResponse = stub.setThrottle(
ThrottleRequest.newBuilder().setThrottle((float) throttleValue / 50f - 1)
.build());
} catch (Exception e) { } catch (Exception e) {
System.out.println("Error"); System.out.println("Error");
stop(); stop();
@@ -71,6 +90,55 @@ public class PiLoader implements Runnable {
} }
} }
private void doStreamUpdates() {
// Stream if user sets use_grpc_streams to true. This method is more efficient but less compatible.
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<Empty> responseObserver = new StreamObserver<Empty>() {
@Override
public void onNext(Empty value) {
finishLatch.countDown();
Log.d("PiLoader", "Finished streaming");
}
@Override
public void onError(Throwable t) {
finishLatch.countDown();
Log.d("PiLoader", "Failed to do gRPC Stream.", t);
}
@Override
public void onCompleted() {
finishLatch.countDown();
Log.d("PiLoader", "Finished streaming");
}
};
StreamObserver<Vehicle2DRequest> requestStreamObserver = CarControlGrpc.newStub(mChannel)
.streamVehicle2d(
responseObserver);
while (!stop.get() && !Thread.interrupted() && finishLatch.getCount() > 0) {
requestStreamObserver.onNext(Vehicle2DRequest.newBuilder()
.setThrottle(ThrottleRequest.newBuilder()
.setThrottle(
(float) throttleValue /
50f -
1)
.build())
.setSteering(SteeringRequest.newBuilder()
.setSteering(
(float) steeringValue /
50f -
1)
.build())
.build());
try {
// Use the same update rate as a typical screen refresh rate.
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
// TODO: Handle when interrupted and sleeping.
}
}
}
public void saveRecording() { public void saveRecording() {
// Ideally don't want to use a blocking stub here, android may complain. // Ideally don't want to use a blocking stub here, android may complain.
Empty done = stub.saveRecordedData(SaveRequest.newBuilder().setFile("Test").build()); Empty done = stub.saveRecordedData(SaveRequest.newBuilder().setFile("Test").build());

View File

@@ -69,10 +69,13 @@ public class SlamController extends AppCompatActivity implements SeekBar.OnSeekB
@Override @Override
protected void onResume() { protected void onResume() {
super.onResume(); super.onResume();
SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(getApplicationContext()); SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(
getApplicationContext());
if (grpcController == null) { if (grpcController == null) {
grpcController = new PiLoader(prefs.getString("host", "10.0.0.53"), Integer.parseInt(prefs.getString("port", "50051"))); grpcController = new PiLoader(prefs.getString("host", "10.0.0.53"),
Integer.parseInt(prefs.getString("port", "50051")),
prefs.getBoolean("use_grpc_streams", false));
} }
// Should call the equivalent of the load method either here or in the loader. // Should call the equivalent of the load method either here or in the loader.
// Test without the grpc for steering. // Test without the grpc for steering.

View File

@@ -20,13 +20,15 @@ import org.vato.carcontroller.SlamDetails;
import org.vato.carcontroller.SlamLocation; import org.vato.carcontroller.SlamLocation;
import org.vato.carcontroller.SlamScan; import org.vato.carcontroller.SlamScan;
import org.vato.carcontroller.Updaters.AbstractUpdater; import org.vato.carcontroller.Updaters.AbstractUpdater;
import org.vato.carcontroller.Updaters.GrpcUpdater;
import org.vato.carcontroller.Updaters.ZmqUpdater; import org.vato.carcontroller.Updaters.ZmqUpdater;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedListener<SlamScan> { public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedListener<SlamScan>,
GrpcUpdater.GrpcUpdateBootstrapper<SlamScan> {
private static final String SLAM_TOPIC = "slam_map"; private static final String SLAM_TOPIC = "slam_map";
@@ -38,6 +40,7 @@ public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedL
private int mapSizePixels; private int mapSizePixels;
private int mapSizeMeters; private int mapSizeMeters;
private String port; private String port;
private boolean useGrpcStreams;
public SlamView(Context context) { public SlamView(Context context) {
super(context); super(context);
@@ -61,13 +64,24 @@ public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedL
String gRPCPort = prefs.getString("port", "50051"); String gRPCPort = prefs.getString("port", "50051");
mapSizePixels = Integer.parseInt(prefs.getString("MAPSIZEPIXELS", "540")); mapSizePixels = Integer.parseInt(prefs.getString("MAPSIZEPIXELS", "540"));
mapSizeMeters = Integer.parseInt(prefs.getString("MAPSIZEMETRES", "10")); mapSizeMeters = Integer.parseInt(prefs.getString("MAPSIZEMETRES", "10"));
slam = new ZmqUpdater<>(SlamScan.getDefaultInstance().getParserForType(), SLAM_TOPIC, host, port); useGrpcStreams = prefs.getBoolean("use_grpc_streams", false);
// TODO: Generify the Updaters to support the strategy pattern.
if (useGrpcStreams) {
slam = new GrpcUpdater<>(SlamScan.getDefaultInstance().getParserForType(),
this);
} else {
slam = new ZmqUpdater<>(SlamScan.getDefaultInstance().getParserForType(), SLAM_TOPIC,
host,
port);
}
slam.addMapChangedListener(this); slam.addMapChangedListener(this);
surfaceHolder = getHolder(); surfaceHolder = getHolder();
paint = new Paint(); paint = new Paint();
paint.setColor(Color.BLUE); paint.setColor(Color.BLUE);
mapThread = new Thread(slam); mapThread = new Thread(slam);
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, Integer.parseInt(gRPCPort)).usePlaintext().build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(host, Integer.parseInt(gRPCPort))
.usePlaintext().build();
stub = SlamControlGrpc.newStub(channel); stub = SlamControlGrpc.newStub(channel);
} }
@@ -75,6 +89,18 @@ public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedL
* Called by MainActivity.onResume() to start a thread. * Called by MainActivity.onResume() to start a thread.
*/ */
public void resume() { public void resume() {
if (useGrpcStreams) {
// TODO: Make a gRPC updater that is a generic <T,U>, one for response proto (like normal) and
// the other for a request proto. onNext() will do the normal map updating.
// GrpcMapThread can just be started for now.
mapThread.start();
} else {
doZmqSlamStream();
}
}
private void doZmqSlamStream() {
// TODO: See if this bootstrapping can be integrated into updaters, as grpc requires a similar thing.
StreamObserver<Empty> response = new StreamObserver<Empty>() { StreamObserver<Empty> response = new StreamObserver<Empty>() {
@Override @Override
public void onNext(Empty value) { public void onNext(Empty value) {
@@ -94,9 +120,9 @@ public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedL
}; };
// use async grpc method, ZMQ doesn't need to connect straight away. // use async grpc method, ZMQ doesn't need to connect straight away.
stub.startMapStreaming(SlamDetails.newBuilder() stub.startMapStreaming(SlamDetails.newBuilder()
.setMapSizePixels(mapSizePixels) .setMapSizePixels(mapSizePixels)
.setMapSizeMeters(mapSizeMeters) .setMapSizeMeters(mapSizeMeters)
.build(), response); .build(), response);
} }
public void stop() { public void stop() {
@@ -119,17 +145,30 @@ public class SlamView extends SurfaceView implements AbstractUpdater.MapChangedL
canvas.save(); canvas.save();
canvas.drawColor(Color.WHITE); canvas.drawColor(Color.WHITE);
// Using width as we want square. // Using width as we want square.
Bitmap bitmap = Bitmap.createBitmap(mapSizePixels, mapSizePixels, Bitmap.Config.ALPHA_8); Bitmap bitmap = Bitmap.createBitmap(mapSizePixels, mapSizePixels,
Bitmap.Config.ALPHA_8);
for (int i = 0; i < mapSizePixels; i++) { for (int i = 0; i < mapSizePixels; i++) {
for (int j = 0; j < mapSizePixels; j++) { for (int j = 0; j < mapSizePixels; j++) {
// 0-255 is appropriate for the config used. // 0-255 is appropriate for the config used.
// Take away from 255 to invert the colours, so walls are the correct colour. // Take away from 255 to invert the colours, so walls are the correct colour.
bitmap.setPixel(i, j, 255 - map.byteAt(i * mapSizePixels + j)); bitmap.setPixel(i, j, map.byteAt(i * mapSizePixels + j));
} }
} }
// TODO: Place the car in the right spot.
canvas.drawBitmap(bitmap, 0, 0, paint); canvas.drawBitmap(bitmap, 0, 0, paint);
canvas.restore(); canvas.restore();
surfaceHolder.unlockCanvasAndPost(canvas); surfaceHolder.unlockCanvasAndPost(canvas);
} }
} }
@Override
public void bootstrap(StreamObserver<SlamScan> responseObserver) {
stub.mapStream(SlamDetails.newBuilder()
.setMapSizePixels(
mapSizePixels)
.setMapSizeMeters(
mapSizeMeters)
.build(), responseObserver);
}
} }

View File

@@ -12,10 +12,6 @@ import androidx.preference.PreferenceManager;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import org.vato.carcontroller.LIDAR.LidarTrackingController;
import java.util.stream.Stream;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
public class SimpleController extends AppCompatActivity implements SeekBar.OnSeekBarChangeListener { public class SimpleController extends AppCompatActivity implements SeekBar.OnSeekBarChangeListener {
@@ -49,10 +45,13 @@ public class SimpleController extends AppCompatActivity implements SeekBar.OnSee
@Override @Override
protected void onResume() { protected void onResume() {
super.onResume(); super.onResume();
SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(getApplicationContext()); SharedPreferences prefs = PreferenceManager.getDefaultSharedPreferences(
getApplicationContext());
if (grpcController == null) { if (grpcController == null) {
grpcController = new PiLoader(prefs.getString("host", "10.0.0.53"), Integer.parseInt(prefs.getString("port", "50051"))); grpcController = new PiLoader(prefs.getString("host", "10.0.0.53"),
Integer.parseInt(prefs.getString("port", "50051")),
prefs.getBoolean("use_grpc_streams", false));
} }
// Should call the equivalent of the load method either here or in the loader. // Should call the equivalent of the load method either here or in the loader.
grpcController.start(); grpcController.start();
@@ -115,12 +114,14 @@ public class SimpleController extends AppCompatActivity implements SeekBar.OnSee
StreamObserver<Empty> response = new StreamObserver<Empty>() { StreamObserver<Empty> response = new StreamObserver<Empty>() {
@Override @Override
public void onNext(Empty value) { public void onNext(Empty value) {
Toast.makeText(getApplicationContext(), "Started Recording Lidar", Toast.LENGTH_SHORT).show(); Toast.makeText(getApplicationContext(), "Started Recording Lidar",
Toast.LENGTH_SHORT).show();
} }
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
Toast.makeText(getApplicationContext(), "Failed to set lidar recording", Toast.LENGTH_SHORT).show(); Toast.makeText(getApplicationContext(), "Failed to set lidar recording",
Toast.LENGTH_SHORT).show();
} }
@Override @Override

View File

@@ -0,0 +1,56 @@
package org.vato.carcontroller.Updaters;
import android.util.Log;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.grpc.stub.StreamObserver;
public class GrpcUpdater<T extends MessageLite> extends AbstractUpdater<T> {
GrpcUpdateBootstrapper<T> bootstrapper;
public GrpcUpdater(Parser parser, GrpcUpdateBootstrapper bootstrapper) {
super(parser);
this.bootstrapper = bootstrapper;
}
@Override
public void stop() {
}
@Override
public void run() {
StreamObserver<T> responseObserver = new StreamObserver<T>() {
@Override
public void onNext(T value) {
fireMapChanged(value);
}
@Override
public void onError(Throwable t) {
Log.d("GRPC_UPDATER", "Failed to get a response", t);
}
@Override
public void onCompleted() {
}
};
bootstrapper.bootstrap(responseObserver);
}
// TODO: Find if there's a way to clean this up so other updaters can easily substitute.
public interface GrpcUpdateBootstrapper<T extends MessageLite> {
/**
* A responseObserver is passed into this method so that it can be called/initialised by
* the client.
*
* @param responseObserver
*/
void bootstrap(StreamObserver<T> responseObserver);
}
}

View File

@@ -28,6 +28,10 @@
android:title="Port number" android:title="Port number"
app:key="port" app:key="port"
app:useSimpleSummaryProvider="true" /> app:useSimpleSummaryProvider="true" />
<SwitchPreference
android:defaultValue="false"
android:key="use_grpc_streams"
android:title="Use gRPC Streams Where Possible" />
</PreferenceCategory> </PreferenceCategory>
<PreferenceCategory android:title="Car Calibration"> <PreferenceCategory android:title="Car Calibration">

View File

@@ -12,7 +12,7 @@ dependencies {
task copyPythonCode(type: Copy, dependsOn: configurations.python){ task copyPythonCode(type: Copy, dependsOn: configurations.python){
// Copy python protobuf code from proto project. // Copy python protobuf code from proto project.
from zipTree(configurations.python.asPath) from zipTree(configurations.python.asPath)
into './src' into 'src'
} }
task build(type: Exec, dependsOn: copyPythonCode) { task build(type: Exec, dependsOn: copyPythonCode) {

View File

@@ -12,6 +12,7 @@ class MotorServicer(motorService_pb2_grpc.CarControlServicer):
def __init__(self, vehicle): def __init__(self, vehicle):
self.vehicle = VehicleRecordingDecorator(vehicle) self.vehicle = VehicleRecordingDecorator(vehicle)
self._timer = None self._timer = None
self._timeout_elapsed = False
def set_throttle(self, request, context): def set_throttle(self, request, context):
# gRPC streams currently don't work between python and android. # gRPC streams currently don't work between python and android.
@@ -26,6 +27,21 @@ class MotorServicer(motorService_pb2_grpc.CarControlServicer):
self.vehicle.steering = request.steering self.vehicle.steering = request.steering
return motorService_pb2.SteeringResponse(steeringSet=True) return motorService_pb2.SteeringResponse(steeringSet=True)
def stream_vehicle_2d(self, request_iterator, context):
print('Starting Vehicle Control Stream')
self._timeout_elapsed = False
for request_2d in request_iterator:
# End the stream if the user fails to respond in time.
if self._timeout_elapsed:
break
print('Setting 2d values to: ' + str((request_2d.throttle.throttle, request_2d.steering.steering)))
self.set_timeout(3)
# TODO: Make a vehicle method set 2d throttle/steering.
self.vehicle.throttle = request_2d.throttle.throttle
self.vehicle.steering = request_2d.steering.steering
return empty.Empty()
def set_timeout(self, min_timeout): def set_timeout(self, min_timeout):
"""Stops the old timer and restarts it to the specified time. """Stops the old timer and restarts it to the specified time.
@@ -39,6 +55,7 @@ class MotorServicer(motorService_pb2_grpc.CarControlServicer):
def timeout_elapsed(self): def timeout_elapsed(self):
"""Election or heartbeat timeout has elapsed.""" """Election or heartbeat timeout has elapsed."""
print("Node timeout elapsed") print("Node timeout elapsed")
self._timeout_elapsed = True
self.vehicle.stop() self.vehicle.stop()
def record(self, request, context): def record(self, request, context):

View File

@@ -0,0 +1,87 @@
import car.tracking.devices.factory as lidar_fact
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import RPLidarA1 as LaserModel
class SlamProcessor:
def __init__(self, map_pixels=None, map_meters=None):
self._map_pixels = map_pixels
self._map_meters = map_meters
def do_scanning(self):
self.can_scan = True
# Adapted from BreezySLAM rpslam example.
# Connect to Lidar unit. For some reason it likes to be done twice, otherwise it errors out...
lidar = lidar_fact.get_lidar()
lidar = lidar_fact.get_lidar()
print('Initialised lidar')
# Create an RMHC SLAM object with a laser model and optional robot model
slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters)
print('initialised slam')
# Initialize empty map
mapbytes = bytearray(self.map_pixels * self.map_pixels)
print('Initialised byte []')
# Create an iterator to collect scan data from the RPLidar
iterator = lidar.iter_scans()
print('Scanning')
while self.can_scan:
# Extract (quality, angle, distance) triples from current scan
# This can still sometimes fail, at least on macOS.
try:
items = [item for item in next(iterator)]
except Exception:
# Ignore errors for now.
print('Failed to retrieve some scans from lidar.')
continue
# Extract distances and angles from triples
distances = [item[2] for item in items]
angles = [item[1] for item in items]
print('Updating map')
# Update SLAM with current Lidar scan and scan angles
slam.update(distances, scan_angles_degrees=angles)
print('Map updated')
slam.getmap(mapbytes)
# Make a generator that can be iterated on by this function.
yield (mapbytes, slam.getpos())
# Close the generator when user stops scanning.
raise StopIteration
def stop_scanning(self):
self.can_scan = False
# Properties
@property
def map_pixels(self):
return self._map_pixels
@map_pixels.setter
def map_pixels(self, value):
self._map_pixels = value
@property
def map_meters(self):
return self._map_meters
@map_meters.setter
def map_meters(self, value):
self._map_meters = value
@property
def lidar_connection(self):
return self._lidar_connection
@lidar_connection.setter
def lidar_connection(self, value):
self._lidar_connection = value

View File

@@ -2,8 +2,11 @@ import car.slam.SlamController_pb2_grpc as grpc
import car.slam.SlamController_pb2 as proto import car.slam.SlamController_pb2 as proto
import car.empty_pb2 as empty import car.empty_pb2 as empty
import car.slam.slam_streamer as slam import car.slam.slam_streamer as slam
from .slam_processor import SlamProcessor
from multiprocessing import Process from multiprocessing import Process
import os import os
from car.slam.SlamController_pb2 import SlamLocation, SlamScan
class SlamServicer(grpc.SlamControlServicer): class SlamServicer(grpc.SlamControlServicer):
@@ -11,24 +14,34 @@ class SlamServicer(grpc.SlamControlServicer):
def __init__(self): def __init__(self):
print('Servicer initialised') print('Servicer initialised')
self.slam = slam.SlamStreamer() self.slam = SlamProcessor()
def start_map_streaming(self, request, context): def start_map_streaming(self, request, context):
print('Received Map Start Streaming Request') print('Received Map Start Streaming Request')
slam_streamer = slam.SlamStreamer(self.slam)
if self.slam_thread is None: if self.slam_thread is None:
print('initialising slam_thread') print('initialising slam_thread')
# Don't bother creating and starting slam more than once. # Don't bother creating and starting slam more than once.
self.slam.port = 50052 if "CAR_ZMQ_PORT" not in os.environ else os.environ[ slam_streamer.port = 50052 if "CAR_ZMQ_PORT" not in os.environ else os.environ[
'CAR_ZMQ_PORT'] 'CAR_ZMQ_PORT']
self.slam.map_pixels = request.map_size_pixels self.slam.map_pixels = request.map_size_pixels
self.slam.map_meters = request.map_size_meters self.slam.map_meters = request.map_size_meters
self.slam_thread = Process(target=self.slam.start) self.slam_thread = Process(target=slam_streamer.start)
self.slam_thread.start() self.slam_thread.start()
return empty.Empty() return empty.Empty()
def map_stream(self, request, context):
print('Received Map gRPC Stream Start Request')
self.slam.map_meters = request.map_size_meters
self.slam.map_pixels = request.map_size_pixels
# If expression doesn't work, then just do yields in a for loop.
return (SlamScan(map=bytes(m), location=SlamLocation(x=pos[0], y=pos[1], theta=pos[2])) for m, pos in self.slam.do_scanning())
def stop_streaming(self, request, context): def stop_streaming(self, request, context):
# Need to adapt this to also be able to stop gRPC streamer.
self.slam.stop_scanning()
if self.slam_thread is not None: if self.slam_thread is not None:
self.slam.stop_scanning()
self.slam_thread.join() self.slam_thread.join()
self.slam = None self.slam = None
return empty.Empty() return empty.Empty()

View File

@@ -1,10 +1,9 @@
import zmq import zmq
from breezyslam.algorithms import RMHC_SLAM
from breezyslam.sensors import RPLidarA1 as LaserModel
from car.slam.SlamController_pb2 import SlamScan, SlamLocation from car.slam.SlamController_pb2 import SlamScan, SlamLocation
import car.messaging.message_factory as mf import car.messaging.message_factory as mf
import car.messaging.messages as messages import car.messaging.messages as messages
import car.tracking.devices.factory as lidar_fact import car.tracking.devices.factory as lidar_fact
from .slam_processor import SlamProcessor
# Left here as was used in the example, configure as necessary. # Left here as was used in the example, configure as necessary.
@@ -15,10 +14,12 @@ import car.tracking.devices.factory as lidar_fact
class SlamStreamer: class SlamStreamer:
can_scan = False can_scan = False
def __init__(self, map_pixels=None, map_meters=None, port=None): def __init__(self, processor, port=None):
self._map_pixels = map_pixels """
self._map_meters = map_meters ZMQ implementation to stream slam map.
"""
self._port = port self._port = port
self.processor = processor
def start(self): def start(self):
''' '''
@@ -31,47 +32,12 @@ class SlamStreamer:
to calling this method, and changing those values after to calling this method, and changing those values after
calling this method will have no effect. calling this method will have no effect.
''' '''
self.can_scan = True
print('Starting to stream') print('Starting to stream')
# TODO: Don't get the messenger here, pass it in/specify another way.
self._mFactory = mf.getZmqPubSubStreamer(self._port) self._mFactory = mf.getZmqPubSubStreamer(self._port)
print('Started and bound zmq socket.') print('Started and bound zmq socket.')
for map, pos in self.processor.do_scanning():
# Adapted from BreezySLAM rpslam example. self._push_map(map, pos)
# Connect to Lidar unit. For some reason it likes to be done twice, otherwise it errors out...
lidar = lidar_fact.get_lidar()
lidar = lidar_fact.get_lidar()
print('Initialised lidar')
# Create an RMHC SLAM object with a laser model and optional robot model
slam = RMHC_SLAM(LaserModel(), self._map_pixels, self._map_meters)
print('initialised slam')
# Initialize empty map
mapbytes = bytearray(self.map_pixels * self.map_pixels)
print('Initialised byte []')
# Create an iterator to collect scan data from the RPLidar
iterator = lidar.iter_scans()
print('Scanning')
while self.can_scan:
# Extract (quality, angle, distance) triples from current scan
items = [item for item in next(iterator)]
# Extract distances and angles from triples
distances = [item[2] for item in items]
angles = [item[1] for item in items]
print('Updating map')
# Update SLAM with current Lidar scan and scan angles
slam.update(distances, scan_angles_degrees=angles)
print('Map updated')
slam.getmap(mapbytes)
self._push_map(mapbytes, slam.getpos())
def _push_map(self, mapbytes, location): def _push_map(self, mapbytes, location):
''' '''
@@ -86,32 +52,7 @@ class SlamStreamer:
b'slam_map', protoScan) b'slam_map', protoScan)
def stop_scanning(self): def stop_scanning(self):
self.can_scan = False self.processor.stop_scanning()
# Properties
@property
def map_pixels(self):
return self._map_pixels
@map_pixels.setter
def map_pixels(self, value):
self._map_pixels = value
@property
def map_meters(self):
return self._map_meters
@map_meters.setter
def map_meters(self, value):
self._map_meters = value
@property
def lidar_connection(self):
return self._lidar_connection
@lidar_connection.setter
def lidar_connection(self, value):
self._lidar_connection = value
@property @property
def port(self): def port(self):

View File

@@ -25,6 +25,11 @@ message SteeringResponse{
bool steeringSet = 1; bool steeringSet = 1;
} }
message Vehicle2DRequest{
ThrottleRequest throttle = 1;
SteeringRequest steering = 2;
}
message RecordingReqeust{ message RecordingReqeust{
bool record = 1; bool record = 1;
} }
@@ -36,6 +41,7 @@ message SaveRequest{
service CarControl{ service CarControl{
rpc set_throttle(ThrottleRequest) returns (ThrottleResponse){} rpc set_throttle(ThrottleRequest) returns (ThrottleResponse){}
rpc set_steering(SteeringRequest) returns (SteeringResponse){} rpc set_steering(SteeringRequest) returns (SteeringResponse){}
rpc stream_vehicle_2d(stream Vehicle2DRequest) returns (google.protobuf.Empty){}
rpc record(RecordingReqeust) returns (google.protobuf.Empty) {} rpc record(RecordingReqeust) returns (google.protobuf.Empty) {}
rpc save_recorded_data(SaveRequest) returns (google.protobuf.Empty) {} rpc save_recorded_data(SaveRequest) returns (google.protobuf.Empty) {}
} }

View File

@@ -29,5 +29,7 @@ message SlamScan{
service SlamControl { service SlamControl {
rpc start_map_streaming(SlamDetails) returns (google.protobuf.Empty) {} rpc start_map_streaming(SlamDetails) returns (google.protobuf.Empty) {}
rpc map_stream(SlamDetails) returns (stream SlamScan) {}
rpc stop_streaming(google.protobuf.Empty) returns (google.protobuf.Empty) {} rpc stop_streaming(google.protobuf.Empty) returns (google.protobuf.Empty) {}
} }