Source code for image_data_handling

__author__ = 'David Tadres'
__project__ = 'PiVR'

from glob import glob
import numpy as np
import os
import tkinter as tk
from tkinter import messagebox

import pandas as pd
from natsort import natsorted
import zipfile
import imageio
import json
from scipy.io import savemat
from pathlib import Path
from skimage import draw as ski_draw

# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# Since this command only works in Linux it is caught using
# try-except otherwise it's throw an error in a Windows system.
try:
    if os.uname()[4][:3] == 'arm':
        # This will yield True for both a Raspberry and for M1 Chip
        # Apple devices.
        # Use this code snippet
        # (from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi)
        import re
        CPUINFO_PATH = Path("/proc/cpuinfo")
        if CPUINFO_PATH.exists():
            with open(CPUINFO_PATH) as f:
                cpuinfo = f.read()
            if re.search(r"^Model\s*:\s*Raspberry Pi", cpuinfo, flags=re.M) is not None:
                # if True, is Raspberry Pi
                RASPBERRY = True
                LINUX = True
        else: # Test if one more intendation necessary or not. On Varun's computer
            # is Apple M1 chip (or other Arm CPU device).
            RASPBERRY = False
            LINUX = True
    else:
        # is either Mac or Linux
        RASPBERRY = False
        LINUX = True

    DIRECTORY_INDICATOR = '/'
except AttributeError:
    # is Windows
    RASPBERRY = False
    LINUX = False
    DIRECTORY_INDICATOR = '\\'

if not RASPBERRY:
    # Only import if not on Raspberry!
    import cv2

