__author__ = 'David Tadres'
__project__ = 'PiVR'
from glob import glob
import numpy as np
import os
import tkinter as tk
from tkinter import messagebox
from natsort import natsorted
import zipfile
import imageio
import json
from scipy.io import savemat
from pathlib import Path
# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# Since this command only works in Linux it is caught using
# try-except otherwise it's throw an error in a Windows system.
try:
if os.uname()[4][:3] == 'arm':
# This will yield True for both a Raspberry and for M1 Chip
# Apple devices.
# Use this code snippet
# (from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi)
import re
CPUINFO_PATH = Path("/proc/cpuinfo")
if CPUINFO_PATH.exists():
with open(CPUINFO_PATH) as f:
cpuinfo = f.read()
if re.search(r"^Model\s*:\s*Raspberry Pi", cpuinfo, flags=re.M) is not None:
# if True, is Raspberry Pi
RASPBERRY = True
LINUX = True
else: # Test if one more intendation necessary or not. On Varun's computer
# is Apple M1 chip (or other Arm CPU device).
RASPBERRY = False
LINUX = True
else:
# is either Mac or Linux
RASPBERRY = False
LINUX = True
DIRECTORY_INDICATOR = '/'
except AttributeError:
# is Windows
RASPBERRY = False
LINUX = False
DIRECTORY_INDICATOR = '\\'
if not RASPBERRY:
# Only import if not on Raspberry!
import cv2
[docs]class PackingImages():
"""
After running an experiment with the full frame recording option,
it is often problemtatic to move the folder around.
The reason is that for the OS it is usually harder (i.e. slower)
to move thousands of small files around compared to a single file
with the same size.
This class collects images and essentially creates a single file
from them.
"""
def __init__(self, controller, path, multiplefolders,
folders,zip,delete,npy,mat,color_mode):
self.controller = controller
self.path = path
self.zip = zip
self.delete_images = delete
self.save_npy = npy
self.save_mat = mat
self.color_mode = color_mode
if multiplefolders:
print('will pack up images in ' + repr(len(folders))
+ ' folders')
for i_folder in folders:
print("Working on Folder: " + i_folder)
os.chdir(path + '//' + i_folder)
try:
with open((path + '//' + i_folder +
'/experiment_settings.json'),
'r') as file:
self.controller.all_common_variables.experimental_metadata = \
json.load(file)
self.pack_up_images()
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to pack up images')
else:
print('Will pack up images in folder ' + path)
try:
with open((path + '/experiment_settings.json'), 'r') as \
file:
self.controller.all_common_variables.experimental_metadata = \
json.load(file)
self.pack_up_images()
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to pack up images')
def pack_up_images(self):
abort = False # only pack up if this isn't turned on
try:
self.controller.all_common_variables.framerate_read_from_experiment_settings = \
self.controller.all_common_variables.experimental_metadata[
'Framerate']
except KeyError:
self.controller.all_common_variables.framerate_read_from_experiment_settings = None
abort = True
messagebox.showerror('Could not find "Framerate" entry in json file',
"Unable to find the 'Framerate' "
"entry in the "
"'experiment_settings.json' file"
)
try:
self.controller.all_common_variables.recording_time_read_from_experiment_settings = \
self.controller.all_common_variables.experimental_metadata[
'Recording time']
except KeyError:
self.controller.all_common_variables.recording_time_read_from_experiment_settings = None
abort = True
messagebox.showerror('Could not find "Recording Time" '
'entry in json file',
"Unable to find the 'Framerate' "
"entry in the "
"'experiment_settings.json' file"
)
if not abort:
expected_number_of_images = \
self.controller.all_common_variables.recording_time_read_from_experiment_settings * \
self.controller.all_common_variables.framerate_read_from_experiment_settings
image_format = None
# check for images with the following extensions:
# self.image_sequence_format_options = ('jpg', 'png', 'rgb', 'yuv', 'rgba')
for image_extension in self.controller.all_common_variables.image_sequence_format_options:
image_names = [p.replace('\\', '') for p in
glob('*' + image_extension)]
if len(image_names) == expected_number_of_images:
# sucess - found the image format.
image_format = image_extension
print('image format found: ' + image_format)
elif len(image_names) > expected_number_of_images:
# this can happen when other jpg or other files are in the folder. Can't know which
# files are supposed to be part of the experiment and which are not. Can't use
# this tool then!
tk.messagebox.showerror('Too many images',
'Expected ' + repr(
expected_number_of_images)
+ ' images.\n'
'Found ' + repr(
len(image_names)) +
' images.\n'
'Please remove excess '
'images.')
if image_format is not None:
image_names = [p.replace('\\', '') for p in
glob('*' + image_format)]
# sort the names 'naturally' (1,2,3...)
image_names = natsorted(image_names)
# make a zipfile without compression to get a single file
# for all the images - it takes forever to copy 1000images
# even if only 1.5kb in size
if self.zip:
zf = zipfile.ZipFile('images.zip', mode='w')
try:
# run through all the image names, read the image
# file and put in the image_raw numpy array. Also
# put into the zip file. In the interest of
# speed, in the first iteration, we preallocate
# the numpy array
for i in range(len(image_names)):
# print('image #' + repr(i))
if i == 0:
# read the first image to get the
# dimensions and the datatype
im = imageio.imread(image_names[i])
if self.color_mode == 'greyscale':
# use the first image as a template and
# preallocate an empty numpy array with
# the same datatype as the image (usually
# uint8)
images = np.zeros((im.shape[0],
im.shape[1],
len(image_names)),
dtype=im.dtype)
# if the image was already in greyscale
# use that, otherwise...
try:
images[:, :, i] = im
# take the data from channel 0
except ValueError:
images[:, :, i] = im[:, :, 0]
else:
images = np.zeros((im.shape[0],
im.shape[1],
im.shape[2],
len(image_names)),
dtype=im.dtype)
images[:,:,:,i] = im
else:
if self.color_mode == 'greyscale':
try:
images[:, :, i] = imageio.imread(
image_names[i])
# take the data from channel 0
except ValueError:
images[:, :, i] = imageio.imread(
image_names[i])[:, :, 0]
else:
images[:,:,:,i] = imageio.imread(
image_names[i])
if self.zip:
zf.write(image_names[i])
finally:
# close the zip file writing process
if self.zip:
zf.close()
if self.save_npy:
np.save('all_images.npy', images)
if self.save_mat:
savemat(
('all_images.mat'),
mdict={'images': images})
if self.delete_images:
# I really don't want to have another loop but
# that's just necessary for safety reasons. If
# first make sure that there's no error with the
# creation of the zip file and only THEN delete
# all the image. Also, this really should not be
# a problem as we have even two copies of the
# images - one in the uncompressed numpy file and
# one in a zip file
for i in range(len(image_names)):
os.remove(image_names[i])
else:
tk.messagebox.showerror(
'Could not convert images',
'Could not convert images as not enough or too many of\n'
'the following file extensions where found:\n'
'' + repr(
self.controller.all_common_variables.image_sequence_format_options))
[docs]class ConvertH264():
"""
When recording a video using PiVR there seems to be a problem
with the encoder: Some of the metadata is not correctly stored,
most importantly the frame rate is usually given as 'inf'.
This class enables the user to convert the recorded h264 video to
another video format. This happens by completely decoding the video
"""
def __init__(self, path, multiplefolders, folders, save_npy,
save_mat, color_mode, output_video_format, codec):
self.path = path
self.save_npy = save_npy
self.save_mat = save_mat
self.color_mode = color_mode
self.video_format = output_video_format
self.codec = codec
if multiplefolders:
print('Will convert videos in '
+ repr(len(folders))
+ ' folders')
for i_folder in folders:
print("Working on Folder: " + i_folder)
os.chdir(path + '//' + i_folder)
print(os.getcwd())
try:
with open((path + '//' + i_folder +
'/experiment_settings.json'),
'r') as file:
self.experiment_variables = \
json.load(file)
self.convert_video()
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to convert videos')
else:
print('Will convert video in folder ' + path)
try:
with open((path +'/experiment_settings.json'), 'r') as file:
self.experiment_variables = \
json.load(file)
self.convert_video()
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to convert videos')
def convert_video(self):
framerate = None # Set to None to keep original FPS.
# Set to a number to define the OUTPUT FRAMERATE
# video file name - can be changed!
#video_file_name = 'converted_video'
# video format - can be changed!
with open(('experiment_settings.json'), 'r') as file:
experiment_variables = json.load(file)
# What files exist in the folder?
files = [p.replace('\\', '') for p in glob('*')]
# Need this as a bool switch in the first elif
video_object = None
# Currently, this is the only video format this script is
# looking for!
video_formats = ['h264']
# print(files)
# cycle through all the files in the folder as there's no
# way of knowing what the name of the video file is
for i in range(len(files)):
# only know that it ends with h264
if files[i].split('.')[-1] in video_formats:
# if there's more than one h264 file in the folder
# warn the user
if video_object is not None:
print(
'More than one video file in folder',
'Found more than one file with one of the'
'following extensions.\n' +
repr(video_formats) + '\n'
'You may only have a single '
'video file in a '
'folder\n'
'you wish to analyze')
# and break the loop
break
# if a video file is found get a reader object using imageio
video_object = imageio.get_reader(files[i], 'ffmpeg')
print('found video object: ' + files[i])
video_file_name = files[i].split('.')[-2]
if framerate is None:
framerate = experiment_variables['Framerate']
if self.video_format != 'None':
writer = imageio.get_writer(
video_file_name + '.' + self.video_format,
fps=framerate,
codec=self.codec)
for i, im in enumerate(video_object):
if self.color_mode == 'greyscale':
writer.append_data(im[:, :, 0]) # only take one
# channel of the three > grayscale
elif self.color_mode == 'color':
writer.append_data(im)
writer.close()
if self.save_npy or self.save_mat:
video_length = experiment_variables['Recording time']
# put all the frames in a numpy array - this will be
# hard to do with machines with little RAM for long
# videos/videos with high framerates!
for i, im in enumerate(video_object):
if i == 0:
if self.color_mode == 'greyscale':
images = np.zeros((
im.shape[0],
im.shape[1],
int(round(video_length * framerate))),
dtype=np.uint8)
elif self.color_mode == 'color':
images = np.zeros((
im.shape[0],
im.shape[1],
im.shape[2],
int(round(video_length*framerate))),
dtype=np.uint8)
try:
if self.color_mode == 'greyscale':
images[:, :, i] = im[:, :, 0] # only take one
# channel of the three > grayscale
elif self.color_mode == 'color':
images[:, :, :, i] = im
except IndexError as e:
print('Potential Error:\n' + repr(e))
if self.save_npy:
np.save(video_file_name + '.npy', images)
if self.save_mat:
if self.save_mat:
savemat((video_file_name +'.mat'),
mdict={'images': images})
class UndistortH264():
"""
Unfortunately, the RPi camera produces quite distorted images.
Luckily, this can be fixed using and undistort algorithm.
The code has essentially been copy-pasted from:
https://gitlab.com/davidtadres/cameracalibrations
Important: the mtx and dist numpy files have been created using
the standard lens. If a different lens is being used, the user
must follow the protocol in the
https://gitlab.com/davidtadres/cameracalibrations repository and
copy the resulting mtx and dist files into
the PiVR/undistort_matrices folder!
"""
def __init__(self, path, multiplefolders, folders, save_npy,
save_mat, output_video_format, codec, undistort_path):
self.path = path
self.save_npy = save_npy
self.save_mat = save_mat
self.output_video_format = output_video_format
self.codec = codec
self.undistort_path = undistort_path
# Create path to undistort matrices
main_folder = Path(os.path.realpath(__file__)).parents[0]
#print(Path(main_folder, 'undistort_matrices', '640_dist.npy'))
if multiplefolders:
print('Will convert videos in '
+ repr(len(folders))
+ ' folders')
for i_folder in folders:
print("Working on Folder: " + i_folder)
os.chdir(Path(path, i_folder))
print(Path(path, i_folder))
try:
with open(('experiment_settings.json'), 'r') as file:
self.experiment_variables = \
json.load(file)
if '640' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'640x480_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'640x480_dist.npy'))
self.undistort_video()
elif '1024' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'1024x768_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'1024x768_dist.npy'))
self.undistort_video()
elif '1296' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'1296x972_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'1296x972_dist.npy'))
self.undistort_video()
else:
tk.messagebox.showerror('Not implemented',
'You recorded the experiment at ' +
self.experiment_variables['Resolution'] +
'.\nOnly 640x480, 1024x786 and 1296x972\n'
'is currently implemented')
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to convert videos')
# Return to parent path
os.chdir(Path(path))
else:
print('Will convert video in folder ' + path)
try:
with open((path +'/experiment_settings.json'), 'r') as file:
self.experiment_variables = \
json.load(file)
if '640' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'640x480_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'640x480_dist.npy'))
self.undistort_video()
elif '1024' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'1024x768_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'1024x768_dist.npy'))
self.undistort_video()
elif '1296' in self.experiment_variables['Resolution']:
# Load the undistort matrices saved in the
# 'undistort'matrices' folder
self.mtx = np.load(Path(self.undistort_path,
'1296x972_mtx.npy'))
self.dist = np.load(Path(self.undistort_path,
'1296x972_dist.npy'))
self.undistort_video()
else:
tk.messagebox.showerror('Not implemented',
'You recorded the experiment at ' +
self.experiment_variables['Resolution'] +
'.\nOnly 640x480, 1024x786 and 1296x972\n'
'is currently implemented')
except FileNotFoundError:
messagebox.showerror(
'Experiment_settings.json missing',
'The file "Experiment_settings.json is '
'missing.\nUnable to convert videos')
def undistort_video(self):
framerate = None # Set to None to keep original FPS.
# Set to a number to define the OUTPUT FRAMERATE
with open(('experiment_settings.json'), 'r') as file:
experiment_variables = json.load(file)
# What files exist in the folder?
files = [p.replace('\\', '') for p in glob('*')]
# Need this as a bool switch in the first elif
video_object = None
# Currently, this is the only video format this script is
# looking for!
video_formats = ['h264']
# cycle through all the files in the folder as there's no
# way of knowing what the name of the video file is
for i in range(len(files)):
# only know that it ends with h264
if files[i].split('.')[-1] in video_formats:
# if there's more than one h264 file in the folder
# warn the user
if video_object is not None:
print(
'More than one video file in folder',
'Found more than one file with one of the'
'following extensions.\n' +
repr(video_formats) + '\n'
'You may only have a single '
'video file in a '
'folder\n'
'you wish to analyze')
# and break the loop
break
# if a video file is found get a reader object using imageio
video_object = imageio.get_reader(files[i], 'ffmpeg')
print('found video object: ' + files[i])
video_file_name = files[i].split('.')[-2] + '_undistorted'
if framerate is None:
framerate = experiment_variables['Framerate']
# Needs to be taken from experiment_settings as imageio seems
# to be unable to correctly read the length of a video
# created on the RPi
video_length = experiment_variables['Framerate'] * \
experiment_variables['Recording time']
# preallocate an empty array with dimension of rows, columns and then frames
images = np.zeros((video_length,
video_object.get_meta_data()['size'][1],
video_object.get_meta_data()['size'][0],
),
dtype=np.uint8)
# put all the frames in a numpy array - this will be hard to
# do with machines with little RAM for long videos/
# videos with high framerates!
for i, im in enumerate(video_object):
# images[:, :, i] = im[:, :, 0] # only take one channel
# of the three > grayscale
if i == 0:
# get shape of first image (doesn't change for the
# rest of the video) and get the optimal camera matrix
h, w = im[:, :, 0].shape[:2]
newcameramtx, roi = cv2.getOptimalNewCameraMatrix(
self.mtx, self.dist,(w, h),1, (w, h))
# instead of calling cv.2undistort for as many frames as there are
# just do the undistortion once (here) and then simply remap (cv2.remap below).
map1, map2 = cv2.initUndistortRectifyMap(cameraMatrix=self.mtx,
distCoeffs=self.dist,
R=None, # I think this is rotation
newCameraMatrix=newcameramtx,
size=(w,h),
m1type=cv2.CV_8U # 8bit, 1-byte unsigned integer, e.g. CV_32F would be 32bit/4-byte floating point
)
# undistort the image
#dst = cv2.undistort(im[:, :, 0], self.mtx, self.dist, None,
# newcameramtx)
dst = cv2.remap(im[:,:,0], map1, map2, interpolation=cv2.INTER_LINEAR)
# crop the image
# x, y, w, h = roi
# dst = dst[y:y + h, x:x + w]
try:
# assign the image to the array
images[i, :, :] = dst
except IndexError as e:
print(e)
messagebox.showerror('Index Error!',
'Please double check if the video really is '
+ repr(experiment_variables['Framerate']) + 'fps '
'and '
+ repr(experiment_variables['Recording time']) +
'seconds long\n\n')
# Stop the loop
break
if self.output_video_format != 'None':
print('Writing the video file')
imageio.mimwrite(video_file_name + '.' + self.output_video_format,
images[0:i],
fps=framerate,
codec=self.codec)
if self.save_npy:
print('Writing the numpy file')
np.save(video_file_name + '.npy', images)
if self.save_mat:
print('Writing the matlab file')
savemat(Path(video_file_name + '.mat'),
mdict={'images': images})