Source code for image_data_handling

__author__ = 'David Tadres'
__project__ = 'PiVR'

from glob import glob
import numpy as np
import os
import tkinter as tk
from tkinter import messagebox
from natsort import natsorted
import zipfile
import imageio
import json
from scipy.io import savemat
from pathlib import Path

# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not. Since this command only works in
# Linux it is caught using try-except otherwise it's throw an error
# in a Windows system.
try:
    if os.uname()[4][:3] == 'arm':
        RASPBERRY = True
        LINUX = True
    else:
        RASPBERRY = False
        LINUX = True

    DIRECTORY_INDICATOR = '/'
except AttributeError:
    RASPBERRY = False
    LINUX = False
    DIRECTORY_INDICATOR = '\\'

if not RASPBERRY:
    # Only import if not on Raspberry!
    import cv2

[docs]class PackingImages(): """ After running an experiment with the full frame recording option, it is often problemtatic to move the folder around. The reason is that for the OS it is usually harder (i.e. slower) to move thousands of small files around compared to a single file with the same size. This class collects images and essentially creates a single file from them. """ def __init__(self, controller, path, multiplefolders, folders,zip,delete,npy,mat,color_mode): self.controller = controller self.path = path self.zip = zip self.delete_images = delete self.save_npy = npy self.save_mat = mat self.color_mode = color_mode if multiplefolders: print('will pack up images in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(path + '//' + i_folder) try: with open((path + '//' + i_folder + '/experiment_settings.json'), 'r') as file: self.controller.all_common_variables.experimental_metadata = \ json.load(file) self.pack_up_images() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to pack up images') else: print('Will pack up images in folder ' + path) try: with open((path + '/experiment_settings.json'), 'r') as \ file: self.controller.all_common_variables.experimental_metadata = \ json.load(file) self.pack_up_images() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to pack up images') def pack_up_images(self): abort = False # only pack up if this isn't turned on try: self.controller.all_common_variables.framerate_read_from_experiment_settings = \ self.controller.all_common_variables.experimental_metadata[ 'Framerate'] except KeyError: self.controller.all_common_variables.framerate_read_from_experiment_settings = None abort = True messagebox.showerror('Could not find "Framerate" entry in json file', "Unable to find the 'Framerate' " "entry in the " "'experiment_settings.json' file" ) try: self.controller.all_common_variables.recording_time_read_from_experiment_settings = \ self.controller.all_common_variables.experimental_metadata[ 'Recording time'] except KeyError: self.controller.all_common_variables.recording_time_read_from_experiment_settings = None abort = True messagebox.showerror('Could not find "Recording Time" ' 'entry in json file', "Unable to find the 'Framerate' " "entry in the " "'experiment_settings.json' file" ) if not abort: expected_number_of_images = \ self.controller.all_common_variables.recording_time_read_from_experiment_settings * \ self.controller.all_common_variables.framerate_read_from_experiment_settings image_format = None # check for images with the following extensions: # self.image_sequence_format_options = ('jpg', 'png', 'rgb', 'yuv', 'rgba') for image_extension in self.controller.all_common_variables.image_sequence_format_options: image_names = [p.replace('\\', '') for p in glob('*' + image_extension)] if len(image_names) == expected_number_of_images: # sucess - found the image format. image_format = image_extension print('image format found: ' + image_format) elif len(image_names) > expected_number_of_images: # this can happen when other jpg or other files are in the folder. Can't know which # files are supposed to be part of the experiment and which are not. Can't use # this tool then! tk.messagebox.showerror('Too many images', 'Expected ' + repr( expected_number_of_images) + ' images.\n' 'Found ' + repr( len(image_names)) + ' images.\n' 'Please remove excess ' 'images.') if image_format is not None: image_names = [p.replace('\\', '') for p in glob('*' + image_format)] # sort the names 'naturally' (1,2,3...) image_names = natsorted(image_names) # make a zipfile without compression to get a single file # for all the images - it takes forever to copy 1000images # even if only 1.5kb in size if self.zip: zf = zipfile.ZipFile('images.zip', mode='w') try: # run through all the image names, read the image # file and put in the image_raw numpy array. Also # put into the zip file. In the interest of # speed, in the first iteration, we preallocate # the numpy array for i in range(len(image_names)): # print('image #' + repr(i)) if i == 0: # read the first image to get the # dimensions and the datatype im = imageio.imread(image_names[i]) if self.color_mode == 'greyscale': # use the first image as a template and # preallocate an empty numpy array with # the same datatype as the image (usually # uint8) images = np.zeros((im.shape[0], im.shape[1], len(image_names)), dtype=im.dtype) # if the image was already in greyscale # use that, otherwise... try: images[:, :, i] = im # take the data from channel 0 except ValueError: images[:, :, i] = im[:, :, 0] else: images = np.zeros((im.shape[0], im.shape[1], im.shape[2], len(image_names)), dtype=im.dtype) images[:,:,:,i] = im else: if self.color_mode == 'greyscale': try: images[:, :, i] = imageio.imread( image_names[i]) # take the data from channel 0 except ValueError: images[:, :, i] = imageio.imread( image_names[i])[:, :, 0] else: images[:,:,:,i] = imageio.imread( image_names[i]) if self.zip: zf.write(image_names[i]) finally: # close the zip file writing process if self.zip: zf.close() if self.save_npy: np.save('all_images.npy', images) if self.save_mat: savemat( ('all_images.mat'), mdict={'images': images}) if self.delete_images: # I really don't want to have another loop but # that's just necessary for safety reasons. If # first make sure that there's no error with the # creation of the zip file and only THEN delete # all the image. Also, this really should not be # a problem as we have even two copies of the # images - one in the uncompressed numpy file and # one in a zip file for i in range(len(image_names)): os.remove(image_names[i]) else: tk.messagebox.showerror( 'Could not convert images', 'Could not convert images as not enough or too many of\n' 'the following file extensions where found:\n' '' + repr( self.controller.all_common_variables.image_sequence_format_options))
[docs]class ConvertH264(): """ When recording a video using PiVR there seems to be a problem with the encoder: Some of the metadata is not correctly stored, most importantly the framerate is usually given as 'inf'. This class enables the user to convert the recorded h264 video to another video format. This happens by completely decoding the video """ def __init__(self, path, multiplefolders, folders, save_npy, save_mat, color_mode, output_video_format, codec): self.path = path self.save_npy = save_npy self.save_mat = save_mat self.color_mode = color_mode self.video_format = output_video_format self.codec = codec if multiplefolders: print('Will convert videos in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(path + '//' + i_folder) print(os.getcwd()) try: with open((path + '//' + i_folder + '/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) self.convert_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') else: print('Will convert video in folder ' + path) try: with open((path +'/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) self.convert_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') def convert_video(self): framerate = None # Set to None to keep original FPS. # Set to a number to define the OUTPUT FRAMERATE # video file name - can be changed! #video_file_name = 'converted_video' # video format - can be changed! with open(('experiment_settings.json'), 'r') as file: experiment_variables = json.load(file) # What files exist in the folder? files = [p.replace('\\', '') for p in glob('*')] # Need this as a bool switch in the first elif video_object = None # Currently, this is the only video format this script is # looking for! video_formats = ['h264'] # print(files) # cycle through all the files in the folder as there's no # way of knowing what the name of the video file is for i in range(len(files)): # only know that it ends with h264 if files[i].split('.')[-1] in video_formats: # if there's more than one h264 file in the folder # warn the user if video_object is not None: print( 'More than one video file in folder', 'Found more than one file with one of the' 'following extensions.\n' + repr(video_formats) + '\n' 'You may only have a single ' 'video file in a ' 'folder\n' 'you wish to analyze') # and break the loop break # if a video file is found get a reader object using imageio video_object = imageio.get_reader(files[i], 'ffmpeg') print('found video object: ' + files[i]) video_file_name = files[i].split('.')[-2] if framerate is None: framerate = experiment_variables['Framerate'] if self.video_format != 'None': writer = imageio.get_writer( video_file_name + '.' + self.video_format, fps=framerate, codec=self.codec) for i, im in enumerate(video_object): if self.color_mode == 'greyscale': writer.append_data(im[:, :, 0]) # only take one # channel of the three > grayscale elif self.color_mode == 'color': writer.append_data(im) writer.close() if self.save_npy or self.save_mat: video_length = experiment_variables['Recording time'] # put all the frames in a numpy array - this will be # hard to do with machines with little RAM for long # videos/videos with high framerates! for i, im in enumerate(video_object): if i == 0: if self.color_mode == 'greyscale': images = np.zeros(( im.shape[0], im.shape[1], int(round(video_length * framerate))), dtype=np.uint8) elif self.color_mode == 'color': images = np.zeros(( im.shape[0], im.shape[1], im.shape[2], int(round(video_length*framerate))), dtype=np.uint8) if self.color_mode == 'greyscale': images[:, :, i] = im[:, :, 0] # only take one # channel of the three > grayscale elif self.color_mode == 'color': images[:, :, :, i] = im if self.save_npy: np.save(video_file_name + '.npy', images) if self.save_mat: if self.save_mat: savemat((video_file_name +'.mat'), mdict={'images': images})
class UndistortH264(): """ Unfortunately, the RPi camera produces quite distorted images. Luckily, this can be fixed using and undistort algorithm. The code has essentially been copy-pasted from: https://gitlab.com/davidtadres/cameracalibrations Important: the mtx and dist numpy files have been created using the standard lens. If a different lens is being used, the user must follow the protocol in the https://gitlab.com/davidtadres/cameracalibrations repository and copy the resulting mtx and dist files into the PiVR/undistort_matrices folder! """ def __init__(self, path, multiplefolders, folders, save_npy, save_mat, output_video_format, codec): self.path = path self.save_npy = save_npy self.save_mat = save_mat self.output_video_format = output_video_format self.codec = codec # Create path to undistort matrices main_folder = Path(os.path.realpath(__file__)).parents[0] #print(Path(main_folder, 'undistort_matrices', 'dist.npy')) # Load the undistort matrices saved in the # 'undistort'matrices' folder self.mtx = np.load(Path(main_folder, 'undistort_matrices', 'mtx.npy')) self.dist = np.load(Path(main_folder, 'undistort_matrices', 'dist.npy')) if multiplefolders: print('Will convert videos in ' + repr(len(folders)) + ' folders') for i_folder in folders: print("Working on Folder: " + i_folder) os.chdir(Path(path, i_folder)) print(Path(path, i_folder)) try: with open(('experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) self.undistort_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') # Return to parent path os.chdir(Path(path)) else: print('Will convert video in folder ' + path) try: with open((path +'/experiment_settings.json'), 'r') as file: self.experiment_variables = \ json.load(file) self.undistort_video() except FileNotFoundError: messagebox.showerror( 'Experiment_settings.json missing', 'The file "Experiment_settings.json is ' 'missing.\nUnable to convert videos') def undistort_video(self): framerate = None # Set to None to keep original FPS. # Set to a number to define the OUTPUT FRAMERATE with open(('experiment_settings.json'), 'r') as file: experiment_variables = json.load(file) # What files exist in the folder? files = [p.replace('\\', '') for p in glob('*')] # Need this as a bool switch in the first elif video_object = None # Currently, this is the only video format this script is # looking for! video_formats = ['h264'] # cycle through all the files in the folder as there's no # way of knowing what the name of the video file is for i in range(len(files)): # only know that it ends with h264 if files[i].split('.')[-1] in video_formats: # if there's more than one h264 file in the folder # warn the user if video_object is not None: print( 'More than one video file in folder', 'Found more than one file with one of the' 'following extensions.\n' + repr(video_formats) + '\n' 'You may only have a single ' 'video file in a ' 'folder\n' 'you wish to analyze') # and break the loop break # if a video file is found get a reader object using imageio video_object = imageio.get_reader(files[i], 'ffmpeg') print('found video object: ' + files[i]) video_file_name = files[i].split('.')[-2] if framerate is None: framerate = experiment_variables['Framerate'] # Needs to be taken from experiment_settings as imageio seems # to be unable to correctly read the length of a video # created on the RPi video_length = experiment_variables['Framerate'] * \ experiment_variables['Recording time'] # preallocate an empty array with dimension of rows, columns and then frames images = np.zeros((video_length, video_object.get_meta_data()['size'][1], video_object.get_meta_data()['size'][0], ), dtype=np.uint8) # put all the frames in a numpy array - this will be hard to # do with machines with little RAM for long videos/ # videos with high framerates! for i, im in enumerate(video_object): # images[:, :, i] = im[:, :, 0] # only take one channel # of the three > grayscale if i == 0: # get shape of first image (doesn't change for the # rest of the video) and get the optimal camera matrix h, w = im[:, :, 0].shape[:2] newcameramtx, roi = cv2.getOptimalNewCameraMatrix( self.mtx, self.dist,(w, h),1, (w, h)) # undistort the image dst = cv2.undistort(im[:, :, 0], self.mtx, self.dist, None, newcameramtx) # crop the image # x, y, w, h = roi # dst = dst[y:y + h, x:x + w] try: # assign the image to the array images[i, :, :] = dst except IndexError as e: print(e) messagebox.showerror('Index Error!', 'Please double check if the video really is ' + repr(experiment_variables['Framerate']) + 'fps ' 'and ' + repr(experiment_variables['Recording time']) + 'seconds long\n\n') # Stop the loop break if self.output_video_format != 'None': print('Writing the video file') imageio.mimwrite(video_file_name + '.' + self.output_video_format, images[0:i], fps=framerate, codec=self.codec) if self.save_npy: print('Writing the numpy file') np.save(video_file_name + '.npy', images) if self.save_mat: print('Writing the matlab file') savemat(Path(video_file_name + '.mat'), mdict={'images': images})