Source code for analysis_scripts

__author__ = 'David Tadres'
__project__ = 'PiVR'


import tkinter as tk
import numpy as np
import os
from matplotlib.figure import Figure
import matplotlib.backends.backend_tkagg as tkagg
from matplotlib.backends.backend_agg import FigureCanvasAgg
from tkinter import messagebox
from imageio import imread
from glob import glob
import json
from scipy.spatial.distance import cdist
import pandas as pd
from pathlib import Path

try:
    import cv2
    CV2_INSTALLED = True
except ModuleNotFoundError:
    CV2_INSTALLED = False

# this try-except statement checks if the processor is a ARM processor
# (used by the Raspberry Pi) or not.
# Since this command only works in Linux it is caught using
# try-except otherwise it's throw an error in a Windows system.
try:
    if os.uname()[4][:3] == 'arm':
        # This will yield True for both a Raspberry and for M1 Chip
        # Apple devices.
        # Use this code snippet
        # (from https://raspberrypi.stackexchange.com/questions/5100/detect-that-a-python-program-is-running-on-the-pi)
        import re
        CPUINFO_PATH = Path("/proc/cpuinfo")
        if CPUINFO_PATH.exists():
            with open(CPUINFO_PATH) as f:
                cpuinfo = f.read()
            if re.search(r"^Model\s*:\s*Raspberry Pi", cpuinfo, flags=re.M) is not None:
                # if True, is Raspberry Pi
                RASPBERRY = True
                LINUX = True
        else: # Test if one more intendation necessary or not. On Varun's computer
            # is Apple M1 chip (or other Arm CPU device).
            RASPBERRY = False
            LINUX = True
    else:
        # is either Mac or Linux
        RASPBERRY = False
        LINUX = True

    DIRECTORY_INDICATOR = '/'
except AttributeError:
    # is Windows
    RASPBERRY = False
    LINUX = False
    DIRECTORY_INDICATOR = '\\'

