5.6 Compiling Saved Data
7.1.2 MeasurementClass.py
time_var.set(
str(round(_results['time'].value / 60)) + ' mins remaining')
,→
elif _results['time'].value != 0:
time_var.set('<1 min remaining')
# ======================================= Test Version Below
============================================ #
,→
def testf1(output, delay, resources):
time.sleep(0.01)
print(f'testf1 {output} with {delay}') print(resources)
def testf2(output, delay, resources):
time.sleep(0.01)
print(f'testf2 {output} with {delay}')
def testx(output, delay, resources):
time.sleep(0.01)
print(float(output), float(output % 2 + randint(0, 100)), delay, 'testx')
,→
return float(output), float(output % 2 + randint(0, 100)), delay
if __name__ == '__main__':
main()
import numpy as np import tkinter as tk
from pymeasure import instruments from GaussMeterClass import GaussMeter
class Measurement():
# initialize this class, includes passed arguments
def __init__(self, order_dict, gui_results,
gui_graph, pause_flag, quit_flag, save_dir, save_name,
,→
loop, prog_queue, resources, kwargs):
self.kwargs = kwargs
sys.path.append(order_dict['module_path']) # add path to import module
,→
# import specified module
m_func = importlib.import_module(order_dict['module_name'], order_dict['module_path'])
# results data, all shared between processes
self.x = gui_results['x_data'] # a shared array of floats self.y = gui_results['y_data'] # a shared array of floats
# a shared array of floats, for guassmeter readings self.x2 = gui_results['x2_data']
# a shared value (int), tells how many data points have been stored
,→
self.counter = gui_results['counter']
# a shared value (int) -- percent of the task completed (shown in progress bar)
,→
self.p = gui_results['progress']
# a shared value (int) -- tells time remaining self.t = gui_results['time']
self.queue = prog_queue # queue to GUI process
self.pause = pause_flag # flag for if stop is pressed self.quit = quit_flag # flag for if quit is pressed
# resource command functions
# get the specific function from module try:
except Exception as err:
self.queue.put('Resource functions not found: ', err) self.quit.set()
# Save function info
self.graph = gui_graph # dictionary of graph titles, also used for saving
,→
self.f1_header = self.graph['fixed_param_1']
self.f2_header = self.graph['fixed_param_2']
self.directory = save_dir # directory where files are saved self.name = save_name # name of file
self.loop = loop # loop type for save file
# int with list of num of measurements, updates every time build_array is called
,→
self.tot_measurement_len = 1
self.progress_counter = 0 # increments and updates progress as time goes on
,→
self.time_counter = 0 # increments and lets user know estimated time remaining
,→
# dictionary with keys that tell which key to use from kwargs for min, max, step, loop direction
,→
self.order = order_dict
# dictionary of resources with names and address, when initialized address becomes instance
,→
self.measurement_resources = resources self.map_sensitivity()
# map sensitivity to a numerical value def map_sensitivity(self):
if "Sensitivity" in self.kwargs:
self.kwargs['Sensitivity'] = sensitivity_dict[self.kwargs['Sensitivity']]
,→
print('sensitivity set to: ', self.kwargs['Sensitivity'])
,→
else:
pass
# save data, including arbitrary data user wants saved def save_function(self, f1, f2):
# save function proper
t = time.strftime('%Y-%m-%d-%H-%M-%S',)
string = self.name + '_' + self.graph['gui_title'] + '_' + self.f1_header + '_' + \
,→
self.f2_header + '_' + self.loop + '_' + \
t + '.csv'
# replace all spaces
f = ''.join(['_' if i == ' ' else i for i in string]) try:
with open(self.directory + '\\' + f, 'w', newline='', encoding='utf-8') as csvfile:
,→
savewriter = csv.writer(csvfile, dialect='excel')
# accounts measurements without gaussmeter measurements
,→
if self.graph['x2_title'] != '':
savewriter.writerow([self.graph['x_title'], Type', 'Loop Type', 'Time'])
,→
for num in range(self.counter.value):
savewriter.writerow(
else: # save files for data with gaussmeter measurements
for num in range(self.counter.value):
savewriter.writerow([self.x[num], self.y[num], f1, f2,
,→
self.name, self.graph['gui_title'], self.loop, t, ])
,→
csvfile.close()
return 'File Saved as ' + self.directory + '\\' + f except Exception as err:
return 'Error, could not save file! ' + str(err)
# import charging delay from file, otherwise use standard def charging_delay(self, val):
if val >= 2500:
return 7.0
elif 1500 <= val < 2500:
return 5.0
elif 1000 <= val < 1500:
return 3.0
elif 500 <= val < 1000:
return 1.0
# check resource and attempt to create instance, throw error if can't do that or find matching name/address
,→
for name, address in self.measurement_resources.items():
if name == 'dsp_lockin':
try:
self.measurement_resources[name] = instruments.signalrecovery.DSP7265(
,→
address)
except Exception as err:
self.queue.put('Error in initializing ' + str(name) +
elif name == 'keithley_2000':
try:
self.measurement_resources[name] = instruments.keithley.Keithley2000(
,→
address)
except Exception as err:
self.queue.put('Error in initializing ' + str(name) +
elif name == 'keithley_2400':
try:
self.measurement_resources[name] = instruments.keithley.Keithley2400(
,→
address)
except Exception as err:
self.queue.put('Error in initializing ' + str(name) +
elif name == 'gaussmeter':
try:
self.measurement_resources[name] = GaussMeter(address)
,→
except Exception as err:
self.queue.put('Error in initializing ' + str(name) + elif name == 'sig_gen_8257':
try:
self.measurement_resources[name] = instruments.agilent.Agilent8257D(
,→
address)
except Exception as err:
self.queue.put('Error in initializing ' + str(name) +
# run through instruments list and shut them down def shutdown_resources(self):
# can add exceptions in to alert user if shutdown fails for any instrument
,→
for name, inst in self.measurement_resources.items():
try:
if name == 'keithley_2400':
inst.ramp_to_current(10e-6) # set current to 10 microAmps
,→
inst.compliance_voltage = 2.1 # set compliance to 2.1 V
,→
inst.shutdown()
elif name == 'dsp_lockin':
inst.dac1, inst.dac2, inst.dac3, inst.dac4 = 0, 0, 0, 0 # turn off all dac outputs
,→
inst.shutdown() else:
inst.shutdown() except Exception as err:
self.queue.put('Failed to shut down ' + str(name) + ':' + str(err))
print(f'Failed to shut down {inst} with error:
{err}')
,→
# builds numpy arrays determined by the loop direction, updates the total measurement length
,→
def build_array(self, start, end, step, direction):
if step == 0:
return [end]
elif start == step and step == end: # where just one value is needed
,→
return np.arange(0, start, 1) elif direction == 'low-high':
arr = np.arange(start, end + step, step) return np.hstack((arr, np.flipud(arr[:-1:]))) elif direction == 'zero-zero':
arr = np.arange(0, end + step, step)
pos_arr = np.hstack((arr, np.flipud(arr[:-1:]))) return np.hstack((pos_arr, (pos_arr[1::1] * -1))) elif direction == 'high-low':
arr = np.arange(end, start - step, -step) return np.hstack((arr, np.flipud(arr[:-1:]))) else:
# case for normal numpy arrays
return np.arange(start, end + step, step)
# checks if output exceeds limit
def check_amp_limits(self, direction, array):
if direction == 'none': # accounts for loops that don't have hx or hz
,→
pass else:
try:
amp_max = float(self.kwargs['%s Max' % direction]) conversion = float(self.kwargs['%s Conversion' % direction])
,→
except Exception as err:
self.queue.put(str(direction) +
' maximum not found! ' + str(err)) self.quit.set()
if np.max(array) > conversion / amp_max:
self.queue.put(str(np.max(array)) + ' exceeds ' + str(direction) + ' amp output limit!') self.quit.set()
else:
pass
# check the loop direction and return it
def check_direction(self, key):
if 'hx' in self.order[key].lower():
return 'Hx'
elif 'hz' in self.order[key].lower():
return 'Hz' else:
return 'none'
# user passed function, runs the measurement def measure(self):
if self.order['MOKE']:
self.points = {'left': 0, 'top': 0, 'width': 0, 'height': 0 }
self.pause.clear() # sets flag to pause
# will run the tkinter window that selects the measurement points
,→
meas_region(self.points, self.pause)
self.pause.wait() # pause until measure region is selected
,→
else:
pass
# check that there are commands to build this loop, otherwise just run 1 time
,→
if all(elem in self.order for elem in ['fix1_start', 'fix1_stop', 'fix1_step']):
,→
# build the arrays of floats to sweep over fix1_values = self.build_array(
# check if amp limits are protected self.check_amp_limits(
self.check_direction('fix1_start'), fix1_values) else:
self.queue.put(
'No fixed loop one values detected, setting loop to 1')
,→
fix1_values = [1]
# check that there are commands to build this loop, otherwise just run 1 time
,→
if all(elem in self.order for elem in ['fix2_start', 'fix2_stop', 'fix2_step']):
# check if amp limits are protected self.check_amp_limits(
self.check_direction('fix2_start'), fix2_values) else:
self.queue.put(
'No fixed loop two values detected, setting loop to 1')
,→
fix2_values = [1]
# only x_values are loop dependent, the other two are just ordinary arrays from start to stop by step
,→
# update the total measurement length to the proper value self.tot_measurement_len = len(
fix1_values) * len(fix2_values) * len(x_values)
# check if measurement exceeds array length if len(x_values) >= len(self.x):
self.queue.put(
'Measurement exceeds array memory length! Please change in GUIBaseClass')
,→
self.quit.set()
# iteratre through dictionary of machines, turn address time into instance of resource
,→
self.activate_resources() # sets quit flag if fails
# set first fixed values
for f1_count, fix_1 in enumerate(fix1_values):
if self.quit.is_set(): # quits if quit flag is set break
# delay time for charging/ramping to value if necessary if f1_count == 0:
f1_delay = fix_1 else:
f1_delay = abs(fix_1) + abs(fix1_values[f1_count -1])
,→
# run user function
self.fix1func(f1_count, fix_1, self.charging_delay(
f1_delay), self.measurement_resources, self.kwargs)
# set fixed values, update graph
for f2_count, fix_2 in enumerate(fix2_values):
self.pause.wait() # waits for pause flag
if self.quit.is_set(): # check if quit flag is set break
if self.progress_counter == 0: # time one loop t_start = time.time()
# update graph titles
self.graph['fixed_param_1'] = self.f1_header + ': %.
2f' % fix_1
,→
self.graph['fixed_param_2'] = self.f2_header + ': %.
2f' % fix_2
,→
self.queue.put(self.graph)
# calculate delay time if f2_count == 0:
f2_delay = fix_2 else:
f2_delay = abs(fix_2) + abs(fix2_values[f2_count - 1])
,→
# run user function
self.fix2func(fix_2, self.charging_delay(
f2_delay), self.measurement_resources, self.kwargs)
,→
# scan x values
for x_count, x_val in enumerate(x_values):
if self.quit.is_set():
break
# delay time for charging/ramping to value if necessary
,→
if x_count == 0:
x_delay = x_val else:
x_delay = abs(x_val) + abs(x_values[x_count -1])
,→
# run user function depending on if MOKE measurement or not
self.counter.value = x_count + 1 # where data is stored to in the shared array
,→
self.progress_counter += 1
# percent progress completed self.p.value = round(
self.progress_counter / self.tot_measurement_len * 100)
,→
if self.progress_counter == len(x_values + 1):
t_finish = time.time()
time_approximation = t_finish - t_start else:
time_approximation = 0
# estimated time remaing (est. tot time - est. time taken)
,→
self.t.value = round((time_approximation * (len(fix1_values) * len(fix2_values)))
-,→
(time_approximation * (f2_count + len(fix2_values) * f1_count)))
,→
try: # accounts for errors with fix_1, fix_2 b = str(round(fix_1, 2))
s = str(round(fix_2, 2)) except Exception:
b = '' s = ''
# only save if quit flag has not been set if not self.quit.is_set():
# saves data, returns string
self.queue.put(self.save_function(b, s))
self.queue.put('Job Finished') # end measurement
# ============================= MOKE FUNCTIONS
====================================== #
,→
# runs a GUI that has a button which selects the measurement region of the screen
,→
def meas_region(p_dict, flag):
reg = tk.Tk()
reg.title("Determine Capture Area Coordinates") reg.configure(bg='#F2F2F2')
reg.geometry("450x200")
reg.attributes('-alpha', 0.8)
det_btn = tk.Button(master=reg, text='Select Region',
command=lambda: select_coords(reg, p_dict),
# button command updates the list to include the coordinates def select_coords(master, p_dict, flag):
p_dict['left'] = master.winfo_x() p_dict['top'] = master.winfo_y()
p_dict['width'] = master.winfo_width() p_dict['height'] = master.winfo_height()
flag.clear() # set the flag to let the measurement continue master.quit() # destroys the GUI window