Motion detection in a video footage has always been an interesting topic among the Machine Learning enthusiasts. It has many purposes. Motion detection can be used for automatic video analysis in video surveillance systems. It can be used to capture movements on a wildlife camera. Another application is to detect number of vehicles passing by on a highway.
In this article, we will build a fully working motion detector. It will not only detect motion, but will also be able to track motion of different objects. By this process, we will learn a lot about image processing techniques in OpenCV. By the end, you will have all the codes required to build your own motion detector.
Problem Statement
We will build a Computer Vision model for motion detection. It will be able to detect moving people in a video footage. Each person detected will have a different colored bounding box. The motion’s path will be shown with the same color as that of the bounding box.
We will achieve motion detection by comparing each video frame to the previous one. The spots which are different from the last frame are detected. For the predicting motion’s path, Kalman Filter is used. The final result will look like this:
Coding Our Motion Detector with Python
Firstly, we look at and understand the code for the motion detection. Then, we write scripts for tracking the path of moving objects. We will use the scripts and draw path of the objects on the frame. So, let’s begin-
1. Reading the video from location
We’ve used cv2.VideoCapture to capture the video from a file location. You can change it to your own location.
import cv2 import imutils import numpy as np cap = cv2.VideoCapture('video.mp4')
2. Preparing the frames
while cap.isOpened(): # Capture frame-by-frame ret, frame = cap.read() if ret is None: break try: frame = imutils.resize(frame, width=750) except: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (21, 21), 0)
In this block of code, we capture each frame of the video and resize it. Then, we convert it to gray scale image. Now the frame’s pixels have values in the range of 0-255. Later, the frame is blurred to smooth it out a little.
3. Detecting motion in the frames
We begin with setting the first frame if it is None. If the number of frames exceed 5, we reset the delay counter and set the first frame with next frame.Motion is detected by comparing first frame with the next frame. The difference between frames is calculated using absdiff method.
if first_frame is None: first_frame = gray next_frame = gray delay_counter += 1 if delay_counter > 5: delay_counter = 0 first_frame = next_frame frame_delta = cv2.absdiff(first_frame, next_frame) thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] thresh = cv2.dilate(thresh, None, iterations=2) cnts, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
threshold() method is used to make the pixels white whose values are greater than threshold value 25. The frames are dilated and we finally find the contours where motion is detected.
This is what it looks like-
5. Finding centroids of the objects in motion
For each contour with contour area greater than 1000, we find its centroid and store it. Simultaneously, we also store the coordinates of bounding box the centroid belongs to.
centers = [] coords = [] for c in cnts: try: (x, y, w, h) = cv2.boundingRect(c) if cv2.contourArea(c) > 1000: b = np.array([[x + w // 2], [y + h // 2]]) centers.append(np.round(b)) coords.append([x, y, x + w, y + h]) except ZeroDivisionError: pass
We have successfully detected bounding boxes and their centroids for the motion detected in the video footage. So, the first half of our task is completed. Now, we proceed towards tracking the objects in motion.
Kalman Filter
Kalman filter algorithm estimates the states of a system given the observations or measurements. It is a useful tool for a variety of different applications including object tracking and autonomous navigation systems, economics prediction, etc.
The Kalman filter is essentially a set of mathematical equations that implement a predictor-corrector type estimator that is optimal in the sense that it minimizes the estimated error covariance when some presumed conditions are met [1].
Mathematical Equation [1]:
The Kalman filter addresses the general problem of trying to estimate the state x ∈ ℜn of a discrete-time controlled process that is governed by the linear stochastic difference equation
xk = Axk-1 + Buk + wk-1
with a measurement m y ∈ℜ that is
yk = Hxk +vk
The random variables wk and kv represent the process and measurement noise respectively. They are assumed to be independent of each other, white, and with normal probability distributions p(w) ≈ N ,0( Q) (3) p(v) ≈ N ,0( R)
The more detailed working of Kalman filter can be found in [1].
Tracking Path of Moving Object
We provide you with two python scripts named tracker.py and kalman_filter.py. The scripts contain codes to track the object with its centroid. It makes use of Kalman filter algorithm. The codes are explained within the scripts with the help of comments.
tracker.py
# Import python libraries import numpy as np from kalman_filter import KalmanFilter from common import dprint from scipy.optimize import linear_sum_assignment class Track(object): """Track class for every object to be tracked Attributes: None """ def __init__(self, prediction, trackIdCount): """Initialize variables used by Track class Args: prediction: predicted centroids of object to be tracked trackIdCount: identification of each track object Return: None """ self.track_id = trackIdCount # identification of each track object self.KF = KalmanFilter() # KF instance to track this object self.prediction = np.asarray(prediction) # predicted centroids (x,y) self.skipped_frames = 0 # number of frames skipped undetected self.trace = [] # trace path class Tracker(object): """Tracker class that updates track vectors of object tracked Attributes: None """ def __init__(self, dist_thresh, max_frames_to_skip, max_trace_length, trackIdCount): """Initialize variable used by Tracker class Args: dist_thresh: distance threshold. When exceeds the threshold, track will be deleted and new track is created max_frames_to_skip: maximum allowed frames to be skipped for the track object undetected max_trace_lenght: trace path history length trackIdCount: identification of each track object Return: None """ self.dist_thresh = dist_thresh self.max_frames_to_skip = max_frames_to_skip self.max_trace_length = max_trace_length self.tracks = [] self.trackIdCount = trackIdCount def Update(self, detections): """Update tracks vector using following steps: - Create tracks if no tracks vector found - Calculate cost using sum of square distance between predicted vs detected centroids - Using Hungarian Algorithm assign the correct detected measurements to predicted tracks https://en.wikipedia.org/wiki/Hungarian_algorithm - Identify tracks with no assignment, if any - If tracks are not detected for long time, remove them - Now look for un_assigned detects - Start new tracks - Update KalmanFilter state, lastResults and tracks trace Args: detections: detected centroids of object to be tracked Return: None """ # Create tracks if no tracks vector found if (len(self.tracks) == 0): for i in range(len(detections)): track = Track(detections[i], self.trackIdCount) self.trackIdCount += 1 self.tracks.append(track) # Calculate cost using sum of square distance between # predicted vs detected centroids N = len(self.tracks) M = len(detections) cost = np.zeros(shape=(N, M)) # Cost matrix for i in range(len(self.tracks)): for j in range(len(detections)): try: diff = self.tracks[i].prediction - detections[j] distance = np.sqrt(diff[0][0]*diff[0][0] + diff[1][0]*diff[1][0]) cost[i][j] = distance except: pass # Let's average the squared ERROR cost = (0.5) * cost # Using Hungarian Algorithm assign the correct detected measurements # to predicted tracks assignment = [] for _ in range(N): assignment.append(-1) row_ind, col_ind = linear_sum_assignment(cost) for i in range(len(row_ind)): assignment[row_ind[i]] = col_ind[i] # Identify tracks with no assignment, if any un_assigned_tracks = [] for i in range(len(assignment)): if (assignment[i] != -1): # check for cost distance threshold. # If cost is very high then un_assign (delete) the track if (cost[i][assignment[i]] > self.dist_thresh): assignment[i] = -1 un_assigned_tracks.append(i) pass else: self.tracks[i].skipped_frames += 1 # If tracks are not detected for long time, remove them del_tracks = [] for i in range(len(self.tracks)): if (self.tracks[i].skipped_frames > self.max_frames_to_skip): del_tracks.append(i) if len(del_tracks) > 0: # only when skipped frame exceeds max for id in del_tracks: if id < len(self.tracks): del self.tracks[id] del assignment[id] else: dprint("ERROR: id is greater than length of tracks") # Now look for un_assigned detects un_assigned_detects = [] for i in range(len(detections)): if i not in assignment: un_assigned_detects.append(i) # Start new tracks if(len(un_assigned_detects) != 0): for i in range(len(un_assigned_detects)): track = Track(detections[un_assigned_detects[i]], self.trackIdCount) self.trackIdCount += 1 self.tracks.append(track) # Update KalmanFilter state, lastResults and tracks trace for i in range(len(assignment)): self.tracks[i].KF.predict() if(assignment[i] != -1): self.tracks[i].skipped_frames = 0 self.tracks[i].prediction = self.tracks[i].KF.correct( detections[assignment[i]], 1) else: self.tracks[i].prediction = self.tracks[i].KF.correct( np.array([[0], [0]]), 0) if(len(self.tracks[i].trace) > self.max_trace_length): for j in range(len(self.tracks[i].trace) - self.max_trace_length): del self.tracks[i].trace[j] self.tracks[i].trace.append(self.tracks[i].prediction) self.tracks[i].KF.lastResult = self.tracks[i].prediction
kalman_filter.py
import numpy as np class KalmanFilter(object): """Kalman Filter class keeps track of the estimated state of the system and the variance or uncertainty of the estimate. Predict and Correct methods implement the functionality Reference: https://en.wikipedia.org/wiki/Kalman_filter Attributes: None """ def __init__(self): """Initialize variable used by Kalman Filter class Args: None Return: None """ self.dt = 0.005 # delta time self.A = np.array([[1, 0], [0, 1]]) # matrix in observation equations self.u = np.zeros((2, 1)) # previous state vector # (x,y) tracking object center self.b = np.array([[0], [255]]) # vector of observations self.P = np.diag((3.0, 3.0)) # covariance matrix self.F = np.array([[1.0, self.dt], [0.0, 1.0]]) # state transition mat self.Q = np.eye(self.u.shape[0]) # process noise matrix self.R = np.eye(self.b.shape[0]) # observation noise matrix self.lastResult = np.array([[0], [255]]) def predict(self): """Predict state vector u and variance of uncertainty P (covariance). where, u: previous state vector P: previous covariance matrix F: state transition matrix Q: process noise matrix Equations: u'_{k|k-1} = Fu'_{k-1|k-1} P_{k|k-1} = FP_{k-1|k-1} F.T + Q where, F.T is F transpose Args: None Return: vector of predicted state estimate """ # Predicted state estimate self.u = np.round(np.dot(self.F, self.u)) # Predicted estimate covariance self.P = np.dot(self.F, np.dot(self.P, self.F.T)) + self.Q self.lastResult = self.u # same last predicted result return self.u def correct(self, b, flag): """Correct or update state vector u and variance of uncertainty P (covariance). where, u: predicted state vector u A: matrix in observation equations b: vector of observations P: predicted covariance matrix Q: process noise matrix R: observation noise matrix Equations: C = AP_{k|k-1} A.T + R K_{k} = P_{k|k-1} A.T(C.Inv) u'_{k|k} = u'_{k|k-1} + K_{k}(b_{k} - Au'_{k|k-1}) P_{k|k} = P_{k|k-1} - K_{k}(CK.T) where, A.T is A transpose C.Inv is C inverse Args: b: vector of observations flag: if "true" prediction result will be updated else detection Return: predicted state vector u """ if not flag: # update using prediction self.b = self.lastResult else: # update using detection self.b = b C = np.dot(self.A, np.dot(self.P, self.A.T)) + self.R K = np.dot(self.P, np.dot(self.A.T, np.linalg.inv(C))) self.u = np.round(self.u + np.dot(K, (self.b - np.dot(self.A, self.u)))) self.P = self.P - np.dot(K, np.dot(C, K.T)) self.lastResult = self.u return self.u
Now, in the final part of the article, we will first draw tracking lines. Then, we draw bounding boxes for each object. The color of bounding box and tracking lines will be same and unique for each object.
We begin with importing Tracker class from the tracker.py script. We write the following code before the while loop i.e. before while cap.isOpened():-
from tracker import Tracker traces = [] tracker = Tracker(200, 30, 20, 100) # Variables initialization skip_frame_count = 0 track_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (0, 255, 255), (255, 0, 255), (255, 127, 255), (127, 0, 255), (127, 0, 127)]
After adding this code, we proceed to write code within the while loop i.e. within while cap.isOpened() loop.
# If centroids are detected then track them if len(centers) > 0: # Track object using Kalman Filter tracker.Update(centers) # For identified object tracks draw tracking line # Use various colors to indicate different track_id colors = {} for i in range(len(tracker.tracks)): if len(tracker.tracks[i].trace) > 1: for j in range(len(tracker.tracks[i].trace) - 1): # Draw trace line x1 = tracker.tracks[i].trace[j][0][0] y1 = tracker.tracks[i].trace[j][1][0] x2 = tracker.tracks[i].trace[j + 1][0][0] y2 = tracker.tracks[i].trace[j + 1][1][0] clr = tracker.tracks[i].track_id % 9 traces.append([x1,y1,x2,y2,clr]) for trace in traces: frame = cv2.line(frame, (int(trace[0]), int(trace[1])), (int(trace[2]), int(trace[3])), track_colors[trace[4]], 2) if clr not in colors.keys(): colors[clr] = [x2, y2] try: c = [] for z in range(len(coords)): minv = np.linalg.norm(np.array(centers[z]) - np.array(list(colors.values())[0])) v = list(colors.keys())[0] for val in range(1, len(colors.keys())): dist = np.linalg.norm(np.array(centers[z]) - np.array(list(colors.values())[val])) if dist < minv: minv = dist v = list(colors.keys())[val] if v in c: c.append(v + 1) else: c.append(v) for z in range(len(coords)): cv2.rectangle(frame, (coords[z][0], coords[z][1]), (coords[z][2], coords[z][3]), track_colors[c[z]], 2) except: pass
The above code block successfully draws tracking paths and bounding boxes on the frame. The frame can be shown using imshow() method.
Conclusion
In this article, we’ve examined how motion detection works in a video. We also worked with image processing techniques like blurring, diluting, finding differencesbetween frames. In addition, we saw how Kalman filter can be used to track the moving objects. At last, we learned about drawing bounding boxes and lines on the image with unique colors.
References
[1]. Mohamed LAARAIEDH. “Implementation of Kalman Filter with Python Language“. IETR Labs, University of Rennes 1. https://arxiv.org/ftp/arxiv/papers/1204/1204.0375.pdf