[docs] class AnalysisDistanceToSource(): """ For our lab, a typical experiment would be the presentation of an odor source to an animal. By analyzing the behavior, for example the attraction of the animal towards the source, we can learn a lot about the underlying biology that manifests itself in that behavior. To easily enable the analysis of such an experiment, the user has the option to automatically analyze these experiments. This class is at the heart of the analysis. As each experiment (across trials) can have the source at different positions in the image, the user is first presented with the background image. The user then selects the source upon which the distance to the source is calculated for each timepoint of the experiment. The output is a csv file with the distance to source for each analyzed experiment and a plot indicating the median and the indvidual trajectories. .. note:: Up to v.1.5.0 (27th of March 2021) the centroid position was median filtered with window size 3. This was removed for v.1.5.1. User should implement own filters. """ def __init__(self, path, multiple_files, string, size_of_window, controller): self.path = path + DIRECTORY_INDICATOR self.multiple_files = multiple_files self.string = string self.size_of_window = size_of_window self.controller = controller if size_of_window.get() == 'Small': figure_width = (16/9) * 3 figure_height = 3 elif size_of_window.get() == 'Medium': figure_width =(16/9) * 6 figure_height = 5 elif size_of_window.get() == 'Large': figure_width = (16/9) * 12 figure_height = 12 # This variable is the reference to the placed animal self.arrow = None self.counter = 0 self.previously_defined_source = False # The script will go into the first folder (if there are more # than one) and read the background image # to display it to the user. os.chdir(self.path) if self.multiple_files: self.dataset_names = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*' + string + '*/')] os.chdir(self.path + self.dataset_names[0]) self.current_path = self.path + self.dataset_names[0] else: os.chdir(self.path) self.current_path = self.path # read image try: bg = imread('Background.tiff') except FileNotFoundError: try: bg = imread('Background.jpg') except FileNotFoundError: bg = imread('Overview of SmAl-VR tracking.png') # read experiment settings with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # if undistort has been performed online the backgroun image will # need to be undistorted now. Call function from start_GUI to do s. resolution = experiment_info['Resolution'] if 'Online undistort performed' in experiment_info: if experiment_info['Online undistort performed'] == 'True': if CV2_INSTALLED: self.controller.all_common_functions.grab_undistort_files(resolution = resolution) dst = self.controller.all_common_variables.undistort_dst_file mtx = self.controller.all_common_variables.undistort_mtx_file newcameramtx = self.controller.all_common_variables.newcameramtx bg = cv2.undistort(bg, mtx, dst, None, newcameramtx) # Creation of the Tkinter window self.child = tk.Toplevel() self.child.grab_set() #self.child.geometry(str(screenwidth)+'x'+str(screenheight)) self.child.wm_title('Analysis') #self.child.columnconfigure(0, weight=1) #self.child.rowconfigure(0, weight=1) self.child_frame = tk.Frame(self.child) self.child_frame.grid(row=0, column=0) #self.child_frame.columnconfigure(1, weight=1) #self.child_frame.rowconfigure(1, weight=1) # print path of the current folder: self.path_text_variable = tk.StringVar() self.path_label = tk.Label(self.child_frame, textvariable=self.path_text_variable) self.path_label.grid(row=0, column=0, columnspan=2) if len(self.current_path) > 60: self.path_text_variable.set('Path: ...' + self.current_path[-60:]) else: self.path_text_variable.set('Path: ' + self.current_path) # create the frame for the canvas and the buttons bound_figure = tk.Frame(self.child_frame) bound_figure.grid(row=1, column=0, rowspan=1, columnspan=2) bound_figure.columnconfigure(1, weight=1) bound_figure.rowconfigure(1, weight=1) # create the figure and plot it self.fig = Figure(figsize=(figure_width, figure_height)) self.ax = self.fig.add_subplot(111) self.image = self.ax.imshow(bg, vmin=0, vmax=255, cmap='Greys_r') # check if this file has been analyzed previously and place the # arrow appropriatly if yes if 'Source x' in experiment_info: self.arrow_annotation = \ self.ax.annotate( 'Previously selected source', xy=(experiment_info['Source x'], experiment_info['Source y']), xytext=(experiment_info['Source x'] + 50, experiment_info['Source y'] - 50), arrowprops=dict(facecolor='blue', shrink=0.05)) self.previously_defined_source = True # bind the plot to the GUI - do it in a new frame due to the # inherent pack method of NaviagationToolbar self.canvas = tkagg.FigureCanvasTkAgg( self.fig, master=bound_figure) self.canvas.draw() # Add the toolbar toolbar = tkagg.NavigationToolbar2Tk(self.canvas, bound_figure) toolbar.update() self.canvas.get_tk_widget().pack( side="top",fill='both',expand=True) self.select_button = tk.Button( self.child_frame, text='Select Source', command=self.select_func) self.select_button.grid(row=2,column=0) self.go_button = tk.Button( self.child_frame, text='Analyze', command=self.go) self.go_button.grid(row=2, column=1) self.no_json_yet = True self.re_select_source = True # todo - probably only needed # during development # Here, the term protocol refers to the interaction between # the application and the window manager. The most # commonly used protocol is called WM_DELETE_WINDOW, and is # used to define what happens when the user # explicitly closes a window using the window manager. # (http://effbot.org/tkinterbook/tkinter-events-and-bindings.htm#protocols) def on_closing(): if tk.messagebox.askokcancel("Quit without " "\n saving", "Do you want to quit " "\nwithout saving your " "settings?"): self.child.destroy() # root.destroy() self.child.protocol("WM_DELETE_WINDOW", on_closing) def select_func(self): try: self.arrow.arrow_annotation.set_visible(False) except AttributeError: pass # if there was a previously defined source and the user want # to re-define the location, the previously defined # arrow has to be made invisible if self.previously_defined_source: self.arrow_annotation.set_visible(False) self.arrow = SelectSource(ax=self.ax) def go(self): # if self.previously_defined_source: # before displaying the next background image (after # analysing the current folder) the arrow is turned # invisble self.arrow_annotation.set_visible(False) # now the function that actually calculates the distance # to source is called self.distance_to_source_analysis() else: try: # call the last instance of the select source and # disconnect it from the figure the reason for this # is in case of multiple folders, the user has to # explicitly press the 'select source' button # in order to place the arrow self.arrow.disonnect() if self.arrow.x_press != None: self.distance_to_source_analysis() else: # if the user has pressed the 'select source' # button but never clicked anywhere on the # figure, throw this error user will be returned # to overview as not tools can be done this way. messagebox.showerror('No Source Selected ', 'You have to select a source!' '\nAfter pressing the' '\n"Select Source" button indicate' '\n the source on the figure') except AttributeError: messagebox.showerror('No Source Selected ', 'You have to select a source!' '\nPlease do so by pressing the' '\n"Select Source" button and' '\nindicating the source on the figure') def distance_to_source_analysis(self): with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # save the x and y position of the source if self.previously_defined_source: source_x = experiment_info['Source x'] source_y = experiment_info['Source y'] else: experiment_info['Source x'] = self.arrow.x_press experiment_info['Source y'] = self.arrow.y_press source_x = self.arrow.x_press source_y = self.arrow.y_press # find all files in folder all_files_in_folder = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] # find the 'data.csv' file #for filename in all_files_in_folder: # if 'data.csv' in filename: # data = pd.read_csv(filename) # Since there's the suboptimal naming convention when # collecting a video of 'data.csv' to denote info about the # stimulus and when then doing analysis it's also called # 'data.csv' the loop below first collects all data.csv names # and selects the newest one (as analysis must have been done # after the data collection). files_of_interest = [] for i in all_files_in_folder: if 'data.csv' in i: files_of_interest.append(i) #data_name = i if len(files_of_interest) == 1: data_name = files_of_interest[0] else: files_of_interest.sort() data_name = files_of_interest[-1] data = pd.read_csv(data_name, sep=',') # we'll use y and then x coordinates, makes it less confusing # when working with images source = np.asarray((source_y, source_x)) # Be explicit: # find all rows where 'Time' is zero. This happens when no # informtion is present/ data[data['Time'] == 0] = np.nan # Then be explicit centroids = np.asarray((data['Y-Centroid'], data['X-Centroid'])) # if there is more than one folder that needs to be analyzed, # keep all distances in memory for an overview plot if self.multiple_files: if self.counter == 0: # in the first round, create an empty array to keep # the distances to source in memory self.distance_to_source = np.zeros(( centroids.shape[1], len(self.dataset_names))) self.distance_to_source_mm = np.zeros(( centroids.shape[1], len(self.dataset_names))) self.distance_to_source.fill(np.nan) self.distance_to_source_mm.fill(np.nan) try: self.distance_to_source[:, self.counter] = \ cdist(centroids.T, source[np.newaxis,:])[:, 0] # divide the pixel by pixel/mm to get mm self.distance_to_source_mm[:, self.counter] = \ self.distance_to_source[:, self.counter] \ / experiment_info['Pixel per mm'] except ValueError: # sometimes the experiments that are being analyzed # do not have the exact same amount of frames # in that case: if self.distance_to_source.shape[0] > centroids.shape[1]: print(self.dataset_names[self.counter] + ' has less values than the rest') # if the currently analyzed folder is smaller # than the one (all) before, just fill as much # as possible and the rest will be indicates as # missing values (numpy.nan) self.distance_to_source[0:centroids.shape[1], self.counter] = \ cdist(centroids.T,source[np.newaxis, :])[:, 0] self.distance_to_source_mm[0:centroids.shape[1], self.counter] = \ self.distance_to_source[:centroids.shape[1], self.counter] \ / experiment_info['Pixel per mm'] else: print(self.dataset_names[self.counter] + ' has more values than the rest') # if the new folder has more entries it becomes # more difficult. First copy the original temp = self.distance_to_source.copy() self.distance_to_source = np.zeros(( centroids.shape[1], len(self.dataset_names))) self.distance_to_source[0:temp.shape[0], :] = temp.copy() self.distance_to_source[:, self.counter] = \ cdist(centroids.T, source[np.newaxis, :])[:, 0] temp = self.distance_to_source_mm.copy() self.distance_to_source_mm = np.zeros(( centroids.shape[1], len(self.dataset_names))) self.distance_to_source_mm[0:temp.shape[0], :] = temp.copy() self.distance_to_source_mm[:, self.counter] =\ self.distance_to_source[:, self.counter] \ / experiment_info['Pixel per mm'] distance_to_source_pd = pd.DataFrame( self.distance_to_source_mm[:,self.counter]) distance_to_source_pd.to_csv( os.getcwd() + DIRECTORY_INDICATOR +'distance_to_source.csv', sep = ',') # todo to be discussed - do we want to keep everything # tidy and only have one file but loose old tools # information (at the moment that's only the source) or # do we not want to touch the original files coming from # the Raspberry and also conserve whatever we analyzed # before - space is usually not an issue. A way would be # to always save the old file in an folder old with the # current datetime attached to the filename. But don't # know how useful that is? with open(('experiment_settings.json'), 'w') as file: json.dump(experiment_info, file, sort_keys=True, indent=4) # this should be fine! According to the docs # (https://pandas.pydata.org/pandas-docs/stable # /visualization.html) pandas does not just drop # the datapoint that is a NaN, but actually leaves it as # a NaN: Line Leave gaps at NaNs. This means that if at # second 5 we have a couple of points missing we don't # mess up the whole time series afterwards! plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=distance_to_source_pd, figname='Distance_to_source') # after analyzing the current folder, move to the next # one and read the background image. self.read_next_background_image() else: #centroids = np.asarray((centroids[:,0], # centroids[:,1])) self.distance_to_source = cdist( centroids.T,source[np.newaxis,:])[:,0] # divide the pixel by pixel/mm to get mm self.distance_to_source_mm = self.distance_to_source \ / experiment_info['Pixel per mm'] distance_to_source_pd = pd.DataFrame(self.distance_to_source_mm) distance_to_source_pd.to_csv( os.getcwd() + DIRECTORY_INDICATOR + 'distance_to_source.csv', sep = ',') # this should be fine! According to the docs # (https://pandas.pydata.org/pandas-docs/stable # /visualization.html) pandas does not just drop # the datapoint that is a NaN, but actually leaves it as # a NaN: Line Leave gaps at NaNs. This means that if at # second 5 we have a couple of points missing we don't # mess up the whole time series afterwards! plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=distance_to_source_pd, figname='Distance_to_source') # todo to be discussed - do we want to keep everything # tidy and only have one file but loose old tools # information (at the moment that's only the source) or # do we not want to touch the original files coming from # the Raspberry and also conserve whatever we analyzed # before - space is usually not an issue. A way would be # to always save the old file in an folder old with the # current datetime attached to the filename. But don't # know how useful that is? with open(('experiment_settings.json'), 'w') as file: json.dump(experiment_info, file, sort_keys=True, indent=4) # after saving the both the source location ( # experiment_settings) and the distance to source for # every time point (data.csv) a plot is created messagebox.showinfo('Folder analyzed', 'Finished analyzing!') self.child.destroy() def read_next_background_image(self): # add one to counter self.counter += 1 print('At folder #' + repr(self.counter) + ' of ' + repr(len(self.dataset_names))) # check if we are have already analyzed the last folder - if # not go into the next folder, read the background image and # display if for the user to select the source if necessary if self.counter != len(self.dataset_names): # update the displayed path label self.current_path = self.path + self.dataset_names[self.counter] if len(self.current_path) > 60: self.path_text_variable.set('Path: ...' + self.current_path[-60:]) else: self.path_text_variable.set('Path: ' + self.current_path) os.chdir(self.path + self.dataset_names[self.counter]) try: bg = imread('Background.tiff') except FileNotFoundError: try: bg = imread('Background.jpg') except FileNotFoundError: # Todo - get rid of this at one point! Will break # backward compatibility, thogh... bg = imread('Overview of SmAl-VR tracking.png') # IF we uncomment this the user will have to select the # source every time a new folder is # analyzed. Not sure we should do that! try: self.arrow.arrow_annotation.set_visible(False) except AttributeError: pass # Reset the arrow self.arrow = None # open the json file of the experiment with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # if undistort has been performed online the backgroun image will # need to be undistorted now. Call function from start_GUI to do s. resolution = experiment_info['Resolution'] if 'Online undistort performed' in experiment_info: if experiment_info['Online undistort performed'] == 'True': if CV2_INSTALLED: self.controller.all_common_functions.grab_undistort_files(resolution=resolution) dst = self.controller.all_common_variables.undistort_dst_file mtx = self.controller.all_common_variables.undistort_mtx_file newcameramtx = self.controller.all_common_variables.newcameramtx bg = cv2.undistort(bg, mtx, dst, None, newcameramtx) # After either undistorting or no set data. self.image.set_data(bg) # if the has already defined where the source was last # time, re-use this information if 'Source x' in experiment_info: already_analyzed = True self.arrow_annotation = self.ax.annotate( 'Previously Selected Source', xy=(experiment_info['Source x'], experiment_info['Source y']), xytext=(experiment_info['Source x'] + 50, experiment_info['Source y'] - 50), arrowprops=dict(facecolor='blue', shrink=0.05)) self.previously_defined_source = True else: self.previously_defined_source = False self.ax.figure.canvas.draw() else: for i in range(len(self.dataset_names)): os.chdir(self.path + self.dataset_names[i]) print(self.dataset_names[i]) # next, the data.csv file is being read... # find all files in folder all_files_in_folder = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] # find the 'data.csv' file.. for filename in all_files_in_folder: if 'data.csv' in filename: data_file_name = filename # .. and read it experimental_data = pd.read_csv(data_file_name) with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) os.chdir(self.path) # convert numpy to pandas all_distance_to_source_data_pd = pd.DataFrame( self.distance_to_source_mm) all_distance_to_source_data_pd.columns = self.dataset_names # plt.show() plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=all_distance_to_source_data_pd, figname='Median_Distance_to_source') messagebox.showinfo('Folder analyzed', 'Finished analyzing!') self.child.destroy()
class SelectSource(object): """ This class connects the mouse click by the user with the displayed canvas. It enables the user to click on the presented image and then saves the x and y position as the source. """ def __init__(self, ax): self.ax = ax self.x_press = None self.y_press = None self.cidpress = self.ax.figure.canvas.mpl_connect( 'button_press_event', self.on_press) print(self.cidpress) def on_press(self, event): print('Left mouse button pressed at x=' + repr(event.xdata)[0:5] + ' y=' + repr(event.ydata)[0:5]) self.x_press = event.xdata self.y_press = event.ydata try: self.arrow_annotation.remove() except AttributeError: pass self.arrow_annotation = self.ax.annotate( 'Source', xy=(self.x_press, self.y_press), xytext=(self.x_press + 50, self.y_press - 50), arrowprops=dict(facecolor='black', shrink=0.05)) self.ax.figure.canvas.draw() def disonnect(self): self.ax.figure.canvas.mpl_disconnect(self.cidpress)
[docs] class AnalysisVRDistanceToSource(): """ After running a virtual reality experiment with a **single point source** we are often interested in the distance to this source. For example, when expressing the optogenetic tool Chrimson in the olfactory system of fruit fly larva, they will ascend a virtual odor gradient which is similar to real odor source. To easily enable the analysis of such an experiment, the user has the option to automatically analyze these experiments. This class is at the heart of the analysis. The user just has to select the folder containing the experiments. This class will automatically detect the maximum intensity point in virtual space and calculate the distance to that point for the duration of the experiment. The output is a csv file with the distance to the **single** point of maximum virtual stimulus for each analyzed experiment and a plot indicating the median and the indvidual trajectories. """ def __init__(self, path, multiple_files, string, controller): self.path = path + DIRECTORY_INDICATOR self.string = string self.multiple_files = multiple_files self.counter = 0 if self.multiple_files: self.dataset_names = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*' + string + '*/')] for i_folder in range(len(self.dataset_names)): print('Folder ' + repr(i_folder+1) + ' of ' + repr(len( self.dataset_names)) ) # calculate the distance to the maximum intensity pixel dist = self.vr_distance_analysis( current_folder=self.dataset_names[i_folder]) if i_folder == 0: all_dist_to_source = np.zeros((dist.shape[0], len(self.dataset_names))) # save all the distances in one numpy file to create # the final plot all_dist_to_source[:,i_folder] = dist if i_folder == len(self.dataset_names)-1: # Take metadata from the last experiment - THIS # will break if experiments with different # metadata is used, of course!! with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) recording_time = experiment_info['Recording time'] framerate = experiment_info['Framerate'] # Save all_dist os.chdir(path) all_dist_to_source_mm_pd = pd.DataFrame( all_dist_to_source) # give columnname according to dataset_name all_dist_to_source_mm_pd.columns = \ self.dataset_names all_dist_to_source_mm_pd.to_csv( 'all_distance_to_VR_max.csv', sep=',') plot_distance_to_source(path, length=recording_time, fps=framerate, distance=all_dist_to_source_mm_pd, figname='Median_Distance_to_VR_max') else: self.vr_distance_analysis(current_folder=None) messagebox.showinfo('Done', 'Analysis finished') def vr_distance_analysis(self, current_folder): """ Reads data of one VR experiment, extracts the x/y position of the maximum light intensity of the arena and calculates the distance of the centroid to this point over time. """ if current_folder is not None: current_folder_path = self.path + \ DIRECTORY_INDICATOR + \ current_folder else: current_folder_path = self.path os.chdir(current_folder_path) files = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) recording_time = experiment_info['Recording time'] framerate = experiment_info['Framerate'] pixel_per_mm = experiment_info['Pixel per mm'] # easiest to define false and only change if both # conditions are fullfilled. undistort_image = False if 'Online undistort performed' in experiment_info: if experiment_info['Online undistort performed'] == 'True': if CV2_INSTALLED: undistort_image = True for i_file in files: if 'data.csv' in i_file: # print(i_file) data = pd.read_csv(i_file) if '640x480' in i_file or '1024x768' in i_file \ or '1296x972' in i_file or '1920x1080' in i_file: arena = np.genfromtxt(i_file, delimiter=',') # find the coordinates with the max value # only take the first, discard the rest. That's why only # gaussian gradients with should be used here! source_y = np.where(arena == np.max(arena))[0][0] source_x = np.where(arena == np.max(arena))[1][0] source = np.asarray((source_y, source_x)) centroids = np.asarray((data['Y-Centroid'], data['X-Centroid'])) distance_to_source = cdist(centroids.T, source[np.newaxis, :])[:, 0] distance_to_source_mm = distance_to_source/pixel_per_mm # Save the distance to source data in a csv file in each folder distance_to_source_mm_pd = pd.DataFrame(distance_to_source_mm) distance_to_source_mm_pd.to_csv('distance_to_VR_max.csv', sep=',') plot_distance_to_source(path=current_folder_path,#self.path + # DIRECTORY_INDICATOR + current_folder, length=recording_time, fps=framerate, distance=distance_to_source_mm_pd, figname='Distance_to_VR_max') return(distance_to_source_mm)
def plot_distance_to_source(path, length, fps, distance,figname): fig = Figure(figsize=(5, 5)) # The canvas has to be called explicitly if not working # with pyplot... canvas = FigureCanvasAgg(fig) ax = fig.add_subplot(111) x_values = np.arange(0,length, 1/fps) if distance.ndim == 1: ax.plot(x_values[0:distance.shape[0]], distance.values, c='r') else: ax.plot(x_values[0:distance.shape[0]], distance.values, c='k', alpha=0.3) ax.plot(x_values[0:distance.shape[0]], np.nanmedian(distance,axis=1), c='r') ax.set_xlabel('Time [s]') if distance.ndim <= 2: ax.set_ylabel('Centroid Distance to source [mm]') fig.savefig(path + DIRECTORY_INDICATOR + figname + '.png', dpi=300) else: ax.set_ylabel('Median Centroid Distance to source [mm]') fig.savefig(path + DIRECTORY_INDICATOR + figname + '.png', dpi=300)