Source code for analysis_scripts

__author__ = 'David Tadres'
__project__ = 'PiVR'


import tkinter as tk
import numpy as np
import os
from matplotlib.figure import Figure
import matplotlib.backends.backend_tkagg as tkagg
from matplotlib.backends.backend_agg import FigureCanvasAgg
from tkinter import messagebox
from imageio import imread
from glob import glob
import json
from scipy.signal import medfilt
from scipy.spatial.distance import  cdist
import pandas as pd

# this try-except statement checks if the processor is a ARM processor 
# (used by the Raspberry Pi) or not. Since this command only works in
# Linux it is caught using try-except otherwise it's throw an error 
# in a Windows system.
try:
    if os.uname()[4][:3] == 'arm':
        RASPBERRY = True
        LINUX = True
    else:
        RASPBERRY = False
        LINUX = True

    DIRECTORY_INDICATOR = '/'
except AttributeError:
    RASPBERRY = False
    LINUX = False
    DIRECTORY_INDICATOR = '\\'

[docs]class AnalysisDistanceToSource(): """ For our lab, a typical experiment would be the presentation of an odor source to an animal. By analyzing the behavior, for example the attraction of the animal towards the source, we can learn a lot about the underlying biology that manifests itself in that behavior. To easily enable the analysis of such an experiment, the user has the option to automatically analyze these experiments. This class is at the heart of the analysis. As each experiment (across trials) can have the source at different positions in the image, the user is first presented with the background image. The user then selects the source upon which the distance to the source is calculated for each timepoint of the experiment. The output is a csv file with the distance to source for each analyzed experiment and a plot indicating the median and the indvidual trajectories. """ def __init__(self, path, multiple_files, string, size_of_window): self.path = path + DIRECTORY_INDICATOR self.multiple_files = multiple_files self.string = string self.size_of_window = size_of_window if size_of_window.get() == 'Small': figure_width = (16/9) * 3 figure_height = 3 elif size_of_window.get() == 'Medium': figure_width =(16/9) * 6 figure_height = 5 elif size_of_window.get() == 'Large': figure_width = (16/9) * 12 figure_height = 12 # This variable is the reference to the placed animal self.arrow = None self.counter = 0 self.previously_defined_source = False # The script will go into the first folder (if there are more # than one) and read the background image # to display it to the user. os.chdir(self.path) if self.multiple_files: self.dataset_names = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*' + string + '*/')] os.chdir(self.path + self.dataset_names[0]) self.current_path = self.path + self.dataset_names[0] else: os.chdir(self.path) self.current_path = self.path # read image try: bg = imread('Background.jpg') except FileNotFoundError: bg = imread('Overview of SmAl-VR tracking.png') # read experiment settings with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # Creation of the Tkinter window self.child = tk.Toplevel() self.child.grab_set() #self.child.geometry(str(screenwidth)+'x'+str(screenheight)) self.child.wm_title('Analysis') #self.child.columnconfigure(0, weight=1) #self.child.rowconfigure(0, weight=1) self.child_frame = tk.Frame(self.child) self.child_frame.grid(row=0, column=0) #self.child_frame.columnconfigure(1, weight=1) #self.child_frame.rowconfigure(1, weight=1) # print path of the current folder: self.path_text_variable = tk.StringVar() self.path_label = tk.Label(self.child_frame, textvariable=self.path_text_variable) self.path_label.grid(row=0, column=0, columnspan=2) if len(self.current_path) > 60: self.path_text_variable.set('Path: ...' + self.current_path[-60:]) else: self.path_text_variable.set('Path: ' + self.current_path) # create the frame for the canvas and the buttons bound_figure = tk.Frame(self.child_frame) bound_figure.grid(row=1, column=0, rowspan=1, columnspan=2) bound_figure.columnconfigure(1, weight=1) bound_figure.rowconfigure(1, weight=1) # create the figure and plot it self.fig = Figure(figsize=(figure_width, figure_height)) self.ax = self.fig.add_subplot(111) self.image = self.ax.imshow(bg, vmin=0, vmax=255, cmap='Greys_r') # check if this file has been analyzed previously and place the # arrow appropriatly if yes if 'Source x' in experiment_info: self.arrow_annotation = \ self.ax.annotate( 'Previously selected source', xy=(experiment_info['Source x'], experiment_info['Source y']), xytext=(experiment_info['Source x'] + 50, experiment_info['Source y'] - 50), arrowprops=dict(facecolor='blue', shrink=0.05)) self.previously_defined_source = True # bind the plot to the GUI - do it in a new frame due to the # inherent pack method of NaviagationToolbar self.canvas = tkagg.FigureCanvasTkAgg( self.fig, master=bound_figure) self.canvas.draw() # Add the toolbar toolbar = tkagg.NavigationToolbar2Tk(self.canvas, bound_figure) toolbar.update() self.canvas.get_tk_widget().pack( side="top",fill='both',expand=True) self.select_button = tk.Button( self.child_frame, text='Select Source', command=self.select_func) self.select_button.grid(row=2,column=0) self.go_button = tk.Button( self.child_frame, text='Analyze', command=self.go) self.go_button.grid(row=2, column=1) self.no_json_yet = True self.re_select_source = True # todo - probably only needed # during development # Here, the term protocol refers to the interaction between # the application and the window manager. The most # commonly used protocol is called WM_DELETE_WINDOW, and is # used to define what happens when the user # explicitly closes a window using the window manager. # (http://effbot.org/tkinterbook/tkinter-events-and-bindings.htm#protocols) def on_closing(): if tk.messagebox.askokcancel("Quit without " "\n saving", "Do you want to quit " "\nwithout saving your " "settings?"): self.child.destroy() # root.destroy() self.child.protocol("WM_DELETE_WINDOW", on_closing) def select_func(self): try: self.arrow.arrow_annotation.set_visible(False) except AttributeError: pass # if there was a previously defined source and the user want # to re-define the location, the previously defined # arrow has to be made invisible if self.previously_defined_source: self.arrow_annotation.set_visible(False) self.arrow = SelectSource(ax=self.ax) def go(self): # if self.previously_defined_source: # before displaying the next background image (after # analysing the current folder) the arrow is turned # invisble self.arrow_annotation.set_visible(False) # now the function that actually calculates the distance # to source is called self.distance_to_source_analysis() else: try: # call the last instance of the select source and # disconnect it from the figure the reason for this # is in case of multiple folders, the user has to # explicitly press the 'select source' button # in order to place the arrow self.arrow.disonnect() if self.arrow.x_press != None: self.distance_to_source_analysis() else: # if the user has pressed the 'select source' # button but never clicked anywhere on the # figure, throw this error user will be returned # to overview as not tools can be done this way. messagebox.showerror('No Source Selected ', 'You have to select a source!' '\nAfter pressing the' '\n"Select Source" button indicate' '\n the source on the figure') except AttributeError: messagebox.showerror('No Source Selected ', 'You have to select a source!' '\nPlease do so by pressing the' '\n"Select Source" button and' '\nindicating the source on the figure') def distance_to_source_analysis(self): with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # save the x and y position of the source if self.previously_defined_source: source_x = experiment_info['Source x'] source_y = experiment_info['Source y'] else: experiment_info['Source x'] = self.arrow.x_press experiment_info['Source y'] = self.arrow.y_press source_x = self.arrow.x_press source_y = self.arrow.y_press # find all files in folder all_files_in_folder = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] # find the 'data.csv' file for filename in all_files_in_folder: if 'data.csv' in filename: data_file_name = filename # we'll use y and then x coordinates, makes it less confusing # when working with images source = np.asarray((source_y, source_x)) # load the save array with the x and y coordinates of the centroid try: centroids = np.load('smAL-VR-centroids.npy') except FileNotFoundError: centroids = np.load('centroids.npy') # if there is more than one folder that needs to be analyzed, # keep all distances memory for an overview plot if self.multiple_files: if self.counter == 0: # in the first round, create an empty array to keep # the distances to source in memory self.distance_to_source = np.zeros(( centroids.shape[0], len(self.dataset_names))) self.distance_to_source_mm = np.zeros(( centroids.shape[0], len(self.dataset_names))) self.distance_to_source.fill(np.nan) self.distance_to_source_mm.fill(np.nan) # filter the centroid positions using a median filter # with a window size of three see what exactly happens here: # https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/scipy.signal.medfilt.html # and: https://en.wikipedia.org/wiki/Median_filter # the goal of this is to smoothen the noisy x and y # coordinates. The kernel size might need to be # adjusted depending on the framerate and the speed of # movement of the animal medfilt_centroids = np.asarray((medfilt(centroids[:, 0, ], 3), medfilt(centroids[:, 1, ], 3))) try: # calculate the eucledian distance between the user # defined source and the centroid of the animal self.distance_to_source[:, self.counter] = \ cdist(medfilt_centroids.T, source[np.newaxis,:])[:, 0] # divide the pixel by pixel/mm to get mm self.distance_to_source_mm[:, self.counter] = \ self.distance_to_source[:, self.counter] \ / experiment_info['Pixel per mm'] except ValueError: # sometimes the experiments that are being analyzed # do not have the exact same amount of frames # in that case: if self.distance_to_source.shape[0] > centroids.shape[0]: print(self.dataset_names[self.counter] + ' has less values than the rest') # if the currently analyzed folder is smaller # than the one (all) before, just fill as much # as possible and the rest will be indicates as # missing values (numpy.nan) self.distance_to_source[0:centroids.shape[0], self.counter] = \ cdist(medfilt_centroids.T,source[np.newaxis, :])[:, 0] self.distance_to_source_mm[0:centroids.shape[0], self.counter] = \ self.distance_to_source[:centroids.shape[0], self.counter] \ / experiment_info['Pixel per mm'] else: print(self.dataset_names[self.counter] + ' has more values than the rest') # if the new folder has more entries it becomes # more difficult. First copy the original temp = self.distance_to_source.copy() self.distance_to_source = np.zeros(( centroids.shape[0], len(self.dataset_names))) self.distance_to_source[0:temp.shape[0], :] = temp.copy() self.distance_to_source[:, self.counter] = \ cdist(medfilt_centroids.T, source[np.newaxis, :])[:, 0] temp = self.distance_to_source_mm.copy() self.distance_to_source_mm = np.zeros(( centroids.shape[0], len(self.dataset_names))) self.distance_to_source_mm[0:temp.shape[0], :] = temp.copy() self.distance_to_source_mm[:, self.counter] =\ self.distance_to_source[:, self.counter] \ / experiment_info['Pixel per mm'] distance_to_source_pd = pd.DataFrame( self.distance_to_source_mm[:,self.counter]) distance_to_source_pd.to_csv( os.getcwd() + DIRECTORY_INDICATOR +'distance_to_source.csv', sep = ',') # todo to be discussed - do we want to keep everything # tidy and only have one file but loose old tools # information (at the moment that's only the source) or # do we not want to touch the original files coming from # the Raspberry and also conserve whatever we analyzed # before - space is usually not an issue. A way would be # to always save the old file in an folder old with the # current datetime attached to the filename. But don't # know how useful that is? with open(('experiment_settings.json'), 'w') as file: json.dump(experiment_info, file, sort_keys=True, indent=4) # this should be fine! According to the docs # (https://pandas.pydata.org/pandas-docs/stable # /visualization.html) pandas does not just drop # the datapoint that is a NaN, but actually leaves it as # a NaN: Line Leave gaps at NaNs. This means that if at # second 5 we have a couple of points missing we don't # mess up the whole time series afterwards! plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=distance_to_source_pd, figname='Distance_to_source') # after analyzing the current folder, move to the next # one and read the background image. self.read_next_background_image() else: # filter the centroid positions using a median filter # with a window size of three see what exactly happens here: # https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/scipy.signal.medfilt.html # and: https://en.wikipedia.org/wiki/Median_filter # the goal of this is to smoothen the noisy x and y # coordinates. The kernel size might need to be adjusted # depending on the framerate and the speed of movement of # the animal medfilt_centroids = np.asarray((medfilt(centroids[:, 0, ], 3), medfilt(centroids[:, 1, ], 3))) # calculate the eucledian distance between the user # defined source and the centroid of the animal self.distance_to_source = cdist(medfilt_centroids.T, source[np.newaxis, :])[:, 0] # divide the pixel by pixel/mm to get mm self.distance_to_source_mm = self.distance_to_source \ / experiment_info['Pixel per mm'] distance_to_source_pd = pd.DataFrame(self.distance_to_source_mm) distance_to_source_pd.to_csv( os.getcwd() + DIRECTORY_INDICATOR + 'distance_to_source.csv', sep = ',') # this should be fine! According to the docs # (https://pandas.pydata.org/pandas-docs/stable # /visualization.html) pandas does not just drop # the datapoint that is a NaN, but actually leaves it as # a NaN: Line Leave gaps at NaNs. This means that if at # second 5 we have a couple of points missing we don't # mess up the whole time series afterwards! plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=distance_to_source_pd, figname='Distance_to_source') # todo to be discussed - do we want to keep everything # tidy and only have one file but loose old tools # information (at the moment that's only the source) or # do we not want to touch the original files coming from # the Raspberry and also conserve whatever we analyzed # before - space is usually not an issue. A way would be # to always save the old file in an folder old with the # current datetime attached to the filename. But don't # know how useful that is? with open(('experiment_settings.json'), 'w') as file: json.dump(experiment_info, file, sort_keys=True, indent=4) # after saving the both the source location ( # experiment_settings) and the distance to source for # every time point (data.csv) a plot is created messagebox.showinfo('Folder analyzed', 'Finished analyzing!') self.child.destroy() def read_next_background_image(self): # add one to counter self.counter += 1 print('At folder #' + repr(self.counter) + ' of ' + repr(len(self.dataset_names))) # check if we are have already analyzed the last folder - if # not go into the next folder, read the background image and # display if for the user to select the source if necessary if self.counter != len(self.dataset_names): # update the displayed path label self.current_path = self.path + self.dataset_names[self.counter] if len(self.current_path) > 60: self.path_text_variable.set('Path: ...' + self.current_path[-60:]) else: self.path_text_variable.set('Path: ' + self.current_path) os.chdir(self.path + self.dataset_names[self.counter]) try: bg = imread('Background.jpg') except FileNotFoundError: # Todo - get rid of this at one point! Will break # backward compatibility, thogh... bg = imread('Overview of SmAl-VR tracking.png') self.image.set_data(bg) # IF we uncomment this the user will have to select the # source every time a new folder is # analyzed. Not sure we should do that! try: self.arrow.arrow_annotation.set_visible(False) except AttributeError: pass # Reset the arrow self.arrow = None # open the json file of the experiment with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) # if the has already defined where the source was last # time, re-use this information if 'Source x' in experiment_info: already_analyzed = True self.arrow_annotation = self.ax.annotate( 'Previously Selected Source', xy=(experiment_info['Source x'], experiment_info['Source y']), xytext=(experiment_info['Source x'] + 50, experiment_info['Source y'] - 50), arrowprops=dict(facecolor='blue', shrink=0.05)) self.previously_defined_source = True else: self.previously_defined_source = False self.ax.figure.canvas.draw() else: for i in range(len(self.dataset_names)): os.chdir(self.path + self.dataset_names[i]) print(self.dataset_names[i]) # next, the data.csv file is being read... # find all files in folder all_files_in_folder = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] # find the 'data.csv' file.. for filename in all_files_in_folder: if 'data.csv' in filename: data_file_name = filename # .. and read it experimental_data = pd.read_csv(data_file_name) with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) os.chdir(self.path) # convert numpy to pandas all_distance_to_source_data_pd = pd.DataFrame( self.distance_to_source_mm) all_distance_to_source_data_pd.columns = self.dataset_names # plt.show() plot_distance_to_source(path=os.getcwd(), length=experiment_info['Recording time'], fps=experiment_info['Framerate'], distance=self.distance_to_source_mm, figname='Median_Distance_to_source') messagebox.showinfo('Folder analyzed', 'Finished analyzing!') self.child.destroy()
class SelectSource(object): """ This class connects the mouse click by the user with the displayed canvas. It enables the user to click on the presented image and then saves the x and y position as the source. """ def __init__(self, ax): self.ax = ax self.x_press = None self.y_press = None self.cidpress = self.ax.figure.canvas.mpl_connect( 'button_press_event', self.on_press) print(self.cidpress) def on_press(self, event): print('Left mouse button pressed at x=' + repr(event.xdata)[0:5] + ' y=' + repr(event.ydata)[0:5]) self.x_press = event.xdata self.y_press = event.ydata try: self.arrow_annotation.remove() except AttributeError: pass self.arrow_annotation = self.ax.annotate( 'Source', xy=(self.x_press, self.y_press), xytext=(self.x_press + 50, self.y_press - 50), arrowprops=dict(facecolor='black', shrink=0.05)) self.ax.figure.canvas.draw() def disonnect(self): self.ax.figure.canvas.mpl_disconnect(self.cidpress)
[docs]class AnalysisVRDistanceToSource(): """ After running a virtual reality experiment with a **single point source** we are often interested in the distance to this source. For example, when expressing the optogenetic tool Chrimson in the olfactory system of fruit fly larva, they will ascend a virtual odor gradient which is similar to real odor source. To easily enable the analysis of such an experiment, the user has the option to automatically analyze these experiments. This class is at the heart of the analysis. The user just has to select the folder containing the experiments. This class will automatically detect the maximum intensity point in virtual space and calculate the distance to that point for the duration of the experiment. The output is a csv file with the distance to the **single** point of maximum virtual stimulus for each analyzed experiment and a plot indicating the median and the indvidual trajectories. """ def __init__(self, path, multiple_files, string): self.path = path + DIRECTORY_INDICATOR self.string = string self.multiple_files = multiple_files self.counter = 0 if self.multiple_files: self.dataset_names = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*' + string + '*/')] for i_folder in range(len(self.dataset_names)): print('Folder ' + repr(i_folder+1) + ' of ' + repr(len( self.dataset_names)) ) # calculate the distance to the maximum intensity pixel dist = self.vr_distance_analysis( current_folder=self.dataset_names[i_folder]) if i_folder == 0: all_dist_to_source = np.zeros((dist.shape[0], len(self.dataset_names))) # save all the distances in one numpy file to create # the final plot all_dist_to_source[:,i_folder] = dist if i_folder == len(self.dataset_names)-1: # Take metadata from the last experiment - THIS # will break if experiments with different # metadata is used, of course!! with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) recording_time = experiment_info['Recording time'] framerate = experiment_info['Framerate'] # Save all_dist os.chdir(path) all_dist_to_source_mm_pd = pd.DataFrame( all_dist_to_source) # give columnname according to dataset_name all_dist_to_source_mm_pd.columns = \ self.dataset_names all_dist_to_source_mm_pd.to_csv( 'all_distance_to_VR_max.csv', sep=',') plot_distance_to_source(self.path, length=recording_time, fps=framerate, distance=all_dist_to_source, figname='Median_Distance_to_VR_max') else: self.vr_distance_analysis(current_folder=None) messagebox.showinfo('Done', 'Analysis finished') def vr_distance_analysis(self, current_folder): if current_folder is not None: current_folder_path = self.path + \ DIRECTORY_INDICATOR + \ current_folder else: current_folder_path = self.path os.chdir(current_folder_path) files = [p.replace(DIRECTORY_INDICATOR, '') for p in glob('*')] with open('experiment_settings.json', 'r') as file: experiment_info = json.load(file) recording_time = experiment_info['Recording time'] framerate = experiment_info['Framerate'] pixel_per_mm = experiment_info['Pixel per mm'] for i_file in files: if 'data.csv' in i_file: # print(i_file) data = pd.read_csv(i_file) if '640x480' in i_file: arena = np.genfromtxt(i_file, delimiter=',') # find the coordinates with the max value # only take the first, discard the rest. That's why only # gaussian gradients with should be used here! source_y = np.where(arena == np.max(arena))[0][0] source_x = np.where(arena == np.max(arena))[1][0] source = np.asarray((source_y, source_x)) # filter the centroid positions using a median filter with a # window size of three see what exactly happens here: # https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/scipy.signal.medfilt.html # and: https://en.wikipedia.org/wiki/Median_filter # the goal of this is to smoothen the noisy x and y # coordinates. The kernel size might need to be adjusted # depending on the framerate and the speed of movement of the # animal #medfilt_centroids = np.asarray((medfilt(data['X-Centroid'], 3), # medfilt(data['Y-Centroid'], # 3))) centroids = np.asarray((data['Y-Centroid'], data['X-Centroid'])) distance_to_source = cdist(centroids.T, source[np.newaxis, :])[:, 0] distance_to_source_mm = distance_to_source/pixel_per_mm # Save the distance to source data in a csv file in each folder distance_to_source_mm_pd = pd.DataFrame(distance_to_source_mm) distance_to_source_mm_pd.to_csv('distance_to_VR_max.csv', sep=',') plot_distance_to_source(path=current_folder_path,#self.path + # DIRECTORY_INDICATOR + current_folder, length=recording_time, fps=framerate, distance=distance_to_source_mm_pd, figname='Distance_to_VR_max') return(distance_to_source_mm)
def plot_distance_to_source(path, length, fps, distance,figname): fig = Figure(figsize=(5, 5)) # The canvas has to be called explicitly if not working # with pyplot... canvas = FigureCanvasAgg(fig) ax = fig.add_subplot(111) x_values = np.arange(0,length, 1/fps) if distance.ndim == 1: ax.plot(x_values, distance, c='r') else: ax.plot(x_values, distance, c='k', alpha=0.3) ax.plot(x_values, np.nanmedian(distance,axis=1), c='r') ax.set_xlabel('Time [s]') if distance.ndim <= 2: ax.set_ylabel('Centroid Distance to source [mm]') fig.savefig(path + DIRECTORY_INDICATOR + figname + '.png', dpi=300) else: ax.set_ylabel('Median Centroid Distance to source [mm]') fig.savefig(path + DIRECTORY_INDICATOR + figname + '.png', dpi=300)