__author__ = 'David Tadres'
__project__ = 'PiVR'
import operator
import os
import time
import tkinter as tk
import traceback
import numpy as np
import skimage
from PIL import Image, ImageTk
from scipy import ndimage
from skimage import morphology
from skimage.draw import line
from skimage.measure import regionprops, label
from skimage.transform import resize as resize_image
from pathlib import Path
try:
import cv2
except ModuleNotFoundError:
pass #No need to do anything as nothing with cv2 is called if module
# can't be imported in start_GUI.py
import tracking_help_classes as thc
TESTING_LOOP_TIME = False
# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# Since this command only works in Linux it is caught using
# try-except otherwise it's throw an error in a Windows system.
try:
if os.uname()[4][:3] == 'arm':
# This will yield True for both a Raspberry and for M1 Chip
# Apple devices.
# Use this code snippet
# (from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi)
import re
CPUINFO_PATH = Path("/proc/cpuinfo")
if CPUINFO_PATH.exists():
with open(CPUINFO_PATH) as f:
cpuinfo = f.read()
if re.search(r"^Model\s*:\s*Raspberry Pi", cpuinfo, flags=re.M) is not None:
# if True, is Raspberry Pi
RASPBERRY = True
LINUX = True
else: # Test if one more intendation necessary or not. On Varun's computer
# is Apple M1 chip (or other Arm CPU device).
RASPBERRY = False
LINUX = True
else:
# is either Mac or Linux
RASPBERRY = False
LINUX = True
DIRECTORY_INDICATOR = '/'
except AttributeError:
# is Windows
RASPBERRY = False
LINUX = False
DIRECTORY_INDICATOR = '\\'
'''
################################################################
# START OF NEW CODE
################################################################
# When implementing the head-tail classification fix uncomment the
# below and comment the original
THINING_FUNCTION = 'thin'
try:
if int(skimage.__version__[2:4]) < 13:
tk.messagebox.showerror('Fatal Error',
'You must have skimage version >= 13 '
'installed to use PiVR')
import sys
sys.exit()
except ValueError: # This would happen if the int() above is a string.
print('check skimage.__version__ in fast_tracking.py')
################################################################
# END OF NEW CODE
################################################################
'''
# Currently the newest skimage version I can easily put on the Pi using
# Raspian is 0.12 - the thin function was only introduced
# in version 13. In order to keep it simple just use the
# backup function if user is using 0.12, but print a statement
if int(skimage.__version__[2:4]) >= 13:
THINING_FUNCTION = 'thin'
else:
THINING_FUNCTION = 'skeletonize'
#print('Skimage version: ' + skimage.__version__ + '! In order
# to use better skeletonize function update to at least'
[docs]class FastTrackingControl():
"""
This class controls the tracking algorithm.
It was necessary to create a second class as the 'record_video'
function of pi-camera requires it's own class to deliver the images.
This function can be cleaned up further by removing a few extra variables however,
the current function performs well.
"""
def __init__(self,
genotype='Unknown',
recording_framerate=2,
display_framerate = None,
resolution=None,
recordingtime=None,
initial_data=None,
boxsize=20,
signal=None,
frames_to_define_orientation=5,
debug_mode=None,
debug_mode_resize=1,
repair_ht_swaps=True,
cam=None,
dir=None,
pixel_per_mm=None,
model_organism = 'Not in List',
vr_arena = None,
#vr_GPIO = None,
pwm_object = None,
time_dependent_file=None,
high_power_led_bool=False,
offline_analysis=False,
minimal_speed_for_moving=0.5,
organisms_and_heuristics=None,
post_hoc_tracking=False,
datetime=None,
output_channel_one=[],
output_channel_two=[],
output_channel_three=[],
output_channel_four=[],
simulated_online_analysis=False,
overlay_bool=False,
controller=None,
time_delay_due_to_animal_detection=0,
vr_update_rate = 1,
pwm_range = 100,
video_filename = 'test.yuv',
pts_filename = 'pts_test.txt',
pi_time_filename = 'system_time_test.txt',
vr_stim_location = 'NA',
save_centroids_npy = False,
save_heads_npy = False,
save_tails_npy = False,
save_midpoints_npy = False,
save_bbox_npy=False,
save_stim_npy=False,
save_thresh_npy=False,
save_skeleton_npy=False,
undistort_dst=None,
undistort_mtx=None,
newcameramtx=None
):
self.genotype = genotype
self.recording_framerate = recording_framerate
self.display_framerate = display_framerate
self.resolution = resolution
self.recordingtime = recordingtime
self.initial_data = initial_data
self.boxsize = boxsize
self.signal = signal
self.frames_to_define_orientation = frames_to_define_orientation
self.debug_mode = debug_mode
self.resize = int(debug_mode_resize)
self.repair_ht_swaps = repair_ht_swaps
# self.camera will be the picamera object on the RPi or it is
# a numpy array/list of images if offline analysis
self.cam = cam
if RASPBERRY:
# set camera to desired framerate - this is neccessary as
# user might not have pressed 'update framerate'
# after entering a different framerate number
self.cam.framerate = self.recording_framerate
# confirm that framerate has beend changed
# print('Framerate set to: ' + repr(self.camera.framerate))
# In order for clarity in offline analysis the images are
# stored in a array called images.
self.dir = dir
self.pixel_per_mm = pixel_per_mm
self.model_organism = model_organism
self.vr_arena = vr_arena
# vr_GPIO = None,
self.pwm_object = pwm_object
self.time_dependent_file = time_dependent_file
self.high_power_led_bool = high_power_led_bool
self.offline_analysis = offline_analysis
self.minimal_speed_for_moving = minimal_speed_for_moving
self.organisms_and_heuristics = organisms_and_heuristics
self.post_hoc_tracking = post_hoc_tracking
self.datetime = datetime
self.output_channel_one = output_channel_one
self.output_channel_two = output_channel_two
self.output_channel_three = output_channel_three
self.output_channel_four = output_channel_four
self.simulated_online_analysis = simulated_online_analysis
self.overlay_bool = overlay_bool
self.controller = controller
self.time_delay_due_to_animal_detection = \
time_delay_due_to_animal_detection
self.vr_update_rate = vr_update_rate
self.pwm_range = pwm_range
self.vr_stim_location = vr_stim_location
self.save_centroids_npy = save_centroids_npy
self.save_heads_npy = save_heads_npy
self.save_tails_npy = save_tails_npy
self.save_midpoints_npy = save_midpoints_npy
self.save_bbox_npy = save_bbox_npy
self.save_stim_npy = save_stim_npy
self.save_thresh_npy = save_thresh_npy
self.save_skeleton_npy = save_skeleton_npy
self.undistort_dst = undistort_dst
self.undistort_mtx = undistort_mtx
self.newcameramtx = newcameramtx
self.i_tracking = 0
# check if pictures come in as numpy array or as list of
# single pictures
if offline_analysis or simulated_online_analysis:
if type(cam) is np.ndarray:
self.images_as_npy = True
elif type(cam) is list:
self.images_as_npy = False
# This adresses #66: It's better to have zero in data.csv in the
# first few rows than to have frame number divergence between
# data.csv and time dependent stimulus!
self.i_tracking = self.initial_data.counter
# If on the Raspberry the assumption is that it will only be
# used to track the animal not to analyze data
# post-hoc!
if RASPBERRY and not self.offline_analysis:
# how many frames does the user want
self.total_frame_number = recordingtime * recording_framerate
# Not true anymore - will record as many frames as it can
# in a given time!
#print('Will record a total of ' + repr(
# self.total_frame_number) + ' frames.')
# else if not on Raspberry, assumption is that the user
# wants to do a post-analysis - it should be possible to
# decouple the framerate that has been recorded
# (e.g 2fps) and the display framerate the user can watch the
# animal behave (e.g. at 10frames) to speed up the analysis.
elif offline_analysis or simulated_online_analysis:
if not RASPBERRY:
# Identify the number of frames are there
if self.images_as_npy:
self.total_frame_number = \
cam.shape[2] # - (initial_data.counter)
# Note: The total_frame_number was changed to address
# Issue #66: It's better to have empty rows at the
# beginning of a post-hoc experiment than to have
# time between tracked and stimulus file break!
else:
self.total_frame_number = \
len(cam) - (initial_data.counter)
self.display_framerate = display_framerate
self.interval = (1 / self.display_framerate) * 1000
delay = \
int(self.total_frame_number
/ self.display_framerate * 1000 + 200)
# need for loop
self.start_at_frame = initial_data.counter
# needed to subtract the original time delay in case
# there this is a simulated experiment
self.delay_due_to_detection = 0
# this array will hold the timestamps recorded during the
# experiment
self.real_time = np.zeros((self.total_frame_number))
# display time is array that will just be filled with
# expected time passed for a given from (not the real time!)
if not RASPBERRY:
self.display_time = np.zeros((self.total_frame_number))
self.display_time[0] = 0 # todo - pretty sure that's not correct
# Search Box is the part of the image the algorithm will be
# looking for an animal in the next frame. It has four
# variables, row_min (minimal y value), row_max (maximal y
# value), col_min (minimal x value) and col_max (maximal x
# value). The search box is defined as the bounding box of
# the animal + a boxsize which should be dynamic in the future!
# must be possible to go negative, otherwise if goes below 0
# goes to 32K!
self.search_boxes = np.zeros((self.total_frame_number, 4),
dtype=np.int16)
# pre-allocation of the empty array - might be good to let
# user decide if skel needs to be pre-allocated. For my
# experiments memory will never be an issue but if someone
# want to track a fast animal for a long time memory might
# run out. In numpy even the bool arrays are 8 bit (and not
# 1bit). For example a boxsize of 25 and 9000 frames (
# 9000/30fps=5minutes) the preallocation for EACH array is
# 22.5Mb. If someone where to run and experiment for one hour
# (3600 seconds * 30fps = 108000 frames). Each array would be
# 270Mb. That will probably lead to memory error.
# Todo in the future we must implement bitarray (or similar)
# which will allow true 1Bit arrays for the binary arrays
# OR: Don't save images and only save centroid/head for this
# kind of long experiment.
size_of_expected_animals = \
organisms_and_heuristics[model_organism][
'max_skeleton_length_mm'] * pixel_per_mm
self.image_raw = np.zeros(
(int(np.ceil(size_of_expected_animals * 2)),
int(np.ceil(size_of_expected_animals * 2)),
self.total_frame_number), dtype=np.uint8)
self.image_thresh = np.zeros((
int(np.ceil(size_of_expected_animals * 2)),
int(np.ceil(size_of_expected_animals * 2)),
self.total_frame_number), dtype=np.bool_)
self.image_skel = np.zeros((
int(np.ceil(size_of_expected_animals * 2)),
int(np.ceil(size_of_expected_animals * 2)),
self.total_frame_number), dtype=np.bool_)
self.local_threshold = np.zeros((self.total_frame_number))
# preallocate bounding box and centroid array
self.bounding_boxes = np.zeros((
4, self.total_frame_number), dtype=np.uint16)
self.centroids = np.zeros((
self.total_frame_number, 2), dtype=np.int16)
self.midpoints = np.zeros((
self.total_frame_number, 2), dtype=np.int16)
self.length_skeleton = np.zeros((
self.total_frame_number), dtype=np.uint16)
# if self.Drosophila_larva: # todo - always keep on
self.tails = np.zeros((
self.total_frame_number, 2), dtype=np.int16)
self.heads = np.zeros((
self.total_frame_number, 2), dtype=np.int16)
self.endpoints = np.zeros((
2, 2, self.total_frame_number), dtype=int)
self.ht_swap = np.zeros((
self.total_frame_number), dtype=np.bool_)
# irrespective if there's a stimulation or not this array is
# preallocated - it allows for saving the pandas array later
if self.vr_arena is not None:
self.stimulation = np.zeros((self.total_frame_number),
dtype=np.float32)
elif self.time_dependent_file is not None:
self.stimulation = np.zeros((self.total_frame_number, 4),
dtype=np.float32)
else:
self.stimulation = None
if post_hoc_tracking:
# this is for the case that the user is testing a
# video/full frame recording to get all the heuristic rules
# to track an animal in the future for each frame save:
# 0) the filled area in pixel
# 1) the filled area divided by pixel/mm
# 2) the major over minor axis
# 3) the eccentricty
# 4) the length - this is going to be very useful for the
# boxsize and the searchboxsize
# 5) length of skeleton divided by pixel/mm
# 6) speed in pixel per frame
# 7) speed in mm/frame (just #5 divided by pixel per frame)
# 8) speed in mm/s (7 * frames per second)
self.heuristic_parameters = np.zeros((9, cam.shape[2]))
self.post_hoc_tracking = post_hoc_tracking
else:
self.heuristic_parameters = None
self.post_hoc_tracking = False
# even though the user won't actually see anything (except in
# offline analysis and debug mode on) this child window is
# created to turn off the main window. This will make it
# impossible to change anything in the main GUI for the user
# while the program is running
self.child = tk.Toplevel()
self.child.grab_set()
# debug mode can be chosen by the user to display three
# images while tracking (all only the search box):
# the raw image, the thresholded image and on the far right
# the raw image with the detected bounding box
if debug_mode:
# set a title for the window
self.child.wm_title('Debug Window')
# the function to call if the user presses the 'x' (
# cancel) button on the top right of the debug window
self.child.protocol("WM_DELETE_WINDOW", self.on_closing)
# self.child.geometry("100x100") # You want the size of
# the app to be 500x500
self.child.attributes("-topmost", True)
self.title_raw = tk.Label(self.child,
text='Original Image')
self.title_raw.grid(row=0, column=0)
self.title_binary = tk.Label(
self.child,
text='Binary Image')
self.title_binary.grid(row=0, column=1)
self.below_binary = tk.Label(
self.child,
text='Grey indicates Search Box'
'\nWhite indicates pixels below threshold (in '
'search box)'
'\nBlack indicates pixels not considered ('
'outside search box)',
anchor='w',
justify='left')
self.below_binary.grid(row=2, column=1)
self.title_detected = tk.Label(
self.child,
text='Detected animal')
self.title_detected.grid(row=0, column=2)
self.below_detected = tk.Label(
self.child,
text='Filled Area: '
'\nEccentricity: '
'\nMajor over minor axis: ',
anchor='w',
justify='left')
self.below_detected.grid(row=2, column=2)
# a canvas for the raw image
# Need to just fix the window size!
# resolution = [640, 480]
# Window size should never be too large as one can't see
# anything. Just fix it assuming that everyone has a
# 1024x768 screen nowadays.
self.canvas_width = 320 #(1024/3) ~=341, 320 is half the
# width of the input frame
self.canvas_height = 240
self.child_canvas_top_left = tk.Canvas(
self.child,
width=self.canvas_width,
height=self.canvas_height)
self.child_canvas_top_left.grid(row=1, column=0)
# disable change of size when image size changes
self.child_canvas_top_left.grid_propagate(0)
# a canvas for the binary image
self.child_canvas_top_middle = tk.Canvas(
self.child,
width=self.canvas_width,
height=self.canvas_height)
self.child_canvas_top_middle.grid(row=1, column=1)
# disable change of size when image size changes
self.child_canvas_top_middle.grid_propagate(0)
# a canvas for the raw image with bounding box
self.child_canvas_top_right = tk.Canvas(
self.child,
width=self.canvas_width,
height=self.canvas_height)
self.child_canvas_top_right.grid(row=1, column=2)
self.child_canvas_top_right.grid_propagate(0)
# Show the time/frames that is remaining until the window closes
self.time_remaining_label = tk.Label(self.child, text='')
self.time_remaining_label.grid(row=3, column=0,
columnspan=3)
self.pause_debug_var = tk.IntVar()
self.pause_button = tk.Button(self.child,
text='Pause tracking',
command=self.pause_debug)
self.pause_button.grid(row=2, column=0)
else:
self.time_remaining_label = None
self.child_canvas_top_left = None
self.child_canvas_top_middle = None
self.child_canvas_top_right = None
self.canvas_width = None
self.canvas_height = None
self.below_detected = None
self.pause_debug_var = None
# In case of not-testing, just pass the None around.
self.loop_time_measurement = None
if TESTING_LOOP_TIME:
# In case of testing, save the loop time in the
# experimental folder
self.loop_time_measurement = np.zeros((
recordingtime*recording_framerate,2))
self.loop_time_measurement.fill(np.nan)
if RASPBERRY:
self.run_experiment()
if offline_analysis or simulated_online_analysis:
self.offline_analysis_func()
[docs] def offline_analysis_func(self):
"""
This function is called when the user selects either the
"Tools->Analysis->Single Animal tracking" or the
"Debug->Simulate Online Tracking" option. It calls the
identical animal tracking function as the live version,
the only difference being the way the images are being provided.
While in the live version, the images are streamed from the
camera, in the simulated online version the images are
provided as a numpy array.
"""
try:
FastTrackingVidAlg(
genotype=self.genotype,
recording_framerate=self.recording_framerate,
display_framerate=self.display_framerate,
resolution=self.resolution,
recordingtime=self.recordingtime,
initial_data=self.initial_data,
boxsize=self.boxsize,
signal=self.signal,
frames_to_define_orientation=self.frames_to_define_orientation,
debug_mode=self.debug_mode,
debug_mode_resize=self.resize,
repair_ht_swaps=self.repair_ht_swaps,
cam=self.cam,
dir=self.dir,
pixel_per_mm=self.pixel_per_mm,
model_organism=self.model_organism,
vr_arena=self.vr_arena,
# vr_GPIO = None,
pwm_object=self.pwm_object,
time_dependent_file=self.time_dependent_file,
high_power_led_bool=self.high_power_led_bool,
offline_analysis=self.offline_analysis,
minimal_speed_for_moving=self.minimal_speed_for_moving,
organisms_and_heuristics=self.organisms_and_heuristics,
post_hoc_tracking=self.post_hoc_tracking,
datetime=self.datetime,
output_channel_one=self.output_channel_one,
output_channel_two=self.output_channel_two,
output_channel_three=self.output_channel_three,
output_channel_four=self.output_channel_four,
simulated_online_analysis=self.simulated_online_analysis,
overlay_bool=self.overlay_bool,
controller=self.controller,
time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection,
vr_update_rate=self.vr_update_rate,
pwm_range=self.pwm_range,
#video_filename='test.yuv',
#pts_filename='pts_test.txt',
#pi_time_filename='system_time_test.txt',
real_time=self.real_time,
i_tracking=self.i_tracking,
total_frame_number=self.total_frame_number,
search_boxes=self.search_boxes,
image_raw=self.image_raw,
image_skel=self.image_skel,
image_thresh=self.image_thresh,
local_threshold=self.local_threshold,
bounding_boxes=self.bounding_boxes,
centroids=self.centroids,
midpoints=self.midpoints,
length_skeleton=self.length_skeleton,
tails=self.tails,
heads=self.heads,
endpoints=self.endpoints,
ht_swap=self.ht_swap,
stimulation=self.stimulation,
heuristic_parameters=self.heuristic_parameters,
time_remaining_label=self.time_remaining_label,
child_canvas_top_left=self.child_canvas_top_left,
child_canvas_top_middle=self.child_canvas_top_middle,
child_canvas_top_right=self.child_canvas_top_right,
child = self.child,
canvas_height=self.canvas_height,
canvas_width=self.canvas_width,
below_detected=self.below_detected,
pause_debug_var = self.pause_debug_var,
save_centroids_npy=self.save_centroids_npy,
save_heads_npy=self.save_heads_npy,
save_tails_npy=self.save_tails_npy,
save_midpoints_npy=self.save_midpoints_npy,
save_bbox_npy=self.save_bbox_npy,
save_stim_npy=self.save_stim_npy,
save_thresh_npy=self.save_thresh_npy,
save_skeleton_npy=self.save_skeleton_npy,
undistort_dst=self.undistort_dst,
undistort_mtx=self.undistort_mtx,
newcameramtx=self.newcameramtx
)
except Exception as caught_error:
self.error_message_func(error_stack=caught_error)
# Note how many frames have been recorded in what time
captured_images = np.count_nonzero(self.real_time)
expected_no_of_images = self.recordingtime \
* self.recording_framerate
print('Crashed at frame ' + repr(captured_images) + ' of '
+ repr(expected_no_of_images) )
# after either successfully finishing or unexpectedly
# finishing early with the offline tracking, call the
# function that will save all the data
#self.child.after(0, self.after_tracking)
# Was the below, had to change it to get the "Tools>Single
# Animal Tracking" option for more than one folder to save
# the data in the correct folder!
#self.child.after(0, self.after_tracking)
#self.child.after(300, lambda : self.child.destroy())
self.after_tracking()
[docs] def run_experiment(self):
"""
This function is called during live tracking on the PiVR.
Essentially, it start to record a video but provides a custom
output. See `here
<https://picamera.readthedocs.io/en/release-1.13/recipes2
.html#custom-outputs>`__.
The video records frames in the **YUV** format. See `here
<https://picamera.readthedocs.io/en/release-1.13/recipes2
.html#unencoded-image-capture-yuv-format>`__.
for explanation of that particular format.
YUV was chosen as it encodes a greyscale version of the image
(the Y' component) at full resolution (e.g. 307'200bytes for
a 640x480) image while the U and the V component, which
essentially encode the color of the image only have a quarter
of the resolution (e.g. 76'800bytes for a 640x480 image). As
the color is anyway discarded, this allows a more efficient
usage of the Raspberry Pi's buffer compared to using,
for example RGB.
"""
try:
self.cam.start_recording(
FastTrackingVidAlg(
genotype=self.genotype,
recording_framerate=self.recording_framerate,
display_framerate = self.display_framerate,
resolution=self.resolution,
recordingtime=self.recordingtime,
initial_data=self.initial_data,
boxsize=self.boxsize,
signal=self.signal,
frames_to_define_orientation=self.frames_to_define_orientation,
debug_mode=self.debug_mode,
debug_mode_resize=self.resize,
repair_ht_swaps=self.repair_ht_swaps,
cam=self.cam,
dir=self.dir,
pixel_per_mm=self.pixel_per_mm,
model_organism = self.model_organism,
vr_arena = self.vr_arena,
#vr_GPIO = None,
pwm_object =self.pwm_object,
time_dependent_file=self.time_dependent_file,
high_power_led_bool=self.high_power_led_bool,
offline_analysis=self.offline_analysis,
minimal_speed_for_moving=self.minimal_speed_for_moving,
organisms_and_heuristics=self.organisms_and_heuristics,
post_hoc_tracking=self.post_hoc_tracking,
datetime=self.datetime,
output_channel_one=self.output_channel_one,
output_channel_two=self.output_channel_two,
output_channel_three=self.output_channel_three,
output_channel_four=self.output_channel_four,
simulated_online_analysis=self.simulated_online_analysis,
overlay_bool=self.overlay_bool,
controller=self.controller,
time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection,
vr_update_rate = self.vr_update_rate,
pwm_range = self.pwm_range,
# video_filename = 'test.yuv' # Just in case we ever want to also record video while tracking!
real_time=self.real_time,
i_tracking = self.i_tracking,
total_frame_number=self.total_frame_number,
search_boxes = self.search_boxes,
image_raw = self.image_raw,
image_thresh = self.image_thresh,
image_skel = self.image_skel,
local_threshold = self.local_threshold,
bounding_boxes = self.bounding_boxes,
centroids = self.centroids,
midpoints = self.midpoints,
length_skeleton = self.length_skeleton,
tails = self.tails,
heads = self.heads,
endpoints = self.endpoints,
ht_swap = self.ht_swap,
stimulation = self.stimulation,
heuristic_parameters = self.heuristic_parameters,
loop_time_measurement = self.loop_time_measurement,
vr_stim_location = self.vr_stim_location,
save_centroids_npy=self.save_centroids_npy,
save_heads_npy=self.save_heads_npy,
save_tails_npy=self.save_tails_npy,
save_midpoints_npy=self.save_midpoints_npy,
save_bbox_npy=self.save_bbox_npy,
save_stim_npy=self.save_stim_npy,
save_thresh_npy=self.save_thresh_npy,
save_skeleton_npy=self.save_skeleton_npy,
undistort_dst=self.undistort_dst,
undistort_mtx=self.undistort_mtx,
newcameramtx=self.newcameramtx
),
format='yuv'
)
self.cam.wait_recording(self.recordingtime)
except Exception as caught_error:
self.cam.preview_window = (0, 0, 180, 180)
if self.overlay_bool:
self.controller.all_common_variables.overlay.window = (
0, 0, 180, 180)
# todo - write more information, e.g. at what frame did
# the experiment stop. This might be useful for analysis.
# also todo: What do these cryptic error messages mean?
# Have a FAQ ready OR automatically go into traceback
# and look for known errors and suggest what the error
# could be based on the position.
# self.child.after(0, lambda : self.error_message_func(error_stack=caught_error))
self.error_message_func(error_stack=caught_error)
finally:
# clean up.
try:
self.cam.stop_recording()
except: # Not good to have blank exception, but here it
# should relally only
# catch PiCameraErrors!
pass
# Note how many frames have been recorded in what time
captured_images = np.count_nonzero(self.real_time)
expected_no_of_images = self.recordingtime * self.recording_framerate
# Note how many frames have been recorded in what time
print('Captured ' + repr(captured_images) + ' of ' +
repr(expected_no_of_images) + ' images at %.2ffps' %
(self.recording_framerate))
# TODO save this somwhere! Maybe in the json file
# TODO also do tkinter messagebox!
# after either successfully finishing or unexpectedly
# finishing early with the offline tracking, call the
# function that will save all the data
#self.child.after(0, self.after_tracking)
self.child.after(0, self.after_tracking)
self.child.after(300, lambda : self.child.destroy())
[docs] def on_closing(self):
"""
Function to use when the user clicks on the X to close the
window.
This should never be called in a live experiment as there is
simply no option to click to close a window.
Will ask if user wants to quit the experiment.
Will save the experiment so far
"""
if tk.messagebox.askokcancel("Cancel",
"Do you want to cancel?"):
self.offline_analysis_running = False
save = tk.messagebox.askquestion("Save?",
"Do you want to save\n"
"the analysis completed\n"
"so far?")
if save == 'yes':
thc.Save(heads=self.heads,
tails=self.tails,
centroids=self.centroids,
image_skel=self.image_skel,
image_raw=self.image_raw,
image_thresh=self.image_thresh,
local_threshold=self.local_threshold,
background=self.initial_data.smoothed_goodbackground,
real_time=self.real_time,
pixel_per_mm=self.pixel_per_mm,
bounding_boxes=self.bounding_boxes,
stimulation=self.stimulation,
arena=self.vr_arena,
heuristic_data=self.heuristic_parameters,
datetime=self.datetime,
midpoints=self.midpoints,
recording_time=self.recordingtime,
framerate=self.recording_framerate,
time_dep_stim_file=self.time_dependent_file,
#HP_setup=self.high_power_led_bool,
pwm_range=self.pwm_range,
save_centroids_npy=self.save_centroids_npy,
save_heads_npy=self.save_heads_npy,
save_tails_npy=self.save_tails_npy,
save_midpoints_npy=self.save_midpoints_npy,
save_bbox_npy=self.save_bbox_npy,
save_stim_npy=self.save_stim_npy,
save_thresh_npy=self.save_thresh_npy,
save_skeleton_npy=self.save_skeleton_npy,
undistort_dst=self.undistort_dst,
undistort_mtx=self.undistort_mtx,
newcameramtx=self.newcameramtx
)
# save the data
# pause the script to give time to save
self.child.after(100)
self.experiment_stopped_early = True
# Set the main window active again
self.child.grab_release()
# close the tkinter window
self.child.destroy()
[docs] def error_message_func(self, error_stack):
"""
This function is called if the recording can not continue
until the end as defined by frame rate * recording_length.
It will write the error into a file called "DATE-ERROR.txt"
and will place it in the experiment folder along with the other files of the given trial.
"""
captured_images = np.count_nonzero(self.real_time)
expected_no_of_images = self.recordingtime * self.recording_framerate
with open(self.datetime + '_ERROR.txt', 'a') as file:
file.write('Unexpected Error at frame ' + repr(
captured_images) + ' of ' +
repr(int(
expected_no_of_images)) + '\n\n')
file.write('Traceback (most recent call last): ' + str(
error_stack) + '\n\n')
if "IndexError: index -1 is out of bounds for axis 0 " \
"with size 0" in traceback.format_exc() \
and "self.filled_area = areas_sorted['filled " \
"area'][-1]" in traceback.format_exc():
file.write('Error #1\n'
'The following 2 lines:'
'\n'"self.filled_area = "
"areas_sorted['filled area'][-1]"
"\nIndexError: index -1 is out of "
"bounds for axis 0 with size 0"
"\nindicate that the animal was not "
"found in the region of interest."
"\nThis can happen if the animal moves "
"faster than expected."
"\nTo circumvent this problem increase "
"the 'max_speed_animal_mm_per_s' parameter"
"in the 'list_of_available_organisms.json' "
"file"
"\nAlternatively, your animal might be "
"able to hide under some obstruction. "
"If that is the case please clear the "
"arena so that the camera can always see "
"at least parts of the animal"
"\n")
# elif.... put all the known errors and print possible solutions
else:
file.write('Error that has not been classfied yet!\n\n')
file.write('Full error traceback below: \n\n')
file.write(traceback.format_exc())
tk.messagebox.showerror('Error',
'The experiment stopped earlier than requested.'
'\nSee the ERROR.txt file in the experiment folder'
'\nfor a detailed traceback for debugging purposes'
'\n'
)
[docs] def after_tracking(self):
"""
When live tracking is done, the GPIOs must be turned off.
Then save the data that was just collected by calling the
function 'save' (pressing 'save' button) in tracking_help_classes.
"""
if self.high_power_led_bool:
# turn of GPIOs of Channel one - keep in loop instead of just saying GPIO17 off for forward compatibility!!
# list comprehension of Channel 1
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_one[i_stim][0],
dutycycle=self.pwm_range)
for i_stim in range(len(self.output_channel_one))]
# list comprehension of Channel 2
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_two[i_stim][0],
dutycycle=self.pwm_range)
for i_stim in range(len(self.output_channel_two))]
# list comprehension of Channel 3
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_three[i_stim][0],
dutycycle=self.pwm_range)
for i_stim in range(len(self.output_channel_three))]
# list comprehension of Channel 4
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_four[i_stim][0],
dutycycle=self.pwm_range)
for i_stim in range(len(self.output_channel_four))]
print('High powered LED are all turned off')
else:
# turn of GPIOs of Channel one - keep in loop instead of
# just saying GPIO17 off for forward compatibility!!
# list comprehension of Channel 1
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_one[i_stim][0],
dutycycle=0)
for i_stim in range(len(self.output_channel_one))]
# list comprehension of Channel 2
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_two[i_stim][0],
dutycycle=0)
for i_stim in range(len(self.output_channel_two))]
# list comprehension of Channel 3
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_three[i_stim][0],
dutycycle=0)
for i_stim in range(len(self.output_channel_three))]
# list comprehension of Channel 4
[self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_four[i_stim][0],
dutycycle=0)
for i_stim in range(len(self.output_channel_four))]
print('Normal LED turned off')
if self.offline_analysis: # todo why different from online?
thc.Save(heads=self.heads,
tails=self.tails,
centroids=self.centroids,
image_skel=self.image_skel,
image_raw=self.image_raw,
image_thresh=self.image_thresh,
local_threshold=self.local_threshold,
background=self.initial_data.smoothed_goodbackground,
real_time=self.real_time,
pixel_per_mm=self.pixel_per_mm,
bounding_boxes=self.bounding_boxes,
stimulation=self.stimulation,
arena=self.vr_arena,
heuristic_data=self.heuristic_parameters,
datetime=self.datetime,
midpoints=self.midpoints,
recording_time=self.recordingtime,
framerate=self.recording_framerate,
save_centroids_npy=self.save_centroids_npy,
save_heads_npy=self.save_heads_npy,
save_tails_npy=self.save_tails_npy,
save_midpoints_npy=self.save_midpoints_npy,
save_bbox_npy=self.save_bbox_npy,
save_stim_npy=self.save_stim_npy,
save_thresh_npy=self.save_thresh_npy,
save_skeleton_npy=self.save_skeleton_npy,
undistort_dst=self.undistort_dst,
undistort_mtx=self.undistort_mtx,
newcameramtx=self.newcameramtx
)
self.child.destroy()
else:
self.child.after(0, lambda: thc.Save(
heads=self.heads,
tails=self.tails,
centroids=self.centroids,
image_skel=self.image_skel,
image_raw=self.image_raw,
image_thresh=self.image_thresh,
local_threshold=self.local_threshold,
background=self.initial_data.smoothed_goodbackground,
real_time=self.real_time,
pixel_per_mm=self.pixel_per_mm,
bounding_boxes=self.bounding_boxes,
stimulation=self.stimulation,
arena=self.vr_arena,
heuristic_data=self.heuristic_parameters,
datetime=self.datetime,
midpoints=self.midpoints,
time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection,
loop_time=self.loop_time_measurement,
recording_time=self.recordingtime,
framerate=self.recording_framerate,
time_dep_stim_file=self.time_dependent_file,
high_power_led_bool=self.high_power_led_bool,
pwm_range=self.pwm_range,
save_centroids_npy=self.save_centroids_npy,
save_heads_npy=self.save_heads_npy,
save_tails_npy=self.save_tails_npy,
save_midpoints_npy=self.save_midpoints_npy,
save_bbox_npy=self.save_bbox_npy,
save_stim_npy=self.save_stim_npy,
save_thresh_npy=self.save_thresh_npy,
save_skeleton_npy=self.save_skeleton_npy,
undistort_dst=self.undistort_dst,
undistort_mtx=self.undistort_mtx,
newcameramtx=self.newcameramtx
)
)
# give 300ms to save, then destroy the child window and
# go back to the main window. Should be fine, even if
# it takes longer as the main window will just be frozen
# until saving is complete
self.child.after(300, lambda: self.child.destroy())
def pause_debug(self):
if self.pause_debug_var.get():
self.pause_debug_var.set(False)
self.pause_button.config(text='Pause tracking')
else:
self.pause_debug_var.set(True)
self.pause_button.config(text='Continue tracking')
[docs]class FastTrackingVidAlg(object):
"""
This class takes either a camera object (so far only from the
RPicamera) or images in a 3D numpy array (y,x and time). When run
on the RPi it is assumed it's running a live experiment. The camera
frame rate will be set to the frame rate the user wants (if user
asks for higher frame rate than the camera can give the program will
throw an error directly in the GUI). The camera will then deliver
each image into an in-memory stream. The images will then be
formatted to be in 2D with the right resolution.
(For future improvement: To increase speed one could only take
the bytes that are actually needed (we do have the search_box)).
"""
def __init__(self,
genotype='Unknown',
recording_framerate=2,
display_framerate = None,
resolution=None,
recordingtime=None,
initial_data=None,
boxsize=20,
signal=None,
frames_to_define_orientation=5,
debug_mode=None,
debug_mode_resize=1,
repair_ht_swaps=True,
cam=None,
dir=None,
pixel_per_mm=None,
model_organism = 'Not in List',
vr_arena = None,
#vr_GPIO = None,
pwm_object = None,
time_dependent_file=None,
high_power_led_bool=False,
offline_analysis=False,
minimal_speed_for_moving=0.5,
organisms_and_heuristics=None,
post_hoc_tracking=False,
datetime=None,
output_channel_one=[],
output_channel_two=[],
output_channel_three=[],
output_channel_four=[],
simulated_online_analysis=False,
overlay_bool=False,
controller=None,
time_delay_due_to_animal_detection=0,
vr_update_rate = 1,
pwm_range = 40000,
video_filename = 'test.yuv',
real_time= None,
i_tracking = None,
total_frame_number = 10,
search_boxes = None,
image_raw = None,
image_thresh = None,
image_skel = None,
local_threshold = None,
bounding_boxes = None,
centroids = None,
midpoints = None,
length_skeleton = None,
tails = None,
heads = None,
endpoints = None,
ht_swap = None,
stimulation = None,
heuristic_parameters = None,
time_remaining_label = None,
child_canvas_top_left = None,
child_canvas_top_middle = None,
child_canvas_top_right = None,
child = None,
loop_time_measurement = None,
canvas_width = None ,
canvas_height = None,
below_detected = None,
pause_debug_var = None,
vr_stim_location = 'NA',
save_centroids_npy=False,
save_heads_npy=False,
save_tails_npy=False,
save_midpoints_npy=False,
save_bbox_npy=False,
save_stim_npy=False,
save_thresh_npy=False,
save_skeleton_npy=False,
undistort_dst=None,
undistort_mtx=None,
newcameramtx=None
):
#self.video_output = io.open(video_filename, 'wb')
self.start_time = None
self.total_frame_number = total_frame_number
self.i_tracking = i_tracking
self.search_boxes = search_boxes
self.image_raw = image_raw
self.image_thresh = image_thresh
self.image_skel = image_skel
self.local_threshold = local_threshold
self.bounding_boxes = bounding_boxes
self.centroids = centroids
self.midpoints = midpoints
self.length_skeleton = length_skeleton
self.tails = tails
self.heads = heads
self.endpoints = endpoints
self.ht_swap = ht_swap
self.stimulation = stimulation
self.heuristic_parameters = heuristic_parameters
self.post_hoc_tracking = post_hoc_tracking
self.time_remaining_label = time_remaining_label
self.child_canvas_top_left = child_canvas_top_left
self.child_canvas_top_middle = child_canvas_top_middle
self.child_canvas_top_right = child_canvas_top_right
self.child = child
self.canvas_width = canvas_width
self.canvas_height = canvas_height
self.below_detected = below_detected
self.pause_debug_var = pause_debug_var
self.vr_stim_location = vr_stim_location
self.save_centroids_npy = save_centroids_npy
self.save_heads_npy = save_heads_npy
self.save_tails_npy = save_tails_npy
self.save_midpoints_npy = save_midpoints_npy
self.save_bbox_npy = save_bbox_npy
self.save_stim_npy = save_stim_npy
self.save_thresh_npy = save_thresh_npy
self.save_skeleton_npy = save_skeleton_npy
self.undistort_dst = undistort_dst
self.undistort_mtx = undistort_mtx
self.newcameramtx = newcameramtx
self.display_framerate = display_framerate
self.real_time = real_time
self.cam = cam
if offline_analysis or simulated_online_analysis:
self.images = cam
# If a post-hoc analsysis is being done it usually takes
# several frames until the animal has been identified
# and until it moved from its original position. Counter
# will take care that the index is correct
#self.counter = initial_data.counter
# initiate the variables that will be used later
self.previous_channel_one_value = None
self.previous_channel_two_value = None
self.previous_channel_three_value = None
self.previous_channel_four_value = None
self.datetime = datetime
self.genotype = genotype
self.recording_framerate = recording_framerate
self.recording_time = recordingtime
if RASPBERRY:
# print(self.cam.resolution)
# set resolution of the images. Resolution comes a string,
# e.g. 640x480
#self.width = int(resolution.split("x")[0])
#self.height = int(resolution.split("x")[1])
self.width, self.height = self.cam.resolution
# IMPORTANT:
# Width is always padded to a multiple of 32, and height to a multiple of 16.
# So 1296x972 will appear as a buffer of 1312x976.
self.width = self.width%32 + self.width
self.height = self.height%16 + self.height
self.offline_analysis = offline_analysis
# check if pictures come in as numpy array or as list of single pictures
# TODO I think this can be deleted!
#if offline_analysis or simulated_online_analysis:
# if type(cam) is np.ndarray:
# self.images_as_npy = True
# elif type(cam) is list:
# self.images_as_npy = False
# first_roi is an instance of a class - take y (row) min and
# max and x (column) min amd max of the search box
self.first_row_min = initial_data.first_roi.row_min
self.first_row_max = initial_data.first_roi.row_max
self.first_col_min = initial_data.first_roi.col_min
self.first_col_max = initial_data.first_roi.col_max
# and both y and x of the centroid
self.first_centroid = [initial_data.first_roi.centroid_row,
initial_data.first_roi.centroid_col]
# boxsize has been dynamically defined in start_GUI
self.boxsize = boxsize
# if Raspberry or self.offline_analysis and not post_hoc_tracking:
# read background image, also from class instance
self.smoothed_background = initial_data.smoothed_goodbackground
# elif post_hoc_tracking:
# self.smoothed_background = initial_data.mean_image
# If animals are illuminated from below they are usually dark
# in the camera. If they are illuminated from the side and
# if the bottom is black they appear white. Depending on this
# experimental parameter the binary image needs to be created
# using greater than (if animal is white) or lower than (if
# animal is dark) than the calculated threshold.
self.signal = signal
if self.signal == 'white':
self.compare = operator.gt
self.box_intensity = 255
elif self.signal == 'dark':
self.compare = operator.lt
self.box_intensity = 0
else:
tk.messagebox.showerror('Error',
'Signal has to be either "bright" or "dark".\n'
'Please adjust code.\n'
'Program will exit after pressing "Ok"')
import sys
# Todo: this is overkill! Could also just go back to main Gui
sys.exit()
# If head and tail are being defined several heuristic rules
# are needed. If for a number of frames, defined by this
# parameter frames_to_define_orientation, the tail moves
# backwards, it is assumed that the head/tail classification
# is the wrong way around.
self.frames_to_define_orientation = frames_to_define_orientation
# In order to repair the head/tail swaps, this bool needs to
# be True as well.
self.repair_ht_swaps = repair_ht_swaps
# debug mode is only available in post-hoc analysis
self.debug_mode = debug_mode
if self.debug_mode:
self.current_animal_characteristics = None
# TODO Delete if not nercessary!
#if self.offline_analysis or simulated_online_analysis:
# self.images = cam
# # If a post-hoc analsysis is being done it usually takes
## several frames until the animal has been identified
# # and until it moved from its original position. counter
# will take care that the index is correct
# self.counter = initial_data.counter
# the directory of the experiment
self.path = dir
# turn bool on if debug mode is requested
self.debug_mode = debug_mode
if debug_mode:
self.resize = int(debug_mode_resize)
self.pixel_per_mm = pixel_per_mm
# Stimulation
# take both the vr_arena and time_dep_file
self.vr_arena = vr_arena
self.time_dependent_stim_file = time_dependent_file
if self.vr_arena is not None:
self.VR_ARENA = True
# self.VR_GPIO = vr_GPIO
elif time_dependent_file is not None:
self.time_dependent_stim = True
# self.VR_GPIO = vr_GPIO
self.VR_ARENA = False
else:
print('vr arena not in tracking file')
self.VR_ARENA = False
self.time_dependent_stim = False
# self.VR_GPIO = None
self.pwm_object = pwm_object
# get the lists with the output
self.output_channel_one = output_channel_one
self.output_channel_two = output_channel_two
self.output_channel_three = output_channel_three
self.output_channel_four = output_channel_four
# access to overlay to change size
self.overlay_bool = overlay_bool
self.controller = controller
# get the time it took to detect the animal - will be saved
# for user access later on
self.time_delay_due_to_animal_detection = time_delay_due_to_animal_detection
# heuristics might hold for a large number of model
# organisms! Initiate heads and tails always
# if self.Drosophila_larva:
self.heads = heads
self.tails = tails
# Initiate centroid a
self.centroids = centroids
self.midpoints = midpoints
# After how many frames should the multidimensional (3rd
# Dimension = Time) arena be update?
self.vr_arena_multidimension_update = (1 / vr_update_rate) / (1 / self.recording_framerate)
# Todo
self.vr_arena_multidimension_counter = 0
self.pwm_range = pwm_range
# Needs to be manually entered, but will make sure that the
# light is switched off at the end of the experiment
self.high_power_LED_bool = high_power_led_bool
print('self.high_power_LED_bool: ' + repr(self.high_power_LED_bool))
# initialize an emtpy list
self.time_difference = []
# initialize variables for the local image
self.smoothed_current_local_image = None
# and the local thresholed image
self.current_image_thresholded = None
# Set this bool to False and only switch if exception happens
self.experiment_stopped_early = False
if RASPBERRY and not self.offline_analysis:
# if live experiment start at index 0
self.start_frame = 0
else:
# if not a live experiment, the start frame will be
# defined by how many frames have already been used to
# characterize the animal
self.start_frame = initial_data.counter # intial_data.frames_until_detection might be more explicit
# what's the last frame
self.end_frame = self.total_frame_number - 1
self.search_boxes[self.i_tracking, :] = \
[int(np.round(self.first_row_min - self.boxsize)),
int(np.round(self.first_row_max + self.boxsize)),
int(np.round(self.first_col_min - self.boxsize)),
int(np.round(self.first_col_max + self.boxsize))]
# Needed for head/tail classification - how fast,
# in mm/seconds, does the animal need to move to count it as
# 'moving'. Empirical parameter
self.minimal_speed_for_moving = minimal_speed_for_moving
# while trying to assign head and tail there are 2 heuristics
# that need to be fullfilled: There can only be two endpoints
# in the skeleton and 2) the aspect ratio must be above (
# currently) 1.25 (Todo > make either dynamic or user
# changeable). Whenever these heuristics are not True,
# skip that frame for head/tail assignment for now and fix
# it as soon as a head/tail can be assigned again.
self.to_fix = []
# the 'i' of the for loop that will run during the actual
# experiment
#self.i_tracking = 0
# create an in memory Bytes stream. Will be used by the
# camera to deliver images
# TODO Delete if shonw not to necessary!
#self.stream = io.BytesIO()
# initialize array that will be used as image
self.array = None
self.current_frame = None
# if doing offline analysis it is possible that the user want
# to see the debug mode. It is also possible the user closes
# the window. In that case it is necessary to break the loop,
# ideally using a while loop. This is the switch
if self.offline_analysis or simulated_online_analysis:
self.offline_analysis_running = True
# After construction of this class (in if debug_mode,
# the window) call either the online tracking function or the
# offline tracking function
#if Raspberry:
# # functions in tkinter should be called with the .after
# method, not directly! 0 stands for time in ms
# self._tracking = self.child.after(0, self.online_tracking)
if offline_analysis or simulated_online_analysis:
# # self._tracking = self.child.after(0,
# # self.offline_tracking)
# # if this class is called more than once, e.g because I
# # want to to batch analyze some videos, the
# # after method waits too long, i.e. the main GUI can do
# # stuff (like read the next video) before the function
# # is actually called!
self.offline_tracking_func()
self.loop_time_measurement = loop_time_measurement
#if TESTING_LOOP_TIME:
# print('start_recording now')
# self.online_tracking()
def offline_tracking_func(self):
while self.offline_analysis_running:
#if self.counter == self.images.shape[2]-1:
if self.i_tracking == self.images.shape[2]-1:
self.offline_analysis_running = False
start_time = time.time()
print('Working on Frame#' + repr(self.i_tracking))
self.animal_tracking()
# allow for the debug mode to show
if self.debug_mode:
self.update_debug()
if self.pause_debug_var.get():
self.child.wait_variable(self.pause_debug_var)
self.i_tracking += 1
if self.debug_mode:
# time to wait
analysis_time_s = time.time() - start_time
time_to_wait_ms = int(round(
(1/self.display_framerate - analysis_time_s) * 1000
))
if time_to_wait_ms > 0:
self.child.after(time_to_wait_ms)
else:
pass
[docs] def write(self, buf):
"""
This function is called by the Custom output of the
`picamera video recorder
<https://picamera.readthedocs.io/en/release-1.13/recipes2.html#custom-outputs>`_.
and (1) prepares the image for the tracking algorithm and (2)
calls the the tracking function: :func:`animal_tracking`.
**Image preparation**
#. Receive the buffer object prepared by the GPU which
contains the YUV image and put it into an numpy array in
uint8 number space.
#. Shorten the array to the Y values. As currently only
640x480px images can be used the array is shortened to
307'200bytes (from 460'800byes)
#. The image, which so far has just been a 1D stream of uint8
values is then organized into the 2D image.
#. Save the (GPU -> real time) timestamp of the current frame.
#. Call the :func:`animal_tracking` function.
"""
if TESTING_LOOP_TIME:
try:
self.loop_time_measurement[self.i_tracking, 0] = time.time()
except IndexError:
pass
# go to beginning of the memory object
#buf.seek(0)
# get the interior of the yuv buffer as uint8
#array = np.fromstring(buf.getvalue(), dtype=np.uint8)
array = np.frombuffer(buf, dtype=np.uint8)
## Get the Y (luminence) values. Discard the two chrominance values (U and V)
array = array[0:int(self.width * self.height)]
#array = self.array[0:int(self.width * self.height)]
## organize as the image with the desired resolution
self.array = array.reshape((self.height, self.width))
#self.video_output.write(buf)
if self.cam.frame.complete and self.cam.frame.timestamp:
if self.start_time is None:
self.start_time = self.cam.frame.timestamp
try:
self.real_time[self.i_tracking] = self.cam.frame.timestamp \
- self.start_time
except IndexError:
# if too many frames - TODO make better
pass
if self.i_tracking < self.real_time.shape[0]:
# Had a bunch of error as the self.recording_wait seems
# to be rather imprecise therefore only call the
# animal_tracking function if there's still space in the
# arrays (otherwise it's always gonna throw an error...)
self.animal_tracking()
if TESTING_LOOP_TIME:
try:
self.loop_time_measurement[self.i_tracking, 1] = time.time()
except IndexError:
pass
self.i_tracking +=1
[docs] def animal_tracking(self):
"""
Main function in single animal tracking. After detection in
:meth:`Pre-Experiment` of the animal this function will be
called on each frame to:
#. Identify the animal,
#. Define where to look for the animal in the next frame
#. Define head, tail, centroid and midpoint
#. If requested, present a stimulus by changing the
dutycycle on the requested GPIO
Below a the list in a bit more detail:
#. Ensure that the search box is not outside the image.
#. Subtract the current search box image from the background
search box.
#. Calculate the threshold to binarize the subtracted image.
#. Use the regionprops function of the scikit-image library
to find blobs
http://scikit-image.org/docs/dev/api/skimage.measure.html#skimage.measure.regionprops
#. Select the largest blob as the animal
#. Define the NEXT Search Box
#. Save the current bounding box, centroid position and the
raw image.
#. Skeletonize the binary image and find the endpoints.
#. By comparing the endpoint positions to the position of the
previous tail position, assign the closer endpoint as the
the tail.
#. If virtual reality experiment: Use the head position to
define position in virtual space and update stimulus in
Channel 1 accordingly using a change in dutycycle of the
GPIO.
#. If time dependent stimulus: Update the dutycyle for all
the defined channels.
"""
if RASPBERRY:
# print('Frame' + repr(i_tracking))
# with PiRGBArray(self.camera) as output:
# self.camera.capture(output, 'rgb', use_video_port=True)
current_frame = self.array
else:
#current_frame = self.images[:, :, self.counter]
current_frame = self.images[:, :, self.i_tracking]
#self.counter += 1
# on the edge - this becomes necessary when the animal is
# close to the boundary of the frame. Due to the boxsize it
# can happen that the animal is still well in the frame,
# but the boxsize ask the program to look for the larva
# outside of the frame - which will lead to an error
if self.search_boxes[self.i_tracking, 0] < 0:
self.search_boxes[self.i_tracking, 0] = 0
if self.search_boxes[self.i_tracking, 1] \
> self.smoothed_background.shape[0]:
self.search_boxes[self.i_tracking, 1] \
= self.smoothed_background.shape[0] - 1
if self.search_boxes[self.i_tracking, 2] < 0:
self.search_boxes[self.i_tracking, 2] = 0
if self.search_boxes[self.i_tracking, 3] \
> self.smoothed_background.shape[1]:
self.search_boxes[self.i_tracking, 3] \
= self.smoothed_background.shape[1]
# filter the image to get rid of camera noise. Only take the
# search box
self.smoothed_current_local_image = ndimage.filters.gaussian_filter(
thc.CallBoundingBox(current_frame,
self.search_boxes[self.i_tracking, :]
).sliced_image, sigma=1)
# take only the slice from the background image that is
# necessary to compare
smoothed_background_local_image = thc.CallBoundingBox(
self.smoothed_background,
self.search_boxes[self.i_tracking, :]
).sliced_image
# We have to change the datatype of the numpy array from
# originally unsigned int 8 (goes from 0 to 255) to signed
# int 16 (goes from -32768 to 32767). The reason being that
# if we subtract a two uint8 pictures in case we have 200-201
# = 255 while 200 - 199 = 1. This leads the histogram of
# intensites to have 2 background peaks, one around 0 and the
# other around 255. In int16 space, on the other hand,
# we'll have the background mean at around 0 while the animal
# will be in the negative range
subtracted_current_frame = self.smoothed_current_local_image.astype(
np.int16) - smoothed_background_local_image.astype(np.int16)
# calculate the local threshold by calculating the mean pixel
# intensity (of the subtracted small image) and subtracting
# from it 3 times the standard deviation of the subtracted
# small image. The *3* times was chosen empirically.
current_thresh = thc.MeanThresh(subtracted_current_frame,
self.signal,
3)
# if len(regionprops(label(subtracted_current_frame
# > current_thresh.thresh))) == 0:
# # In case there are zero connected pixels, we assume
# that's because the 3 STDs are too much and go with
# # only two # todo, really? Only see the invert=True as different!
# current_thresh = thc.MeanThresh(subtracted_current_frame,
# self.signal, 3,invert=True)
# calculate the binary local image
self.current_image_thresholded = self.compare(
subtracted_current_frame, current_thresh.thresh)
# use the regionprops function to identify blobs and characterize them
# http://scikit-image.org/docs/dev/api/skimage.measure.html#skimage.measure.regionprops
animal_properties_current = regionprops(label(
self.current_image_thresholded))
# find the LARGEST blob in the search image and set it as the animal
current_animal = thc.DescribeLargestObject(
animal_properties_current,
self.search_boxes[self.i_tracking, 0::2])
# 21/4/27:Had a strange problem with detection of animal.
# The line below was an attempt to figure out where the problem was
# but I could not reproduce the problem. If the problem arises again
# I can uncomment the below
#print('current_animal.row_max' + repr(current_animal.row_max))
if self.debug_mode:
self.current_animal_characteristics = current_animal
# The if makes sure that in the offline case the last frame
# that was recorded can still be analyzed.
if not self.i_tracking == self.end_frame:
# use the bounding box of the largest blob (defined as
# animal) plus a given boxsize as the next search box
self.search_boxes[self.i_tracking + 1, :] \
= [int(np.round(current_animal.row_min - self.boxsize)),
int(np.round(current_animal.row_max + self.boxsize)),
int(np.round(current_animal.col_min - self.boxsize)),
int(np.round(current_animal.col_max + self.boxsize))]
# get the current width and height of the bounding box
current_width = current_animal.row_max - current_animal.row_min
current_height = current_animal.col_max - current_animal.col_min
# check if the detected blob that has been defined as the
# animal can be saved in the preallocated array If not,
# only display an error message if debug mode is on as this
# will completely halt the experiment until the user clicks ok!
if current_width > self.image_raw.shape[0] \
or current_height > self.image_raw.shape[1]:
if self.debug_mode:
tk.messagebox.showerror(
'Blob too large',
'The expected size of the animal is smaller than:' +
repr(self.image_raw.shape[0]) + ' x ' + repr(
self.image_raw.shape[1]) +
'pixel\n'
'The detected blob, however, is ' + repr(
current_width) +
' pixels wide and\n'
+ repr(current_height) + ' pixels high.\n'
'The pixel/mm is set to ' + repr(
self.pixel_per_mm) + '.\n'
'This image will not be saved!\n'
'In the future please increase '
'the expected size of the animal',
parent=self.child)
else:
# save the raw image in the preallocated image array.
# This will fail if your animal is bigger than 2* the
# bounding box!
self.image_raw[0:current_width, 0:current_height, self.i_tracking] = \
thc.CallImageROI(current_frame, current_animal).small_image.copy()
# save the binary image in the preallocated image array
# todo - get rid of this and just save the value for the
# threshold. Can be reconstructed later if user wants to
# view it. Will save a lot of memory.
self.image_thresh[0:current_width, 0:current_height, self.i_tracking] = \
thc.CallImageROI(self.current_image_thresholded, current_animal,
sliced_input_image=self.search_boxes[self.i_tracking,
0::2]).small_image.copy()
self.local_threshold[self.i_tracking] = current_thresh.thresh.copy() # need to make explict copy...
################################################################
# the call below shouldn't be necessary because the image is
# already smoothed! but it might be advantagous in difficult
# environments - but it's definitely going to cost CPU power!
# Note: if turned on, get rid of the following function
# because it would overwrite the image again!
# self.image_thresh[0:current_width, 0:current_height, i_tracking] = \
# binary_fill_holes(thc.CallImageROI(self.current_image_thresholded, current_animal,
# sliced_input_image=search_boxes[i_tracking, 0::2]).small_image).copy()
# Todo: maybe implement a button so that user can choose?
################################################################
# save the bounding box coordinates. This is necessary to
# reconstruct the image as only this part of the image is
# being saved.
self.bounding_boxes[:, self.i_tracking] = \
[current_animal.row_min, current_animal.row_max,
current_animal.col_min, current_animal.col_max].copy()
if self.newcameramtx is not None:
self.centroids[self.i_tracking, :] = thc.undistort_points(
current_animal.centroid_col,
current_animal.centroid_row,
self.undistort_mtx,
self.undistort_dst,
self.newcameramtx)
else:
# ... and the centroid
self.centroids[self.i_tracking, :] = \
current_animal.centroid_row, current_animal.centroid_col
# identify head and tail
# find skeleton
#######
# This needs to be changed to 'thin'. Discuss whether rolled
# out with other improvements of the HT classification
#######
if THINING_FUNCTION == 'Thin':#'thin': # todo - find a better way and test before publication if necessary!
self.image_skel[:, :, self.i_tracking] \
= morphology.thin(self.image_thresh[:, :, self.i_tracking])
else:
self.image_skel[:, :, self.i_tracking] \
= morphology.skeletonize(self.image_thresh[:, :, self.i_tracking])
# how many points are there in the skeleton (also gives me the length)
self.length_skeleton[self.i_tracking] = len(np.nonzero(
self.image_skel[:, :, self.i_tracking])[0])
# todo: This is very explicit - change?
skeleton_y = np.nonzero(self.image_skel[:, :, self.i_tracking])[0]
skeleton_x = np.nonzero(self.image_skel[:, :, self.i_tracking])[1]
# this loop will give me an array, which is organized like the current_skeleton, and tells me how many
# points each point is connected to +1 in all 4 directions possible. Taken from original SOS.
connect = np.zeros(self.length_skeleton[self.i_tracking])
for i_skel in range(self.length_skeleton[self.i_tracking]):
connect[i_skel] = np.sum(
np.logical_and(skeleton_x >= skeleton_x[i_skel] - 1,
np.logical_and(skeleton_x <= skeleton_x[i_skel] + 1,
np.logical_and(skeleton_y >= skeleton_y[i_skel] - 1,
skeleton_y <= skeleton_y[i_skel] + 1)
)))
skeleton_end_points = np.where(connect == 2)[0]
'''
################################################################
# START OF NEW CODE
################################################################
#####
# This looks good. Don't implement yet. Discuss whether
# this improvement will be rolled out with the other
# improvements of the HT classification
#####
# try to always get a skeleton:
# If there are more than two endpoint, assume that the head
# and tail are at the extremity. This helps with difficult
# images where the skeleton has 3 or more endpoints.
# 1) calculate distance to previous tail. Keep point with
# smallest distance
# 2) calculate skeleton point with largest distance to point
# defined in 1) and keep that.
# Make this optional - there are problems with this kind of
# analysis: Get wrong HT classification in some cases where
# it was not a problem before.
if skeleton_end_points.shape[0] > 2:
print(skeleton_end_points)
#1
for i_skel in range(skeleton_end_points.shape[0]):
# need to have coordinates in whole image frame as
# 'self.tails' is already in that coordinate system.
y = current_animal.row_min + skeleton_y[skeleton_end_points[i_skel]]
x = current_animal.col_min + skeleton_x[skeleton_end_points[i_skel]]
dist_to_prev_tail = np.sqrt(
(self.tails[self.i_tracking-1, 0] - y) ** 2 +
(self.tails[self.i_tracking-1, 1] - x) ** 2)
if i_skel == 0:
closest_to_tail = dist_to_prev_tail
keep_endpoint_1 = i_skel
else:
if dist_to_prev_tail < closest_to_tail:
closest_to_tail = dist_to_prev_tail
keep_endpoint_1 = i_skel
#2
y_1 = skeleton_y[skeleton_end_points[keep_endpoint_1]]
x_1 = skeleton_x[skeleton_end_points[keep_endpoint_1]]
for i_skel in range(skeleton_end_points.shape[0]):
# here we can look at local coordinates (NOT global
# image frame)
y = skeleton_y[skeleton_end_points[i_skel]]
x = skeleton_x[skeleton_end_points[i_skel]]
dist_to_skel_endpoint_1 = np.sqrt(
(y_1 - y)**2 + (x_1 - x)**2)
if i_skel == 0:
further_away = dist_to_skel_endpoint_1
keep_endpoint_2 = i_skel
else:
if dist_to_skel_endpoint_1 > further_away:
further_away = dist_to_skel_endpoint_1
keep_endpoint_2 = i_skel
print(dist_to_skel_endpoint_1)
skeleton_end_points = np.array((skeleton_end_points[keep_endpoint_1],
skeleton_end_points[keep_endpoint_2]))
################################################################
# END OF NEW CODE
################################################################
'''
aspect_ratio = current_animal.major_axis / current_animal.minor_axis
if len(skeleton_end_points) == 2:
self.endpoints[0, :, self.i_tracking] = \
current_animal.row_min + skeleton_y[skeleton_end_points[0]], \
current_animal.col_min + skeleton_x[skeleton_end_points[0]]
self.endpoints[1, :, self.i_tracking] = \
current_animal.row_min + skeleton_y[skeleton_end_points[1]], \
current_animal.col_min + skeleton_x[skeleton_end_points[1]]
# Use "illustrate_midpoint_vs_centroid" jupyter notebook to
# sanity check!
# need to add midpoint of the skeleton
# take half the length of the skeleton
temp_midpoint = self.length_skeleton[self.i_tracking] / 2
# if the skeleton length is an even number add one (e.g. the
# center point of 4 points can either be position 2
# or position 3.)
if temp_midpoint % 2 == 0:
temp_midpoint += 1
# The skeleton length is an odd number, then we are already
# in the center
else:
pass
#temp_midpoint += 0.5
# Since it's possible to also have X.5 use int(round( to make sure
# we have an integer
temp_midpoint = int(round(temp_midpoint))
try:
# Bugfix - the skeleton_y and skeleton_x can be assorted.
# np.sort makes sure we actually take the center point!
self.midpoints[self.i_tracking, :] = \
current_animal.row_min + np.sort(skeleton_y)[int(temp_midpoint)], \
current_animal.col_min + np.sort(skeleton_x)[int(temp_midpoint)]
except IndexError: # if animal is extremely small!
if not RASPBERRY:
print('midpoint assigned as centroid')
self.midpoints[self.i_tracking, :] = \
current_animal.row_min, \
current_animal.col_min
# Undistort points if users asks for
if self.newcameramtx is not None:
self.midpoints[self.i_tracking, :] = thc.undistort_points(
self.midpoints[self.i_tracking,1],
self.midpoints[self.i_tracking,0],
self.undistort_mtx,
self.undistort_dst,
self.newcameramtx)
# rules to be able to get to assign head and tail to an image:
# 1): aspect ratio (major axis / minor axis of the animal
# must be higher than a given value
# 2): We need exactly two endpoints on the skeleton (when one
# has holes in the binary image it can become a
# a circular skeleton
# 3): We need a minimum length of the skeleton.
if aspect_ratio > 1.25 and len(skeleton_end_points) == 2 and \
self.length_skeleton[self.i_tracking] > 1 / \
2 * np.nanmean(self.length_skeleton[self.i_tracking - 3:self.i_tracking]):
# in case we have not yet assigned the head, at the
# beginning of the experiment which endpoint has the
# shortest distance to the centroid of the original larva
# = tail
if self.i_tracking == self.start_frame:
# if self.tails[self.i_tracking-1,0] == 0:
if np.linalg.norm(
self.first_centroid - self.endpoints[0, :,self.i_tracking]) \
< np.linalg.norm(self.first_centroid
- self.endpoints[1, :, self.i_tracking]): # endpoint 0 is the tail
self.tails[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
else:
self.tails[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
elif self.tails[self.i_tracking - 1, 0] == 0:
# This happens when you have a donut shaped larva in the frames before.
# idea: The last verified centroid should always be closer to the tail than the head, which can move
# much more
# TODO: I can improve this by not only taking the
# centroid but also the last assigned tail position
# into account. Find condition and test.
if np.linalg.norm(self.endpoints[0, :, self.i_tracking]
- self.centroids[self.to_fix[0] - 1, :]) \
< np.linalg.norm(self.endpoints[1, :, self.i_tracking]
- self.centroids[self.to_fix[0] - 1, :]):
self.tails[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
else:
self.tails[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
# we'll also fix, in retrospect, the lost self.heads and self.tails with the centroid coordinate
# to easily be able to plot afterwards
self.tails[self.to_fix, :] \
= self.centroids[self.to_fix, :].copy()
self.heads[self.to_fix, :]\
= self.centroids[self.to_fix, :].copy()
self.to_fix = []
else:
if np.linalg.norm(
self.tails[self.i_tracking - 1, :]
- self.endpoints[0, :, self.i_tracking]) \
< np.linalg.norm(self.tails[self.i_tracking - 1, :]
- self.endpoints[1, :, self.i_tracking]):
self.tails[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
else:
self.tails[self.i_tracking, :] \
= self.endpoints[1, :, self.i_tracking]
self.heads[self.i_tracking, :] \
= self.endpoints[0, :, self.i_tracking]
# the if clause below checks if the tail is in front of
# the the centroid, essentially checking if there's a
# potential head/tail swap in the assignment.
# First if just checks if there have been enough frames
# collected already to compare with the current frame.
if self.i_tracking > self.recording_framerate:
# First we calculate the distance of the centroid
# between the past 1 second and the current frame.
# This will be needed to calculate speed
distance_centroid = np.linalg.norm(
self.centroids[self.i_tracking - self.recording_framerate, :]
- self.centroids[self.i_tracking, :])
# print('Distance in pixels: ' + repr(distance_centroid))
# print('Distance in mm: ' + repr(distance_centroid/self.pixel_per_mm))
# next there is a filter for the minimum speed of the
# centroid needs to have in order to consider the
# animal 'running'. For now this is set to 0.5mm/s
# speed mm per sec = distance centroid / px_per_mm / framerate
current_centroid_speed = (distance_centroid
/ self.pixel_per_mm) \
/ self.recording_framerate
if self.tails[self.i_tracking - 1, 0] != 0 \
and self.tails[self.i_tracking, 0] != 0 \
and current_centroid_speed > 0.25:
# We'll check if a tail has been asigned or if
# the curvature of the animal was too great.
# Then it checks if the distance travelled was
# enough to justify looking for a H/T swap
# This first checks in which direction the
# centroid is traveling relative to the frame
# before:
# theta_centroid = centroid _current - centroid_past
direction_centroid = np.arctan2(
self.centroids[self.i_tracking,0]
- self.centroids[
self.i_tracking
- self.frames_to_define_orientation, 0],
self.centroids[
self.i_tracking, 1]
- self.centroids[self.i_tracking
- self.frames_to_define_orientation, 1])
# if the centroiddirection of centroid if the
# trail is in front of the centroid (H/T swa is
# in front of the tail, we'll always get a
# similar angle as the y) we should get
# the opposite angle theta_tail = centroid_current
# - tail_current
direction_tail_minus_centroid = np.arctan2(
self.centroids[self.i_tracking, 0]
- self.tails[self.i_tracking, 0],
self.centroids[self.i_tracking, 1]
- self.tails[self.i_tracking, 1])
# Next we'll normalize, i.e. we bring the
# movement of the centroid onto the horizonal axis
# and let the direction of the tail relative to
# the centroid follow
if direction_centroid \
- direction_tail_minus_centroid \
< -np.pi or direction_centroid \
- direction_tail_minus_centroid \
> np.pi:
normalized_angle = direction_centroid \
+ direction_tail_minus_centroid
else:
normalized_angle = direction_centroid \
- direction_tail_minus_centroid
# Now we can just check if the normalized angle is
# bigger or smaller than given freedom we give it
if normalized_angle > 1 / 2 * np.pi \
or normalized_angle < - 1 / 2 * np.pi:
if not RASPBERRY:
print('found HT swap frame ' + repr(self.i_tracking))
print('movement: ' + repr(direction_centroid))
print('tail dir: ' + repr(direction_tail_minus_centroid))
self.ht_swap[self.i_tracking] = 1
if self.repair_ht_swaps:
# In order to be robust against noise the
# heat tail swap needs to persistent for
# at least one second - Todo Results for
# framerate 1 may be problematic!
if (self.ht_swap[self.i_tracking
- self.recording_framerate:self.i_tracking] == 1).all():
if not RASPBERRY:
print('now I could repair the ht swaps')
tails_temp = np.zeros((
self.recording_framerate, 2),
dtype=np.int16);
heads_temp = np.zeros((
self.recording_framerate, 2),
dtype=np.int16)
np.copyto(tails_temp,
self.heads[int(
self.i_tracking
- self.recording_framerate
+ 1):self.i_tracking + 1, :])
np.copyto(heads_temp,
self.tails[int(
self.i_tracking
- self.recording_framerate
+ 1):self.i_tracking + 1, :])
np.copyto(
self.tails[int(
self.i_tracking
- self.recording_framerate
+ 1):self.i_tracking + 1, :],
tails_temp)
np.copyto(self.heads[int(
self.i_tracking
- self.recording_framerate
+ 1):self.i_tracking + 1, :],
heads_temp)
self.ht_swap[self.i_tracking
- self.recording_framerate
+ 1:self.i_tracking + 1] = 0
else:
# need to assign the centroid position to the head/tail -
# otherwise in the VR setting we have huge jumps!
self.tails[self.i_tracking, :] = self.centroids[self.i_tracking, :]
self.heads[self.i_tracking, :] = self.centroids[self.i_tracking, :]
self.to_fix.append(self.i_tracking)
# After final assignment of tail and head, correct using undistortPoints
if self.newcameramtx is not None:
# First the Tails
self.tails[self.i_tracking, :] = thc.undistort_points(
self.tails[self.i_tracking,1],
self.tails[self.i_tracking,0],
self.undistort_mtx,
self.undistort_dst,
self.newcameramtx)
#POI_t = np.zeros((1, 1, 2), dtype=np.float32)
#POI_t[0,0,:] = self.tails[self.i_tracking, 1], self.tails[self.i_tracking, 0]
#corrected_tails = cv2.undistortPoints(POI_t,
# self.undistort_mtx,
# self.undistort_dst,
# None,
# self.newcameramtx)
#self.tails[self.i_tracking, :] = corrected_tails[0][0][1], corrected_tails[0][0][0]
# Then the heads
self.heads[self.i_tracking, :] = thc.undistort_points(
self.heads[self.i_tracking,1],
self.heads[self.i_tracking,0],
self.undistort_mtx,
self.undistort_dst,
self.newcameramtx)
#POI_h = np.zeros((1, 1, 2), dtype=np.float32)
#POI_h[0,0,:] = self.heads[self.i_tracking, 1], self.heads[self.i_tracking, 0]
#corrected_heads = cv2.undistortPoints(POI_h,
# self.undistort_mtx,
# self.undistort_dst,
# None,
# self.newcameramtx)
#self.heads[self.i_tracking, :] = corrected_heads[0][0][1], corrected_heads[0][0][0]
if self.VR_ARENA:
if len(self.vr_arena.shape) > 2:
# If the VR_arena is a multidimensional array assume
# that the last index(-1) is a time index.
# First we need to update the counter if it is
# appropriate to do so using the modulo operator only
# update if i_tracking is not zero! otherwise the
# first programmed arena will never be shown!
if self.i_tracking != 0:
if self.i_tracking \
% self.vr_arena_multidimension_update == 0:
self.vr_arena_multidimension_counter += 1
if self.vr_stim_location == 'Head':
current_stim_temp = self.vr_arena[
self.heads[self.i_tracking, 0],
self.heads[self.i_tracking, 1],
self.vr_arena_multidimension_counter
% self.vr_arena.shape[2]
]
elif self.vr_stim_location == 'Centroid':
current_stim_temp = self.vr_arena[
self.centroids[self.i_tracking, 0],
self.centroids[self.i_tracking, 1],
self.vr_arena_multidimension_counter
% self.vr_arena.shape[2]
]
elif self.vr_stim_location == 'Midpoint':
current_stim_temp = self.vr_arena[
self.midpoints[self.i_tracking, 0],
self.midpoints[self.i_tracking, 1],
self.vr_arena_multidimension_counter
% self.vr_arena.shape[2]
]
elif self.vr_stim_location == 'Tail':
current_stim_temp = self.vr_arena[
self.tails[self.i_tracking, 0],
self.tails[self.i_tracking, 1],
self.vr_arena_multidimension_counter
% self.vr_arena.shape[2]
]
# This should be relatively save - if user presents a
# uint16 arena this will throw an error as soon as
# as the pwm_range is exceeded!
current_stim = current_stim_temp * self.pwm_range / 255
# It is currently not possible to use a arena coded
# in uint16 number space! But here would be a good
# point to start changing the code if needed!
# Normalize to 65535 instead of 255!
for i_stim in range(len(self.output_channel_one)):
# then we just call the same function but with a
# different slice, again using the modulo
# operator. This means that if the index we're
# trying to call is higher than the available
# indices we just start again at 0
self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_one[i_stim][0],
dutycycle=current_stim)
self.stimulation[self.i_tracking] = current_stim_temp
else:
if self.vr_stim_location == 'Head':
current_stim_value = self.vr_arena[
self.heads[self.i_tracking, 0],
self.heads[self.i_tracking, 1]]
elif self.vr_stim_location == 'Centroid':
current_stim_value = self.vr_arena[
self.centroids[self.i_tracking, 0],
self.centroids[self.i_tracking, 1]]
elif self.vr_stim_location == 'Midpoint':
current_stim_value = self.vr_arena[
self.midpoints[self.i_tracking, 0],
self.midpoints[self.i_tracking, 1]]
elif self.vr_stim_location == 'Tail':
current_stim_value = self.vr_arena[
self.tails[self.i_tracking, 0],
self.tails[self.i_tracking, 1]]
# dont always update - only if value changed - at
# least in high intensity light I clearly see
# flickering which might come from the updating.
'''
if self.previous_channel_one_value \
!= self.vr_arena[self.heads[self.i_tracking, 0],
self.heads[self.i_tracking, 1]]:
'''
if self.previous_channel_one_value != \
current_stim_value:
# print('value changed')
# Todo: Have to write this in the instructions:
# VR arena is always on Channel 1!
for i_stim in range(len(self.output_channel_one)):
'''
# print(self.output_channel_one[i_stim][0])
self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_one[i_stim][0],
dutycycle=self.vr_arena[self.heads[self.i_tracking, 0],
self.heads[self.i_tracking, 1]
])
'''
self.pwm_object.set_PWM_dutycycle(
user_gpio=self.output_channel_one[i_stim][0],
dutycycle=current_stim_value)
self.stimulation[self.i_tracking] = current_stim_value
self.previous_channel_one_value = current_stim_value
elif self.time_dependent_stim:
# THIS IS NEW and should address #69: if unexpected
# framerate time dependent stimulus was starting to show
# unexpected behavior. By using real-time camera time we
# can tie stimulus to time to keep between experiment
# stimulus consistent.
# note - self.real_time comes in us. We need to convert to seconds
current_time = self.real_time[self.i_tracking] / 1e6
# np.searchsorted takes care of the fact that the current
# time is never going to
# be exactly the same as any number here
if 'Time [s]' in self.time_dependent_stim_file:
stim_index = np.searchsorted(
self.time_dependent_stim_file['Time [s]'], current_time)[0]
else:
# for backward compatibility keep the original way of
# presenting the time dependent stimulus
stim_index = self.i_tracking
try:
self.previous_channel_one_value = \
self.update_pwm_dutycycle_time_dependent(
previous_channel_value=self.previous_channel_one_value,
output_channel_list=self.output_channel_one,
output_channel_name='Channel 1',
stim_index=stim_index)
# Save the actual stimulus
self.stimulation[self.i_tracking, 0] = self.previous_channel_one_value
except KeyError:
pass
# repeat for channel 2
try:
self.previous_channel_two_value = \
self.update_pwm_dutycycle_time_dependent(
previous_channel_value=self.previous_channel_two_value,
output_channel_list=self.output_channel_two,
output_channel_name='Channel 2',
stim_index=stim_index)
# Save the actual stimulus
self.stimulation[self.i_tracking, 1] = self.previous_channel_two_value
except KeyError:
pass
# repeat for channel 3
try:
self.previous_channel_three_value = \
self.update_pwm_dutycycle_time_dependent(
previous_channel_value=self.previous_channel_three_value,
output_channel_list=self.output_channel_three,
output_channel_name='Channel 3',
stim_index=stim_index)
# Save the actual stimulus
self.stimulation[self.i_tracking, 2] = self.previous_channel_three_value
except KeyError:
pass
# and finally for channel 4
try:
self.previous_channel_four_value = \
self.update_pwm_dutycycle_time_dependent(
previous_channel_value=self.previous_channel_four_value,
output_channel_list=self.output_channel_four,
output_channel_name='Channel 4',
stim_index=stim_index)
# Save the actual stimulus
self.stimulation[self.i_tracking, 3] = self.previous_channel_three_value
except KeyError:
pass
if self.post_hoc_tracking:
self.heuristic_parameters[0, self.i_tracking] \
= current_animal.filled_area
self.heuristic_parameters[1, self.i_tracking] \
= current_animal.filled_area / self.pixel_per_mm
self.heuristic_parameters[2, self.i_tracking] \
= current_animal.major_axis / current_animal.minor_axis
self.heuristic_parameters[3, self.i_tracking] \
= current_animal.eccentricity
self.heuristic_parameters[4, self.i_tracking] \
= self.length_skeleton[self.i_tracking]
# this is inefficient, but very explicit!
self.heuristic_parameters[5, self.i_tracking] \
= self.heuristic_parameters[4, self.i_tracking] / \
self.pixel_per_mm
# Only calculate speed at second analyzed frame!
if self.i_tracking > self.start_frame:
# First, calculate the speed per frame just by
# calculating the distance between the current centroid
# position and the previous centroid position. This
# will yield the distance in pixel/frame
self.heuristic_parameters[6, self.i_tracking] \
= np.linalg.norm(
self.centroids[self.i_tracking, :]
- self.centroids[self.i_tracking - 1,
:])
# this is inefficient, but very explicit!
# then get the distance in mm per frame by dividing
# the above by the pixel per mm
self.heuristic_parameters[7, self.i_tracking] \
= self.heuristic_parameters[6, self.i_tracking] \
/ self.pixel_per_mm
# this is inefficient, but very explicit!
# then get the distance in mm per second by
# multiplying the above by the recording framerate
self.heuristic_parameters[8, self.i_tracking] \
= self.heuristic_parameters[7, self.i_tracking] \
* self.recording_framerate
# should be covered by frame.timestamp - TEST!
if RASPBERRY:
pass
# self.real_time[self.i_tracking] = time.time()
# if the experiment is analyzed afterwards, there is no way
# of knowing what the real time was, so it is just assumed
# that the camera was perfect and gave exactly the framerate that was requested
elif not RASPBERRY:
if self.i_tracking == self.start_frame:
self.delay_due_to_detection = self.i_tracking \
/ self.recording_framerate
# self.real_time[self.i_tracking] = time.time()
# The timestamp on the raspberry comes in us. Easy to adapt here
# as the conversion to seconds only happens in thc.save which doesn't
# know if posthoc or real tracking is being done.
self.real_time[self.i_tracking] = \
((self.i_tracking / self.recording_framerate) - \
self.time_delay_due_to_animal_detection) * 1e6
[docs] def error_message_func(self, error_stack):
'''
Let user know that something went wrong!
:return:
'''
captured_images = np.count_nonzero(self.real_time)
expected_no_of_images = self.total_frame_number
with open(self.datetime + '_ERROR.txt', 'a') as file:
file.write('Unexpected Error at frame ' + repr(
captured_images) + ' of ' +
repr(int(
expected_no_of_images)) + '\n\n')
file.write('Traceback (most recent call last): ' + str(
error_stack) + '\n\n')
if "IndexError: index -1 is out of bounds for axis 0 with size 0" in traceback.format_exc() and \
"self.filled_area = areas_sorted['filled area'][-1]" in traceback.format_exc():
file.write('Error #1\n'
'The following 2 lines:\n'
"self.filled_area = areas_sorted['filled area'][-1]\n"
"IndexError: index -1 is out of bounds for axis 0 with size 0\n"
'indicate that the animal was not found in the region of interest.\n'
'This can happen if the animal moves faster than expected.\n'
'To circumvent this problem increase the "max_speed_animal_mm_per_s" parameter'
'in the "list_of_available_organisms.json" file\n'
'Alternatively, your animal might be able to hide under some obstruction. If that is the case'
'please clear the arena so that the camera can always see at least parts of the animal\n')
# elif.... put all the known errors and print possible solutions
else:
file.write('Error that has not been classfied yet!\n\n')
file.write('Full error traceback below: \n\n')
file.write(traceback.format_exc())
tk.messagebox.showerror('Error',
'The experiment stopped earlier than requested.\n'
'See the ERROR.txt file in the experiment folder\n'
'for a detailed traceback for debugging purposes\n'
)
[docs] def update_debug(self):
"""
This will only work in post-hoc analysis, NOT on the
Raspberry Pi. In principle we could implement a ton more
information, specifially we can always print:
1) filled area
2) eccentricity
3) major over minor axis
Might be good for visualization, but these parameters are
anyway saved if the user wants them.
"""
try:
# display how many frames are left for the analysis
self.time_remaining_label.configure(
text='Frames remaining: '
+ repr(self.end_frame-self.i_tracking),
font='Helvetica 14 bold'
)
#original_image = self.images[:,:,self.counter].copy()
original_image = self.images[:, :, self.i_tracking].copy()
original_image_resized = resize_image(
image=original_image,
output_shape=(self.canvas_height, self.canvas_width),
preserve_range=True,
mode='reflect')
# in order to display an image in labels and frames,
# the PhotoImage method is called. It can take a numpy array
self.photo_raw = ImageTk.PhotoImage(
image=Image.fromarray(original_image_resized))
# set the PhotoImage object into the top left canvas
self.child_canvas_top_left.create_image(
0, 0, image=self.photo_raw, anchor=tk.NW)
# The binary image...The tracking algorithm does not
# subtract the whole but only part of the image.
# Specifically the search box.
# To keep the plot tidy (constant width/height) it would
# be good to show the whole image, indicate the search
# box and how its binary
binary_image = np.zeros((original_image.shape[0],
original_image.shape[1]
))
# The binary image is first copied. Has to be uint8 as
# PhotoImage wants 0s to display black...
subtracted_image = self.current_image_thresholded.astype(np.uint8).copy()
# .. and 255s to display white
subtracted_image[np.where(subtracted_image == 0)] = 127
subtracted_image[np.where(subtracted_image == 1)] = 255
sb_row_min = self.search_boxes[self.i_tracking,0]
sb_row_max = self.search_boxes[self.i_tracking,1]
sb_col_min = self.search_boxes[self.i_tracking,2]
sb_col_max = self.search_boxes[self.i_tracking,3]
binary_image[sb_row_min:sb_row_max,
sb_col_min:sb_col_max] = subtracted_image.copy()
binary_image_resized = resize_image(
image=binary_image,
output_shape=(self.canvas_height, self.canvas_width),
preserve_range=True,
mode='reflect')
# create the PhotoImage object
self.photo_subtracted = ImageTk.PhotoImage(
image=Image.fromarray(binary_image_resized))
# and set it in the middle canvas
self.child_canvas_top_middle.create_image(
0, 0, image=self.photo_subtracted, anchor=tk.NW)
self.below_detected.configure(
text='Filled Area: ' + repr(
self.current_animal_characteristics.filled_area) +
'\nEccentricity: ' + repr(
self.current_animal_characteristics.eccentricity) +
'\nMajor over minor axis: ' + repr(
self.current_animal_characteristics.major_axis
/self.current_animal_characteristics.minor_axis))
# The image on the right will be the raw image with the
# detected blob drawn a bounding box around it
#detected_raw_box = self.images[:, :, self.counter].copy()
detected_raw_box = self.images[:, :, self.i_tracking].copy()
# draw top horizontal line
rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]),
int(self.bounding_boxes[2, self.i_tracking]),
int(self.bounding_boxes[0, self.i_tracking]),
int(self.bounding_boxes[3, self.i_tracking]))
detected_raw_box[rr, cc] = self.box_intensity
# draw right vertical line
rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]),
int(self.bounding_boxes[3, self.i_tracking]),
int(self.bounding_boxes[1, self.i_tracking]),
int(self.bounding_boxes[3, self.i_tracking]))
detected_raw_box[rr, cc] = self.box_intensity
# draw bottom horizontal line
rr, cc = line(int(self.bounding_boxes[1, self.i_tracking]),
int(self.bounding_boxes[2, self.i_tracking]),
int(self.bounding_boxes[1, self.i_tracking]),
int(self.bounding_boxes[3, self.i_tracking]))
detected_raw_box[rr, cc] = self.box_intensity
# draw left vertical line
rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]),
int(self.bounding_boxes[2, self.i_tracking]),
int(self.bounding_boxes[1, self.i_tracking]),
int(self.bounding_boxes[2, self.i_tracking]))
detected_raw_box[rr, cc] = self.box_intensity
detected_box_resized = resize_image(
image=detected_raw_box,
output_shape=(self.canvas_height, self.canvas_width),
preserve_range=True,
mode='reflect')
# create a PhotoImage object
self.photo_raw_detected = ImageTk.PhotoImage(
image=Image.fromarray(detected_box_resized))
# and position it in the right canvas
self.child_canvas_top_right.create_image(
0, 0, image=self.photo_raw_detected, anchor=tk.NW)
# update the child - without this nothing will be shown.
# http://effbot.org/tkinterbook/widget.htm#Tkinter.Widget.update-method
# tried update_idletask() > won't show anything, have to
# go with update
self.child.update()
except IndexError:
# don't my experiment to break just because debug mode
# can't update
pass
[docs] def update_pwm_dutycycle_time_dependent(self,
previous_channel_value,
output_channel_list,
output_channel_name,
stim_index):
"""
A convenience function for the timedependent stimulation.
Takes the list with the gpios for a given channel, and,
in a for loop, updates gpios according to a given channel.
In the first iteration of the loop it will just set the pwm
dutcycle according to whatever dutycycle is specified.
As this function is called as 'previous_channel_x_value
= update_pwm_dutcycle..' it then updates the
previous_channel_x_value for the next iteration.
:param previous_channel_value: As the GPIO dutcycle should
only be updated when the value changes, this holds the
previous value
:param output_channel_list: list of gpio for a given channel,
e.g. GPIO 17 would be [[17,1250]] (1250 is the frequency,
not used here)
:param output_channel_name: the channel as a string, e.g. 'Channel 1'
:return:
"""
if self.i_tracking == self.start_frame or \
previous_channel_value \
!= self.time_dependent_stim_file[output_channel_name][stim_index]:
for i_stim in range(len(output_channel_list)):
self.pwm_object.set_PWM_dutycycle(
user_gpio=output_channel_list[i_stim][0],
dutycycle=self.time_dependent_stim_file[output_channel_name][
stim_index])
return (self.time_dependent_stim_file[output_channel_name][stim_index])
[docs] def flush(self):
"""
Unsure if needed. Test if can do without
"""
pass
#self.video_output.flush()
#self.pts_output.flush()
#self.pi_time_output.flush()
[docs] def close(self):
"""
Unsure if needed. Test if can do without
"""
pass
#self.video_output.close()
#self.pts_output.close()
#self.pi_time_output.close()