Source code for control_file

__author__ = 'David Tadres'
__project__ = 'PiVR'

import json
import os
import time
import tkinter as tk
from os import chdir, makedirs
from pathlib import Path

import numpy as np
from scipy.ndimage.interpolation import affine_transform
from import imread

import fast_tracking
import pre_experiment

# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# Since this command only works in Linux it is caught using
# try-except otherwise it's throw an error in a Windows system.
    if os.uname()[4][:3] == 'arm':
        # This will yield True for both a Raspberry and for M1 Chip
        # Apple devices.
        # Use this code snippet
        # (from
        import re
        CPUINFO_PATH = Path("/proc/cpuinfo")
        if CPUINFO_PATH.exists():
            with open(CPUINFO_PATH) as f:
                cpuinfo =
            if"^Model\s*:\s*Raspberry Pi", cpuinfo, flags=re.M) is not None:
                # if True, is Raspberry Pi
                RASPBERRY = True
                LINUX = True
        else: # Test if one more intendation necessary or not. On Varun's computer
            # is Apple M1 chip (or other Arm CPU device).
            RASPBERRY = False
            LINUX = True
        # is either Mac or Linux
        RASPBERRY = False
        LINUX = True

except AttributeError:
    # is Windows
    RASPBERRY = False
    LINUX = False