[docs] class PackingImages(): """ After running an experiment with the full frame recording option, it is often problemtatic to move the folder around. The reason is that for the OS it is usually harder (i.e. slower) to move thousands of small files around compared to a single file with the same size. This class collects images and essentially creates a single file from them. """ def __init__(self, controller, path, multiplefolders, folders,zip,delete,npy,mat,color_mode): self.controller = controller self.path = path self.zip = zip self.delete_images = delete self.save_npy = npy self.save_mat = mat self.color_mode = color_mode if multiplefolders: print('will pack up images in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(path + '//' + i_folder) try: with open((path + '//' + i_folder + '/experiment_settings.json'), 'r') as file: self.controller.all_common_variables.experimental_metadata = \ json.load(file) self.pack_up_images() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to pack up images') else: print('Will pack up images in folder ' + path) try: with open((path + '/experiment_settings.json'), 'r') as \ file: self.controller.all_common_variables.experimental_metadata = \ json.load(file) self.pack_up_images() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to pack up images') def pack_up_images(self): abort = False # only pack up if this isn't turned on try: self.controller.all_common_variables.framerate_read_from_experiment_settings = \ self.controller.all_common_variables.experimental_metadata[ 'Framerate'] except KeyError: self.controller.all_common_variables.framerate_read_from_experiment_settings = None abort = True messagebox.showerror('Could not find "Framerate" entry in json file', "Unable to find the 'Framerate' " "entry in the " "'experiment_settings.json' file" ) try: self.controller.all_common_variables.recording_time_read_from_experiment_settings = \ self.controller.all_common_variables.experimental_metadata[ 'Recording time'] except KeyError: self.controller.all_common_variables.recording_time_read_from_experiment_settings = None abort = True messagebox.showerror('Could not find "Recording Time" ' 'entry in json file', "Unable to find the 'Framerate' " "entry in the " "'experiment_settings.json' file" ) if not abort: expected_number_of_images = \ self.controller.all_common_variables.recording_time_read_from_experiment_settings * \ self.controller.all_common_variables.framerate_read_from_experiment_settings image_format = None # check for images with the following extensions: # self.image_sequence_format_options = ('jpg', 'png', 'rgb', 'yuv', 'rgba') for image_extension in self.controller.all_common_variables.image_sequence_format_options: image_names = [p.replace('\\', '') for p in glob('*' + image_extension)] if len(image_names) == expected_number_of_images: # sucess - found the image format. image_format = image_extension print('image format found: ' + image_format) elif len(image_names) > expected_number_of_images: # this can happen when other jpg or other files are in the folder. Can't know which # files are supposed to be part of the experiment and which are not. Can't use # this tool then! tk.messagebox.showerror('Too many images', 'Expected ' + repr( expected_number_of_images) + ' images.\n' 'Found ' + repr( len(image_names)) + ' images.\n' 'Please remove excess ' 'images.') if image_format is not None: image_names = [p.replace('\\', '') for p in glob('*' + image_format)] # sort the names 'naturally' (1,2,3...) image_names = natsorted(image_names) # make a zipfile without compression to get a single file # for all the images - it takes forever to copy 1000images # even if only 1.5kb in size if self.zip: zf = zipfile.ZipFile('images.zip', mode='w') try: # run through all the image names, read the image # file and put in the image_raw numpy array. Also # put into the zip file. In the interest of # speed, in the first iteration, we preallocate # the numpy array for i in range(len(image_names)): # print('image #' + repr(i)) if i == 0: # read the first image to get the # dimensions and the datatype im = imageio.imread(image_names[i]) if self.color_mode == 'greyscale': # use the first image as a template and # preallocate an empty numpy array with # the same datatype as the image (usually # uint8) images = np.zeros((im.shape[0], im.shape[1], len(image_names)), dtype=im.dtype) # if the image was already in greyscale # use that, otherwise... try: images[:, :, i] = im # take the data from channel 0 except ValueError: images[:, :, i] = im[:, :, 0] else: images = np.zeros((im.shape[0], im.shape[1], im.shape[2], len(image_names)), dtype=im.dtype) images[:,:,:,i] = im else: if self.color_mode == 'greyscale': try: images[:, :, i] = imageio.imread( image_names[i]) # take the data from channel 0 except ValueError: images[:, :, i] = imageio.imread( image_names[i])[:, :, 0] else: images[:,:,:,i] = imageio.imread( image_names[i]) if self.zip: zf.write(image_names[i]) finally: # close the zip file writing process if self.zip: zf.close() if self.save_npy: np.save('all_images.npy', images) if self.save_mat: savemat( ('all_images.mat'), mdict={'images': images}) if self.delete_images: # I really don't want to have another loop but # that's just necessary for safety reasons. If # first make sure that there's no error with the # creation of the zip file and only THEN delete # all the image. Also, this really should not be # a problem as we have even two copies of the # images - one in the uncompressed numpy file and # one in a zip file for i in range(len(image_names)): os.remove(image_names[i]) else: tk.messagebox.showerror( 'Could not convert images', 'Could not convert images as not enough or too many of\n' 'the following file extensions where found:\n' '' + repr( self.controller.all_common_variables.image_sequence_format_options))
[docs] class ConvertH264(): """ When recording a video using PiVR there seems to be a problem with the encoder: Some of the metadata is not correctly stored, most importantly the frame rate is usually given as 'inf'. This class enables the user to convert the recorded h264 video to another video format. This happens by completely decoding the video """ def __init__(self, path, multiplefolders, folders, save_npy, save_mat, emboss_stimulus, color_mode, output_video_format, codec): self.path = path self.save_npy = save_npy self.save_mat = save_mat self.emboss_stimulus = emboss_stimulus self.color_mode = color_mode self.video_format = output_video_format self.codec = codec if multiplefolders: print('Will convert videos in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(path + '//' + i_folder) print(os.getcwd()) try: with open((path + '//' + i_folder + '/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) if self.emboss_stimulus: self.stimulus_presented = None for current_file in Path(path, i_folder).iterdir(): if 'stimulation_presented' in current_file.name: self.stimulus_presented = pd.read_csv(current_file) #print(self.stimulus_presented) if self.stimulus_presented is None: print('ERROR: Could not find stimulation_presented.csv file in folder: ' + \ repr(Path(path, i_folder))) self.convert_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') else: print('Will convert video in folder ' + path) try: with open((path +'/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) if self.emboss_stimulus: self.stimulus_presented = None for current_file in Path(path).iterdir(): if 'stimulation_presented' in current_file.name: self.stimulus_presented = pd.read_csv(current_file) #print(self.stimulus_presented) if self.stimulus_presented is None: print('ERROR: Could not find stimulus_presented.csv file in folder: ' + \ repr(Path(path))) self.convert_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') def convert_video(self): framerate = None # Set to None to keep original FPS. # Set to a number to define the OUTPUT FRAMERATE # video file name - can be changed! #video_file_name = 'converted_video' # video format - can be changed! with open(('experiment_settings.json'), 'r') as file: experiment_variables = json.load(file) # What files exist in the folder? files = [p.replace('\\', '') for p in glob('*')] # Need this as a bool switch in the first elif video_object = None # Currently, this is the only video format this script is # looking for! video_formats = ['h264'] # print(files) # cycle through all the files in the folder as there's no # way of knowing what the name of the video file is for i in range(len(files)): # only know that it ends with h264 if files[i].split('.')[-1] in video_formats: # if there's more than one h264 file in the folder # warn the user if video_object is not None: print( 'More than one video file in folder', 'Found more than one file with one of the' 'following extensions.\n' + repr(video_formats) + '\n' 'You may only have a single ' 'video file in a ' 'folder\n' 'you wish to analyze') # and break the loop break # if a video file is found get a reader object using imageio video_object = imageio.get_reader(files[i], 'ffmpeg') print('found video object: ' + files[i]) video_file_name = files[i].split('.')[-2] if framerate is None: framerate = experiment_variables['Framerate'] if self.video_format != 'None': writer = imageio.get_writer( video_file_name + '.' + self.video_format, fps=framerate, codec=self.codec) for i, im in enumerate(video_object): if self.emboss_stimulus: try: if self.stimulus_presented['Channel 1'].iloc[i] > 0: #print(self.stimulus_presented['Time [s]'].iloc[i]) rr, cc = ski_draw.rectangle(start=(100, 100), extent=(100, 100)) im[rr, cc, 0] = 0 except IndexError: pass if self.color_mode == 'greyscale': writer.append_data(im[:, :, 0]) # only take one # channel of the three > grayscale elif self.color_mode == 'color': writer.append_data(im) writer.close() if self.save_npy or self.save_mat: video_length = experiment_variables['Recording time'] # put all the frames in a numpy array - this will be # hard to do with machines with little RAM for long # videos/videos with high framerates! for i, im in enumerate(video_object): if i == 0: if self.color_mode == 'greyscale': images = np.zeros(( im.shape[0], im.shape[1], int(round(video_length * framerate))), dtype=np.uint8) elif self.color_mode == 'color': images = np.zeros(( im.shape[0], im.shape[1], im.shape[2], int(round(video_length*framerate))), dtype=np.uint8) try: if self.color_mode == 'greyscale': images[:, :, i] = im[:, :, 0] # only take one # channel of the three > grayscale elif self.color_mode == 'color': images[:, :, :, i] = im except IndexError as e: print('Potential Error:\n' + repr(e)) if self.save_npy: np.save(video_file_name + '.npy', images) if self.save_mat: if self.save_mat: savemat((video_file_name +'.mat'), mdict={'images': images})
class UndistortH264(): """ Unfortunately, the RPi camera produces quite distorted images. Luckily, this can be fixed using and undistort algorithm. The code has essentially been copy-pasted from: https://gitlab.com/davidtadres/cameracalibrations Important: the mtx and dist numpy files have been created using the standard lens. If a different lens is being used, the user must follow the protocol in the https://gitlab.com/davidtadres/cameracalibrations repository and copy the resulting mtx and dist files into the PiVR/undistort_matrices folder! """ def __init__(self, path, multiplefolders, folders, save_npy, save_mat, output_video_format, codec, undistort_path): self.path = path self.save_npy = save_npy self.save_mat = save_mat self.output_video_format = output_video_format self.codec = codec self.undistort_path = undistort_path # Create path to undistort matrices main_folder = Path(os.path.realpath(__file__)).parents[0] #print(Path(main_folder, 'undistort_matrices', '640_dist.npy')) if multiplefolders: print('Will convert videos in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(Path(path, i_folder)) print(Path(path, i_folder)) try: with open(('experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) if '640' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '640x480_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '640x480_dist.npy')) self.undistort_video() elif '1024' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '1024x768_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '1024x768_dist.npy')) self.undistort_video() elif '1296' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '1296x972_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '1296x972_dist.npy')) self.undistort_video() else: tk.messagebox.showerror('Not implemented', 'You recorded the experiment at ' + self.experiment_variables['Resolution'] + '.\nOnly 640x480, 1024x786 and 1296x972\n' 'is currently implemented') except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') # Return to parent path os.chdir(Path(path)) else: print('Will convert video in folder ' + path) try: with open((path +'/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) if '640' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '640x480_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '640x480_dist.npy')) self.undistort_video() elif '1024' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '1024x768_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '1024x768_dist.npy')) self.undistort_video() elif '1296' in self.experiment_variables['Resolution']: # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(self.undistort_path, '1296x972_mtx.npy')) self.dist = np.load(Path(self.undistort_path, '1296x972_dist.npy')) self.undistort_video() else: tk.messagebox.showerror('Not implemented', 'You recorded the experiment at ' + self.experiment_variables['Resolution'] + '.\nOnly 640x480, 1024x786 and 1296x972\n' 'is currently implemented') except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') def undistort_video(self): framerate = None # Set to None to keep original FPS. # Set to a number to define the OUTPUT FRAMERATE with open(('experiment_settings.json'), 'r') as file: experiment_variables = json.load(file) # What files exist in the folder? files = [p.replace('\\', '') for p in glob('*')] # Need this as a bool switch in the first elif video_object = None # Currently, this is the only video format this script is # looking for! video_formats = ['h264'] # cycle through all the files in the folder as there's no # way of knowing what the name of the video file is for i in range(len(files)): # only know that it ends with h264 if files[i].split('.')[-1] in video_formats: # if there's more than one h264 file in the folder # warn the user if video_object is not None: print( 'More than one video file in folder', 'Found more than one file with one of the' 'following extensions.\n' + repr(video_formats) + '\n' 'You may only have a single ' 'video file in a ' 'folder\n' 'you wish to analyze') # and break the loop break # if a video file is found get a reader object using imageio video_object = imageio.get_reader(files[i], 'ffmpeg') print('found video object: ' + files[i]) video_file_name = files[i].split('.')[-2] + '_undistorted' if framerate is None: framerate = experiment_variables['Framerate'] # Needs to be taken from experiment_settings as imageio seems # to be unable to correctly read the length of a video # created on the RPi video_length = experiment_variables['Framerate'] * \ experiment_variables['Recording time'] # preallocate an empty array with dimension of rows, columns and then frames images = np.zeros((video_length, video_object.get_meta_data()['size'][1], video_object.get_meta_data()['size'][0], ), dtype=np.uint8) # put all the frames in a numpy array - this will be hard to # do with machines with little RAM for long videos/ # videos with high framerates! for i, im in enumerate(video_object): # images[:, :, i] = im[:, :, 0] # only take one channel # of the three > grayscale if i == 0: # get shape of first image (doesn't change for the # rest of the video) and get the optimal camera matrix h, w = im[:, :, 0].shape[:2] newcameramtx, roi = cv2.getOptimalNewCameraMatrix( self.mtx, self.dist,(w, h),1, (w, h)) # instead of calling cv.2undistort for as many frames as there are # just do the undistortion once (here) and then simply remap (cv2.remap below). map1, map2 = cv2.initUndistortRectifyMap(cameraMatrix=self.mtx, distCoeffs=self.dist, R=None, # I think this is rotation newCameraMatrix=newcameramtx, size=(w,h), m1type=cv2.CV_8U # 8bit, 1-byte unsigned integer, e.g. CV_32F would be 32bit/4-byte floating point ) # undistort the image #dst = cv2.undistort(im[:, :, 0], self.mtx, self.dist, None, # newcameramtx) dst = cv2.remap(im[:,:,0], map1, map2, interpolation=cv2.INTER_LINEAR) # crop the image # x, y, w, h = roi # dst = dst[y:y + h, x:x + w] try: # assign the image to the array images[i, :, :] = dst except IndexError as e: print(e) messagebox.showerror('Index Error!', 'Please double check if the video really is ' + repr(experiment_variables['Framerate']) + 'fps ' 'and ' + repr(experiment_variables['Recording time']) + 'seconds long\n\n') # Stop the loop break if self.output_video_format != 'None': print('Writing the video file') imageio.mimwrite(video_file_name + '.' + self.output_video_format, images[0:i], fps=framerate, codec=self.codec) if self.save_npy: print('Writing the numpy file') np.save(video_file_name + '.npy', images) if self.save_mat: print('Writing the matlab file') savemat(Path(video_file_name + '.mat'), mdict={'images': images})