import numpy as np import cv2 from threading import Thread from queue import Queue import time class VideoGet: ''' Code taken from Najam R Syed, available here: https://github.com/nrsyed/computer-vision/tree/master/multithread ''' def __init__(self, q, src): ''' Must provide a source so we don't accidently start camera at work. ''' self._stream = cv2.VideoCapture(src) (self.grabbed, self.frame) = self._stream.read() self.stopped = False self.q = q self.q.put(np.copy(self.frame)) self.src = src def start(self): Thread(target=self.get, args=()).start() return self def get(self): while not self.stopped: if not self.grabbed: # self.stopped = True print('frame not grabbed') self._stream.release() self._stream = cv2.VideoCapture(self.src) # time.sleep(2) self.grabbed, self.frame = self._stream.read() else: (self.grabbed, self.frame) = self._stream.read() if self.q.full(): self.q.get() self.q.put(np.copy(self.frame)) time.sleep(0.03) # Approximately 30fps # Start a new feed. def stop(self): self.stopped = True