Source code for fast_tracking

__author__ = 'David Tadres'
__project__ = 'PiVR'

from PIL import Image, ImageTk
from skimage.transform import resize as resize_image
import tkinter as tk
import numpy as np
from scipy import ndimage
from skimage.measure import regionprops, label
import operator
from skimage import morphology
import os
import time
import skimage
import traceback
from skimage.draw import line

import tracking_help_classes as thc

TESTING = True

# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# since this command only works in Linux it is caught using try-except
# otherwise it's throw an error in a Windows system.
try:
    if os.uname()[4][:3] == 'arm':
        RASPBERRY = True
    else:
        RASPBERRY = False
except:
    RASPBERRY = False


# Currently the newest skimage version I can easily put on the Pi using
# Raspian is 0.12 - the thin function was only introduced
# in version 13. In order to keep it simple just use the
# backup function if user is using 0.12, but print a statement
if int(skimage.__version__[2:4]) >= 13:
    THINING_FUNCTION = 'Thin'
else:
    THINING_FUNCTION = 'skeletonize'
    #print('Skimage version: ' + skimage.__version__ + '! In order
    # to use better skeletonize function update to at least'

[docs]class FastTrackingControl(): """ This class controls the tracking algorithm. It was necessary to create a second class as the 'record_video' function of picamera needed it's own class to deliver images to. This script needs to be cleaned up. I'm sure I can get rid of quite a bit of variables or at least quite a bit of variable passing around! """ def __init__(self, genotype='Unknown', recording_framerate=2, display_framerate = None, resolution=None, recordingtime=None, initial_data=None, boxsize=20, signal=None, frames_to_define_orientation=5, debug_mode=None, debug_mode_resize=1, repair_ht_swaps=True, cam=None, dir=None, pixel_per_mm=None, model_organism = 'Not in List', vr_arena = None, #vr_GPIO = None, pwm_object = None, time_dependent_file=None, high_power_led_bool=False, offline_analysis=False, minimal_speed_for_moving=0.5, organisms_and_heuristics=None, post_hoc_tracking=False, datetime=None, output_channel_one=[], output_channel_two=[], output_channel_three=[], output_channel_four=[], simulated_online_analysis=False, overlay_bool=False, controller=None, time_delay_due_to_animal_detection=0, vr_update_rate = 1, pwm_range = 100, video_filename = 'test.yuv', pts_filename = 'pts_test.txt', pi_time_filename = 'system_time_test.txt', vr_stim_location = 'NA'): self.genotype = genotype self.recording_framerate = recording_framerate self.display_framerate = display_framerate self.resolution = resolution self.recordingtime = recordingtime self.initial_data = initial_data self.boxsize = boxsize self.signal = signal self.frames_to_define_orientation = frames_to_define_orientation self.debug_mode = debug_mode self.resize = int(debug_mode_resize) self.repair_ht_swaps = repair_ht_swaps # self.camera will be the picamera object on the RPi or it is # a numpy array/list of images if offline analysis self.cam = cam if RASPBERRY: # set camera to desired framerate - this is neccessary as # user might not have pressed 'update framerate' # after entering a different framerate number self.cam.framerate = self.recording_framerate # confirm that framerate has beend changed # print('Framerate set to: ' + repr(self.camera.framerate)) # In order for clarity in offline analysis the images are # stored in a array called images. self.dir = dir self.pixel_per_mm = pixel_per_mm self.model_organism = model_organism self.vr_arena = vr_arena # vr_GPIO = None, self.pwm_object = pwm_object self.time_dependent_file = time_dependent_file self.high_power_led_bool = high_power_led_bool self.offline_analysis = offline_analysis self.minimal_speed_for_moving = minimal_speed_for_moving self.organisms_and_heuristics = organisms_and_heuristics self.post_hoc_tracking = post_hoc_tracking self.datetime = datetime self.output_channel_one = output_channel_one self.output_channel_two = output_channel_two self.output_channel_three = output_channel_three self.output_channel_four = output_channel_four self.simulated_online_analysis = simulated_online_analysis self.overlay_bool = overlay_bool self.controller = controller self.time_delay_due_to_animal_detection = \ time_delay_due_to_animal_detection self.vr_update_rate = vr_update_rate self.pwm_range = pwm_range self.vr_stim_location = vr_stim_location self.i_tracking = 0 # check if pictures come in as numpy array or as list of # single pictures if offline_analysis or simulated_online_analysis: if type(cam) is np.ndarray: self.images_as_npy = True elif type(cam) is list: self.images_as_npy = False # If on the Raspberry the assumption is that it will only be # used to track the animal not to analyze data # post-hoc! if RASPBERRY and not self.offline_analysis: # how many frames does the user want self.total_frame_number = recordingtime * recording_framerate # Not true anymore - will record as many frames as it can # in a given time! #print('Will record a total of ' + repr( # self.total_frame_number) + ' frames.') # else if not on Raspberry, assumption is that the user # wants to do a post-analysis - it should be possible to # decouple the framerate that has been recorded # (e.g 2fps) and the display framerate the user can watch the # animal behave (e.g. at 10frames) to speed up the analysis. elif offline_analysis or simulated_online_analysis: if not RASPBERRY: # Identify the number of frames are there if self.images_as_npy: self.total_frame_number = \ cam.shape[2] - (initial_data.counter) else: self.total_frame_number = \ len(cam) - (initial_data.counter) self.display_framerate = display_framerate self.interval = (1 / self.display_framerate) * 1000 delay = \ int(self.total_frame_number / self.display_framerate * 1000 + 200) # need for loop self.start_at_frame = initial_data.counter # needed to subtract the original time delay in case # there this is a simulated experiment self.delay_due_to_detection = 0 # this array will hold the timestamps recorded during the # experiment self.real_time = np.zeros((self.total_frame_number)) # display time is array that will just be filled with # expected time passed for a given from (not the real time!) if not RASPBERRY: self.display_time = np.zeros((self.total_frame_number)) self.display_time[0] = 0 # todo - pretty sure that's not correct # Search Box is the part of the image the algorithm will be # looking for an animal in the next frame. It has four # variables, row_min (minimal y value), row_max (maximal y # value), col_min (minimal x value) and col_max (maximal x # value). The search box is defined as the bounding box of # the animal + a boxsize which should be dynamic in the future! # must be possible to go negative, otherwise if goes below 0 # goes to 32K! self.search_boxes = np.zeros((self.total_frame_number, 4), dtype=np.int16) # pre-allocation of the empty array - might be good to let # user decide if skel needs to be pre-allocated. For my # experiments memory will never be an issue but if someone # want to track a fast animal for a long time memory might # run out. In numpy even the bool arrays are 8 bit (and not # 1bit). For example a boxsize of 25 and 9000 frames ( # 9000/30fps=5minutes) the preallocation for EACH array is # 22.5Mb. If someone where to run and experiment for one hour # (3600 seconds * 30fps = 108000 frames). Each array would be # 270Mb. That will probably lead to memory error. # Todo in the future we must implement bitarray (or similar) # which will allow true 1Bit arrays for the binary arrays # OR: Don't save images and only save centroid/head for this # kind of long experiment. size_of_expected_animals = \ organisms_and_heuristics[model_organism][ 'max_skeleton_length_mm'] * pixel_per_mm self.image_raw = np.zeros( (int(np.ceil(size_of_expected_animals * 2)), int(np.ceil(size_of_expected_animals * 2)), self.total_frame_number), dtype=np.uint8) self.image_thresh = np.zeros(( int(np.ceil(size_of_expected_animals * 2)), int(np.ceil(size_of_expected_animals * 2)), self.total_frame_number), dtype=np.bool_) self.image_skel = np.zeros(( int(np.ceil(size_of_expected_animals * 2)), int(np.ceil(size_of_expected_animals * 2)), self.total_frame_number), dtype=np.bool_) # preallocate bounding box and centroid array self.bounding_boxes = np.zeros(( 4, self.total_frame_number), dtype=np.uint16) self.centroids = np.zeros(( self.total_frame_number, 2), dtype=np.int16) self.midpoints = np.zeros(( self.total_frame_number, 2), dtype=np.int16) self.length_skeleton = np.zeros(( self.total_frame_number), dtype=np.uint16) # if self.Drosophila_larva: # todo - always keep on self.tails = np.zeros(( self.total_frame_number, 2), dtype=np.int16) self.heads = np.zeros(( self.total_frame_number, 2), dtype=np.int16) self.endpoints = np.zeros(( 2, 2, self.total_frame_number), dtype=int) self.ht_swap = np.zeros(( self.total_frame_number), dtype=np.bool_) # irrespective if there's a stimulation or not this array is # preallocated - it allows for saving the pandas array later self.stimulation = np.zeros((self.total_frame_number), dtype=np.float32) if post_hoc_tracking: # this is for the case that the user is testing a # video/full frame recording to get all the heuristic rules # to track an animal in the future for each frame save: # 0) the filled area in pixel # 1) the filled area divided by pixel/mm # 2) the major over minor axis # 3) the eccentricty # 4) the length - this is going to be very useful for the # boxsize and the searchboxsize # 5) length of skeleton divided by pixel/mm # 6) speed in pixel per frame # 7) speed in mm/frame (just #5 divided by pixel per frame) # 8) speed in mm/s (7 * frames per second) self.heuristic_parameters = np.zeros((9, cam.shape[2])) self.post_hoc_tracking = post_hoc_tracking else: self.heuristic_parameters = None self.post_hoc_tracking = False # even though the user won't actually see anything (except in # offline analysis and debug mode on) this child window is # created to turn off the main window. This will make it # impossible to change anything in the main GUI for the user # while the program is running self.child = tk.Toplevel() self.child.grab_set() # debug mode can be chosen by the user to display three # images while tracking (all only the search box): # the raw image, the thresholded image and on the far right # the raw image with the detected bounding box if debug_mode: # set a title for the window self.child.wm_title('Debug Window') # the function to call if the user presses the 'x' ( # cancel) button on the top right of the debug window self.child.protocol("WM_DELETE_WINDOW", self.on_closing) # self.child.geometry("100x100") # You want the size of # the app to be 500x500 self.child.attributes("-topmost", True) self.title_raw = tk.Label(self.child, text='Original Image') self.title_raw.grid(row=0, column=0) self.title_binary = tk.Label( self.child, text='Binary Image') self.title_binary.grid(row=0, column=1) self.below_binary = tk.Label( self.child, text='Grey indicates Search Box' '\nWhite indicates pixels below threshold (in ' 'search box)' '\nBlack indicates pixels not considered (' 'outside search box)', anchor='w', justify='left') self.below_binary.grid(row=2, column=1) self.title_detected = tk.Label( self.child, text='Detected animal') self.title_detected.grid(row=0, column=2) self.below_detected = tk.Label( self.child, text='Filled Area: ' '\nEccentricity: ' '\nMajor over minor axis: ', anchor='w', justify='left') self.below_detected.grid(row=2, column=2) # a canvas for the raw image # Need to just fix the window size! # resolution = [640, 480] # Window size should never be too large as one can't see # anything. Just fix it assuming that everyone has a # 1024x784 screen nowadays. self.canvas_width = 320 #(1024/3) ~=341, 320 is half the # width of the input frame self.canvas_height = 240 self.child_canvas_top_left = tk.Canvas( self.child, width=self.canvas_width, height=self.canvas_height) self.child_canvas_top_left.grid(row=1, column=0) # disable change of size when image size changes self.child_canvas_top_left.grid_propagate(0) # a canvas for the binary image self.child_canvas_top_middle = tk.Canvas( self.child, width=self.canvas_width, height=self.canvas_height) self.child_canvas_top_middle.grid(row=1, column=1) # disable change of size when image size changes self.child_canvas_top_middle.grid_propagate(0) # a canvas for the raw image with bounding box self.child_canvas_top_right = tk.Canvas( self.child, width=self.canvas_width, height=self.canvas_height) self.child_canvas_top_right.grid(row=1, column=2) self.child_canvas_top_right.grid_propagate(0) # Show the time/frames that is remaining until the window closes self.time_remaining_label = tk.Label(self.child, text='') self.time_remaining_label.grid(row=3, column=0, columnspan=3) self.pause_debug_var = tk.IntVar() self.pause_button = tk.Button(self.child, text='Pause tracking', command=self.pause_debug) self.pause_button.grid(row=2, column=0) else: self.time_remaining_label = None self.child_canvas_top_left = None self.child_canvas_top_middle = None self.child_canvas_top_right = None self.canvas_width = None self.canvas_height = None self.below_detected = None self.pause_debug_var = None # In case of not-testing, just pass the None around. self.loop_time_measurement = None if TESTING: # In case of testing, save the loop time in the # experimental folder self.loop_time_measurement = np.zeros(( recordingtime*recording_framerate,2)) self.loop_time_measurement.fill(np.nan) if RASPBERRY: self.run_experiment() if offline_analysis or simulated_online_analysis: self.offline_analysis_func()
[docs] def offline_analysis_func(self): """ This function is called when the user selects either the "Tools->Analysis->Single Animal tracking" or the "Debug->Simulate Online Tracking" option. It calls the identical animal tracking function as the live version, the only difference being the way the images are being provided. While in the live version, the images are streamed from the camera, in the simulated online version the images are provided as a numpy array. """ try: FastTrackingVidAlg( genotype=self.genotype, recording_framerate=self.recording_framerate, display_framerate=self.display_framerate, resolution=self.resolution, recordingtime=self.recordingtime, initial_data=self.initial_data, boxsize=self.boxsize, signal=self.signal, frames_to_define_orientation=self.frames_to_define_orientation, debug_mode=self.debug_mode, debug_mode_resize=self.resize, repair_ht_swaps=self.repair_ht_swaps, cam=self.cam, dir=self.dir, pixel_per_mm=self.pixel_per_mm, model_organism=self.model_organism, vr_arena=self.vr_arena, # vr_GPIO = None, pwm_object=self.pwm_object, time_dependent_file=self.time_dependent_file, high_power_led_bool=self.high_power_led_bool, offline_analysis=self.offline_analysis, minimal_speed_for_moving=self.minimal_speed_for_moving, organisms_and_heuristics=self.organisms_and_heuristics, post_hoc_tracking=self.post_hoc_tracking, datetime=self.datetime, output_channel_one=self.output_channel_one, output_channel_two=self.output_channel_two, output_channel_three=self.output_channel_three, output_channel_four=self.output_channel_four, simulated_online_analysis=self.simulated_online_analysis, overlay_bool=self.overlay_bool, controller=self.controller, time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection, vr_update_rate=self.vr_update_rate, pwm_range=self.pwm_range, #video_filename='test.yuv', #pts_filename='pts_test.txt', #pi_time_filename='system_time_test.txt', real_time=self.real_time, i_tracking=self.i_tracking, total_frame_number=self.total_frame_number, search_boxes=self.search_boxes, image_raw=self.image_raw, image_thresh=self.image_thresh, image_skel=self.image_skel, bounding_boxes=self.bounding_boxes, centroids=self.centroids, midpoints=self.midpoints, length_skeleton=self.length_skeleton, tails=self.tails, heads=self.heads, endpoints=self.endpoints, ht_swap=self.ht_swap, stimulation=self.stimulation, heuristic_parameters=self.heuristic_parameters, time_remaining_label=self.time_remaining_label, child_canvas_top_left=self.child_canvas_top_left, child_canvas_top_middle=self.child_canvas_top_middle, child_canvas_top_right=self.child_canvas_top_right, child = self.child, canvas_height = self.canvas_height, canvas_width = self.canvas_width, below_detected = self.below_detected, pause_debug_var = self.pause_debug_var ) except Exception as caught_error: self.error_message_func(error_stack=caught_error) # Note how many frames have been recorded in what time captured_images = np.count_nonzero(self.real_time) expected_no_of_images = self.recordingtime \ * self.recording_framerate print('Crashed at frame ' + repr(captured_images) + ' of ' + repr(expected_no_of_images) ) # after either successfully finishing or unexpectedly # finishing early with the offline tracking, call the # function that will save all the data #self.child.after(0, self.after_tracking) # Was the below, had to change it to get the "Tools>Single # Animal Tracking" option for more than one folder to save # the data in the correct folder! #self.child.after(0, self.after_tracking) #self.child.after(300, lambda : self.child.destroy()) self.after_tracking()
[docs] def run_experiment(self): """ This function is called during live tracking on the PiVR. Essentially, it start to record a video but provides a custom output. See `here <https://picamera.readthedocs.io/en/release-1.13/recipes2 .html#custom-outputs>`__. The video records frames in the **YUV** format. See `here <https://picamera.readthedocs.io/en/release-1.13/recipes2 .html#unencoded-image-capture-yuv-format>`__. for explanation of that particular format. YUV was chosen as it encodes a greyscale version of the image (the Y' component) at full resolution (e.g. 307'200bytes for a 640x480) image while the U and the V component, which essentially encode the color of the image only have a quarter of the resolution (e.g. 76'800bytes for a 640x480 image). As the color is anyway discarded, this allows a more efficient usage of the Raspberry Pi's buffer compared to using, for example RGB. """ try: self.cam.start_recording( FastTrackingVidAlg( genotype=self.genotype, recording_framerate=self.recording_framerate, display_framerate = self.display_framerate, resolution=self.resolution, recordingtime=self.recordingtime, initial_data=self.initial_data, boxsize=self.boxsize, signal=self.signal, frames_to_define_orientation=self.frames_to_define_orientation, debug_mode=self.debug_mode, debug_mode_resize=self.resize, repair_ht_swaps=self.repair_ht_swaps, cam=self.cam, dir=self.dir, pixel_per_mm=self.pixel_per_mm, model_organism = self.model_organism, vr_arena = self.vr_arena, #vr_GPIO = None, pwm_object =self.pwm_object, time_dependent_file=self.time_dependent_file, high_power_led_bool=self.high_power_led_bool, offline_analysis=self.offline_analysis, minimal_speed_for_moving=self.minimal_speed_for_moving, organisms_and_heuristics=self.organisms_and_heuristics, post_hoc_tracking=self.post_hoc_tracking, datetime=self.datetime, output_channel_one=self.output_channel_one, output_channel_two=self.output_channel_two, output_channel_three=self.output_channel_three, output_channel_four=self.output_channel_four, simulated_online_analysis=self.simulated_online_analysis, overlay_bool=self.overlay_bool, controller=self.controller, time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection, vr_update_rate = self.vr_update_rate, pwm_range = self.pwm_range, # video_filename = 'test.yuv' # Just in case we ever want to also record video while tracking! real_time=self.real_time, i_tracking = self.i_tracking, total_frame_number=self.total_frame_number, search_boxes = self.search_boxes, image_raw = self.image_raw, image_thresh = self.image_thresh, image_skel = self.image_skel, bounding_boxes = self.bounding_boxes, centroids = self.centroids, midpoints = self.midpoints, length_skeleton = self.length_skeleton, tails = self.tails, heads = self.heads, endpoints = self.endpoints, ht_swap = self.ht_swap, stimulation = self.stimulation, heuristic_parameters = self.heuristic_parameters, loop_time_measurement = self.loop_time_measurement, vr_stim_location = self.vr_stim_location ), format='yuv' ) self.cam.wait_recording(self.recordingtime) except Exception as caught_error: self.cam.preview_window = (0, 0, 180, 180) if self.overlay_bool: self.controller.all_common_variables.overlay.window = ( 0, 0, 180, 180) # todo - write more information, e.g. at what frame did # the experiment stop. This might be useful for analysis. # also todo: What do these cryptic error messages mean? # Have a FAQ ready OR automatically go into traceback # and look for known errors and suggest what the error # could be based on the position. # self.child.after(0, lambda : self.error_message_func(error_stack=caught_error)) self.error_message_func(error_stack=caught_error) finally: # clean up. try: self.cam.stop_recording() except: # Not good to have blank exception, but here it # should relally only # catch PiCameraErrors! pass # Note how many frames have been recorded in what time captured_images = np.count_nonzero(self.real_time) expected_no_of_images = self.recordingtime * self.recording_framerate # Note how many frames have been recorded in what time print('Captured ' + repr(captured_images) + ' of ' + repr(expected_no_of_images) + ' images at %.2ffps' % (self.recording_framerate)) # TODO save this somwhere! Maybe in the json file # TODO also do tkinter messagebox! # after either successfully finishing or unexpectedly # finishing early with the offline tracking, call the # function that will save all the data #self.child.after(0, self.after_tracking) self.child.after(0, self.after_tracking) self.child.after(300, lambda : self.child.destroy())
[docs] def on_closing(self): """ Function to use when the user clicks on the X to close the window. This should never be called in a live experiment as there is simply no option to click to close a window. Will ask if user wants to quit the experiment. Will save the experiment so far """ if tk.messagebox.askokcancel("Cancel", "Do you want to cancel?"): self.offline_analysis_running = False save = tk.messagebox.askquestion("Save?", "Do you want to save\n" "the analysis completed\n" "so far?") if save == 'yes': thc.Save(heads=self.heads, tails=self.tails, centroids=self.centroids, image_skel=self.image_skel, image_raw=self.image_raw, image_thresh=self.image_thresh, background=self.initial_data.smoothed_goodbackground, real_time=self.real_time, pixel_per_mm=self.pixel_per_mm, bounding_boxes=self.bounding_boxes, stimulation=self.stimulation, arena=self.vr_arena, heuristic_data=self.heuristic_parameters, datetime=self.datetime, midpoints=self.midpoints, recording_time=self.recordingtime, framerate=self.recording_framerate, time_dep_stim_file=self.time_dependent_file) # save the data # pause the script to give time to save self.child.after(100) self.experiment_stopped_early = True # Set the main window active again self.child.grab_release() # close the tkinter window self.child.destroy()
[docs] def error_message_func(self, error_stack): """ This function is called if the recording can not continue until the end as defined by framerate * recording_length. It will write the error into a file called "DATE-ERROR.txt" and put it in the experimental folder. """ captured_images = np.count_nonzero(self.real_time) expected_no_of_images = self.recordingtime * self.recording_framerate with open(self.datetime + '_ERROR.txt', 'a') as file: file.write('Unexpected Error at frame ' + repr( captured_images) + ' of ' + repr(int( expected_no_of_images)) + '\n\n') file.write('Traceback (most recent call last): ' + str( error_stack) + '\n\n') if "IndexError: index -1 is out of bounds for axis 0 " \ "with size 0" in traceback.format_exc() \ and "self.filled_area = areas_sorted['filled " \ "area'][-1]" in traceback.format_exc(): file.write('Error #1\n' 'The following 2 lines:' '\n'"self.filled_area = " "areas_sorted['filled area'][-1]" "\nIndexError: index -1 is out of " "bounds for axis 0 with size 0" "\nindicate that the animal was not " "found in the region of interest." "\nThis can happen if the animal moves " "faster than expected." "\nTo circumvent this problem increase " "the 'max_speed_animal_mm_per_s' parameter" "in the 'list_of_available_organisms.json' " "file" "\nAlternatively, your animal might be " "able to hide under some obstruction. " "If that is the case please clear the " "arena so that the camera can always see " "at least parts of the animal" "\n") # elif.... put all the known errors and print possible solutions else: file.write('Error that has not been classfied yet!\n\n') file.write('Full error traceback below: \n\n') file.write(traceback.format_exc()) tk.messagebox.showerror('Error', 'The experiment stopped earlier than requested.' '\nSee the ERROR.txt file in the experiment folder' '\nfor a detailed traceback for debugging purposes' '\n' )
[docs] def after_tracking(self): """ When live tracking is done, the GPIOs must be turned off. Then save the data that was just collected by calling the function 'save' in tracking_help_classes. """ if self.high_power_led_bool: # turn of GPIOs of Channel one - keep in loop instead of just saying GPIO17 off for forward compatibility!! # list comprehension of Channel 1 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_one[i_stim][0], dutycycle=self.pwm_range) for i_stim in range(len(self.output_channel_one))] # list comprehension of Channel 2 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_two[i_stim][0], dutycycle=self.pwm_range) for i_stim in range(len(self.output_channel_two))] # list comprehension of Channel 3 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_three[i_stim][0], dutycycle=self.pwm_range) for i_stim in range(len(self.output_channel_three))] # list comprehension of Channel 4 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_four[i_stim][0], dutycycle=self.pwm_range) for i_stim in range(len(self.output_channel_four))] print('High powered LED are all turned off') else: # turn of GPIOs of Channel one - keep in loop instead of # just saying GPIO17 off for forward compatibility!! # list comprehension of Channel 1 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_one[i_stim][0], dutycycle=0) for i_stim in range(len(self.output_channel_one))] # list comprehension of Channel 2 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_two[i_stim][0], dutycycle=0) for i_stim in range(len(self.output_channel_two))] # list comprehension of Channel 3 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_three[i_stim][0], dutycycle=0) for i_stim in range(len(self.output_channel_three))] # list comprehension of Channel 4 [self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_four[i_stim][0], dutycycle=0) for i_stim in range(len(self.output_channel_four))] print('Normal LED turned off') if self.offline_analysis: # todo why different from online? thc.Save(heads=self.heads, tails=self.tails, centroids=self.centroids, image_skel=self.image_skel, image_raw=self.image_raw, image_thresh=self.image_thresh, background=self.initial_data.smoothed_goodbackground, real_time=self.real_time, pixel_per_mm=self.pixel_per_mm, bounding_boxes=self.bounding_boxes, stimulation=self.stimulation, arena=self.vr_arena, heuristic_data=self.heuristic_parameters, datetime=self.datetime, midpoints=self.midpoints, recording_time=self.recordingtime, framerate=self.recording_framerate ) self.child.destroy() else: self.child.after(0, lambda: thc.Save( heads=self.heads, tails=self.tails, centroids=self.centroids, image_skel=self.image_skel, image_raw=self.image_raw, image_thresh=self.image_thresh, background=self.initial_data.smoothed_goodbackground, real_time=self.real_time, pixel_per_mm=self.pixel_per_mm, bounding_boxes=self.bounding_boxes, stimulation=self.stimulation, arena=self.vr_arena, heuristic_data=self.heuristic_parameters, datetime=self.datetime, midpoints=self.midpoints, time_delay_due_to_animal_detection=self.time_delay_due_to_animal_detection, loop_time=self.loop_time_measurement, recording_time=self.recordingtime, framerate=self.recording_framerate, time_dep_stim_file=self.time_dependent_file ) ) # give 300ms to save, then destroy the child window and # go back to the main window. Should be fine, even if # it takes longer as the main window will just be frozen # until saving is complete self.child.after(300, lambda: self.child.destroy())
def pause_debug(self): if self.pause_debug_var.get(): self.pause_debug_var.set(False) self.pause_button.config(text='Pause tracking') else: self.pause_debug_var.set(True) self.pause_button.config(text='Continue tracking')
[docs]class FastTrackingVidAlg(object): """ This class takes either a camera object (so far only from the RPicamera) or images in a 3D numpy array (y,x and time). When run on the RPi it is assumed it's running a live experiment. The camera framerate will be set to the framerate the user want (if user asks for higher framerate than the camera can give the program will throw an error directly in the GUI). The camera will then deliver each image into an in-memory stream. The images will then be formatted to be in 2D with the right resolution. (For future improvement: To increase speed one could only take the bytes that are actually needed (we do have the search_box)). """ def __init__(self, genotype='Unknown', recording_framerate=2, display_framerate = None, resolution=None, recordingtime=None, initial_data=None, boxsize=20, signal=None, frames_to_define_orientation=5, debug_mode=None, debug_mode_resize=1, repair_ht_swaps=True, cam=None, dir=None, pixel_per_mm=None, model_organism = 'Not in List', vr_arena = None, #vr_GPIO = None, pwm_object = None, time_dependent_file=None, high_power_led_bool=False, offline_analysis=False, minimal_speed_for_moving=0.5, organisms_and_heuristics=None, post_hoc_tracking=False, datetime=None, output_channel_one=[], output_channel_two=[], output_channel_three=[], output_channel_four=[], simulated_online_analysis=False, overlay_bool=False, controller=None, time_delay_due_to_animal_detection=0, vr_update_rate = 1, pwm_range = 40000, video_filename = 'test.yuv', real_time= None, i_tracking = None, total_frame_number = 10, search_boxes = None, image_raw = None, image_thresh = None, image_skel = None, bounding_boxes = None, centroids = None, midpoints = None, length_skeleton = None, tails = None, heads = None, endpoints = None, ht_swap = None, stimulation = None, heuristic_parameters = None, time_remaining_label = None, child_canvas_top_left = None, child_canvas_top_middle = None, child_canvas_top_right = None, child = None, loop_time_measurement = None, canvas_width = None , canvas_height = None, below_detected = None, pause_debug_var = None, vr_stim_location = 'NA' ): #self.video_output = io.open(video_filename, 'wb') self.start_time = None self.total_frame_number = total_frame_number self.i_tracking = i_tracking self.search_boxes = search_boxes self.image_raw = image_raw self.image_thresh = image_thresh self.image_skel = image_skel self.bounding_boxes = bounding_boxes self.centroids = centroids self.midpoints = midpoints self.length_skeleton = length_skeleton self.tails = tails self.heads = heads self.endpoints = endpoints self.ht_swap = ht_swap self.stimulation = stimulation self.heuristic_parameters = heuristic_parameters self.post_hoc_tracking = post_hoc_tracking self.time_remaining_label = time_remaining_label self.child_canvas_top_left = child_canvas_top_left self.child_canvas_top_middle = child_canvas_top_middle self.child_canvas_top_right = child_canvas_top_right self.child = child self.canvas_width = canvas_width self.canvas_height = canvas_height self.below_detected = below_detected self.pause_debug_var = pause_debug_var self.vr_stim_location = vr_stim_location self.display_framerate = display_framerate self.real_time = real_time self.cam = cam if offline_analysis or simulated_online_analysis: self.images = cam # If a post-hoc analsysis is being done it usually takes # several frames until the animal has been identified # and until it moved from its original position. Counter # will take care that the index is correct self.counter = initial_data.counter # initiate the variables that will be used later self.previous_channel_one_value = None self.previous_channel_two_value = None self.previous_channel_three_value = None self.previous_channel_four_value = None self.datetime = datetime self.genotype = genotype self.recording_framerate = recording_framerate self.recording_time = recordingtime if RASPBERRY: # print(self.cam.resolution) # set resolution of the images. Resolution comes a string, # e.g. 640x480 #self.width = int(resolution.split("x")[0]) #self.height = int(resolution.split("x")[1]) self.width, self.height = self.cam.resolution self.offline_analysis = offline_analysis # check if pictures come in as numpy array or as list of single pictures # TODO I think this can be deleted! #if offline_analysis or simulated_online_analysis: # if type(cam) is np.ndarray: # self.images_as_npy = True # elif type(cam) is list: # self.images_as_npy = False # first_roi is an instance of a class - take y (row) min and # max and x (column) min amd max of the search box self.first_row_min = initial_data.first_roi.row_min self.first_row_max = initial_data.first_roi.row_max self.first_col_min = initial_data.first_roi.col_min self.first_col_max = initial_data.first_roi.col_max # and both y and x of the centroid self.first_centroid = [initial_data.first_roi.centroid_row, initial_data.first_roi.centroid_col] # boxsize has been dynamically defined in start_GUI self.boxsize = boxsize # if Raspberry or self.offline_analysis and not post_hoc_tracking: # read background image, also from class instance self.smoothed_background = initial_data.smoothed_goodbackground # elif post_hoc_tracking: # self.smoothed_background = initial_data.mean_image # If animals are illuminated from below they are usually dark # in the camera. If they are illuminated from the side and # if the bottom is black they appear white. Depending on this # experimental parameter the binary image needs to be created # using greater than (if animal is white) or lower than (if # animal is dark) than the calculated threshold. self.signal = signal if self.signal == 'white': self.compare = operator.gt self.box_intensity = 255 elif self.signal == 'dark': self.compare = operator.lt self.box_intensity = 0 else: tk.messagebox.showerror('Error', 'Signal has to be either "bright" or "dark".\n' 'Please adjust code.\n' 'Program will exit after pressing "Ok"') import sys # Todo: this is overkill! Could also just go back to main Gui sys.exit() # If head and tail are being defined several heuristic rules # are needed. If for a number of frames, defined by this # parameter frames_to_define_orientation, the tail moves # backwards, it is assumed that the head/tail classification # is the wrong way around. self.frames_to_define_orientation = frames_to_define_orientation # In order to repair the head/tail swaps, this bool needs to # be True as well. self.repair_ht_swaps = repair_ht_swaps # debug mode is only available in post-hoc analysis self.debug_mode = debug_mode if self.debug_mode: self.current_animal_characteristics = None # TODO Delete if not nercessary! #if self.offline_analysis or simulated_online_analysis: # self.images = cam # # If a post-hoc analsysis is being done it usually takes ## several frames until the animal has been identified # # and until it moved from its original position. counter # will take care that the index is correct # self.counter = initial_data.counter # the directory of the experiment self.path = dir # turn bool on if debug mode is requested self.debug_mode = debug_mode if debug_mode: self.resize = int(debug_mode_resize) self.pixel_per_mm = pixel_per_mm # Stimulation # take both the vr_arena and time_dep_file self.vr_arena = vr_arena self.time_dependent_stim_file = time_dependent_file if self.vr_arena is not None: self.VR_ARENA = True # self.VR_GPIO = vr_GPIO elif time_dependent_file is not None: self.time_dependent_stim = True # self.VR_GPIO = vr_GPIO self.VR_ARENA = False else: print('vr arena not in tracking file') self.VR_ARENA = False self.time_dependent_stim = False # self.VR_GPIO = None self.pwm_object = pwm_object # get the lists with the output self.output_channel_one = output_channel_one self.output_channel_two = output_channel_two self.output_channel_three = output_channel_three self.output_channel_four = output_channel_four # access to overlay to change size self.overlay_bool = overlay_bool self.controller = controller # get the time it took to detect the animal - will be saved # for user access later on self.time_delay_due_to_animal_detection = time_delay_due_to_animal_detection # heuristics might hold for a large number of model # organisms! Initiate heads and tails always # if self.Drosophila_larva: self.heads = heads self.tails = tails # Initiate centroid a self.centroids = centroids self.midpoints = midpoints # After how many frames should the multidimensional (3rd # Dimension = Time) arena be update? self.vr_arena_multidimension_update = (1 / vr_update_rate) / (1 / self.recording_framerate) # Todo self.vr_arena_multidimension_counter = 0 self.pwm_range = pwm_range # Needs to be manually entered, but will make sure that the # light is switched off at the end of the experiment self.high_power_LED_bool = high_power_led_bool print('self.high_power_LED_bool: ' + repr(self.high_power_LED_bool)) # initialize an emtpy list self.time_difference = [] # initialize variables for the local image self.smoothed_current_local_image = None # and the local thresholed image self.current_image_thresholded = None # Set this bool to False and only switch if exception happens self.experiment_stopped_early = False if RASPBERRY and not self.offline_analysis: # if live experiment start at index 0 self.start_frame = 0 else: # if not a live experiment, the start frame will be # defined by how many frames have already been used to # characterize the animal self.start_frame = initial_data.counter # what's the last frame self.end_frame = self.total_frame_number - 1 self.search_boxes[self.i_tracking, :] = \ [int(np.round(self.first_row_min - self.boxsize)), int(np.round(self.first_row_max + self.boxsize)), int(np.round(self.first_col_min - self.boxsize)), int(np.round(self.first_col_max + self.boxsize))] # Needed for head/tail classification - how fast, # in mm/seconds, does the animal need to move to count it as # 'moving'. Empirical parameter self.minimal_speed_for_moving = minimal_speed_for_moving # while trying to assign head and tail there are 2 heuristics # that need to be fullfilled: There can only be two endpoints # in the skeleton and 2) the aspect ratio must be above ( # currently) 1.25 (Todo > make either dynamic or user # changeable). Whenever these heuristics are not True, # skip that frame for head/tail assignment for now and fix # it as soon as a head/tail can be assigned again. self.to_fix = [] # the 'i' of the for loop that will run during the actual # experiment #self.i_tracking = 0 # create an in memory Bytes stream. Will be used by the # camera to deliver images # TODO Delete if shonw not to necessary! #self.stream = io.BytesIO() # initialize array that will be used as image self.array = None self.current_frame = None # if doing offline analysis it is possible that the user want # to see the debug mode. It is also possible the user closes # the window. In that case it is necessary to break the loop, # ideally using a while loop. This is the switch if self.offline_analysis or simulated_online_analysis: self.offline_analysis_running = True # After construction of this class (in if debug_mode, # the window) call either the online tracking function or the # offline tracking function #if Raspberry: # # functions in tkinter should be called with the .after # method, not directly! 0 stands for time in ms # self._tracking = self.child.after(0, self.online_tracking) if offline_analysis or simulated_online_analysis: # # self._tracking = self.child.after(0, # # self.offline_tracking) # # if this class is called more than once, e.g because I # # want to to batch analyze some videos, the # # after method waits too long, i.e. the main GUI can do # # stuff (like read the next video) before the function # # is actually called! self.offline_tracking_func() self.loop_time_measurement = loop_time_measurement #if TESTING: # print('start_recording now') # self.online_tracking() def offline_tracking_func(self): while self.offline_analysis_running: if self.counter == self.images.shape[2]-1: self.offline_analysis_running = False start_time = time.time() print('Working on Frame#' + repr(self.i_tracking)) self.animal_tracking() # allow for the debug mode to show if self.debug_mode: self.update_debug() if self.pause_debug_var.get(): self.child.wait_variable(self.pause_debug_var) self.i_tracking += 1 if self.debug_mode: # time to wait analysis_time_s = time.time() - start_time time_to_wait_ms = int(round( (1/self.display_framerate - analysis_time_s) * 1000 )) if time_to_wait_ms > 0: self.child.after(time_to_wait_ms) else: pass
[docs] def write(self, buf): """ This function is called by the Custom output of the `picamera video recorder <https://picamera.readthedocs.io/en/release-1.13/recipes2.html#custom-outputs>`_. and (1) prepares the image for the tracking algorithm and (2) calls the the tracking function: :func:`animal_tracking`. **Image preparation** #. Receive the buffer object prepared by the GPU which contains the YUV image and put it into an numpy array in uint8 number space. #. Shorten the array to the Y values. As currently only 640x480px images can be used the array is shortened to 307'200bytes (from 460'800byes) #. The image, which so far has just been a 1D stream of uint8 values is then organized into the 2D image. #. Save the (GPU -> real time) timestamp of the current frame. #. Call the :func:`animal_tracking` function. """ if TESTING: try: self.loop_time_measurement[self.i_tracking, 0] = time.time() except IndexError: pass # go to beginning of the memory object #buf.seek(0) # get the interior of the yuv buffer as uint8 #array = np.fromstring(buf.getvalue(), dtype=np.uint8) array = np.frombuffer(buf, dtype=np.uint8) ## Get the Y (luminence) values. Discard the two chrominance values (U and V) array = array[0:int(self.width * self.height)] #array = self.array[0:int(self.width * self.height)] ## organize as the image with the desired resolution self.array = array.reshape((self.height, self.width)) #self.video_output.write(buf) if self.cam.frame.complete and self.cam.frame.timestamp: if self.start_time is None: self.start_time = self.cam.frame.timestamp try: self.real_time[self.i_tracking] = self.cam.frame.timestamp \ - self.start_time except IndexError: # if too many frames - TODO make better pass if self.i_tracking < self.real_time.shape[0]: # Had a bunch of error as the self.recording_wait seems # to be rather imprecise therefore only call the # animal_tracking function if there's still space in the # arrays (otherwise it's always gonna throw an error...) self.animal_tracking() if TESTING: try: self.loop_time_measurement[self.i_tracking, 1] = time.time() except IndexError: pass self.i_tracking +=1
[docs] def animal_tracking(self): """ Main function in single animal tracking. After detection in :meth:`Pre-Experiment` of the animal this function will be called on each frame to: #. Identify the animal, #. Define where to look for the animal in the next frame #. Define head, tail, centroid and midpoint #. If requested, present a stimulus by changing the dutycycle on the requested GPIO Below a the list in a bit more detail: #. Ensure that the search box is not outside the image. #. Subtract the current search box image from the background search box. #. Calculate the threshold to binarize the subtracted image. #. Use the regionprops function of the scikit-image library to find blobs http://scikit-image.org/docs/dev/api/skimage.measure.html#skimage.measure.regionprops #. Select the largest blob as the animal #. Define the NEXT Search Box #. Save the current bounding box, centroid position and the raw image. #. Skeletonize the binary image and find the endpoints. #. By comparing the endpoint positions to the position of the previous tail position, assign the closer endpoint as the the tail. #. If virtual reality experiment: Use the head position to define position in virtual space and update stimulus in Channel 1 accordingly using a change in dutycycle of the GPIO. #. If time dependent stimulus: Update the dutycyle for all the defined channels. """ if RASPBERRY: # print('Frame' + repr(i_tracking)) # with PiRGBArray(self.camera) as output: # self.camera.capture(output, 'rgb', use_video_port=True) current_frame = self.array else: current_frame = self.images[:, :, self.counter] self.counter += 1 # on the edge - this becomes necessary when the animal is # close to the boundary of the frame. Due to the boxsize it # can happen that the animal is still well in the frame, # but the boxsize ask the program to look for the larva # outside of the frame - which will lead to an error if self.search_boxes[self.i_tracking, 0] < 0: self.search_boxes[self.i_tracking, 0] = 0 if self.search_boxes[self.i_tracking, 1] \ > self.smoothed_background.shape[0]: self.search_boxes[self.i_tracking, 1] \ = self.smoothed_background.shape[0] - 1 if self.search_boxes[self.i_tracking, 2] < 0: self.search_boxes[self.i_tracking, 2] = 0 if self.search_boxes[self.i_tracking, 3] \ > self.smoothed_background.shape[1]: self.search_boxes[self.i_tracking, 3] \ = self.smoothed_background.shape[1] # filter the image to get rid of camera noise. Only take the # search box self.smoothed_current_local_image = ndimage.filters.gaussian_filter( thc.CallBoundingBox(current_frame, self.search_boxes[self.i_tracking, :] ).sliced_image, sigma=1) # take only the slice from the backgroudn image that is # necessary to compare smoothed_background_local_image = thc.CallBoundingBox( self.smoothed_background, self.search_boxes[self.i_tracking, :] ).sliced_image # We have to change the datatype of the numpy array from # originally unsigned int 8 (goes from 0 to 255) to signed # int 16 (goes from -32768 to 32767). The reason being that # if we subtract a two uint8 pictures in case we have 200-201 # = 255 while 200 - 199 = 1. This leads the histogram of # intensites to have 2 background peaks, one around 0 and the # other around 255. In int16 space, on the other hand, # we'll have the background mean at around 0 while the animal # will be in the negative range subtracted_current_frame = self.smoothed_current_local_image.astype( np.int16) - smoothed_background_local_image.astype(np.int16) # calculate the local threshold by calculating the mean pixel # intensity (of the subtracted small image) and subtracting # from it 3 times the standard deviation of the subtracted # small image. The *3* times was chosen empirically. current_thresh = thc.MeanThresh(subtracted_current_frame, self.signal, 3) # if len(regionprops(label(subtracted_current_frame # > current_thresh.thresh))) == 0: # # In case there are zero connected pixels, we assume # that's because the 3 STDs are too much and go with # # only two # todo, really? Only see the invert=True as different! # current_thresh = thc.MeanThresh(subtracted_current_frame, # self.signal, 3,invert=True) # calculate the binary local image self.current_image_thresholded = self.compare( subtracted_current_frame, current_thresh.thresh) # use the regionprops function to identify blobs and characterize them # http://scikit-image.org/docs/dev/api/skimage.measure.html#skimage.measure.regionprops animal_properties_current = regionprops(label( self.current_image_thresholded)) # find the LARGEST blob in the search image and set it as the animal current_animal = thc.DescribeLargestObject( animal_properties_current, self.search_boxes[self.i_tracking, 0::2]) if self.debug_mode: self.current_animal_characteristics = current_animal # The if makes sure that in the offline case the last frame # that was recorded can still be analyzed. if not self.i_tracking == self.end_frame: # use the bounding box of the largest blob (defined as # animal) plus a given boxsize as the next search box self.search_boxes[self.i_tracking + 1, :] \ = [int(np.round(current_animal.row_min - self.boxsize)), int(np.round(current_animal.row_max + self.boxsize)), int(np.round(current_animal.col_min - self.boxsize)), int(np.round(current_animal.col_max + self.boxsize))] # get the current width and height of the bounding box current_width = current_animal.row_max - current_animal.row_min current_height = current_animal.col_max - current_animal.col_min # check if the detected blob that has been defined as the # animal can be saved in the preallocated array If not, # only display an error message if debug mode is on as this # will completely halt the experiment until the user clicks ok! if current_width > self.image_raw.shape[0] \ or current_height > self.image_raw.shape[1]: if self.debug_mode: tk.messagebox.showerror( 'Blob too large', 'The expected size of the animal is smaller than:' + repr(self.image_raw.shape[0]) + ' x ' + repr( self.image_raw.shape[1]) + 'pixel\n' 'The detected blob, however, is ' + repr( current_width) + ' pixels wide and\n' + repr(current_height) + ' pixels high.\n' 'The pixel/mm is set to ' + repr( self.pixel_per_mm) + '.\n' 'This image will not be saved!\n' 'In the future please increase ' 'the expected size of the animal', parent=self.child) else: # save the raw image in the preallocated image array. # This will fail if your animal is bigger than 2* the # bounding box! self.image_raw[0:current_width, 0:current_height, self.i_tracking] = \ thc.CallImageROI(current_frame, current_animal).small_image.copy() # save the binary image in the preallocated image array # todo - get rid of this and just save the value for the # threshold. Can be reconstructed later if user wants to # view it. Will save a lot of memory. self.image_thresh[0:current_width, 0:current_height, self.i_tracking] = \ thc.CallImageROI(self.current_image_thresholded, current_animal, sliced_input_image=self.search_boxes[self.i_tracking, 0::2]).small_image.copy() ################################################################ # the call below shouldn't be necessary because the image is # already smoothed! but it might be advantagous in difficult # environments - but it's definitely going to cost CPU power! # Note: if turned on, get rid of the following function # because it would overwrite the image again! # self.image_thresh[0:current_width, 0:current_height, i_tracking] = \ # binary_fill_holes(thc.CallImageROI(self.current_image_thresholded, current_animal, # sliced_input_image=search_boxes[i_tracking, 0::2]).small_image).copy() # Todo: maybe implement a button so that user can choose? ################################################################ # save the bounding box coordinates. This is necessary to # reconstruct the image as only this part of the image is # being saved. self.bounding_boxes[:, self.i_tracking] = \ [current_animal.row_min, current_animal.row_max, current_animal.col_min, current_animal.col_max].copy() # ... and the centroid self.centroids[self.i_tracking, :] = \ current_animal.centroid_row, current_animal.centroid_col # identify head and tail # find skeleton if THINING_FUNCTION == 'thin': # todo - find a better way # and test before publication if necessary! self.image_skel[:, :, self.i_tracking] \ = morphology.thin(self.image_thresh[:, :, self.i_tracking]) else: self.image_skel[:, :, self.i_tracking] \ = morphology.skeletonize(self.image_thresh[:, :, self.i_tracking]) # how many points are there in the skeleton (also gives me the length) self.length_skeleton[self.i_tracking] = len(np.nonzero( self.image_skel[:, :, self.i_tracking])[0]) # todo: This is very explicit - change? skeleton_y = np.nonzero(self.image_skel[:, :, self.i_tracking])[0] skeleton_x = np.nonzero(self.image_skel[:, :, self.i_tracking])[1] # this loop will give me an array, which is organized like the current_skeleton, and tells me how many # points each point is connected to +1 in all 4 directions possible. Taken from original SOS. connect = np.zeros(self.length_skeleton[self.i_tracking]) for i_skel in range(self.length_skeleton[self.i_tracking]): connect[i_skel] = np.sum( np.logical_and(skeleton_x >= skeleton_x[i_skel] - 1, np.logical_and(skeleton_x <= skeleton_x[i_skel] + 1, np.logical_and(skeleton_y >= skeleton_y[i_skel] - 1, skeleton_y <= skeleton_y[i_skel] + 1) ))) skeleton_end_points = np.where(connect == 2)[0] aspect_ratio = current_animal.major_axis / current_animal.minor_axis if len(skeleton_end_points) == 2: self.endpoints[0, :, self.i_tracking] = \ current_animal.row_min + skeleton_y[skeleton_end_points[0]], \ current_animal.col_min + skeleton_x[skeleton_end_points[0]] self.endpoints[1, :, self.i_tracking] = \ current_animal.row_min + skeleton_y[skeleton_end_points[1]], \ current_animal.col_min + skeleton_x[skeleton_end_points[1]] # TODO: Test this - I feel something is wrong # Try: not adding 1 and 0.5 to the temp_midpoint. # Try: rounding (not just int) when looking for skeleton index # Use "illustrate_midpoint_vs_centroid" jupyter notebook to # sanity check! # need to add midpoint of the skeleton # take half the length of the skeleton temp_midpoint = self.length_skeleton[self.i_tracking] / 2 # if the skeleton length is an even number add one (e.g. the # center point of 4 points can either be position 2 # or position 3.) if temp_midpoint % 2 == 0: temp_midpoint += 1 # The skeleton length is an odd number, add half (e.g. # center point of 5 numbers is 3 else: temp_midpoint += 0.5 try: self.midpoints[self.i_tracking, :] = \ current_animal.row_min + skeleton_y[int(temp_midpoint)], \ current_animal.col_min + skeleton_x[int(temp_midpoint)] except IndexError: # if animal is extremely small! self.midpoints[self.i_tracking, :] = \ current_animal.row_min, current_animal.col_min # rules to be able to get to assign head and tail to an image: # 1): aspect ratio (major axis / minor axis of the animal # must be higher than a given value # 2): We need exactly two endpoints on the skeleton (when one # has holes in the binary image it can become a # a circular skeleton # 3): We need a minimum length of the skeleton. if aspect_ratio > 1.25 and len(skeleton_end_points) == 2 and \ self.length_skeleton[self.i_tracking] > 1 / \ 2 * np.nanmean(self.length_skeleton[self.i_tracking - 3:self.i_tracking]): # in case we have not yet assigned the head, at the # beginning of the experiment which endpoint has the # shortest distance to the centroid of the original larva # = tail if self.i_tracking == self.start_frame: # if self.tails[self.i_tracking-1,0] == 0: if np.linalg.norm( self.first_centroid - self.endpoints[0, :,self.i_tracking]) \ < np.linalg.norm(self.first_centroid - self.endpoints[1, :, self.i_tracking]): # endpoint 0 is the tail self.tails[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] else: self.tails[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] elif self.tails[self.i_tracking - 1, 0] == 0: # This happens when you have a donut shaped larva in the frames before. # idea: The last verified centroid should always be closer to the tail than the head, which can move # much more if np.linalg.norm(self.endpoints[0, :, self.i_tracking] - self.centroids[self.to_fix[0] - 1, :]) \ < np.linalg.norm(self.endpoints[1, :, self.i_tracking] - self.centroids[self.to_fix[0] - 1, :]): self.tails[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] else: self.tails[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] # we'll also fix, in retrospect, the lost self.heads and self.tails with the centroid coordinate # to easily be able to plot afterwards self.tails[self.to_fix, :] \ = self.centroids[self.to_fix, :].copy() self.heads[self.to_fix, :]\ = self.centroids[self.to_fix, :].copy() self.to_fix = [] else: if np.linalg.norm( self.tails[self.i_tracking - 1, :] - self.endpoints[0, :, self.i_tracking]) \ < np.linalg.norm(self.tails[self.i_tracking - 1, :] - self.endpoints[1, :, self.i_tracking]): self.tails[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] else: self.tails[self.i_tracking, :] \ = self.endpoints[1, :, self.i_tracking] self.heads[self.i_tracking, :] \ = self.endpoints[0, :, self.i_tracking] # the if clause below checks if the tail is in front of # the the centroid, essentially checking if there's a # potential head/tail swap in the assignment. # First if just checks if there have been enough frames # collected already to compare with the current frame. if self.i_tracking > self.recording_framerate: # First we calculate the distance of the centroid # between the past 1 second and the current frame. # This will be needed to calculate speed distance_centroid = np.linalg.norm( self.centroids[self.i_tracking - self.recording_framerate, :] - self.centroids[self.i_tracking, :]) # print('Distance in pixels: ' + repr(distance_centroid)) # print('Distance in mm: ' + repr(distance_centroid/self.pixel_per_mm)) # next there is a filter for the minimum speed of the # centroid needs to have in order to consider the # animal 'running'. For now this is set to 0.5mm/s # speed mm per sec = distance centroid / px_per_mm / framerate current_centroid_speed = (distance_centroid / self.pixel_per_mm) \ / self.recording_framerate if self.tails[self.i_tracking - 1, 0] != 0 \ and self.tails[self.i_tracking, 0] != 0 \ and current_centroid_speed > 0.25: # We'll check if a tail has been asigned or if # the curvature of the animal was too great. # Then it checks if the distance travelled was # enough to justify looking for a H/T swap # This first checks in which direction the # centroid is traveling relative to the frame # before: # theta_centroid = centroid _current - centroid_past direction_centroid = np.arctan2( self.centroids[self.i_tracking,0] - self.centroids[ self.i_tracking - self.frames_to_define_orientation, 0], self.centroids[ self.i_tracking, 1] - self.centroids[self.i_tracking - self.frames_to_define_orientation, 1]) # if the centroiddirection of centroid if the # trail is in front of the centroid (H/T swa is # in front of the tail, we'll always get a # similar angle as the y) we should get # the opposite angle theta_tail = centroid_current # - tail_current direction_tail_minus_centroid = np.arctan2( self.centroids[self.i_tracking, 0] - self.tails[self.i_tracking, 0], self.centroids[self.i_tracking, 1] - self.tails[self.i_tracking, 1]) # Next we'll normalize, i.e. we bring the # movement of the centroid onto the horizonal axis # and let the direction of the tail relative to # the centroid follow if direction_centroid \ - direction_tail_minus_centroid \ < -np.pi or direction_centroid \ - direction_tail_minus_centroid \ > np.pi: normalized_angle = direction_centroid \ + direction_tail_minus_centroid else: normalized_angle = direction_centroid \ - direction_tail_minus_centroid # Now we can just check if the normalized angle is # bigger or smaller than given freedom we give it if normalized_angle > 1 / 2 * np.pi \ or normalized_angle < - 1 / 2 * np.pi: print('found HT swap frame ' + repr(self.i_tracking)) print('movement: ' + repr(direction_centroid)) print('tail dir: ' + repr(direction_tail_minus_centroid)) self.ht_swap[self.i_tracking] = 1 if self.repair_ht_swaps: # In order to be robust against noise the # heat tail swap needs to persistent for # at least one second - Todo Results for # framerate 1 may be problematic! if (self.ht_swap[self.i_tracking - self.recording_framerate:self.i_tracking] == 1).all(): print('now I could repair the ht swaps') tails_temp = np.zeros(( self.recording_framerate, 2), dtype=np.int16); heads_temp = np.zeros(( self.recording_framerate, 2), dtype=np.int16) np.copyto(tails_temp, self.heads[int( self.i_tracking - self.recording_framerate + 1):self.i_tracking + 1, :]) np.copyto(heads_temp, self.tails[int( self.i_tracking - self.recording_framerate + 1):self.i_tracking + 1, :]) np.copyto( self.tails[int( self.i_tracking - self.recording_framerate + 1):self.i_tracking + 1, :], tails_temp) np.copyto(self.heads[int( self.i_tracking - self.recording_framerate + 1):self.i_tracking + 1, :], heads_temp) self.ht_swap[self.i_tracking - self.recording_framerate + 1:self.i_tracking + 1] = 0 else: # need to assign the centroid position to the head/tail - # otherwise in the VR setting we have huge jumps! self.tails[self.i_tracking, :] = self.centroids[self.i_tracking, :] self.heads[self.i_tracking, :] = self.centroids[self.i_tracking, :] self.to_fix.append(self.i_tracking) if self.VR_ARENA: if len(self.vr_arena.shape) > 2: # If the VR_arena is a multidimensional array assume # that the last index(-1) is a time index. # First we need to update the counter if it is # appropriate to do so using the modulo operator only # update if i_tracking is not zero! otherwise the # first programmed arena will never be shown! if self.i_tracking != 0: if self.i_tracking \ % self.vr_arena_multidimension_update == 0: self.vr_arena_multidimension_counter += 1 if self.vr_stim_location == 'Head': current_stim_temp = self.vr_arena[ self.heads[self.i_tracking, 0], self.heads[self.i_tracking, 1], self.vr_arena_multidimension_counter % self.vr_arena.shape[2] ] elif self.vr_stim_location == 'Centroid': current_stim_temp = self.vr_arena[ self.centroids[self.i_tracking, 0], self.centroids[self.i_tracking, 1], self.vr_arena_multidimension_counter % self.vr_arena.shape[2] ] elif self.vr_stim_location == 'Midpoint': current_stim_temp = self.vr_arena[ self.midpoints[self.i_tracking, 0], self.midpoints[self.i_tracking, 1], self.vr_arena_multidimension_counter % self.vr_arena.shape[2] ] elif self.vr_stim_location == 'Tail': current_stim_temp = self.vr_arena[ self.tails[self.i_tracking, 0], self.tails[self.i_tracking, 1], self.vr_arena_multidimension_counter % self.vr_arena.shape[2] ] # This should be relatively save - if user presents a # uint16 arena this will throw an error as soon as # as the pwm_range is exceeded! current_stim = current_stim_temp * self.pwm_range / 255 # It is currently not possible to use a arena coded # in uint16 number space! But here would be a good # point to start changing the code if needed! # Normalize to 65535 instead of 255! for i_stim in range(len(self.output_channel_one)): # then we just call the same function but with a # different slice, again using the modulo # operator. This means that if the index we're # trying to call is higher than the available # indices we just start again at 0 self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_one[i_stim][0], dutycycle=current_stim) self.stimulation[self.i_tracking] = current_stim_temp else: if self.vr_stim_location == 'Head': current_stim_value = self.vr_arena[ self.heads[self.i_tracking, 0], self.heads[self.i_tracking, 1]] elif self.vr_stim_location == 'Centroid': current_stim_value = self.vr_arena[ self.centroids[self.i_tracking, 0], self.centroids[self.i_tracking, 1]] elif self.vr_stim_location == 'Midpoint': current_stim_value = self.vr_arena[ self.midpoints[self.i_tracking, 0], self.midpoints[self.i_tracking, 1]] elif self.vr_stim_location == 'Tail': current_stim_value = self.vr_arena[ self.tails[self.i_tracking, 0], self.tails[self.i_tracking, 1]] # dont always update - only if value changed - at # least in high intensity light I clearly see # flickering which might come from the updating. ''' if self.previous_channel_one_value \ != self.vr_arena[self.heads[self.i_tracking, 0], self.heads[self.i_tracking, 1]]: ''' if self.previous_channel_one_value != \ current_stim_value: # print('value changed') # Todo: Have to write this in the instructions: # VR arena is always on Channel 1! for i_stim in range(len(self.output_channel_one)): ''' # print(self.output_channel_one[i_stim][0]) self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_one[i_stim][0], dutycycle=self.vr_arena[self.heads[self.i_tracking, 0], self.heads[self.i_tracking, 1] ]) ''' self.pwm_object.set_PWM_dutycycle( user_gpio=self.output_channel_one[i_stim][0], dutycycle=current_stim_value) self.stimulation[self.i_tracking] = current_stim_value self.previous_channel_one_value = current_stim_value elif self.time_dependent_stim: try: self.previous_channel_one_value = \ self.update_pwm_dutycycle_time_dependent( previous_channel_value=self.previous_channel_one_value, output_channel_list=self.output_channel_one, output_channel_name='Channel 1') except KeyError: pass # repeat for channel 2 try: self.previous_channel_two_value = \ self.update_pwm_dutycycle_time_dependent( previous_channel_value=self.previous_channel_two_value, output_channel_list=self.output_channel_two, output_channel_name='Channel 2') except KeyError: pass # repeat for channel 3 try: self.previous_channel_three_value = \ self.update_pwm_dutycycle_time_dependent( previous_channel_value=self.previous_channel_three_value, output_channel_list=self.output_channel_three, output_channel_name='Channel 3') except KeyError: pass # and finally for channel 4 try: self.previous_channel_four_value = \ self.update_pwm_dutycycle_time_dependent( previous_channel_value=self.previous_channel_four_value, output_channel_list=self.output_channel_four, output_channel_name='Channel 4') except KeyError: pass if self.post_hoc_tracking: self.heuristic_parameters[0, self.i_tracking] \ = current_animal.filled_area self.heuristic_parameters[1, self.i_tracking] \ = current_animal.filled_area / self.pixel_per_mm self.heuristic_parameters[2, self.i_tracking] \ = current_animal.major_axis / current_animal.minor_axis self.heuristic_parameters[3, self.i_tracking] \ = current_animal.eccentricity self.heuristic_parameters[4, self.i_tracking] \ = self.length_skeleton[self.i_tracking] # this is inefficient, but very explicit! self.heuristic_parameters[5, self.i_tracking] \ = self.heuristic_parameters[4, self.i_tracking] / \ self.pixel_per_mm # Only calculate speed at second analyzed frame! if self.i_tracking > self.start_frame: # First, calculate the speed per frame just by # calculating the distance between the current centroid # position and the previous centroid position. This # will yield the distance in pixel/frame self.heuristic_parameters[6, self.i_tracking] \ = np.linalg.norm( self.centroids[self.i_tracking, :] - self.centroids[self.i_tracking - 1, :]) # this is inefficient, but very explicit! # then get the distance in mm per frame by dividing # the above by the pixel per mm self.heuristic_parameters[7, self.i_tracking] \ = self.heuristic_parameters[6, self.i_tracking] \ / self.pixel_per_mm # this is inefficient, but very explicit! # then get the distance in mm per second by # multiplying the above by the recording framerate self.heuristic_parameters[8, self.i_tracking] \ = self.heuristic_parameters[7, self.i_tracking] \ * self.recording_framerate # should be covered by frame.timestamp - TEST! if RASPBERRY: pass # self.real_time[self.i_tracking] = time.time() # if the experiment is analyzed afterwards, there is no way # of knowing what the real time was, so it is just assumed # that the camera was perfect and gave exactly the framerate that was requested elif not RASPBERRY: if self.i_tracking == self.start_frame: self.delay_due_to_detection = self.i_tracking \ / self.recording_framerate # self.real_time[self.i_tracking] = time.time() self.real_time[self.i_tracking] = \ self.i_tracking / (self.recording_framerate - self.time_delay_due_to_animal_detection)
[docs] def error_message_func(self, error_stack): ''' Let user know that something went wrong! :return: ''' captured_images = np.count_nonzero(self.real_time) expected_no_of_images = self.total_frame_number with open(self.datetime + '_ERROR.txt', 'a') as file: file.write('Unexpected Error at frame ' + repr( captured_images) + ' of ' + repr(int( expected_no_of_images)) + '\n\n') file.write('Traceback (most recent call last): ' + str( error_stack) + '\n\n') if "IndexError: index -1 is out of bounds for axis 0 with size 0" in traceback.format_exc() and \ "self.filled_area = areas_sorted['filled area'][-1]" in traceback.format_exc(): file.write('Error #1\n' 'The following 2 lines:\n' "self.filled_area = areas_sorted['filled area'][-1]\n" "IndexError: index -1 is out of bounds for axis 0 with size 0\n" 'indicate that the animal was not found in the region of interest.\n' 'This can happen if the animal moves faster than expected.\n' 'To circumvent this problem increase the "max_speed_animal_mm_per_s" parameter' 'in the "list_of_available_organisms.json" file\n' 'Alternatively, your animal might be able to hide under some obstruction. If that is the case' 'please clear the arena so that the camera can always see at least parts of the animal\n') # elif.... put all the known errors and print possible solutions else: file.write('Error that has not been classfied yet!\n\n') file.write('Full error traceback below: \n\n') file.write(traceback.format_exc()) tk.messagebox.showerror('Error', 'The experiment stopped earlier than requested.\n' 'See the ERROR.txt file in the experiment folder\n' 'for a detailed traceback for debugging purposes\n' )
[docs] def update_debug(self): """ This will only work in post-hoc analysis, NOT on the Raspberry Pi. In principle we could implement a ton more information, specifially we can always print: 1) filled area 2) eccentricity 3) major over minor axis Might be good for visualization, but these parameters are anyway saved if the user wants them. """ try: # display how many frames are left for the analysis self.time_remaining_label.configure( text='Frames remaining: ' + repr(self.end_frame-self.i_tracking), font='Helvetica 14 bold' ) original_image = self.images[:,:,self.counter].copy() original_image_resized = resize_image( image=original_image, output_shape=(self.canvas_height, self.canvas_width), preserve_range=True, mode='reflect') # in order to display an image in labels and frames, # the PhotoImage method is called. It can take a numpy array self.photo_raw = ImageTk.PhotoImage( image=Image.fromarray(original_image_resized)) # set the PhotoImage object into the top left canvas self.child_canvas_top_left.create_image( 0, 0, image=self.photo_raw, anchor=tk.NW) # The binary image...The tracking algorithm does not # subtract the whole but only part of the image. # Specifically the search box. # To keep the plot tidy (constant width/height) it would # be good to show the whole image, indicate the search # box and how its binary binary_image = np.zeros((original_image.shape[0], original_image.shape[1] )) # The binary image is first copied. Has to be uint8 as # PhotoImage wants 0s to display black... subtracted_image = self.current_image_thresholded.astype(np.uint8).copy() # .. and 255s to display white subtracted_image[np.where(subtracted_image == 0)] = 127 subtracted_image[np.where(subtracted_image == 1)] = 255 sb_row_min = self.search_boxes[self.i_tracking,0] sb_row_max = self.search_boxes[self.i_tracking,1] sb_col_min = self.search_boxes[self.i_tracking,2] sb_col_max = self.search_boxes[self.i_tracking,3] binary_image[sb_row_min:sb_row_max, sb_col_min:sb_col_max] = subtracted_image.copy() binary_image_resized = resize_image( image=binary_image, output_shape=(self.canvas_height, self.canvas_width), preserve_range=True, mode='reflect') # create the PhotoImage object self.photo_subtracted = ImageTk.PhotoImage( image=Image.fromarray(binary_image_resized)) # and set it in the middle canvas self.child_canvas_top_middle.create_image( 0, 0, image=self.photo_subtracted, anchor=tk.NW) self.below_detected.configure( text='Filled Area: ' + repr( self.current_animal_characteristics.filled_area) + '\nEccentricity: ' + repr( self.current_animal_characteristics.eccentricity) + '\nMajor over minor axis: ' + repr( self.current_animal_characteristics.major_axis /self.current_animal_characteristics.minor_axis)) # The image on the right will be the raw image with the # detected blob drawn a bounding box around it detected_raw_box = self.images[:, :, self.counter].copy() # draw top horizontal line rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]), int(self.bounding_boxes[2, self.i_tracking]), int(self.bounding_boxes[0, self.i_tracking]), int(self.bounding_boxes[3, self.i_tracking])) detected_raw_box[rr, cc] = self.box_intensity # draw right vertical line rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]), int(self.bounding_boxes[3, self.i_tracking]), int(self.bounding_boxes[1, self.i_tracking]), int(self.bounding_boxes[3, self.i_tracking])) detected_raw_box[rr, cc] = self.box_intensity # draw bottom horizontal line rr, cc = line(int(self.bounding_boxes[1, self.i_tracking]), int(self.bounding_boxes[2, self.i_tracking]), int(self.bounding_boxes[1, self.i_tracking]), int(self.bounding_boxes[3, self.i_tracking])) detected_raw_box[rr, cc] = self.box_intensity # draw left vertical line rr, cc = line(int(self.bounding_boxes[0, self.i_tracking]), int(self.bounding_boxes[2, self.i_tracking]), int(self.bounding_boxes[1, self.i_tracking]), int(self.bounding_boxes[2, self.i_tracking])) detected_raw_box[rr, cc] = self.box_intensity detected_box_resized = resize_image( image=detected_raw_box, output_shape=(self.canvas_height, self.canvas_width), preserve_range=True, mode='reflect') # create a PhotoImage object self.photo_raw_detected = ImageTk.PhotoImage( image=Image.fromarray(detected_box_resized)) # and position it in the right canvas self.child_canvas_top_right.create_image( 0, 0, image=self.photo_raw_detected, anchor=tk.NW) # update the child - without this nothing will be shown. # http://effbot.org/tkinterbook/widget.htm#Tkinter.Widget.update-method # tried update_idletask() > won't show anything, have to # go with update self.child.update() except IndexError: # don't my experiment to break just because debug mode # can't update pass
[docs] def update_pwm_dutycycle_time_dependent(self, previous_channel_value, output_channel_list, output_channel_name): """ A convenience function for the timedependent stimulation. Takes the list with the gpios for a given channel, and, in a for loop, updates gpios according to a given channel. In the first iteration of the loop it will just set the pwm dutcycle according to whatever dutycycle is specified. As this function is called as 'previous_channel_x_value = update_pwm_dutcycle..' it then updates the previous_channel_x_value for the next iteration. :param previous_channel_value: As the GPIO dutcycle should only be updated when the value changes, this holds the previous value :param output_channel_list: list of gpio for a given channel, e.g. GPIO 17 would be [[17,1250]] (1250 is the frequency, not used here) :param output_channel_name: the channel as a string, e.g. 'Channel 1' :return: """ if self.i_tracking == self.start_frame or \ previous_channel_value \ != self.time_dependent_stim_file[output_channel_name][self.i_tracking]: for i_stim in range(len(output_channel_list)): self.pwm_object.set_PWM_dutycycle( user_gpio=output_channel_list[i_stim][0], dutycycle=self.time_dependent_stim_file[output_channel_name][ self.i_tracking]) return (self.time_dependent_stim_file[output_channel_name][self.i_tracking])
[docs] def flush(self): """ Unsure if needed. Test if can do without """ pass
#self.video_output.flush() #self.pts_output.flush() #self.pi_time_output.flush()
[docs] def close(self): """ Unsure if needed. Test if can do without """ pass
#self.video_output.close() #self.pts_output.close() #self.pi_time_output.close()