diff --git a/Messaging/message_factory.py b/Messaging/message_factory.py index 8705c7d..fa24312 100644 --- a/Messaging/message_factory.py +++ b/Messaging/message_factory.py @@ -8,12 +8,10 @@ class ZmqPubSubStreamer: ''' def __init__(self, port): - # Should create the socket here always, since zmq is not thread safe. - # Hopefully whoever uses this is not stupid enough to create it then - # pass it into a thread. self._socket = zmq.Context.instance().socket(zmq.PUB) print('Starting socket with address: ' + 'tcp://*:' + str(port)) self._socket.bind("tcp://*:" + str(port)) + def send_message(self, message): ''' diff --git a/requirements.txt b/requirements.txt index 15e7951..03863bb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,4 @@ paho-mqtt u-msgpack-python grpcio-tools rplidar +pyzmq \ No newline at end of file diff --git a/tracking/lidar_cache.py b/tracking/lidar_cache.py index 22d60ef..4641513 100644 --- a/tracking/lidar_cache.py +++ b/tracking/lidar_cache.py @@ -4,7 +4,6 @@ from threading import Thread from tracking import algorithms import tracking.lidar_tracker_pb2 as tracker_pb import zmq -import Messaging.message_factory as mf import Messaging.messages as messages @@ -14,31 +13,27 @@ class LidarCache(): runs grouping algorithms between scans and keeps a copy of the group data. ''' - run = True - tracking_group_number = -1 - currentGroups = None - groupsChanged = [] - port = None def __init__(self, measurements=100): self.lidar = RPLidar('/dev/ttyUSB0') self.measurements = measurements print('Info: ' + self.lidar.get_info()) print('Health: ' + self.lidar.get_health()) + self.run = True + self.tracking_group_number = -1 + self.currentGroups = None + self.groupsChanged = [] - def start_cache(self): - if self.port is None: - print('ERROR: Port has not been set!') - return - self.thread = Thread(target=self.do_scanning) + def start_cache(self, sender): + self.thread = Thread(target=self.do_scanning, args=[sender]) self.thread.start() - def do_scanning(self): + def do_scanning(self, sender): ''' Performs a scan for the given number of iterations. ''' # Create the 0MQ socket first. This should not be passed between threads. - self._mFactory = mf.getZmqPubSubStreamer(self.port) + self._mFactory = sender for i, scan in enumerate(self.lidar.iter_scans(min_len=self.measurements)): print('%d: Got %d measurments' % (i, len(scan))) @@ -54,6 +49,7 @@ class LidarCache(): def fireGroupsChanged(self): # Send the updated groups to 0MQ socket. + # Rename this to be a generic listener method, rather than an explicit 'send' (even though it can be treated as such already) self._mFactory.send_message_topic("lidar_map", messages.ProtoMessage( message=tracker_pb.PointScan(points=[]).SerializeToString())) diff --git a/tracking/lidar_servicer.py b/tracking/lidar_servicer.py index f2cd66c..e5a5b89 100644 --- a/tracking/lidar_servicer.py +++ b/tracking/lidar_servicer.py @@ -2,6 +2,7 @@ import tracking.lidar_tracker_pb2 as lidar_tracker_pb2 from tracking.lidar_tracker_pb2_grpc import PersonTrackingServicer from tracking.lidar_cache import LidarCache from multiprocessing import Process +import Messaging.message_factory as mf class LidarServicer(PersonTrackingServicer): @@ -19,4 +20,4 @@ class LidarServicer(PersonTrackingServicer): ''' Starts the lidar cache. ''' - self.cache.start_cache() + self.cache.start_cache(mf.getZmqPubSubStreamer(request.value))