[docs] class ControlTracking(): """ Whenever the tracking algorithm is called, this class controls first the detection algorithm, prepares the virtual arena if necessary, and then calls the tracking algorithm. """ def __init__(self, boxsize = 20, signal = None, cam = None, base_path = None, genotype = None, recording_framerate = 30, resolution = [640, 480], recordingtime=20, pixel_per_mm = None, model_organism = 'Not in list', display_framerate = None, vr_arena = None, pwm_object = None, placed_animal = None, vr_arena_name = None, offline_analysis=False, time_dependent_stim_file=None, time_dependent_stim_file_name=None, vr_arena_multidimensional=False, high_power_led_bool=False, minimal_speed_for_moving=0.25, observation_resize_variable=1, organisms_and_heuristics=None, post_hoc_tracking=False, debug_mode='OFF', animal_detection_mode='Mode 1', output_channel_one=[], output_channel_two=[], output_channel_three=[], output_channel_four=[], simulated_online_analysis=False, overlay_bool=False, controller=None, background_channel = [], background_2_channel = [], background_dutycycle = 0, background_2_dutycycle = 0, vr_update_rate=1, pwm_range=100, adjust_intensity=100, vr_stim_location='NA', version_info=None, save_centroids_npy=False, save_heads_npy=False, save_tails_npy=False, save_midpoints_npy=False, save_bbox_npy=False, save_stim_npy=False, save_thresh_npy=False, save_skeleton_npy=False, undistort_dst=None, undistort_mtx=None, newcameramtx=None, ): self.boxsize = boxsize if signal == 'white' or signal == 'dark': self.signal = signal else: import sys sys.exit('signal must be "bright" or "dark" not ' + repr(signal)) = cam self.genotype = genotype self.recording_framerate = recording_framerate self.pixel_per_mm = pixel_per_mm self.model_organism = model_organism if display_framerate is not None: self.display_framerate = display_framerate else: self.display_framerate = None self.offline_analysis = offline_analysis self.data_as_npy = True if offline_analysis: if type(cam) is np.ndarray: self.data_as_npy=True elif type(cam) is list: self.data_as_npy=False # elif check if it's a video if RASPBERRY: self.Path = base_path self.resolution = resolution self.recordingtime = recordingtime else: self.Path = cam self.recordingtime = 0 if self.data_as_npy: self.resolution = cam.shape[1], cam.shape[0] else: temp=imread(cam[0]) self.resolution = temp[1], temp[0] if debug_mode == 'ON': self.debug_mode = True elif debug_mode == 'OFF': self.debug_mode = False if vr_arena is not None: #print('vr_arena is True') self.VR_ARENA = True self.vr_arena_matrix = vr_arena self.placed_animal = placed_animal self.vr_arena_name = vr_arena_name #print('max value arena ' + repr(np.amax(vr_arena))) self.time_dependent_stim_file = None self.time_dependent_stim_file_name = None elif time_dependent_stim_file is not None: self.time_dependent_stim=True self.time_dependent_stim_file = time_dependent_stim_file self.time_dependent_stim_file_name = time_dependent_stim_file_name #print('time dependent stimulation selected!') self.VR_ARENA = False self.vr_arena_matrix = None self.vr_arena_name = 'None' else: #print('No Stimulation selected') self.VR_ARENA = False self.vr_arena_matrix = None self.time_dependent_stim_file = None self.vr_arena_name = 'None' self.time_dependent_stim_file = None self.time_dependent_stim_file_name = None self.pwm_object = pwm_object self.overlay_bool = overlay_bool self.controller = controller self.minimal_speed_for_moving = minimal_speed_for_moving # todo - implement in organism file self.observation_resize_variable = observation_resize_variable # bool to track if the setup has the current controller or # transistor to control LED output self.high_power_LED_bool = high_power_led_bool # animal detection mode self.animal_detection_mode = animal_detection_mode # get all the outputchannels as lists self.output_channel_one=output_channel_one self.output_channel_two=output_channel_two self.output_channel_three=output_channel_three self.output_channel_four=output_channel_four self.background_channel = background_channel self.background_2_channel = background_2_channel self.background_dutycycle = background_dutycycle self.background_2_dutycycle = background_2_dutycycle self.vr_update_rate = vr_update_rate self.pwm_range = pwm_range self.adjust_intensity = adjust_intensity self.vr_stim_location = vr_stim_location self.version_info = version_info self.save_centroids_npy = save_centroids_npy self.save_heads_npy = save_heads_npy self.save_tails_npy = save_tails_npy self.save_midpoints_npy = save_midpoints_npy self.save_bbox_npy = save_bbox_npy self.save_stim_npy = save_stim_npy self.save_thresh_npy = save_thresh_npy self.save_skeleton_npy = save_skeleton_npy self.undistort_dst = undistort_dst self.undistort_mtx = undistort_mtx self.newcameramtx = newcameramtx self.initial_data = None self.tracking_data = None self.organisms_and_heuristics = organisms_and_heuristics self.post_hoc_tracking = post_hoc_tracking self.simulated_online_analysis = simulated_online_analysis self.datetime = None self.done_offline_analysis = tk.IntVar() self.done_offline_analysis.set(0) self.start_experiment()
[docs] def adjust_arena(self): """ This function translates and rotates the virtual reality if necessary. It also adjusts the desired stimulus intensity. For both translation and rotation the scipy.ndimage.affine_transform function is used: For the translation, the following transformation matrix is used with :math:`{\\zeta}` being the difference between the animal position and the desired animal position: .. math:: \\begin{bmatrix} Y' \\\\ X' \\\\ 0 \\end{bmatrix} = \\begin{bmatrix} 1 & 0 & Y \\zeta \\\\ 0 & 1 & X \\zeta \\\\ 0 & 0 & 0 \\end{bmatrix} To translate and rotate the arena, the following is done: #) Take the position of the animal in the real world and the position of the animal in the virtual reality. Translate the arena by the difference, effectively using the placed animal coordinates as the origin around which the arena is rotated. #) Then translate the arena to the origin of the array at [0,0] #) Rotate the arena by the difference in real movement angle and the desired angle #) Finally, translate the arena back to the desired position, defined by both the real position of the animal and the desired position. This is implemented by the following linear transformation where: :math:`{X \\zeta}` and :math:`{Y \\zeta}` is the difference between the animal position and the desired animal position and, :math:`{\\eta}` is the desired animal position. .. math:: \\begin{bmatrix} Y' \\\\ X' \\\\ 0 \\end{bmatrix} = \\begin{bmatrix} 1 & 0 & Y \\zeta \\\\ 0 & 1 & X \\zeta \\\\ 0 & 0 & 0 \\end{bmatrix} \\cdot \\begin{bmatrix} 1 & 0 & Y \\eta \\\\ 0 & 1 & X \\eta \\\\ 0 & 0 & 0 \\end{bmatrix} \\cdot \\begin{bmatrix} \\cos & -\\sin & 0 \\\\ \\sin & \\cos & 0 \\\\ 0 & 0 & 1 \\end{bmatrix} \\cdot \\begin{bmatrix} 1 & 0 & -Y \\eta \\\\ 0 & 1 & -X \\eta \\\\ 0 & 0 & 0 \\end{bmatrix} """ # Only do translation and rotation for non-dynamic arenas. if len(self.vr_arena_matrix.shape) <= 2: if self.placed_animal is not None: # Difference of the real animal coordinates and the # desired coordinates as defined in the VR arena diff_x = self.initial_data.animal_after_box.centroid_col\ - self.placed_animal[0] diff_y = self.initial_data.animal_after_box.centroid_row\ - self.placed_animal[1] # if len == 2 means that only x and y coordinate but # no orientation has been defined using the VR # Arena. Here a simple translation matrix is defined # # Note that the x and y position is swapped as this # is how numpy accesses images. if len(self.placed_animal) == 2: animal_position_difference = np.array([ [1, 0, diff_y], [0, 1, diff_x], [0, 0, 1] ]) # to get the transformation matrix the inverse of # the above is taken transformation_matrix = \ np.linalg.inv(animal_position_difference) # if len == 3 it means that not only x and y # coordinate, but also the angle at which the animal is # supposed to move relative to the VR Arena has been # defined. if len(self.placed_animal) == 3: # First the difference of the real and the # desired angle needs to be calculated: # The coordinates of the first original animal in # int32 space (originally delivered as int16) first_animal_coordinates = \ [(self.initial_data.first_animal.centroid_row).astype(np.int32), \ (self.initial_data.first_animal.centroid_col).astype(np.int32)] # The coordinates of the animal after it left the original bounding box before_exp_start_animal_coordinates = \ [(self.initial_data.animal_after_box.centroid_row).astype(np.int32), (self.initial_data.animal_after_box.centroid_col).astype(np.int32)] # These two coordinates are used to calculate the # direction angle that the animal is coming from. # The origin is the first animal. real_animal_angle = \ np.arctan2(before_exp_start_animal_coordinates[0] - first_animal_coordinates[0], before_exp_start_animal_coordinates[1] - first_animal_coordinates[1]) # Then the difference in the real angle of the # experiment and the desired angle defined by the VR # Arena is calculated difference_angle = self.placed_animal[2] \ - real_animal_angle # Goal here is to be explicit as speed is # probably not really of the essence as this only # happens once. # First the defined (= placed_animal) # coordinates are set to become the origin rotation_origin = np.array([ [1, 0, -self.placed_animal[1]], # y-coordinate [0, 1, -self.placed_animal[0]], # x-coordinate [0, 0, 1.] ]) # Then the whole arena is rotated at the origin # by the difference in angle between the real animal # and the defined angle (=placed_animal) rotation = np.array([ [np.cos(difference_angle), -np.sin(difference_angle), 0], [np.sin(difference_angle), np.cos(difference_angle), 0], [0, 0, 1] ]) # Then the position of the animal is set back to # the original position rotation_back = np.array([ [1, 0, self.placed_animal[1]], # y-coordinate [0, 1, self.placed_animal[0]], # x-coordinate [0, 0, 1.]]) # Finally the difference between the # x and y position of the real animal and the desired position # is stated animal_position_difference = np.array([ [1, 0, diff_y], [0, 1, diff_x], [0, 0, 1] ]) # Now all these matrices are multplied (@ # operator does same as transform = animal_position_difference @ rotation_back @ rotation @ rotation_origin """ ---- -- -- -- -- -- -- -- -- |y'| |1, 0, Y difference | |1, 0, Y VR animal| |cos, -sin, 0| |1, 0, -Y VR animal| |x'| = |0, 1, X difference | x |0, 1, X VR animal| x |sin, cos, 0| x |0, 1, -X VR animal| |0 | |0, 0, 1 | |0, 0, 0 | |0, 0, 1| |0, 0, 0 | ---- -- -- -- -- -- -- -- -- """ # Need to inverse the transformation matrix transformation_matrix = np.linalg.inv(transform) # Do an affine tranformation of the original arena # using the scipy.ndimage.affine_transform function # self.vr_arena_matrix = affine_transform( self.vr_arena_matrix, transformation_matrix[:2, :2], offset=transformation_matrix[:2, 2], cval=0.0, order=1) # Potential Problem - if rotated and translated there # will be undefined regions (filled with zeros) at the # edge! # Todo: Idea: How about providing the user when drawing # the VR arena with a much larger canvas than what's # necessary for the e.g. 640x480 = 3*640x3*480. Only # show exactly the part that they want to use, # e.g. 640x480. Thanks to the figure toolbar it's # possible to pan etc and the user can look up the areas # that are not visible at the beginning. Once saved, # only the csv file that is different the background # will be saved (and only if the user places and animal) # - This should make it possible to not have any parts of # the arena that are 0 (together with the cval value # If the user is doing a non-dynamic virtual reality # experiment and wants to adjust the delivered intensity, # this is done here. self.vr_arena_matrix = \ self.vr_arena_matrix.astype(np.float32) \ * self.adjust_intensity * 0.01 # Save the arena that will be presented for future reference if RASPBERRY: #print(self.Path + '/' + self.datetime + '_' # + self.genotype + '/' + self.vr_arena_name) np.savetxt(self.Path + '/' + self.datetime + '_' + self.genotype + '/' + self.vr_arena_name, self.vr_arena_matrix, delimiter=',', fmt='%1.2f') # Do internal inversion of the array if high powered PiVR # version is being used if self.high_power_LED_bool: self.high_power_LED_arena_inversion_func() else: # self.vr_arena_matrix) if self.high_power_LED_bool: self.high_power_LED_arena_inversion_func()
[docs] def high_power_LED_arena_inversion_func(self): """ When the high powered PiVR version is used, the software has to handle the unfortunate fact that the LED controller of the high powered PiVR version is completely ON when the GPIO is OFF and vice versa. This of course is the opposite of what happens in the normal version. Internally, the software must therefore invert the arena if that's the case. This function takes care of this. The end user does not need to know this. From their perspective they are able to use the same input arena they would use for the standard version while getting the expected result. """ self.vr_arena_matrix = self.pwm_range - self.vr_arena_matrix
[docs] def high_power_time_dependent_stim_func(self): """ Analog function to :high_power_LED_arena_inversion_func: When the high powered PiVR version is used, the software has to handle the unfortunate fact that the LED controller of the high powered PiVR version is completely ON when the GPIO is OFF and vice versa. This of course is the opposite of what happens in the normal version. Internally, the software must therefore invert the stimulus if that's the case. This function takes care of this. The end user does not need to know this. From their perspective they are able to use the same input arena they would use for the standard version while getting the expected result. """ for current_column in self.time_dependent_stim_file.columns: if 'Channel' in current_column: self.time_dependent_stim_file[current_column] = \ self.pwm_range - self.time_dependent_stim_file[current_column]
[docs] def show_dynamic_vr_arena_update_error(self): """ This function warns the user that an incompatible frame rate/dynamic arena update frequency has been chosen. For example, if the frame rate is 30 frames per second and the update rate is 10Hz the arena will be updated every 3rd frame (30/10=3). This is of course possible. If the frame rate is 30 frames per second and the update rate is set to 20Hz the arena should be updated every 1.5th frame (30/20=1.5). This is not possible. What will happen is that for every other frame the arena will be updated for each frame and the other it will take two frames to update. This will lead to a mean of 1.5 but it's not regular, of course. As this can easily lead to bad data being produced without the user knowing (no explicit error will be thrown) this function informs the user of the mistake so that they can change the settings to either 40 frames per second to keep the 20Hz update rate or to change the update rate. """ top = tk.Toplevel() top.title("Incorrect Update Rate!") tk.Label(top, text='You have chosen a framerate of ' + repr(self.recording_framerate) + '\nand a dynamic arena update rate of ' + repr(self.vr_update_rate) + '.\n This would lead to an update ' 'every ' + repr(round(self.recording_framerate / self.vr_update_rate, 2)) + '\nframes which can not be ' 'deliviered! ' '\n Please choose a framerate that ' 'is compatible' '\nwith the update rate or vice versa.' '\nE.g. 30FPS and VRUpdate 15', justify='left').pack(fill='both') top.geometry("300x160+500+30") tk.Button(top, text="OK", command=top.destroy).pack(pady=5)
[docs] def start_experiment(self): """ This function is called at the end of the initialization of the :py:class:`control_file.ControlTracking` class. It creates the folder where all the experimental data is being saved using a timestamp taken now. It then saves the "experiment_settings.json" file which contains a lot of important information of the current experiment. Then it starts the detection algorithm in :py:class:`pre_experiment.FindAnimal`. If the animal has been detected, the arena will be translated and rotated if requested using the :py:func:`adjust_arena` function. Then the tracking algorithm is called: :py:class:`fast_tracking.FastTrackingControl` """ if self.VR_ARENA: if self.recording_framerate % self.vr_update_rate != 0: # Check for useful user input regarding framerate and vr # Arena update speed! self.show_dynamic_vr_arena_update_error() self.datetime = time.strftime("%Y.%m.%d_%H-%M-%S") animal_detection_start_time = time.time() # Indicate whether online undistortion was performed if self.newcameramtx is None: online_undistort = 'False' else: online_undistort = 'True' # On the Raspberry, create the folder that will be used to # save all the experimental data if RASPBERRY: makedirs(self.Path + '/' + self.datetime + '_' + self.genotype, exist_ok=True) chdir(self.Path + '/' + self.datetime + '_' + self.genotype) if self.high_power_LED_bool: PiVR_LED_version = 'High Powered LED version' else: PiVR_LED_version = 'Standard LED version' # save experiment variables in a json file experiment_info = {'PiVR info (recording)': self.version_info, 'PiVR info (tracking)': self.version_info, 'Camera Shutter Speed [us]':, 'Experiment Date and Time' : self.datetime, 'Framerate': self.recording_framerate, 'Pixel per mm': self.pixel_per_mm, 'Exp. Group': self.genotype, 'Resolution': self.resolution, 'Recording time': self.recordingtime, 'Search box size' : self.boxsize, 'Model Organism': self.model_organism, 'Virtual Reality arena name': self.vr_arena_name, 'VR Stimulus Body Part': self.vr_stim_location, 'Animal Color': self.signal, 'output channel 1' : self.output_channel_one, 'output channel 2' : self.output_channel_two, 'output channel 3' : self.output_channel_three, 'output channel 4' : self.output_channel_four, 'backlight channel' : self.background_channel, 'backlight 2 channel' : self.background_2_channel, 'backlight dutycycle' : self.background_dutycycle, 'backlight 2 dutcycle' : self.background_2_dutycycle, 'Signal': self.signal, 'Online undistort performed': online_undistort, 'PiVR LED version' : PiVR_LED_version } with open((self.Path + '/' + self.datetime + '_' + self.genotype + '/' + 'experiment_settings.json'), 'w') as file: json.dump(experiment_info, file, sort_keys=True, indent=4) # Save the stimulation file in the experimental folder - # this is just the original, not the actual presented as # the stim file can be much longer than the experiment! if self.time_dependent_stim_file is not None: self.time_dependent_stim_file.to_csv( self.Path + '/' + self.datetime + '_' + self.genotype + '/' + self.time_dependent_stim_file_name.split('/')[-1]) # If high-powered LED version is used, it is # necessary to invert the time dependent stimulus # file so that the expected stimulus is presented. self.high_power_time_dependent_stim_func() # Now the detection algorithm is being called self.initial_data = pre_experiment.FindAnimal( boxsize=self.boxsize, signal=self.signal,, debug_mode=self.debug_mode, resolution=self.resolution, recording_framerate=self.recording_framerate, display_framerate=self.display_framerate, model_organism=self.model_organism, offline_analysis=self.offline_analysis, pixel_per_mm=self.pixel_per_mm, organisms_and_heuristics=self.organisms_and_heuristics, post_hoc_tracking=self.post_hoc_tracking, animal_detection_mode=self.animal_detection_mode, simulated_online_analysis=self.simulated_online_analysis, datetime=self.datetime ) # only continue if an animal could be detected! if self.initial_data.animal_detected: # only rotate arena if arena provided if self.VR_ARENA: self.adjust_arena() time_delay_due_to_animal_detection = \ time.time() - animal_detection_start_time if not RASPBERRY or self.offline_analysis \ or self.simulated_online_analysis: self.tracking_data = \ fast_tracking.FastTrackingControl( genotype=self.genotype, recording_framerate=self.recording_framerate, display_framerate=self.display_framerate, resolution=self.resolution, recordingtime=self.recordingtime, initial_data=self.initial_data, boxsize=self.boxsize, signal=self.signal, # frames to define orientation - make dynamic # depending on speed of animal debug_mode=self.debug_mode, debug_mode_resize=self.observation_resize_variable, # repair_ht_swaps todo - make dynamic?, dir=self.Path, pixel_per_mm=self.pixel_per_mm, model_organism=self.model_organism, vr_arena=self.vr_arena_matrix, # todo for debugging kick out again pwm_object=self.pwm_object, # todo, for debugging, kick out again,, time_dependent_file=self.time_dependent_stim_file, # TODO: THIS IS high_power_led_bool=self.high_power_LED_bool, offline_analysis=self.offline_analysis, minimal_speed_for_moving=self.minimal_speed_for_moving, organisms_and_heuristics=self.organisms_and_heuristics, post_hoc_tracking=self.post_hoc_tracking, datetime=self.datetime, simulated_online_analysis=self.simulated_online_analysis, save_centroids_npy=self.save_centroids_npy, save_heads_npy=self.save_heads_npy, save_tails_npy=self.save_tails_npy, save_midpoints_npy=self.save_midpoints_npy, save_bbox_npy=self.save_bbox_npy, save_stim_npy=self.save_stim_npy, save_thresh_npy=self.save_thresh_npy, save_skeleton_npy=self.save_skeleton_npy, undistort_dst=self.undistort_dst, undistort_mtx=self.undistort_mtx, newcameramtx=self.newcameramtx ) self.done_offline_analysis.set(1) else: # The preview window will always be presented, # even if the user has not turned it on before. This # is necessary as it informs the user that the # experiment is running turn_cam_back_off = False if not self.controller.all_common_variables.preview_bool: self.controller.all_common_functions.cam_on_func() turn_cam_back_off = True = (0, 0, 540, 540) try: # and put the overlay in the correct size if self.controller.all_common_variables.overlay_image is not None: self.controller.all_common_variables.overlay.window = \ (0, 0, 540, 540) except AttributeError: # This happens if user first runs VR experiment and then a # non vr experiment. pass self.tracking_data = \ fast_tracking.FastTrackingControl( genotype=self.genotype, recordingtime=self.recordingtime, recording_framerate=self.recording_framerate, display_framerate= self.display_framerate, resolution=self.resolution, initial_data=self.initial_data, boxsize=self.boxsize, signal=self.signal,, dir=self.Path, debug_mode = self.debug_mode, debug_mode_resize=self.observation_resize_variable, pixel_per_mm = self.pixel_per_mm, model_organism = self.model_organism, vr_arena=self.vr_arena_matrix, pwm_object=self.pwm_object, time_dependent_file=self.time_dependent_stim_file, # TODO: THIS IS BUG PRONE!!! MAKE SURE IT WORKS IN SLOW TRACKING TOO!!!! high_power_led_bool=self.high_power_LED_bool, minimal_speed_for_moving=self.minimal_speed_for_moving, organisms_and_heuristics=self.organisms_and_heuristics, post_hoc_tracking=self.post_hoc_tracking, datetime = self.datetime, output_channel_one=self.output_channel_one, output_channel_two=self.output_channel_two, output_channel_three=self.output_channel_three, output_channel_four=self.output_channel_four, overlay_bool = self.overlay_bool, controller=self.controller, time_delay_due_to_animal_detection = time_delay_due_to_animal_detection, vr_update_rate = self.vr_update_rate, pwm_range = self.pwm_range, vr_stim_location = self.vr_stim_location, save_centroids_npy=self.save_centroids_npy, save_heads_npy=self.save_heads_npy, save_tails_npy=self.save_tails_npy, save_midpoints_npy=self.save_midpoints_npy, save_bbox_npy=self.save_bbox_npy, save_stim_npy=self.save_stim_npy, save_thresh_npy=self.save_thresh_npy, save_skeleton_npy=self.save_skeleton_npy, undistort_dst=self.undistort_dst, undistort_mtx=self.undistort_mtx, newcameramtx=self.newcameramtx ) if turn_cam_back_off: self.controller.all_common_functions.cam_off_func() else: = (0, 0, 180, 180) try: if self.controller.all_common_variables.overlay is not None: self.controller.all_common_variables.overlay.window \ = (0, 0, 180, 180) except AttributeError: # This happens if user first runs VR experiment and then a # non vr experiment. pass