Source code for arcjetCV.utils.processor

import cv2 as cv
import numpy as np
import os, sys
from arcjetCV.segmentation.contour.contour import (
    contoursHSV,
    contoursGRAY,
    contoursCNN,
    getEdgeFromContour,
    contoursAutoHSV,
    getPoints,
)
from arcjetCV.segmentation.contour.cnn import CNN
from arcjetCV.utils.utils import clahe_normalize, annotate_image_with_frame_number
from arcjetCV.utils.output import OutputListJSON
from arcjetCV.utils.video import Video


[docs] class ArcjetProcessor: """ Video frame processor Primary image processing class: used to read in video data, extract leading edges, hold processed arrays, and output processed data to file. """ def __init__(self, videometa): """ Initializes the ArcjetProcessor object. :param videometa: dictionary containing video metadata """ self.flow_dir = videometa["FLOW_DIRECTION"] self.h = videometa["HEIGHT"] self.w = videometa["WIDTH"] self.crop = videometa.crop_range() self.cnn = CNN()
[docs] def update_video_meta(self, videometa): """ Updates video metadata. :param videometa: dictionary containing updated video metadata """ self.flow_dir = videometa["FLOW_DIRECTION"] self.h = videometa["HEIGHT"] self.w = videometa["WIDTH"] self.crop = videometa.crop_range()
[docs] def get_flow_direction(self, frame): """ Infers flow direction from the provided frame. :param frame: opencv image :returns flowDirection: string, "left or "right" TODO: add support for top/bottom directions """ if len(frame.shape) == 3: gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) else: gray = frame # smooth image to remove speckles/text gray = cv.GaussianBlur(gray, (15, 15), 0) # find location of max intensity _, _, _, max_loc = cv.minMaxLoc(gray) width_img, width_loc = frame.shape[1], max_loc[1] flux_loc = width_loc / width_img # Bright location generally indicates flow direction if flux_loc > 0.5: flow_direction = "left" elif flux_loc < 0.5: flow_direction = "right" return flow_direction
[docs] def get_image_flags(self, frame, argdict): """ Uses histogram of 8-bit grayscale image (0,255) to classify image type. :param frame: opencv image :param argdict: dictionary to store flags :returns: dictionary of flags """ try: # Attempt to retrieve the model fraction value from argdict, defaults to 0.005 if not found modelfraction = argdict["MODEL_FRACTION"] except KeyError: modelfraction = 0.005 ### Gray value histogram # Convert the input image to grayscale gray_ = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) # Apply Gaussian blur to the grayscale image to reduce noise for better analysis gray = cv.GaussianBlur(gray_, (5, 5), 0) # Store the minimum and maximum pixel values of the grayscale image in argdict argdict["PIXEL_MIN"] = gray.min() argdict["PIXEL_MAX"] = gray.max() ### grayscale histogram histr = cv.calcHist([gray], None, None, [256], (0, 256)) imgsize = gray.size ### classification criteria # bright pixels occupy more than one histogram bin (not a single shade) modelvis = ((histr[12:250] / imgsize) > 0.00).sum() != 1 # bright pixels exceed a threshold fraction of the full image modelvis *= (histr[50:250].sum() / imgsize) > modelfraction argdict["MODEL_VISIBLE"] = modelvis argdict["OVEREXPOSED"] = (histr[243:].sum() / imgsize) > modelfraction argdict["UNDEREXPOSED"] = histr[150:].sum() / imgsize < modelfraction return argdict
[docs] def segment(self, img_crop, argdict): """ Segments image using one of several methods specified in argdict. :param img_crop: cropped opencv image :param argdict: dictionary containing segmentation method and related parameters :returns: contour_dict: dictionary containing contours flags: dictionary containing flags """ # Check the segmentation method specified in argdict and execute the corresponding block if argdict["SEGMENT_METHOD"] == "AutoHSV": # If the method is AutoHSV, call the contoursAutoHSV function with the cropped image contour_dict, flags = contoursAutoHSV(img_crop, flags=argdict) elif argdict["SEGMENT_METHOD"] == "HSV": # If the method is HSV, first try to retrieve HSV range values from argdict try: self.HSVModelRange = argdict["HSV_MODEL_RANGE"] self.HSVShockRange = argdict["HSV_SHOCK_RANGE"] except KeyError: # If the ranges are not specified, use default values and print a message self.HSVModelRange = [(0, 0, 150), (121, 125, 255)] self.HSVShockRange = [(125, 40, 85), (170, 80, 230)] print( f"HSVRange not provided, using default value of self.HSVModelRange: {self.HSVModelRange}, self.HSVShockRange: {self.HSVShockRange}" ) # Normalize the cropped image for better segmentation img_clahe = clahe_normalize(img_crop) # Call the contoursHSV function with the normalized image and HSV ranges contour_dict, flags = contoursHSV( img_clahe, log=None, minHSVModel=self.HSVModelRange[0], maxHSVModel=self.HSVModelRange[1], minHSVShock=self.HSVShockRange[0], maxHSVShock=self.HSVShockRange[1], ) elif argdict["SEGMENT_METHOD"] == "GRAY": # If the method is GRAY, try to retrieve the threshold value from argdict try: thresh = argdict["THRESHOLD"] except: # Use a default threshold if not specified and print a message thresh = 240 print(f"Threshold not provided, using default value of {thresh}") # Normalize the cropped image img_clahe = clahe_normalize(img_crop) # Call the contoursGRAY function with the normalized image and the threshold contour_dict, flags = contoursGRAY(img_clahe, thresh=thresh, log=None) elif argdict["SEGMENT_METHOD"] == "CNN": # If the method is CNN, call the contoursCNN function with the cropped image and the CNN model contour_dict, flags = contoursCNN(img_crop, self.cnn) else: # If none of the specified methods match, return None to indicate failure return # Update the original argdict with the flags returned from the segmentation function argdict.update(flags) # Return the dictionary of contours and the updated argdict return contour_dict, argdict
[docs] def get_edges_metrics(self, contour_dict, argdict, offset): """ Retrieves edges and metrics from contour dictionary. :param contour_dict: dictionary containing contours :param argdict: dictionary containing metrics :param offset: tuple containing offset values :returns: edges: dictionary containing edges argdict: updated dictionary containing metrics """ edges = {} for key in contour_dict.keys(): c = contour_dict[key] if c is not None and len(c) > 0: ### get contour area M = cv.moments(c) argdict[key + "_AREA"] = M["m00"] ### get centroid if M["m00"] > 0: argdict[key + "_CENTROID_X"] = int(M["m10"] / M["m00"]) argdict[key + "_CENTROID_Y"] = int(M["m01"] / M["m00"]) else: argdict[key + "_CENTROID_X"] = np.nan argdict[key + "_CENTROID_Y"] = np.nan ### get front edge edges[key] = getEdgeFromContour( c, self.flow_dir, offset=(self.crop[0][0] - offset[0], self.crop[1][0] - offset[1]), ) if len(edges[key]) > 0: if key == "MODEL": outputs = getPoints( edges[key], flow_direction=self.flow_dir, r=[-0.95, -0.50, 0, 0.50, 0.95], prefix="MODEL", ) else: # SHOCK outputs = getPoints( edges[key], flow_direction=self.flow_dir, r=[0], prefix="SHOCK", ) argdict.update(outputs) else: edges[key] = None return edges, argdict
[docs] def make_crop_square(self, frame): """ Makes the provided frame square by cropping or padding. :param frame: opencv image :returns: square_frame: square opencv image offset: list containing offset values """ # Crop the frame based on predefined crop coordinates stored in self.crop # self.crop is expected to be a tuple or list with two elements, each an (start, end) pair for y and x dimensions respectively cropped_frame = frame[ self.crop[0][0] : self.crop[0][1], self.crop[1][0] : self.crop[1][1] ] # Check if the cropped frame is grayscale (i.e., has only one channel) if len(cropped_frame.shape) == 2 or cropped_frame.shape[2] == 1: # Convert grayscale to RGB cropped_frame = cv.cvtColor(cropped_frame, cv.COLOR_GRAY2RGB) # Determine the height and width of the cropped frame cropped_height, cropped_width = cropped_frame.shape[:2] # Calculate the side length of the new square frame as the max of cropped height and width square_side = max(cropped_height, cropped_width) # Initialize a square frame filled with zeros (black) of the same type as the cropped frame square_frame = np.zeros( (square_side, square_side, 3), dtype=cropped_frame.dtype ) # Calculate starting points (y, x) to paste the cropped frame into the square frame # so that it is centered within the square frame start_y = (square_side - cropped_height) // 2 start_x = (square_side - cropped_width) // 2 # Paste the cropped frame into the square frame at the calculated starting points square_frame[ start_y : start_y + cropped_height, start_x : start_x + cropped_width ] = cropped_frame # Return the square frame along with the offset values indicating where the cropped frame # was placed within the square frame return square_frame, [start_y, start_x]
[docs] def process(self, frame, argdict): """ Processes the given frame. :param frame: opencv image :param argdict: dictionary containing segmentation parameters :returns: edges: dictionary containing edges argdict: updated dictionary containing metrics Example: ```python processor = ArcjetProcessor(videometa) frame = cv.imread('frame.jpg') argdict = {"SEGMENT_METHOD": "AutoHSV", "MODEL_FRACTION": 0.005} edges, argdict = processor.process(frame, argdict) ``` """ # Determine the flow direction of the frame if not already set if self.flow_dir is None: self.flow_dir = self.get_flow_direction(frame) # Make the frame square to ensure consistent processing, obtaining the cropped frame and offset frame_crop, offset = self.make_crop_square(frame) # Update argdict with image flags based on the cropped frame argdict = self.get_image_flags(frame_crop, argdict) # Segment the cropped frame, updating argdict with segmentation results contour_dict, argdict = self.segment(frame_crop, argdict) # Calculate edge metrics based on contours, updating argdict further edges, argdict = self.get_edges_metrics(contour_dict, argdict, offset) # Return the edges and a copy of the updated argdict return edges, argdict.copy()
[docs] def process_all( self, video: Video, options, first_frame, last_frame, frame_stride, output_prefix="", write_json=True, write_video=False, display_shock=True, ): """ Processes all frames in the video. :param video: video object (defined in utils/video.py) :param options: dictionary containing segmentation options :param first_frame: index of the first frame to process :param last_frame: index of the last frame to process :param frame_stride: stride for frame processing :param write_json: boolean indicating whether to write processed data to JSON file :param write_video: boolean indicating whether to write processed video Example: ```python video = Video('input_video.mp4') options = {"SEGMENT_METHOD": "AutoHSV", "MODEL_FRACTION": 0.005} processor = ArcjetProcessor(videometa) processor.process_all(video, options, 0, 100, 1, 'output.json', write_video=True) ``` """ # Initialize video writer if write_video is True if write_video: video_output_name = "video_out_%s_%i_%i.m4v" % ( output_prefix, first_frame, last_frame, ) video.get_writer(video_output_name) # Setup output JSON file if output_prefix == "": output_prefix = video.name self.filename = "%s_%i_%i.json" % (output_prefix, first_frame, last_frame) out_json = OutputListJSON(os.path.join(video.folder, self.filename)) # Iterate over frames from first_frame to last_frame, with steps of frame_stride for frame_index in range(first_frame, last_frame + 1, frame_stride): try: frame = video.get_frame(frame_index) # If a frame cannot be retrieved, print an error message and return if frame is None: print(f"Failed at frame {frame_index}") continue # Update options with the current frame index options["INDEX"] = frame_index # Process the current frame, obtaining contours and updated argdict contour_dict, argdict = self.process(frame, options) # Draw model and shock contours on the frame for visualization color_map = { "MODEL": (0, 255, 0), "SHOCK": (255, 0, 255), } # Define colors for MODEL and SHOCK if display_shock: for key, contours in contour_dict.items(): cv.drawContours( frame, contours, -1, color_map.get(key, (255, 0, 255)), 2 ) else: # Draw only the MODEL contours if display_shock is False cv.drawContours( frame, contour_dict["MODEL"], -1, color_map["MODEL"], 2 ) # Annotate the frame with its index for reference annotate_image_with_frame_number(frame, frame_index) argdict.update(contour_dict) # update output dictionary out_json.append(argdict.copy()) # Add processed frame to video output if write_video: frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB) video.writer.write(frame) # Print processing progress sys.stdout.write( f"\rProcessing video using {options['SEGMENT_METHOD']} ... " + f"{min(((((frame_index - first_frame) / frame_stride) + 1) / np.ceil((last_frame - first_frame + 1) / frame_stride)) * 100, 100):.1f}%" ) except Exception as e: print(f"Failed at frame {frame_index} with error:\n" + str(e)) if write_json: out_json.write() if write_video: video.close_writer() return out_json