This file contains the python script for importing and processing the raw data from the FF2 study. This file does not contain the lines used for trouble shooting errors in the data. Created April 7, 2025 Notebook 1: Import FFE Summaries import glob import os import pandas as pd import numpy as np import matplotlib.pyplot as plt from datetime import datetime from datetime import timedelta pd.options.mode.chained_assignment = None offset = 12 ### Configure Paths folder='path' dir_prefix = 'prefix' subject_folders = glob.glob(os.path.join(folder,dir_prefix)) print('Using contents of directories: ') PythonOutputDirectory = 'path' for each in subject_folders: if '_00_' in each: subject_folders.remove(each) subject_folders ### Define Function def load_data_sequence(files_list): # read in the files using pandas # begin with first, then read in the others to concatenate df = pd.read_csv(files_list[0], skiprows=4, delimiter = ',', names=['num','data','green','red','zero3'], engine='python') for i,each in enumerate(files_list[1:]): dfi = pd.read_csv(each,skiprows=4, delimiter = ',', names=['num','data','green','red','zero3'], engine='python') df = pd.concat([df,dfi]) df = df.drop(['num','zero3'], axis = 1) return df def find_ind_strt_index(data,new_list,date,cpc,session): d = [] select_list=[] for i in range(len(new_list[:-1])): if len(data.iloc[new_list[i]:new_list[i+1]])>2: select_list.append(new_list[i]) d.append({'date': date,'Session':session,'Number': len(select_list),'Indv_index': new_list[i],'CPC': cpc}) select_list.append(new_list[-1]) d.append({'date': date,'Session':session,'Number': len(select_list),'Indv_index': new_list[-1],'CPC': cpc}) return d,select_list def merge_ambient_mask_df(mask_data,ambient_data): ## assume that both cpc were started at the same time. ## row numbers should be the same for both data frames if len(mask_data) != len(ambient_data): diff = len(mask_data) - len(ambient_data) if diff < 0: print('Ambient data has more rows') ambient_data = ambient_data.iloc[:diff] else: print('Mask data has more rows') mask_data = mask_data.iloc[:-diff] FFE_df= pd.concat([ambient_data,mask_data], axis = 1) FFE_df= FFE_df.reset_index() FFE_df['FFE'] = 100-FFE_df.loc[:,'mask']/FFE_df.loc[:,'ambient']*100 return(FFE_df) def find_test_starts(indv_FFE_df,subject): ## list of rows that indicate new individual new_ind_list=list(indv_FFE_df.index.values[(indv_FFE_df['green_ambient']>0) & (indv_FFE_df['red_ambient']==0) & (indv_FFE_df['red_mask']==0)]) print(new_ind_list) print(indv_FFE_df.iloc[new_ind_list]) ## Selects index of row to start new individual new_ind_select_list=[] if(subject == "FF2-013"): print("Subject FF2_013") del new_ind_list[12] for i in range(len(new_ind_list[:-1])): if len(indv_FFE_df.iloc[new_ind_list[i]:new_ind_list[i+1]])>2 or len(indv_FFE_df.iloc[new_ind_list[i-1]:new_ind_list[i]])>2: new_ind_select_list.append(new_ind_list[i]) print(new_ind_select_list) new_ind_select_list2=[] for index in range(1,len(new_ind_select_list)): a = new_ind_select_list[index] b = new_ind_select_list[index-1] if a-b>20: new_ind_select_list2.append(new_ind_select_list[index]) start_positions=[new_ind_select_list[0]] start_positions.extend(new_ind_select_list2) ## First start is a leftover piece of the individual start and needs to be deleted. if subject == "FF2-009": start_positions.pop(0) if len(start_positions)!=12: print('Test number wrong! Check for markers!') start_positions.append(new_ind_list[-1]) #new_ind_select_list.append(new_ind_list[-1]) start_positions_offset = [x+offset for x in start_positions] if len(start_positions_offset) == 12: print("Using Ambient starts") return start_positions_offset else: print("Here are the mask starts!") new_ind_list_mask=list(indv_FFE_df.index.values[(indv_FFE_df['green_mask']>0) & (indv_FFE_df['red_mask']==0) & (indv_FFE_df['red_ambient']==0)]) print(new_ind_list_mask) print(indv_FFE_df.iloc[new_ind_list_mask]) new_ind_select_list=[] for i in range(len(new_ind_list_mask[:-1])): if len(indv_FFE_df.iloc[new_ind_list_mask[i]:new_ind_list_mask[i+1]])>2 or len(indv_FFE_df.iloc[new_ind_list_mask[i-1]:new_ind_list_mask[i]])>2: new_ind_select_list.append(new_ind_list_mask[i]) print(new_ind_select_list) new_ind_select_list2=[] for index in range(1,len(new_ind_select_list)): a = new_ind_select_list[index] b = new_ind_select_list[index-1] if a-b>20: new_ind_select_list2.append(new_ind_select_list[index]) start_positions_mask=[new_ind_select_list[0]] start_positions_mask.extend(new_ind_select_list2) print(start_positions_mask) if len(start_positions_mask)!=12: print('Test number wrong for masks as well! Check for markers!') start_positions_mask.append(new_ind_list_mask[-1]) print(start_positions_mask) start_positions_offset_mask = [x+offset for x in start_positions_mask] if len(start_positions_offset_mask) == 12: print("Using mask starts") return start_positions_offset_mask def get_activity_data_arrays(start_positions,tot_data): mask_trials = 4 tstills_inds = np.array([0+i*3 for i in range(mask_trials)]) tlr_inds = tstills_inds+1 tud_inds = tstills_inds+2 # then get actual indices of times in data for those events stills_inds = [start_positions[tind] for tind in tstills_inds] lr_inds = [start_positions[tind] for tind in tlr_inds] ud_inds = [start_positions[tind] for tind in tud_inds] # extract the measurements from FFE data by measurement indices stills = np.array([tot_data['FFE'][ind:ind+30] for ind in stills_inds]) lrs = np.array([tot_data['FFE'][ind:ind+30] for ind in lr_inds]) uds = np.array([tot_data['FFE'][ind:ind+30] for ind in ud_inds]) return stills,lrs,uds,stills_inds,lr_inds,ud_inds def generate_basic_summary(start_positions,tot_data): # generate the summaries for each subject # create an empty data frame in which to store the data (previously loaded from file) summary_df = pd.DataFrame() # make column of measurement labels mask_trials = 4 summary_df['Condition'] = pd.Series(['KN95','KN95_clip','surgical','surgical_clip']) # first get indices for where to look in times_df for *event* indices stills,lrs,uds,stills_inds,lr_inds,ud_inds = get_activity_data_arrays(start_positions,tot_data) tots = np.hstack((stills,lrs,uds)) #print(np.shape(tots)) #for i in range(7): # print(bends[i]) # print(reads[i]) # print(lrs[i]) # print(uds[i]) #print(summary_df) #print(np.shape(bends)) summary_df['Stationary Mean'] = [np.mean(each) for each in stills] summary_df['Stationary SD'] = [np.std(each) for each in stills] summary_df['LR Mean'] =[np.mean(each) for each in lrs] summary_df['LR SD'] = [np.std(each) for each in lrs] summary_df['UD Mean'] = [np.mean(each) for each in uds] summary_df['UD SD'] = [np.std(each) for each in uds] summary_df['Overall Mean'] = [np.mean(each,axis=0) for each in tots] summary_df['Overall SD'] = [np.std(each,axis=0) for each in tots] return summary_df def indv_summary_construct(ambient_indv_list,mask_indv_list, new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data, subj_num_df): summary_df=[] ## join session, number, subj_id, and date subj_num_df=subj_num_df.merge(mask_indv_list, left_on=['Session','Number'], right_on=['Session','Number']) for i in range(len(new_ind_select_ambient_list)): subject = subj_num_df['Subject ID'][i] try: indv_mask_df=mask_data.iloc[new_ind_select_mask_list[i]:new_ind_select_mask_list[i+1]] except: indv_mask_df=mask_data.iloc[new_ind_select_mask_list[i]:mask_data.index[-1]] try: indv_ambient_df=ambient_data.iloc[new_ind_select_ambient_list[i]:new_ind_select_ambient_list[i+1]] except: indv_ambient_df=ambient_data.iloc[new_ind_select_ambient_list[i]:ambient_data.index[-1]] ## merge mask and ambient indv_FFE_df=merge_ambient_mask_df(indv_mask_df, indv_ambient_df) indv_FFE_df.is_copy = None indv_FFE_df.reset_index(drop = True, inplace = True) ## Find Test Starts print("Subject:",subject) start_positions = find_test_starts(indv_FFE_df,subject) print("Start Positions:",start_positions) print(indv_FFE_df.iloc[start_positions]) indv_summary_df = generate_basic_summary(start_positions, indv_FFE_df) indv_summary_df['Subject']=subject indv_summary_df.to_csv(PythonOutputDirectory+'path'+subject+'filename.csv',index=False) summary_df.append(indv_summary_df) return summary_df ## Construct Data Frame 1st method ## Selects index of row to start new individual new_ind_select_ambient_list=[] date_indv_ambient=[] ## data set with IDs and dates subj_num_df=pd.read_excel("path", sheet_name = '',usecols="") for i,subject_folder in enumerate(subject_folders): ## indentifying date for folder date = subject_folder.split('\\')[-1][4:] session = i+1 print(date) try: date_obj = datetime.strptime(date,'%m_%d_%Y') except: date_alt = datetime.strptime(date,'%Y_%m_%d') date_obj = datetime.strptime(date_alt.strftime('%m_%d_%Y'),'%m_%d_%Y') print(date_obj) if date_obj < datetime.strptime('date','%m_%d_%Y'): #if date_obj == datetime.strptime('07_24_2024','%m_%d_%Y'): ## import CPC data ambient_files = glob.glob(os.path.join(subject_folder +'\\ambient\\*')) mask_files = glob.glob(os.path.join(subject_folder + '\\mask\\*')) ambient_data = load_data_sequence(ambient_files) print(ambient_data.head()) mask_data = load_data_sequence(mask_files) print(mask_data.shape) ## assume that both cpc were started at the same time. ## change names of identifying columns in mask df to check for consistency mask_data.rename({'data':'mask','green':'green_mask','red':'red_mask'},axis = 1,inplace = True) mask_data = mask_data.reset_index(drop = True) ambient_data.rename({'data':'ambient','green':'green_ambient','red':'red_ambient'},axis = 1,inplace = True) ambient_data = ambient_data.reset_index(drop = True) print(ambient_data.head()) ##Search for individuals ## list of rows that indicate new individual ### ambient cpc = "Ambient" new_ind_list_ambient=list(ambient_data.index.values[(ambient_data['green_ambient']>0) & (ambient_data['red_ambient']>0)]) print(new_ind_list_ambient) ambient_indv_list,new_ind_select_ambient_list = find_ind_strt_index(ambient_data,new_ind_list_ambient,date,cpc,session) ambient_indv_session_df=pd.DataFrame(ambient_indv_list, columns=['date','Session','Number','Indv_index','CPC']) print(ambient_indv_session_df) print(new_ind_select_ambient_list) ### mask new_ind_list_mask=list(mask_data.index.values[(mask_data['green_mask']>0) & (mask_data['red_mask']>0)]) print(new_ind_list_mask) cpc = "Mask" mask_indv_list,new_ind_select_mask_list = find_ind_strt_index(mask_data,new_ind_list_mask,date,cpc,session) mask_indv_session_df=pd.DataFrame(mask_indv_list, columns=['date','Session','Number','Indv_index','CPC']) print(mask_indv_session_df) print(new_ind_select_mask_list) ## Need to also check for individuals that are not marked in one dataset but marked in the other. if len(new_ind_select_ambient_list)==len(new_ind_select_mask_list): print("Ambient and Mask has same number of individuals") summary_df=indv_summary_construct(ambient_indv_session_df,mask_indv_session_df,new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data,subj_num_df) else: print("The number of individuals in ambient and mask data sets are not the same") avg_ambient = ambient_data['ambient'].mean() sd_ambient = ambient_data['ambient'].std() max_ambient = ambient_data['ambient'].max() min_ambient = ambient_data['ambient'].min() median_ambient = ambient_data['ambient'].median() print('mean: ', avg_ambient, '\nstandard deviation: ', sd_ambient, '\nmax: ',max_ambient, '\nminimum: ', min_ambient, '\nmedian: ', median_ambient) ## remove values where air cleaner is on ambient_data_stable=ambient_data.loc[ambient_data['ambient']>10000] avg_ambient = ambient_data_stable['ambient'].mean() sd_ambient = ambient_data_stable['ambient'].std() max_ambient = ambient_data_stable['ambient'].max() min_ambient = ambient_data_stable['ambient'].min() median_ambient = ambient_data_stable['ambient'].median() print('mean: ', avg_ambient, '\nstandard deviation: ', sd_ambient, '\nmax: ',max_ambient, '\nminimum: ', min_ambient, '\nmedian: ', median_ambient) summary_df=indv_summary_construct_alt(ambient_indv_session_df,mask_indv_session_df,new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data, subj_num_df, avg_ambient, sd_ambient) else: print("New Method of Collection. Continue to next chunk.") ## Define functions new method def load_data_sequence_new(files_list): # read in the files using pandas df = pd.read_csv(files_list[0], skiprows=17, engine='python') #df = df.drop(['num','zero3'], axis = 1) if len(files_list) > 1: for i,each in enumerate(files_list[1:]): dfi = pd.read_csv(each,skiprows=17, delimiter = ',', #names=['num','data','green','red','zero3'], engine='python') df = pd.concat([df,dfi]) return df def load_data_sequence_NoRowsRmv(files_list): # read in the files using pandas df = pd.read_csv(files_list[0], engine='python') #df = df.drop(['num','zero3'], axis = 1) return df ## For November 13, 2024 def load_data_sequence(files_list): # read in the files using pandas # begin with first, then read in the others to concatenate df = pd.read_csv(files_list[0], skiprows=17, delimiter = ',', #names=['num','data','green','red','zero3'], engine='python') for i,each in enumerate(files_list[1:]): dfi = pd.read_csv(each,skiprows=17, delimiter = ',', #names=['num','data','green','red','zero3'], engine='python') df = pd.concat([df,dfi]) #df = df.drop(['num','zero3'], axis = 1) return df def find_ind_strt_index(data,new_list,date,cpc,session): d = [] select_list=[] session_check = [36] session_check2 = [13] index_check = [18863, 18868] index_check2 = [1586,1583] #print(index_check) #print(session_check) for i in range(len(new_list[:-1])): if len(data.iloc[new_list[i]:new_list[i+1]])>2: select_list.append(new_list[i]) print(new_list[i]) if session in session_check2 and new_list[i] in index_check2: del select_list[1] continue ## restart for FF2-098, need to remove unfinished session if session in session_check and new_list[i] in index_check: print("Index is present") del select_list[3] continue d.append({'date': date,'Session':session,'Number': len(select_list),'Indv_index': new_list[i],'CPC': cpc}) if session == 58 and cpc == "Ambient": select_list.pop(4) d.pop(4) select_list.append(new_list[-1]) d.append({'date': date,'Session':session,'Number': len(select_list),'Indv_index': new_list[-1],'CPC': cpc}) return d,select_list def merge_ambient_mask_df(mask_data,ambient_data,time_adj): ## assume that both cpc were started at the same time. ## row numbers should be the same for both data frames ambient_data = ambient_data[ambient_data['ambient'].notna()] mask_data = mask_data[mask_data['mask'].notna()] ambient_data['Time'] = ambient_data['Time'] + pd.Timedelta(seconds = time_adj) #ambient_data['Time'] = ambient_data['Time'].dt.strftime('%H:%M:%s') ambient_data['Time'] = ambient_data['Time'].astype(str) mask_data['Time'] = mask_data['Time'].astype(str) mask_data['Time'] = mask_data['Time'].str.pad(width = 8, side = 'left',fillchar = '0') ambient_data['Time']=ambient_data['Time'].str[-8:] print(ambient_data.head()) print(mask_data.head()) print(ambient_data.dtypes) print(mask_data.dtypes) #FFE_df = pd.merge(ambient_data,mask_data, on = 'Time') FFE_df = pd.merge(ambient_data,mask_data, on = 'Time', how = 'outer') print("Just merged!!!") print(FFE_df.head()) print(FFE_df.tail()) #FFE_df= pd.concat([ambient_data,mask_data], axis = 1) #FFE_df= FFE_df.reset_index() FFE_df['FFE'] = 100-FFE_df.loc[:,'mask']/FFE_df.loc[:,'ambient']*100 return(FFE_df) def find_test_starts(indv_FFE_df,subject): ## list of rows that indicate new individual new_ind_list=list(indv_FFE_df.index.values[(indv_FFE_df['green_ambient']>0) & (indv_FFE_df['red_ambient']==0) & (indv_FFE_df['red_mask']==0)]) print(new_ind_list) print(indv_FFE_df.iloc[new_ind_list]) ## Selects index of row to start new individual new_ind_select_list=[] for i in range(len(new_ind_list[:-1])): if len(indv_FFE_df.iloc[new_ind_list[i]:new_ind_list[i+1]])>4 or len(indv_FFE_df.iloc[new_ind_list[i-1]:new_ind_list[i]])>4: new_ind_select_list.append(new_ind_list[i]) print(new_ind_select_list) new_ind_select_list2=[] for index in range(1,len(new_ind_select_list)): a = new_ind_select_list[index] b = new_ind_select_list[index-1] if a-b>20: new_ind_select_list2.append(new_ind_select_list[index]) start_positions=[new_ind_select_list[0]] if subject == "FF2-147": start_positions.insert(0,1) start_positions.extend(new_ind_select_list2) if len(start_positions)!=12: print('Test number wrong! Check for markers!') start_positions.append(new_ind_list[-1]) #new_ind_select_list.append(new_ind_list[-1]) if subject == "FF2-097": start_positions = start_positions[:12] print(start_positions) ## Extra tests due to restart of following subject start_positions_offset = [x+offset for x in start_positions] if len(start_positions_offset) == 12: print("Using Ambient starts") return start_positions_offset else: print("Here are the mask starts!") new_ind_list_mask=list(indv_FFE_df.index.values[(indv_FFE_df['green_mask']>0) & (indv_FFE_df['red_mask']==0) & (indv_FFE_df['red_ambient']==0)]) print(new_ind_list_mask) print(indv_FFE_df.iloc[new_ind_list_mask]) new_ind_select_list=[] for i in range(len(new_ind_list_mask[:-1])): if len(indv_FFE_df.iloc[new_ind_list_mask[i]:new_ind_list_mask[i+1]])>4 or len(indv_FFE_df.iloc[new_ind_list_mask[i-1]:new_ind_list_mask[i]])>4: new_ind_select_list.append(new_ind_list_mask[i]) print(new_ind_select_list) new_ind_select_list2=[] for index in range(1,len(new_ind_select_list)): a = new_ind_select_list[index] b = new_ind_select_list[index-1] if a-b>20: new_ind_select_list2.append(new_ind_select_list[index]) start_positions_mask=[new_ind_select_list[0]] start_positions_mask.extend(new_ind_select_list2) print(start_positions_mask) if len(start_positions_mask)!=12: print('Test number wrong for masks as well! Check for markers!') start_positions_mask.append(new_ind_list_mask[-1]) if subject == "": print("Signal Error. Need to replace lost start") start_positions_mask.insert(5,373) print(start_positions_mask) start_positions_offset_mask = [x+offset for x in start_positions_mask] if len(start_positions_offset_mask) == 12: print("Using mask starts") return start_positions_offset_mask def find_test_starts_mask_stops(indv_FFE_df): new_ind_list=list(indv_FFE_df.index.values[(indv_FFE_df['green_mask']==0) & (indv_FFE_df['red_mask']>0)]) # & (indv_FFE_df['red_ambient']==0) print(new_ind_list) print(indv_FFE_df.iloc[new_ind_list]) ## Selects index of row to start new test new_ind_select_list=[] for i in range(len(new_ind_list[:-1])): if len(indv_FFE_df.iloc[new_ind_list[i]:new_ind_list[i+1]])>2 or len(indv_FFE_df.iloc[new_ind_list[i-1]:new_ind_list[i]])>2: new_ind_select_list.append(new_ind_list[i]) print(new_ind_select_list) new_ind_select_list2=[] for index in range(1,len(new_ind_select_list)): a = new_ind_select_list[index] b = new_ind_select_list[index-1] if a-b>20: new_ind_select_list2.append(new_ind_select_list[index]) if new_ind_list[-1] == 738: new_ind_select_list2.append(new_ind_list[-1]) ## Subject FF2-139 missed a start marker start_positions=[new_ind_select_list[0]] start_positions.extend(new_ind_select_list2) start_positions_offset = [x+offset-30 for x in start_positions] print(start_positions_offset) return(start_positions_offset) def get_activity_data_arrays(start_positions,tot_data): mask_trials = 4 tstills_inds = np.array([0+i*3 for i in range(mask_trials)]) tlr_inds = tstills_inds+1 tud_inds = tstills_inds+2 # then get actual indices of times in data for those events stills_inds = [start_positions[tind] for tind in tstills_inds] lr_inds = [start_positions[tind] for tind in tlr_inds] ud_inds = [start_positions[tind] for tind in tud_inds] # extract the measurements from FFE data by measurement indices stills = np.array([tot_data['FFE'][ind:ind+30] for ind in stills_inds]) lrs = np.array([tot_data['FFE'][ind:ind+30] for ind in lr_inds]) uds = np.array([tot_data['FFE'][ind:ind+30] for ind in ud_inds]) return stills,lrs,uds,stills_inds,lr_inds,ud_inds def generate_basic_summary(start_positions,tot_data): # generate the summaries for each subject # create an empty data frame in which to store the data (previously loaded from file) summary_df = pd.DataFrame() # make column of measurement labels mask_trials = 4 summary_df['Condition'] = pd.Series(['KN95','KN95_clip','surgical','surgical_clip']) # first get indices for where to look in times_df for *event* indices stills,lrs,uds,stills_inds,lr_inds,ud_inds = get_activity_data_arrays(start_positions,tot_data) tots = np.hstack((stills,lrs,uds)) summary_df['Stationary Mean'] = [np.mean(each) for each in stills] summary_df['Stationary SD'] = [np.std(each) for each in stills] summary_df['LR Mean'] =[np.mean(each) for each in lrs] summary_df['LR SD'] = [np.std(each) for each in lrs] summary_df['UD Mean'] = [np.mean(each) for each in uds] summary_df['UD SD'] = [np.std(each) for each in uds] summary_df['Overall Mean'] = [np.mean(each,axis=0) for each in tots] summary_df['Overall SD'] = [np.std(each,axis=0) for each in tots] return summary_df def indv_summary_construct(ambient_indv_list,mask_indv_list, new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data,subj_num_df): summary_df=[] ## join session, number, subj_id, and date subj_num_df=subj_num_df.merge(mask_indv_list, left_on=['Session','Number'], right_on=['Session','Number']) print(subj_num_df.tail()) print("Here are the individuals: ",new_ind_select_ambient_list) for i in range(len(new_ind_select_ambient_list)): subject = subj_num_df['Subject ID'][i] time_adj=subj_num_df['Time_Adj'][i] print(subject) print("Adjust Time: ",time_adj) subject_skip_list = ["",""] if subject in subject_skip_list: continue try: indv_mask_df=mask_data.iloc[new_ind_select_mask_list[i]:new_ind_select_mask_list[i+1]] except: indv_mask_df=mask_data.iloc[new_ind_select_mask_list[i]:mask_data.index[-1]] try: indv_ambient_df=ambient_data.iloc[new_ind_select_ambient_list[i]:new_ind_select_ambient_list[i+1]] except: indv_ambient_df=ambient_data.iloc[new_ind_select_ambient_list[i]:ambient_data.index[-1]] indv_ambient_df['New_Time'] = indv_ambient_df['Time'] ## merge mask and ambient print("About to merge!!!") indv_FFE_df=merge_ambient_mask_df(indv_mask_df, indv_ambient_df,time_adj) print("After Merge: ",indv_FFE_df.head()) indv_FFE_df.is_copy = None indv_FFE_df.reset_index(drop = True, inplace = True) ## Find Test Starts print(indv_FFE_df.shape) print(indv_FFE_df.head(10)) print(indv_FFE_df.tail(10)) mask_stops_indv = [] # if subject in mask_stops_indv: start_positions = find_test_starts_mask_stops(indv_FFE_df) else: start_positions = find_test_starts(indv_FFE_df,subject) print("Before :",start_positions) if subject =="": time_list = [] new_ind_select_list=list(indv_FFE_df.index.values[(indv_FFE_df['Time'].isin(time_list))]) print(new_ind_select_list) start_positions = [x+offset for x in new_ind_select_list] print("After :",start_positions) if subject =="": time_list = [] new_ind_select_list=list(indv_FFE_df.index.values[(indv_FFE_df['Time'].isin(time_list))]) #print(new_ind_select_list) start_positions = [x+offset for x in new_ind_select_list] #print("After :",start_positions) print(indv_FFE_df.iloc[start_positions]) indv_summary_df = generate_basic_summary(start_positions, indv_FFE_df) #indv_summary_df['Subject']=subject subject = subj_num_df['Subject ID'][i] indv_summary_df['Subject']=subject missing_knClipUD = [] ## These subjects were cut off during the KN95 with clip UD test. if subject in missing_knClipUD: indv_summary_df.at[1,'Overall Mean'] = (indv_summary_df.at[1,'Stationary Mean'] + indv_summary_df.at[1,'LR Mean'])/2 indv_summary_df.at[1,'Overall SD'] = (indv_summary_df.at[1,'Stationary SD'] + indv_summary_df.at[1,'LR SD'])/2 #indv_summary_df.to_csv(PythonOutputDirectory+'FFE_individual_summaries\\'+subject+'_FFE_summary.csv',index=False) indv_summary_df.to_csv(PythonOutputDirectory+'path',index=False) ## maybe concat is better to add rows summary_df.append(indv_summary_df) return summary_df ## Construct Data Frame New Method ## Selects index of row to start new individual new_ind_select_ambient_list=[] date_indv_ambient=[] ## data set with IDs and dates subj_num_df=pd.read_excel("path", sheet_name = '',usecols="") for i,subject_folder in enumerate(subject_folders): ## indentifying date for folder date = subject_folder.split('\\')[-1][4:] session = i+1 #print("Session: ",session) print(date) try: date_obj = datetime.strptime(date,'%m_%d_%Y') except: date_alt = datetime.strptime(date,'%Y_%m_%d') date_obj = datetime.strptime(date_alt.strftime('%m_%d_%Y'),'%m_%d_%Y') print(date_obj) ############################### Change Date to update new data #################################### if date_obj == datetime.strptime('','%m_%d_%Y'): print("Analyzing this date") else: continue ######################################################################################################## ## import CPC data ambient_files = glob.glob(os.path.join(subject_folder +'\\ambient\\*')) mask_files = glob.glob(os.path.join(subject_folder + '\\mask\\*')) # if date_obj == datetime.strptime('10_29_2024','%m_%d_%Y'): # ambient_data = load_data_sequence_NoRowsRmv(ambient_files) # else: ambient_data = load_data_sequence_new(ambient_files) # if date_obj < datetime.strptime('08_25_2024','%m_%d_%Y'): # ambient_data = ambient_data.drop(ambient_data.columns[[-1]],axis = 1) ######## For 11-13-2024######## #ambient_data = load_data_sequence(ambient_files) print(ambient_data.head()) mask_data = load_data_sequence_new(mask_files) print(mask_data.shape) ## assume that both cpc were started at the same time. ## change names of identifying columns in mask df to check for consistency mask_data.rename({'Concentration (#/cm³)':'mask','Analog 1':'green_mask','Analog 2':'red_mask'},axis = 1,inplace = True) mask_data = mask_data.reset_index(drop = True) ambient_data.rename({'Concentration (#/cm³)':'ambient','Analog 1':'green_ambient','Analog 2':'red_ambient'},axis = 1,inplace = True) ambient_data = ambient_data.reset_index(drop = True) print(ambient_data.head()) ##Search for individuals ## list of rows that indicate new individual ### ambient cpc = "Ambient" new_ind_list_ambient=list(ambient_data.index.values[(ambient_data['green_ambient']>0) & (ambient_data['red_ambient']>0)]) print(new_ind_list_ambient) if session == 50: del new_ind_list_ambient[0] ambient_indv_list,new_ind_select_ambient_list = find_ind_strt_index(ambient_data,new_ind_list_ambient,date,cpc,session) ambient_indv_session_df=pd.DataFrame(ambient_indv_list, columns=['date','Session','Number','Indv_index','CPC']) print(ambient_indv_session_df) print(new_ind_select_ambient_list) ### mask new_ind_list_mask=list(mask_data.index.values[(mask_data['green_mask']>0) & (mask_data['red_mask']>0)]) print(new_ind_list_mask) cpc = "Mask" ### Restart for participant 129 if session == 50: del new_ind_list_mask[0] mask_indv_list,new_ind_select_mask_list = find_ind_strt_index(mask_data,new_ind_list_mask,date,cpc,session) print(mask_indv_list) mask_indv_session_df=pd.DataFrame(mask_indv_list, columns=['date','Session','Number','Indv_index','CPC']) print(mask_indv_session_df) print(new_ind_select_mask_list) if session == 63: mask_indv_session_df = mask_indv_session_df.iloc[:-1] del new_ind_select_mask_list[-1] print(mask_indv_session_df) print(new_ind_select_mask_list) if date_obj == datetime.strptime('','%m_%d_%Y'): new_ind_select_mask_list.pop(-2) new_ind_select_ambient_list.pop(-2) ## Need to also check for individuals that are not marked in one dataset but marked in the other. if len(new_ind_select_ambient_list)==len(new_ind_select_mask_list): print("Ambient and Mask has same number of individuals") summary_df=indv_summary_construct(ambient_indv_session_df,mask_indv_session_df,new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data,subj_num_df) else: print("The number of individuals in ambient and mask data sets are not the same") avg_ambient = ambient_data['ambient'].mean() sd_ambient = ambient_data['ambient'].std() max_ambient = ambient_data['ambient'].max() min_ambient = ambient_data['ambient'].min() median_ambient = ambient_data['ambient'].median() print('mean: ', avg_ambient, '\nstandard deviation: ', sd_ambient, '\nmax: ',max_ambient, '\nminimum: ', min_ambient, '\nmedian: ', median_ambient) ## remove values where air cleaner is on ambient_data_stable=ambient_data.loc[ambient_data['ambient']>10000] avg_ambient = ambient_data_stable['ambient'].mean() sd_ambient = ambient_data_stable['ambient'].std() max_ambient = ambient_data_stable['ambient'].max() min_ambient = ambient_data_stable['ambient'].min() median_ambient = ambient_data_stable['ambient'].median() print('mean: ', avg_ambient, '\nstandard deviation: ', sd_ambient, '\nmax: ',max_ambient, '\nminimum: ', min_ambient, '\nmedian: ', median_ambient) summary_df=indv_summary_construct_alt(ambient_indv_session_df,mask_indv_session_df,new_ind_select_ambient_list,new_ind_select_mask_list, ambient_data, mask_data, subj_num_df, avg_ambient, sd_ambient) ## Notebook 2: FF2_Save_Mesh_landmarks_as_npz ## Import import numpy as np import matplotlib.pyplot as plt import matplotlib.image as mpimg from time import time import scipy as sp plt.ion() %matplotlib notebook %matplotlib inline import os folders = os.listdir('path') #folders = folders[1:-3] # Drop "BillPractice" and stuff at end output_directory = 'path' folders # This function loads the data def get_data(folder,obj_too = True): directory = 'path' ##### LOAD FACIAL LANDMARKS with open(directory+folder+'path') as file: lines = file.readlines() comment_lines = [] data_start_lines = [] for i in range(len(lines)): if '#' in lines[i]: comment_lines.append(i) if 'data' in lines[i]: data_start_lines.append(i) p2f = "".join(lines[data_start_lines[0]:comment_lines[2]]) p2f = p2f.replace("\n","").replace(" ","").replace("data:","").replace("[","").replace("]","").split(",") p2f = np.array(p2f,dtype=float) p2f = p2f.reshape((int(len(p2f)/2.0),2)) p3f = "".join(lines[data_start_lines[1]:comment_lines[3]]) p3f = p3f.replace("\n","").replace(" ","").replace("data:","").replace("[","").replace("]","").split(",") p3f = np.array(p3f,dtype=float) p3f = p3f.reshape((int(len(p3f)/3.0),3)) ######################## SAME BUT EARS try: with open(directory+folder+'path') as file: lines = file.readlines() comment_lines = [] data_start_lines = [] for i in range(len(lines)): if '#' in lines[i]: comment_lines.append(i) if 'data' in lines[i]: data_start_lines.append(i) p2e = "".join(lines[data_start_lines[0]:comment_lines[2]]) p2e = p2e.replace("\n","").replace(" ","").replace("data:","").replace("[","").replace("]","").split(",") p2e = np.array(p2e,dtype=float) p2e = p2e.reshape((int(len(p2e)/2.0),2)) p3e = "".join(lines[data_start_lines[1]:]) p3e = p3e.replace("\n","").replace(" ","").replace("data:","").replace("[","").replace("]","").split(",") p3e = np.array(p3e,dtype=float) p3e = p3e.reshape((int(len(p3e)/3.0),3)) except: p2e = 'NONE' p3e = 'NONE' ###OBJ FILE v_array = [] # vertex coordinates (x,y,z) vt_array = [] # point in the texture map f_array = [] # face commands with open(directory+folder+'\\headscan.obj') as file: lines = file.readlines() for line in lines: split_line = line.split() if split_line[0] == 'v': v_array.append(split_line[1:]) elif split_line[0] == 'vt': vt_array.append(split_line[1:]) elif split_line[0] == 'f': split_line=[each.split("/") for each in split_line[1:]] f_array.append(split_line) v_array = np.array(v_array).astype(float) vt_array = np.array(vt_array).astype(float) f_array = np.array(f_array).astype(float) obj_array = np.array([v_array,vt_array,f_array],dtype=object) ###Image file img=mpimg.imread(directory+folder+'\\headscan.jpg') return p2f,p3f,p2e,p3e,obj_array,img output_folder = 'path' processed_files = os.listdir(output_folder) processed_file_numbers = [file[0:3] for file in processed_files] data_folder_numbers = [folder[4:7] for folder in folders] numbers_to_process = list(set(data_folder_numbers) - set(processed_file_numbers)) folders_to_process = [] print('Data for the following subjects need to be processed:') print(numbers_to_process) # shameful, ugly loop for each in folders: for number in numbers_to_process: if number in each: folders_to_process.append(each) start_time = time() folders = np.array(folders,dtype=str) datas = [] for each in folders_to_process:#_subset: data = get_data(each) datas.append(data) #plot_landmarks(data[0],data[1],data[2],data[3],data[4],data[5],each,True,True,[2478,1382]) end_time = time() print('loading time (minutes)') print((end_time-start_time)/60) for i,data in enumerate(datas): np.savez_compressed(output_directory+folders_to_process[i][4:7],flm2 = data[0],flm3 = data[1], elm2 = data[2], elm3 = data[3], mesh = data[4]) ## Notebook 3: Extract Landmark-only distance measurements ## Import import numpy as np import matplotlib.pyplot as plt import matplotlib.pyplot as plt import matplotlib.image as mpimg from time import time import scipy as sp import pandas as pd plt.ion() %matplotlib notebook %matplotlib inline import os folder = 'path' files = os.listdir(folder) print(files) ## define functions def get_distance_3d(x1,x2): return ((x1[0]-x2[0])**2+(x1[1]-x2[1])**2 + (x1[2]-x2[2])**2)**0.5 def plot_landmarks_3D(flm3,elm3,title,plot_ears=False,plot_marker = [0,0]): xyz = [0,0,0] #fig = plt.figure() ax = plt.figure(figsize=[6,6]).add_subplot(projection='3d') #ax = fig2.figure(projection='3d') #ax.set_title( # '\npupil distance (mm): '+str(pupil_distance_3d)+ # '\ntemple distance (mm): '+str(temple_distance_3d)+ # '\nforehead to nose (mm): '+str(forehead_to_bridge_3d)+ # '\near crest to crest (mm): '+str(ear_crest_distance_3d)+ # '\near lobe to lobe (mm): '+str(ear_lobe_distance_3d)+ #'\nleft lobe to bridge (mm): '+str(left_lobe_to_bridge_3d) # ,fontsize=10) #plot all facelm ax.scatter(flm3[:,0],flm3[:,1],flm3[:,2],s=2,c='green') #plot special face lm ax.scatter(flm3[38,0],flm3[38,1],flm3[38,2],s=6,c='violet') ax.scatter(flm3[39,0],flm3[39,1],flm3[39,2],s=6,c='violet') ax.scatter(flm3[0,0],flm3[0,1],flm3[0,2],s=3,c = 'red') ax.scatter(flm3[12,0],flm3[12,1],flm3[12,2],s=4,c = 'red') #center forehead ax.scatter(flm3[14,0],flm3[14,1],flm3[14,2],s=4,c = 'magenta') #bridge nose ax.scatter(flm3[49,0],flm3[49,1],flm3[49,2],s=4, c = 'blue') #plot all earlm ax.scatter(elm3[:,0],elm3[:,1],elm3[:,2],s=2,c='blue') #ear crest ax.scatter(elm3[1,0],elm3[1,1],elm3[1,2],s=4,c='yellow') ax.scatter(elm3[17,0],elm3[17,1],elm3[17,2],s=4,c='yellow') #ear lobe ax.scatter(elm3[9,0],elm3[9,1],elm3[9,2],s=4,c='gold') ax.scatter(elm3[25,0],elm3[25,1],elm3[25,2],s=4,c='gold') ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') for i,each in enumerate(flm3): ax.text(each[0],each[1],each[2], str(i)) for i, each in enumerate(elm3): ax.text(each[0],each[1],each[2],str(i),color='red') plt.show() ## load flm, elm data specifically p3fs = [] p3es = [] for file in files: p3f = np.load(folder+file,allow_pickle=True)['flm3'] p3e = np.load(folder+file,allow_pickle=True)['elm3'] p3fs.append(p3f) p3es.append(p3e) bizygos = [] menSels = [] RELens = [] lipLens = [] subjects = [] for i in range(len(p3fs)): subjects.append(int(files[i][0:3])) bizygos.append(get_distance_3d(p3fs[i][1],p3fs[i][11])) menSels.append(get_distance_3d(p3fs[i][89],p3fs[i][6])) RELens.append(get_distance_3d(p3es[i][9],p3es[i][2])) lipLens.append(get_distance_3d(p3fs[i][59],p3fs[i][65])) lm_df = pd.DataFrame(np.transpose([subjects,bizygos,menSels,RELens,lipLens]),columns=['subject','Bizyg_1_11','MenSel_6_89','EarLength_2_9','LipLength_59_65']) lm_df.to_csv('path',index=False) ## Notebook 4 Load Camera Outputs, project contours import numpy as np import matplotlib.pyplot as plt import matplotlib.pyplot as plt import matplotlib.image as mpimg from time import time import scipy as sp plt.ion() %matplotlib notebook %matplotlib inline import os folder = 'path' files = os.listdir(folder) files ## Define Functions def get_distance(x1,x2): return ((x1[0]-x2[0])**2+(x1[1]-x2[1])**2)**0.5 def get_distance_3d(x1,x2): return ((x1[0]-x2[0])**2+(x1[1]-x2[1])**2 + (x1[2]-x2[2])**2)**0.5 ### Functions strictly for 3D calculations def rotation_matrix_from_vectors(vec1, vec2): # courtesy of https://stackoverflow.com/questions/45142959/calculate-rotation-matrix-to-align-two-vectors-in-3d-space """ Find the rotation matrix that aligns vec1 to vec2 :param vec1: A 3d "source" vector :param vec2: A 3d "destination" vector :return mat: A transform matrix (3x3) which when applied to vec1, aligns it with vec2. """ a, b = (vec1 / np.linalg.norm(vec1)).reshape(3), (vec2 / np.linalg.norm(vec2)).reshape(3) v = np.cross(a, b) c = np.dot(a, b) s = np.linalg.norm(v) kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]]) rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2)) return rotation_matrix def rotate_translate_project(p0,p1,p2,data3D,max_proj_dist,verbose = False): # rotates all points so that plane normal is z-hat, then subtracts z target_vec = np.array([0,0,1]) plane_normal = np.cross(p1-p0,p2-p0)/np.linalg.norm(np.cross(p1-p0,p2-p0)) displacement_vec = p0 if verbose: print('normal: ',plane_normal) print('displacement: ',displacement_vec) if np.shape(data3D)[1] != 3: # need rows of [x,y,z] data3D = np.transpose(data3D) # first translate by negative displacement vector if verbose: print(data3D) data3D = data3D - displacement_vec if verbose: print('after subtracting displacement') print(data3D) # define rotation matrix to align plane normal with 0,0,1 mRot = rotation_matrix_from_vectors(target_vec,plane_normal) # apply the rotation to the data data3D = data3D.dot(mRot) if verbose: print('after applying rotation') print(data3D) # subset to only those where z < max_proj_dist projs = data3D[np.where(np.abs(data3D[:,2]) < max_proj_dist)] if verbose: print('projections') print(projs) projs[:,2] = 0 projs = np.transpose(projs) return projs start_time = time() p3fs = [] p3es = [] meshes = [] for file in files: p3e = np.load(folder+file,allow_pickle=True)['elm3'] meshes.append(np.load(folder+file,allow_pickle=True)['mesh'][0]) p3fs.append(np.load(folder+file,allow_pickle=True)['flm3']) p3es.append(p3e) end_time = time() print('loading time (minutes)') print((end_time-start_time)/60) contours = [] flm_projs = [] fig = plt.figure(figsize=[6,6]) for i in range(len(meshes)): # Using ear lobes and nose bridge for points p0 = p3fs[i][49] p1 = p3es[i][15] p2 = p3es[i][31] flm_to_project = np.array([p3fs[i][38],p3fs[i][39],p3fs[i][0],p3fs[i][12],p3fs[i][34],p3fs[i][44],p3fs[i][48],p3fs[i][49],p3fs[i][50]]) projs_and_dists = [] pupil_projs_and_dists = [] contour_n = rotate_translate_project(p0,p1,p2,meshes[i],1) flm_proj = rotate_translate_project(p0,p1,p2,flm_to_project,105) contours.append(contour_n) flm_projs.append(flm_proj) plt.scatter(contour_n[0],contour_n[1]) plt.scatter(flm_proj[0],flm_proj[1]) plt.title('Projected Contour') plt.xlabel('x\'') plt.ylabel('z\'') plt.show() fig = plt.figure(figsize=[8,8]) for i in range(len(contours)): plt.plot(contours[i][0],contours[i][1],label=str(files[i].replace('_HD_s1_yz_obj',''))) plt.title('Projected Contour') #plt.legend() plt.xlabel('x\'') plt.ylabel('z\'') plt.show() def get_deriv(x,y): x_diffs = x[1:]-x[:-1] y_diffs = y[1:]-y[:-1] deriv = y_diffs/x_diffs return deriv def cart2pol(x, y): rho = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) return(rho, phi) def pol2cart(rho, phi): x = rho * np.cos(phi) y = rho * np.sin(phi) return(x, y) def contour_cart_to_pol(contour): contour = np.transpose(contour) polar = [cart2pol(point[0],point[1]) for point in contour] polar = np.transpose(polar) return polar def contour_pol_to_cart(contour): contour = np.transpose(contour) cart = [pol2cart(point[0],point[1]) for point in contour] cart = np.transpose(cart) return cart # Calculate the center of mass for all contours contours_dummy = contours.copy() filts = [] for i,contour in enumerate(contours_dummy): #if not files[i][0:3] in exceptions: cm_x = np.mean(contour[0]) cm_y = np.mean(contour[1]) contour_offset = [contour[0]- cm_x,contour[1]- cm_y] contour_polar = contour_cart_to_pol(contour_offset) contour_polar = contour_polar[:,contour_polar[1,:].argsort()] sorted_contour = contour_pol_to_cart(contour_polar) filt = sp.signal.savgol_filter(sorted_contour,15,2) filt = [filt[0] + cm_x,filt[1] + cm_y] filts.append(filt) for i in range(len(files)): np.savetxt('path'+files[i][:3]+'file.txt',filts[i]) np.savetxt('path'+files[i][:3]+'file.txt',flm_projs[i]) ### Notebook 5 import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.pyplot as plt import matplotlib.image as mpimg import scipy as sp plt.ion() %matplotlib notebook %matplotlib inline import os # where the previously projected contours and landmarks are: contour_dir = 'path' # actual mesh (camera output) directory: folder = 'path' files = os.listdir(folder) files file_prefixes = [name[:3] for name in files] contours = [] projected_lm = [] for each in file_prefixes: contours.append(np.loadtxt(contour_dir+each+'SG-smoothed.txt')) x = np.loadtxt(contour_dir+each+'projected_flm.txt') y = x[:2] projected_lm.append(y) # Rotate by 180 degrees rot180 = np.array([[-1,0],[0,-1]]) for i in range(len(contours)): #if files[i] == '042.npz': # continue contours[i] = np.transpose(contours[i]) #contours[i] = contours[i].dot(rot180) contours[i] = np.transpose(contours[i]) contours[i] = np.array(contours[i]) contours[i] = contours[i][:,np.where(contours[i][1]>-100)[0]] projected_lm[i] = np.transpose(projected_lm[i]) #projected_lm[i] = projected_lm[i].dot(rot180) projected_lm[i] = np.transpose(projected_lm[i]) # initial example points p1 = projected_lm[0][:,3] # cheek p2 = projected_lm[0][:,7] # nose # contour for example contour = contours[0] def get_distance(p1,p2): return ((p1[0]-p2[0])**2+(p1[1]-p2[1])**2)**0.5 def calc_mb(p1,p2): m = (p2[1]-p1[1])/(p2[0]-p1[0]) b = p2[1]-m*p2[0] return m,b def calc_line(array_x,m,b): array_x = np.array(array_x) array_y = m*array_x+b return [array_x,array_y] def get_init_approx_tangent(contour,p1,p2): #use distances in X - therefore, we drop the lower half of the contour so as to not get wrong Y # get the absolute difference between the input point and the contour p2_dists = np.abs(contour[0]-p2[0]) # find the smallest distance p2_min_dist_index = np.where(p2_dists == p2_dists.min())[0][0] # re-assign the original point for line calculation p2 = contour[:,p2_min_dist_index] p1_dists = np.abs(contour[0]-p1[0]) p1_min_dist_index = np.where(p1_dists == p1_dists.min())[0][0] p1 = contour[:,p1_min_dist_index] m,b = calc_mb(p1,p2) line = calc_line(contour[0],m,b) return line,m,b def calc_line_dist_to_point(m,b,p1): # inverse slope m2 = -1.0/m # calculate new b for line with slope m2 intersecting p b2 = p1[1] - m2*p1[0] # find intersection x = (b2-b)/(m-m2) y = m*x+b p2 = [x,y] return get_distance(p1,p2),p2 def find_approx_tangent(p1,p2,contour,peak_threshold = 5, max_arc_length = 20): # get the initial line contour_x = contour[0] line,m,b = get_init_approx_tangent(contour,p1,p2) line = np.transpose(line) contour = np.transpose(contour) dists = [[],[]] pts = [[],[]] pts1_end_x = 0 # for each point on the contour, calculate the distance from the initial tangent approximation to the contour # save those for which the contour is above the tangent for i,point in enumerate(contour): dist,point_on_line = calc_line_dist_to_point(m,b,point) if point_on_line[1] < point[1] and pts1_end_x == 0: # if past first point, and distance between current point and previous point < threshold dists[0].append(dist) pts[0].append(point) if (len(pts[0]) > 1 and np.abs(point[0]-pts[0][-2][0]) > peak_threshold) or (len(pts[0]) > 1 and pts[0][0][0]-point[0] > max_arc_length): dists[0].pop(-1) pts[0].pop(-1) dists[1].append(dist) pts[1].append(point) pts1_end_x = point[0] elif point_on_line[1] < point[1] and pts1_end_x != 0: dists[1].append(dist) pts[1].append(point) dists = np.array([dists[0],dists[1]],dtype=object) pts = np.array([pts[0],pts[1]],dtype=object) max_index_1 = np.where(dists[0] == np.max(dists[0]))[0][0] max_index_2 = np.where(dists[1] == np.max(dists[1]))[0][0] np1 = pts[0][max_index_1] np2 = pts[1][max_index_2] new_m,new_b = calc_mb(np1,np2) new_line = calc_line(contour_x,new_m,new_b) return np1,np2,new_line def find_approx_tangent_alt(p1,p2,contour,peak_threshold = 5, max_arc_length = 20): # get the initial line contour_x = contour[0] line,m,b = get_init_approx_tangent(contour,p1,p2) line = np.transpose(line) contour = np.transpose(contour) dists = [[],[]] pts = [[],[]] pts1_end_x = 0 # for each point on the contour, calculate the distance from the initial tangent approximation to the contour # save those for which the contour is above the tangent for i,point in enumerate(contour): dist,point_on_line = calc_line_dist_to_point(m,b,point) if point_on_line[1] < point[1] and pts1_end_x == 0: # if past first point, and distance between current point and previous point < threshold dists[0].append(dist) pts[0].append(point) if (len(pts[0]) > 1 and np.abs(point[0]-pts[0][-2][0]) > peak_threshold) or (len(pts[0]) > 1 and pts[0][0][0]-point[0] > max_arc_length): dists[0].pop(-1) pts[0].pop(-1) dists[1].append(dist) pts[1].append(point) pts1_end_x = point[0] elif point_on_line[1] < point[1] and pts1_end_x != 0: dists[1].append(dist) pts[1].append(point) dists = np.array([dists[0],dists[1]],dtype=object) pts = np.array([pts[0],pts[1]],dtype=object) max_index_1 = np.where(dists[0] == np.max(dists[0]))[0][0] max_index_2 = np.where(dists[1] == np.max(dists[1]))[0][0] np1 = pts[0][max_index_1] #np2 = pts[1][max_index_2] np2_alt = p2 new_m,new_b = calc_mb(np1,np2_alt) new_line = calc_line(contour_x,new_m,new_b) return np1,np2_alt,new_line def get_curve_length(curve): # https://stackoverflow.com/questions/63986448/finding-arc-length-in-a-curve-created-by-numpy-array return np.sum(np.sqrt(np.sum((curve[:-1] - curve[1:])**2,axis=1))) subject_nums = [] gaps = [] curves = [] for i in range(len(contours)): ## points where landmark should be used instead. first_point_list=['014.npz','017.npz','106.npz','145.npz','170.npz'] fig = plt.figure() p1 = projected_lm[i][:,2] # cheek p2 = projected_lm[i][:,7] # nose line,m,b = get_init_approx_tangent(contours[i], p1,p2) try: if files[i] in first_point_list: np1,np2,new_line = find_approx_tangent_alt(p1,p2,contours[i]) else: np1,np2,new_line = find_approx_tangent(p1,p2,contours[i]) except ValueError: pass new_m,new_b = calc_mb(np1,np2) new_line = calc_line(contours[i][0],new_m,new_b) # Calculate area - first determine indices of points between np1 & np2 for newline and contour if files[i] in first_point_list: print('new_line:', new_line, 'np2: ', np2, 'np1: ', np1, 'p2:',p2) #nll = np.where((new_line[0]>=np2[0]) & (new_line[0] <= np1[0]))[0][0] #nlr = np.where((new_line[0]>=np2[0]) & (new_line[0] <= np1[0]))[0][-1] #cl = np.where((contours[0][0]>=np2[0]) & (contours[0][0] <= np1[0]))[0][0] #cr = np.where((contours[0][0]>=np2[0]) & (contours[0][0] <= np1[0]))[0][-1] else: nll = np.where((new_line[0]>=np2[0]) & (new_line[0] <= np1[0]))[0][0] nlr = np.where((new_line[0]>=np2[0]) & (new_line[0] <= np1[0]))[0][-1] cl = np.where((contours[0][0]>=np2[0]) & (contours[0][0] <= np1[0]))[0][0] cr = np.where((contours[0][0]>=np2[0]) & (contours[0][0] <= np1[0]))[0][-1] # interpolate for evenly spaced points so we can integrate contour_even_func = sp.interpolate.interp1d(contours[i][0], contours[i][1], kind='nearest') x_new = np.linspace(np1[0],np2[0],100) if files[i] in first_point_list: x_new = list(reversed(x_new)) #print("x_new",x_new) line_even_x,line_even = calc_line(x_new,new_m,new_b) #print(line_even_x, line_even) contour_even = contour_even_func(x_new) #print(contour_even) # "integrate" contour_int = np.trapz(contour_even, x=x_new) tangent_int = np.trapz(line_even,x=x_new) #print("Contour_int:",contour_int, "tangent_int:", tangent_int) gap_area = np.round(contour_int - tangent_int,2) # curve length curve_length = np.round(get_curve_length(np.transpose([x_new,contour_even])),2) # append arrays curves.append(curve_length) gaps.append(gap_area) subject_nums.append(str(int(files[i][0:3]))) # Plot for QA plt.plot(contours[i][0],contours[i][1],label='contour') plt.scatter(p1[0],p1[1],color='b',label='landmark') plt.plot(line[0],line[1],label='first guess') plt.scatter(p2[0],p2[1],color='b') plt.scatter(np1[0],np1[1],color='r',label = 'tangent pt') plt.scatter(np2[0],np2[1],color='r') plt.plot(new_line[0],new_line[1],label='tangent') plt.plot(x_new,line_even,label='tangent even-spaced') plt.plot(x_new,contour_even,label='contour even-spaced') plt.title('subject '+str(int(files[i][0:3]))+',\n gap area: '+str(gap_area)+' mm$^2$\n'+'curve length: '+str(curve_length)+' mm') plt.legend() plt.show() meas_df = pd.DataFrame(np.transpose([subject_nums,gaps,curves]),columns = ['subject','nose_gap_area','nose_gap_curve_length']) meas_df.to_csv('path',index=False) ## Notebook 6 Prognathism from npz import numpy as np import pandas as pd import matplotlib.pyplot as plt import matplotlib.pyplot as plt import matplotlib.image as mpimg from time import time import scipy as sp plt.ion() %matplotlib notebook %matplotlib inline import os folder = 'path' files = os.listdir(folder) files def get_distance(x1,x2): return ((x1[0]-x2[0])**2+(x1[1]-x2[1])**2)**0.5 def get_distance_3d(x1,x2): return ((x1[0]-x2[0])**2+(x1[1]-x2[1])**2 + (x1[2]-x2[2])**2)**0.5 def rotation_matrix_from_vectors(vec1, vec2): # courtesy of https://stackoverflow.com/questions/45142959/calculate-rotation-matrix-to-align-two-vectors-in-3d-space """ Find the rotation matrix that aligns vec1 to vec2 :param vec1: A 3d "source" vector :param vec2: A 3d "destination" vector :return mat: A transform matrix (3x3) which when applied to vec1, aligns it with vec2. """ a, b = (vec1 / np.linalg.norm(vec1)).reshape(3), (vec2 / np.linalg.norm(vec2)).reshape(3) v = np.cross(a, b) c = np.dot(a, b) s = np.linalg.norm(v) kmat = np.array([[0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0]]) rotation_matrix = np.eye(3) + kmat + kmat.dot(kmat) * ((1 - c) / (s ** 2)) return rotation_matrix def rotate_translate_project(p0,p1,p2,data3D,max_proj_dist,verbose = False,gave_norm_and_offset = False,return_inverse = False): # rotates all points so that plane normal is z-hat, then subtracts z # this function minimally returns the projected contour and the projected contour rotated/translated back to its starting point # optionally, it returns the unprojected object (that has been rotated and translated) # optional statements: # verbose: print calculation steps # gave_norm_and_offset: p0 = displacement vector to plane, p1 = norm of plane, p2 is ignored # return_unprojected: return data3D after translating, rotating to orient with plane, but without projecting onto plane had_to_transpose = False # works by rotating input mesh (data3D) so plane norm is aligned with target_vector (z-hat) target_vec = np.array([0,0,1]) # calculate the plane normal - cross the two vectors p1-p0 and p2-p0, then take norm. plane_normal = np.cross(p1-p0,p2-p0)/np.linalg.norm(np.cross(p1-p0,p2-p0)) # displacement vector is how much the plane is offset from the origin, where "origin" of plane is p0 displacement_vec = p0 if gave_norm_and_offset: # this means that p0 is the offset, p1 is plane_normal, and p2 is unused displacement_vec = p0 plane_normal = p1 if np.shape(data3D)[0] == 3 and np.shape(data3D)[1] == 3: print('Careful! Input is 3x3 - if results are strange, try transposing input first!') if np.shape(data3D)[1] != 3: # need rows of [x,y,z] data3D = np.transpose(data3D) had_to_transpose = True # BEGIN MESH TRANSFORMATIONS # translate entire data set such that plane normal is at origin data3D_disp = data3D - displacement_vec # define rotation matrix to align plane normal with 0,0,1 mRot = rotation_matrix_from_vectors(target_vec,plane_normal) mRot_Inv = np.linalg.inv(mRot) # apply the rotation to the data - have checked this syntax with dummy matrices/vectors - nColumns must be 3 data3D_rot_disp = data3D_disp.dot(mRot) # project - subset to only those where z < max_proj_dist, and then subtract off z-component projs = data3D_rot_disp[np.where(np.abs(data3D_rot_disp[:,2]) < max_proj_dist)] projs[:,2] = 0 # Now inverse projs_unrot = projs.dot(mRot_Inv) projs_unrot_disp = projs_unrot + displacement_vec #projs_unrot_disp = np.transpose(projs_unrot_disp) if verbose: print('Transposed input data?') print(had_to_transpose) print('normal:') print(plane_normal) print('displacement:') print(displacement_vec) print('rotation matrix:') print(mRot) print('after subtracting displacement:') print(data3D_disp) print('after applying rotation:') print(data3D_rot_disp) print('check with plane norm:') print(plane_normal.dot(mRot)) print('projections:') print(projs) print('Unrotate projected:') print(projs_unrot) print('Translate back to origin:') print(projs_unrot_disp.transpose()) #if return_unprojected: # return projs,projs_unrot_disp,np.array(data3D_rot_disp).transpose() #else: # return projs,projs_unrot_disp if return_inverse: return projs,[mRot_Inv,displacement_vec] else: return projs def get_sagittal_from_34_44(flm34,flm44): offset = (flm34+flm44)/2.0 # vector from origin to plane center norm = (flm34-flm44)/np.linalg.norm(flm34-flm44) # vector defining the plane normal return offset, norm def get_plane_from_bisection(p1,p2): offset = (p1+p2)/2.0 # vector from origin to plane center norm = (p1-p2)/np.linalg.norm(p1-p2) # vector defining the plane normal return offset, norm def cart2pol(x, y): rho = np.sqrt(x**2 + y**2) phi = np.arctan2(y, x) return(rho, phi) def pol2cart(rho, phi): x = rho * np.cos(phi) y = rho * np.sin(phi) return(x, y) def contour_cart_to_pol(contour): contour = np.transpose(contour) polar = [cart2pol(point[0],point[1]) for point in contour] polar = np.transpose(polar) return polar def contour_pol_to_cart(contour): contour = np.transpose(contour) cart = [pol2cart(point[0],point[1]) for point in contour] cart = np.transpose(cart) return cart start_time = time() flm3s = [] elm3s = [] meshes = [] # exceptions = ['Fit042_HD_s1_yz_obj'] # this folder does not have ELM - function will return "NO_EAR_LANDMARKS" for file in files: elm3 = np.load(folder+file,allow_pickle=True)['elm3'] meshes.append(np.load(folder+file,allow_pickle=True)['mesh'][0]) flm3s.append(np.load(folder+file,allow_pickle=True)['flm3']) elm3s.append(elm3) end_time = time() print('loading time (minutes)') print((end_time-start_time)/60) def find_contour_minmax_near_point(obj,guess,plane_def_vecs,contour_offset,buffer,min_or_max,sg_win,sg_deg,plot=[0,0,0],title=''): if len(plane_def_vecs) == 3: use_offset_and_norm = False else: use_offset_and_norm = True plane_def_vecs = np.vstack((plane_def_vecs,[0,0,0])) # dummy point, will not be used # Get projection contour,invT = rotate_translate_project(plane_def_vecs[0], plane_def_vecs[1],plane_def_vecs[2],obj,1.0,False,use_offset_and_norm,True) [guess_p] = rotate_translate_project(plane_def_vecs[0], plane_def_vecs[1],plane_def_vecs[2],[guess],8.0,False,use_offset_and_norm,False) # contour is in the projected, convenient coordinate frame # Make x,y,z as one row each contour = contour.transpose() # Calculate the center of mass and apply the offset cm_x = -contour_offset[0] #np.mean(contour[0])-contour_offset[0] cm_y = -contour_offset[1] #np.mean(contour[1])-contour_offset[1] # offset contour/flm to approximate contour center of mass contour_x = contour[0] - cm_x contour_y = contour[1] - cm_y guess_p_x = guess_p[0] - cm_x guess_p_y = guess_p[1] - cm_y contour_offset = [contour_x,contour_y] guess_p_offset = [guess_p_x,guess_p_y] # convert contour to polar coordinates contour_polar = contour_cart_to_pol(contour_offset) # contour_polar = contour_polar.transpose() guess_p_polar = contour_cart_to_pol(np.array([guess_p_offset]).transpose()) # sort contours by phi contour_polar = contour_polar[:,contour_polar[1,:].argsort()] # convert back to cartesian sorted_contour = contour_pol_to_cart(contour_polar) # Get adjacent points adjacent = contour_polar.transpose()[np.where(np.abs(contour_polar[1]-guess_p_polar[1]) <= buffer)] adjacent = adjacent.transpose() #print(sg_win) #print(adjacent) adjacent = sp.signal.savgol_filter(adjacent,sg_win,sg_deg) if min_or_max == 0: minima_indices = sp.signal.argrelmin(adjacent[0]) if len(minima_indices[0]) == 0: print('NO LOCAL MINIMUM FOUND - USING MINIMUM OF RANGE - INSPECT OUTPUT') minima_values = np.min(adjacent[0]) else: minima_values = adjacent[0][minima_indices] target_pol = adjacent.transpose()[np.where(adjacent[0] == np.amin(minima_values))] else: maxima_indices = sp.signal.argrelmax(adjacent[0]) if len(maxima_indices[0]) == 0: print('NO LOCAL MAXIMUM FOUND - USING MAXIMUM OF RANGE - INSPECT OUTPUT') maxima_values = np.max(adjacent[0]) else: maxima_values = adjacent[0][maxima_indices] target_pol = adjacent.transpose()[np.where(adjacent[0] == np.amax(maxima_values))] target_pol = target_pol.transpose() target_cart = contour_pol_to_cart(target_pol) # go back to original coordiante system sorted_contour_OC = [sorted_contour[0]+cm_x,sorted_contour[1]+cm_y] sorted_contour_OC = np.vstack((sorted_contour_OC,np.zeros(np.shape(sorted_contour[0])))) sorted_contour_OC = sorted_contour_OC.transpose().dot(invT[0]) + invT[1] target_OC = np.array([target_cart[0]+cm_x,target_cart[1]+cm_y,target_cart[0]*0]) target_OC = target_OC.transpose().dot(invT[0]) + invT[1] # get contour displacement and norm cm_OC_x = np.mean(sorted_contour_OC[0]) cm_OC_y = np.mean(sorted_contour_OC[1]) cm_OC_z = np.mean(sorted_contour_OC[2]) contour_disp_OC = [cm_OC_x,cm_OC_y,cm_OC_z] # consider the center of mass as the origin for the plane # return if plot[0]: #print('Using offset, norm: ',use_offset_and_norm) #print('Defining with: ',plane_def_vecs) # Plot in polar coordinates fig = plt.figure(figsize=[4,4]) plt.scatter(contour_polar[1],contour_polar[0],s=4) plt.scatter(adjacent[1],adjacent[0]) plt.scatter(guess_p_polar[1],guess_p_polar[0],s=6,c='g') plt.scatter(target_pol[1],target_pol[0]) plt.title(title) plt.xlabel(r'$\theta$') plt.ylabel(r'$\rho$') #plt.ylim([60,350]) plt.show() if plot[1]: # plot in projected coordinates fig = plt.figure(figsize=[4,4]) plt.scatter(sorted_contour[0],sorted_contour[1],s=4) plt.scatter(target_cart[0],target_cart[1]) plt.title(title) plt.xlabel(r'x\'') plt.ylabel(r'z\'') #plt.ylim([60,350]) plt.show() if plot[2]: fig = plt.figure() ax = fig.add_subplot(projection='3d') ax.scatter(sorted_contour_OC.transpose()[0],sorted_contour_OC.transpose()[1],sorted_contour_OC.transpose()[2],s=5,c='k') ax.scatter(obj[0].transpose()[0][0::90],obj[0].transpose()[1][0::90],obj[0].transpose()[2][0::90],s=2) ax.scatter(target_OC[0],target_OC[1],target_OC[2],s=35,c='r') plt.title(title) ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') ax.set_title('plane contour ') plt.show() return np.concatenate(target_OC).ravel(),sorted_contour_OC plt.close('all') def find_2D_extrema(obj,guess,plane_defs_array,cm_offset_array,buffer_array,min_max_array,sg_window_array,sg_deg_array,delta_limit = 0.1, plot_res=False, save_fig = False, fig_title = '',plot_steps = False): axis_0 = np.array(plane_defs_array[0][1:]) axis_1 = np.array(plane_defs_array[1][1:]) delta = 999 # this will be the distance from the previous solution - stop optimizing when sufficiently small deltas = [] guess_1 = guess plot_settings = [0,0,0] if plot_steps: plot_settings = [1,1,0] for i in range(10): guess_0,contour_0 = find_contour_minmax_near_point(obj,guess_1,plane_defs_array[0],cm_offset_array[0],buffer_array[0],min_max_array[0],sg_window_array[0],sg_deg_array[0],plot=plot_settings) plane_defs_array[1][0] = guess_0 deltas.append(np.linalg.norm(guess_1-guess_0)) guess_1,contour_1 = find_contour_minmax_near_point(obj,guess_0,plane_defs_array[1],cm_offset_array[1],buffer_array[1],min_max_array[1],sg_window_array[1],sg_deg_array[1],plot=plot_settings) plane_defs_array[0][0] = guess_1 deltas.append(np.linalg.norm(guess_1-guess_0)) if deltas[-1] < 1.0: break if i == 9 and deltas[-1] >= 1.0: # loop finished before delta < 1.0 print('Ran for limit - inspect results') #fig = plt.figure() #plt.scatter(np.arange(len(deltas)),deltas) #plt.show() if plot_res: fig = plt.figure() ax = fig.add_subplot(projection='3d') ax.scatter(contour_0.transpose()[0],contour_0.transpose()[1],contour_0.transpose()[2],s=5,c='k') ax.scatter(contour_1.transpose()[0],contour_1.transpose()[1],contour_1.transpose()[2],s=5,c='k') ax.scatter(obj.transpose()[0][0::90],obj.transpose()[1][0::90],obj.transpose()[2][0::90],s=2) ax.scatter(axis_0.transpose()[0],axis_0.transpose()[1],axis_0.transpose()[2],c='g',s=20) ax.scatter(axis_1.transpose()[0],axis_1.transpose()[1],axis_1.transpose()[2],c='y',s=40) ax.scatter(guess_1[0],guess_1[1],guess_1[2],s=35,c='r') ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') ax.azim = 0 ax.set_box_aspect(None,zoom=1) #ax.dist = 10 ax.elev = 10 ax.set_title('plane contour ') plt.show() if save_fig: fig_dir = 'path' fig_title = 'need_name'#int(files[i][0:3]) plt.savefig(fig_dir+fig_title+'.png') return guess_1 sellions = [] zeros_array = np.zeros((len(files),2)).astype(int) # to make uniform array of values, we will just a value to this sg_windows = zeros_array+17 ## Was 20 sg_degrees = zeros_array+2 buffers = zeros_array+0.27 cm_offsets = np.zeros((len(files),2,2))+[[75,0],[0,100]] for i in range(len(files)): print(i,files[i]) offset,norm = get_plane_from_bisection(flm3s[i][1],flm3s[i][11]) guess = flm3s[i][89] p1 = np.cross(norm,guess-offset)+offset p2 = -np.cross(norm,guess-offset)+offset #if i == 38: # sellions.append(np.nan) # continue # setting up for sellion plane_defs = [[guess,p1,p2],[guess,elm3s[i][15],elm3s[i][31]]] min_max_arrays = [0,1] # vertical plane first, then horizontal plane, so minimize then maximize sellions.append(find_2D_extrema(meshes[i],guess,plane_defs,cm_offsets[i],buffers[i],min_max_arrays,sg_windows[i],sg_degrees[i],plot_res = False)) plt.close('all') nose_tips = [] zeros_array = np.zeros((len(files),2)).astype(int) # to make uniform array of values, we will just a value to this sg_windows = zeros_array+6 #20 sg_degrees = zeros_array+2 buffers = zeros_array+0.3 #0.27 cm_offsets = np.zeros((len(files),2,2))+[[30,-10],[-0,50]] #[[30,-10],[0,40]] #cm_offsets[42] = [[30,-0],[0,40]] min_max_arrays = [1,1] # vertical plane first, then horizontal plane, so minimize then maximize for i in range(len(files)): print(i,files[i]) offset,norm = get_plane_from_bisection(flm3s[i][1],flm3s[i][11]) guess = flm3s[i][52] p1 = np.cross(norm,guess-offset)+offset p2 = -np.cross(norm,guess-offset)+offset if i == 38: nose_tips.append(np.nan) continue # setting up for sellion plane_defs = [[guess,p1,p2],[guess,elm3s[i][15],elm3s[i][31]]] nose_tips.append(find_2D_extrema(meshes[i],guess,plane_defs,cm_offsets[i],buffers[i],min_max_arrays,sg_windows[i],sg_degrees[i],plot_res = False,save_fig = True, fig_title = 'nose_tips'+str(i))) plt.close('all') chins = [] zeros_array = np.zeros((len(files),2)).astype(int) # to make uniform array of values, we will just a value to this sg_windows = zeros_array+18 sg_degrees = zeros_array+3 buffers = zeros_array+[0.4,0.4] cm_offsets = np.zeros((len(files),2,2))+[[70,-50],[0,80]] # Modified parameters - these values have been adjusted to help find the extrema #cm_offsets[3] = [[60,-40],[0,30]] #cm_offsets[9] = [[60,-40],[0,30]] #cm_offsets[28] = [[60,-40],[0,30]] #cm_offsets[21] = [[40,-40],[0,30]] #cm_offsets[31] = [[50,-20],[0,70]] #cm_offsets[33] = [[50,-20],[0,70]] #cm_offsets[40] = [[50,-40],[0,70]] #cm_offsets[58] = [[50,-40],[0,70]] min_max_arrays = [1,1] # vertical plane first, then horizontal plane, so minimize then maximize #zeros_array = np.zeros((len(files),2)).astype(int) # to make uniform array of values, we will just a value to this #sg_windows = zeros_array+20 #sg_degrees = zeros_array+3 #buffers = zeros_array+0.57 #cm_offsets = np.zeros((len(files),2,2))+[0,70] #min_max_arrays = [1,1] # vertical plane first, then horizontal plane, so minimize then maximize #cm_offsets[53] = [[0,120],[0,30]] for i in range(len(files)): print(i) offset,norm = get_plane_from_bisection(flm3s[i][1],flm3s[i][11]) guess = flm3s[i][6]#previously used 62 p1 = np.cross(norm,guess-offset)+offset p2 = -np.cross(norm,guess-offset)+offset if i == 38: chins.append(np.nan) continue # setting up for sellion plane_defs = [[guess,p1,p2],[guess,elm3s[i][15],elm3s[i][31]]] chins.append(find_2D_extrema(meshes[i],guess,plane_defs,cm_offsets[i],buffers[i],min_max_arrays,sg_windows[i],sg_degrees[i],plot_res = True)) upper_lips = [] for i in range(len(files)): upper_lips.append(flm3s[i][62]) Frankfort_horizontals = [] for i,file in enumerate(files): if i == 38: Frankfort_horizontals.append(np.nan) continue p0 = flm3s[i][49] p1 = elm3s[i][15] p2 = elm3s[i][31] plane_normal = np.cross(p1-p0,p2-p0)/np.linalg.norm(np.cross(p1-p0,p2-p0)) Frankfort_horizontals.append(plane_normal) subjects = [] for i in range(len(files)): subjects.append(int(files[i][0:3])) ear_bisector = [] for i, file in enumerate(files): if i == 38: ear_bisector.append(np.nan) continue ear_bisector.append((np.array(elm3s[i][15])+np.array(elm3s[i][31]))/2.0) def get_angle_between_vecs(v1,v2): uv1 = v1 / np.linalg.norm(v1) uv2 = v2 / np.linalg.norm(v2) dot_product = np.dot(uv1, uv2) angle = np.arccos(dot_product) return angle*180.0/np.pi def get_angle_between_vec_and_plane(v1,n): uv1 = v1 / np.linalg.norm(v1) uv2 = v2 / np.linalg.norm(v2) dot_product = np.dot(uv1, uv2) angle = np.arccos(dot_product) return angle*180.0/np.pi def calculate_sphenomaxillary_angle(row): return get_angle_between_vecs(row['sel-lip'],row['Frankfort_H']) def get_norm_of_vector_difference(row,v1_name,v2_name): return np.linalg.norm(row[v1_name]-row[v2_name]) #df = pd.DataFrame(np.transpose([subjects,sellions,nose_tips,chins,upper_lips,Frankfort_horizontals,ear_bisector]),columns=['subject','sellion','nose_tip','chin','upper_lip','Frankfort_H','ear_bisector']) columns = ['subject','sellion','nose_tip','chin','upper_lip','Frankfort_H','ear_bisector'] data = np.array([subjects,sellions,nose_tips,chins,upper_lips,Frankfort_horizontals,ear_bisector], object) df = pd.DataFrame(data,index=columns).T df['sel-earb'] = df.apply(lambda row: get_norm_of_vector_difference(row,'sellion','ear_bisector'),axis=1) df['nose-earb'] = df.apply(lambda row: get_norm_of_vector_difference(row,'nose_tip','ear_bisector'),axis=1) df['chin-earb'] = df.apply(lambda row: get_norm_of_vector_difference(row,'chin','ear_bisector'),axis=1) df['lip-earb'] = df.apply(lambda row: get_norm_of_vector_difference(row,'upper_lip','ear_bisector'),axis=1) df['ment-sel'] = df.apply(lambda row: get_norm_of_vector_difference(row,'chin','sellion'),axis=1) df['sel-lip'] = df['upper_lip']-df['sellion'] df['sphenomaxillary_angle'] = df.apply(lambda row: calculate_sphenomaxillary_angle(row),axis=1) df['ratio_sel-nose_tip'] = df['sel-earb']/df['nose-earb'] df['ratio_sel-lip'] = df['sel-earb']/df['lip-earb'] df['ratio_sel-chin'] = df['sel-earb']/df['chin-earb'] prog_df = df[['subject','sel-earb', 'nose-earb', 'chin-earb', 'lip-earb','ment-sel', 'sphenomaxillary_angle', 'ratio_sel-nose_tip', 'ratio_sel-lip', 'ratio_sel-chin']] prog_df.to_csv('path',index=False) ## Notebook 7: Generate Table import glob import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import csv import xlrd import openpyxl from datetime import datetime from sklearn.preprocessing import StandardScaler import statsmodels.api as sm import statsmodels.discrete.discrete_model as sd import statsmodels.formula.api as smf PythonOutputDirectory = 'path' caliper_dir = 'path' FFE_dir = PythonOutputDirectory+'FFE_individual_summaries\\' extracted3D_dir = PythonOutputDirectory+'Extracted_Contours\\' caliper_df=pd.read_excel(caliper_dir+'file.xlsx') FFE_dfs = [] for each in glob.glob(FFE_dir+'\\*'): df = pd.read_csv(each) FFE_dfs.append(df) # merge all FFE dfs into one FFE_df = pd.concat(FFE_dfs) nose_contour_df = pd.read_csv(extracted3D_dir+'file.csv') face_df = caliper_df[['Subject ID','Date','Age','Sex','Race','Ethnicity','HT (cm)','WT (kg)','BMI','BZB_AVG', 'EBR_AVG','NLN_AVG','NECR_AVG','MSL_AVG']] face_df.rename(columns = {'Subject ID':'Subject', 'HT (cm)':'Height_cm', 'WT (kg)':'Weight_kg', 'BZB_AVG':'Bizygomatic_Breadth','EBR_AVG':'Ear_Breadth','NLN_AVG':'Nose_Length', 'NECR_AVG':'Neck_Circumference', 'MSL_AVG':'Face_Length'},inplace = True) face_df[['Bizygomatic_Breadth','Neck_Circumference']] = face_df[['Bizygomatic_Breadth','Neck_Circumference']].mul(10) FFE_df[['Subject']] = FFE_df[['Subject']].astype('str') FFE_df['Subject'] = FFE_df['Subject'].str.zfill(3) #FFE_df['Subject'] = ('FF2-'+ FFE_df['Subject']) nose_contour_df[['subject']] = nose_contour_df[['subject']].astype('str') nose_contour_df['subject'] = nose_contour_df['subject'].str.zfill(3) nose_contour_df['subject'] = ('FF2-'+ nose_contour_df['subject']) est_list = ["FF2-010", "FF2-011"] def est_list_check(row): return np.where(any(row['Subject'] in x for x in est_list),"Yes","No") total_table = FFE_df.merge(face_df,how='inner',left_on='Subject',right_on='Subject') total_table = total_table.merge(nose_contour_df,how='inner',left_on='Subject',right_on='subject') ## mark subjects with missing ambient data total_table['Est_AVG_FFE']=total_table.apply(est_list_check,axis=1) total_table.to_csv(PythonOutputDirectory+'file.csv',index=False)