• 沒有找到結果。

5.6 Compiling Saved Data

7.1.1 GUIBaseClass.py

import os import time

import tkinter as tk from tkinter import ttk

from tkinter import messagebox from tkinter import filedialog import matplotlib

import multiprocessing as mp from random import randint

from pymeasure import instruments

from MeasurementClass import Measurement

# has to be called before pyplot, backends are imported matplotlib.use("TkAgg")

import matplotlib.pyplot as plt

import matplotlib.animation as animation

from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg

class GUIBase(tk.Tk): # Base GUI class with all features universal to code environment

,→

# initialize this class, includes passed arguments

def __init__(self, _graph, _machines, order_dict, *args):

# initialize the tk.Tk class portion tk.Tk.__init__(self, _graph['gui_title'])

tk.Tk.wm_title(self, _graph['gui_title']) tk.Tk.columnconfigure(self, 0, weight=1) tk.Tk.rowconfigure(self, 0, weight=1)

# dictionaries of measurement settings, graph settings and results

,

self.settings = args self.graph = _graph

self.machines = _machines

self.results = {'x_data': mp.Array('d', 1500), 'y_data': mp.Array('d', 1500),

# for cases where Gaussmeter data is recorded

,

'x2_data': mp.Array('d', 1500),

# counts the number of data points to graph 'counter': mp.Value('i', 0),

# int value of the percentage done the total measurement loop is

# directions for measure module import (since can't be pickled) and loop direction commands

,

self.order = order_dict

# make sure that there is a Measurements folder that exists if os.path.isdir(os.path.expanduser('~\\Documents') +

# loop and file name initial values, later changed to widgets

,

self.file_name = 'test_file'

self.loop = ['low-high', 'zero-zero', 'high-low', 'standard']

# multiprocessing.Process controls, for quit and pause functions

,→

self.measure_process = '' # initialized when measurement button is pressed

,

# flag which will pause process at the top of the measurement loop

,

self.pause_flag = mp.Event()

# flag that kills the measurement process after safely releasing resources

,

self.quit_flag = mp.Event()

# initialize the plot figure and axes necessary for drawing self.fig = plt.Figure(figsize=(6, 5), dpi=100)

self.ax = self.fig.add_subplot(111)

# Main frame that will hold the frames for each specific group of widgets

# The four frames that house each different section of widgets

,

plot_frame = ttk.Frame(base_frame, borderwidth=5) settings_frame = ttk.Frame(base_frame, borderwidth=5) info_frame = ttk.Frame(base_frame, borderwidth=5) buttons_frame = ttk.Frame(base_frame, borderwidth=5) progress_frame = ttk.Frame(base_frame, borderwidth=5)

# Use grid to place all frames on the base_frame plot_frame.grid(row=0, column=0, sticky='nsew') settings_frame.grid(row=0, column=1, sticky='nsew') progress_frame.grid(row=1, column=0, sticky='nsew') info_frame.grid(row=2, column=0, sticky='nsew')

buttons_frame.grid(row=1, column=1, rowspan=2, sticky='nsew')

# creates and grids the listbox and scroll bar self.datalog = tk.Listbox(info_frame, height=5) self.y_scroll = ttk.Scrollbar(info_frame,

# test that title is indeed string try:

if type(self.graph['graph_title'] + "\n" +

self.graph['fixed_param_1'] + " " + self.graph['fixed_param_2']) is str:

,→

,

self.ax.set_title(self.graph['graph_title'] + "\n" +

self.graph['fixed_param_1'] + " " + self.graph['fixed_param_2'])

,

else:

raise Exception(

'Graph Title or Fixed Parameters not a string.') except Exception as err:

print('An exception occured for graph titles: ' + str(err))

,

self.datalog.insert(

'end', 'An exception occured for graph titles: ' + str(err))

,

self.datalog.see('end')

# test that x and y axis labels are indeed strings try:

if type(self.graph['x_title'] + self.graph['y_title']) is str:

,

self.ax.set_xlabel(self.graph['x_title']) self.ax.set_ylabel(self.graph['y_title']) else:

raise Exception('X or Y Axis label not a string.') except Exception as err2:

print('An exception occured for axis labels: ' + str(err2))

,

self.datalog.insert(

'end', 'An exception occured for axis labels: ' + str(err2))

,

self.datalog.see('end')

# Draw the plot on canvas

plot_canvas = FigureCanvasTkAgg(self.fig, plot_frame) plot_canvas.draw()

plot_canvas.get_tk_widget().grid(row=0, column=0, pady=0, padx=0, sticky='nsew')

,→

# variable that updates in animation, letting user how long left there is

,→

self.time_var = tk.StringVar() self.time_var.set('Time Remaining')

# creates and grids a progress bar and estimated time remaining label

# make test settings for dic in self.settings:

try:

if type(dic) is dict:

# for each dictionary in the list, build items self.make_form(settings_frame, dic)

except Exception as err3:

print(

'An exception occured with the list of measurement parameters: ' + str(err3))

,→

self.datalog.insert('end',

'An exception occured with the list of measurement parameters: ' + str(err3))

,→

self.datalog.see('end')

# make loop parameters buttons/labels/entries (the misc commands)

,

misc_lf = ttk.LabelFrame(settings_frame, text='Measurement Options')

,→

misc_lf.grid(ipadx=2, ipady=5, sticky='nsew') file_name_lbl = ttk.Label(

misc_lf, width=20, text='File Name:', anchor='w') file_name_ent = ttk.Entry(misc_lf, width=15)

file_name_ent.insert(0, self.file_name)

# update dictionary value to the entry self.file_name = file_name_ent

file_name_lbl.grid(row=0, column=0, sticky='nsew') file_name_ent.grid(row=0, column=1, sticky='nsew') loop_lbl = ttk.Label(misc_lf, width=20,

text='Loop Type:', anchor='w') loop_box = ttk.Combobox(misc_lf, width=10,

state='readonly', values=self.loop) loop_box.set('low-high')

self.loop = loop_box # update dictionary value to the box loop_lbl.grid(row=1, column=0, sticky='nsew')

loop_box.grid(row=1, column=1, sticky='nsew')

# make the buttons

self.measure_button = ttk.Button(

buttons_frame, text='Measure', command=lambda:

self.measure_method())

,

self.output_button = ttk.Button(

buttons_frame, text='Output', command=lambda:

self.output_method())

,

self.dir_button = ttk.Button(

buttons_frame, text='Change Directory', command=lambda:

self.change_directory_method())

# weights columns/rows that are of weight 1, frames with specific weights are written explicitly

,→

# necessary to keep the scroll bar tiny info_frame.columnconfigure(0, weight=1) info_frame.columnconfigure(1, weight=0)

# necessary to keep the label tiny

progress_frame.columnconfigure(0, weight=1) progress_frame.columnconfigure(1, weight=0)

# test that all instruments are on and the gpib addresses are correct

,

self.check_resources()

# takes the input parent and dictionary and creates label and entry widgets for them

,→

def make_form(self, parent, dictionary):

lf = ttk.LabelFrame(parent, text=dictionary['title']) lf.grid(ipadx=2, ipady=2, sticky='nsew')

mod = 0 # modifies the counter to account for the title which does not take up a row

,→

for key, value in dictionary.items():

if key == 'title': # ignores the title key in the dictionary

,→

pass

elif dictionary['title'] == 'Lockin':

if key == 'Mode':

lab = ttk.Label(lf, width=20, text=key + ':', anchor='w')

if value in bx.cget('values'):

bx.set(value) else:

self.datalog.insert(

'end', value + ' not a recognized mode setting.')

elif key == 'Sensitivity':

lab = ttk.Label(lf, width=20, text=key + ':', anchor='w')

,

bx = ttk.Combobox(lf, width=10, state='readonly', values=('2nV', '5nV', '10nV', '20nV', '50nV', '100nV', '200nV',

'500nV', '1uV', '2uV',

,→

,

'5uV', '10uV', '20uV', '50uV', '100uV', '200uV', '500uV', '1mV', '2mV', '5mV', '10mV', '20mV', '50mV', '100mV', '200mV', '500mV', '1V'))

,→

, ,

if value in bx.cget('values'):

bx.set(value) else:

self.datalog.insert(

'end', value + ' is not a recognized sensitivity setting.')

lab = ttk.Label(lf, width=20, text=key + ':', anchor='w')

,

ent = ttk.Entry(lf, width=15)

ent.insert(0, str(value))

lab.grid(row=mod, column=0, sticky='nsew') ent.grid(row=mod, column=1, sticky='nsew')

# set dictionary value to entry widget dictionary[key] = ent

mod += 1 else:

lab = ttk.Label(lf, width=20, text=key + ':', anchor='w')

,→

ent = ttk.Entry(lf, width=15) ent.insert(0, str(value))

lab.grid(row=mod, column=0, sticky='nsew') ent.grid(row=mod, column=1, sticky='nsew')

dictionary[key] = ent # set dictionary value to entry widget

,→

mod += 1

# takes a given frame and gives all columns a weight of 1 def weight_col(self, frame):

for x in range(frame.grid_size()[0]):

frame.columnconfigure(x, weight=1)

# takes a given frame and gives all rows a weight of 1 def weight_row(self, frame):

for x in range(frame.grid_size()[1]):

frame.rowconfigure(x, weight=1)

# take list of resources and checks if there is a device at the address that is on

,

def check_resources(self):

available_resources = instruments.list_resources()

# note, this fails to identify specific machines, it only identifies addresses

,

for key, value in self.machines.items():

try:

if value in available_resources:

pass else:

raise Exception(key + ' not found at ' + str(value))

,→

except Exception as err:

self.datalog.insert(

'end', 'Resource not detected: ' + str(err)) self.datalog.see('end')

# =====================Button Controls Here===================== #

,

# measure loop, this will acutally just spawn a thread that runs the target function passed to it

,

def measure_method(self):

try:

# makes sure there are no other measurement processes alive

,

if len(mp.active_children()) == 0:

self.datalog.insert('end', 'Starting Measurement') self.datalog.see('end')

# make sure pause is set to false self.pause_flag.set()

self.quit_flag.clear() # quits on true

# enables the stop button

self.stop_button.state(['!disabled'])

# initialize process and queue for measurement loop

# get the loop parameters from the tkinter entryboxes and pass them as a dictionary of kwargs to measure process

,

,

loop_params = {}

for dic in self.settings:

for key, value in dic.items():

if key != 'title':

loop_params[key] = value.get() else:

pass

m = Measurement(self.order, self.results, self.graph, self.pause_flag,

# checks to see if measurement is finished self.after(100, self.process_queue)

else:

raise Exception(

'Multiple Active Measurement Processes Detected!')

,

except Exception as err:

print('An Exception Occured starting measurement: ' + str(err))

,

self.datalog.insert(

'end', 'An Exception Occured starting measurement: ' + str(err))

,

self.datalog.see('end')

# Calls updates after processing the queue, runs every 100 ms def process_queue(self):

try:

if not self.measure_process.is_alive():

self.stop_button.state(['disabled'])

while not self.queue.empty(): # empty out any leftover messages from the queue

,

self.queue.get()

elif not self.queue.empty(): # prints the message passed from the measurement

,

msg = self.queue.get(0)

# resets the system if job is finished

if type(msg) is str and msg == 'Job Finished':

self.datalog.insert('end', msg)

self.measure_process.join() # safely releases process

,

elif type(msg) is dict:

# update dictionary values to updated fields self.graph.update(msg)

# output field for given time def output_method(self):

# pop up a window with input of time, direction, strength t = tk.Toplevel()

z_lim = 1 x_lim = 12

# find the lockin dictionary for dic in self.settings:

if dic['title'] == 'Lockin':

d = dic

# get the conversion factors and channels for key, value in d.items():

if 'Hx Dac' in key:

x_dac = value.get() elif 'Hz Dac' in key:

z_dac = value.get() elif 'Hx Conversion' in key:

x_con = float(value.get()) elif 'Hz Conversion' in key:

z_con = float(value.get())

# see what directions are exceptable for the output if x_dac != -1 and z_dac != -1:

self.datalog.insert('end', 'No magnet control settings found!')

,→

self.datalog.see('end') t.destroy()

# try setting up pymeasure lockin instance try:

lockin = instruments.signalrecovery.DSP7265(

self.machines['dsp_lockin'])

# direction label and combobox output_lbl = ttk.Label(

t, width=15, text='Output direction:', anchor='w') output_direction = ttk.Combobox(

t, width=15, state='readonly', values=(v))

output_direction.set(v[0])

t, width=15, text='Output Strength (Oe)', anchor='w') s_ent = ttk.Entry(t, width=15)

s_ent.insert(0, 100)

s_lbl.grid(row=1, column=0, ipadx=2, ipady=2, sticky='nsew')

,

s_ent.grid(row=1, column=1, ipadx=2, ipady=2, sticky='nsew')

,→

# output time

ot_lbl = ttk.Label(t, width=15, text='Output Time (s)', anchor='w')

,

ot_ent = ttk.Entry(t, width=15) ot_ent.insert(0, 1)

ot_lbl.grid(row=2, column=0, ipadx=2, ipady=2, sticky='nsew')

,

ot_ent.grid(row=2, column=1, ipadx=2, ipady=2, sticky='nsew')

,

out = ttk.Button(t, text='Output', command=lambda:

out_field())

,

out.grid(row=3, columnspan=2, ipadx=5, ipady=5, sticky='nsew')

,→

except Exception as err:

self.datalog.insert(

'end', "Failed to find lockin at " + self.machines['dsp_lockin'] + ':' + str(err))

,

self.datalog.see('end') t.destroy()

def out_field(): # outputs a field in a given direction if not above the limit

,→

self.datalog.insert(

'end', 'Output %s direction field for %s seconds' % (output_direction.get(), ot_ent.get()))

,→

if output_direction.get() == 'z':

if float(s_ent.get()) / z_con <= z_lim:

setattr(lockin,

z_dac, float(s_ent.get()) / z_con) time.sleep(float(ot_ent.get()))

setattr(lockin, z_dac, 0)

else:

self.datalog.insert(

'end', 'Output exceeds Hz amp limit of %d V'

% z_lim)

,

self.datalog.see('end') else:

if float(s_ent.get()) / float(x_con) <= x_lim:

setattr(lockin,

x_dac, float(s_ent.get()) / x_con) time.sleep(float(ot_ent.get()))

setattr(lockin, x_dac, 0) else:

self.datalog.insert(

'end', 'Output exceeds Hx amp limit of %d V'

% x_lim)

,→

self.datalog.see('end') lockin.shutdown()

# change directory, sweet and simple def change_directory_method(self):

self.directory = tk.filedialog.askdirectory() self.datalog.insert('end', self.directory)

# stop, first will pause and then can end the measure process and shut down the instruementss

,

def stop_method(self):

self.pause_flag.clear() # pauses process

self.datalog.insert('end', 'Pausing Process...') self.datalog.see('end')

q1 = tk.messagebox.askquestion('Loop Paused', 'Do you wish to stop?')

self.quit_flag.set() # kills the process

# process queue will join thread and is called every 0.1 seconds

,→

time.sleep(0.2)

# reset the progress bar, time remaining self.results['progress'].value = 0

self.datalog.insert('end', 'Resuming measurement...') self.datalog.see('end')

self.pause_flag.set() # resumes measurement

# quit, shuts down all instruments, kills process, closes program

,→

def quit_method(self):

q = tk.messagebox.askquestion('Quit', 'Are you sure you want to quit?')

,→

if q == 'yes':

try:

if len(mp.active_children()) != 0:

self.datalog.insert(

'end', 'Terminating measurement process...') self.quit_flag.set() # kills the process

# process queue will join thread and is called every 0.1 seconds

def animate_plot(i, ax, _graph, _results, progress_bar, time_var, animate_x2=False): # animation function

,

ax.clear() ax.grid(True)

ax.set_title(_graph['graph_title'] + "\n" + _graph['fixed_param_1'] + " " + _graph['fixed_param_2'])

,→

ax.set_xlabel(_graph['x_title']) ax.set_ylabel(_graph['y_title'])

# avoids a situation where len(x_data[:counter]) !=

len(y_data[:counter])

,

tmp_x = _results['x_data'][0:_results['counter'].value]

ax.plot(tmp_x,

_results['y_data'][0:len(tmp_x)], 'b-o', ms=10, mew=0.5) if animate_x2:

if _results['time'].value != 0 and _results['time'].value / 60

>= 1:

,

time_var.set(

str(round(_results['time'].value / 60)) + ' mins remaining')

,

elif _results['time'].value != 0:

time_var.set('<1 min remaining')

# ======================================= Test Version Below

============================================ #

,

def testf1(output, delay, resources):

time.sleep(0.01)

print(f'testf1 {output} with {delay}') print(resources)

def testf2(output, delay, resources):

time.sleep(0.01)

print(f'testf2 {output} with {delay}')

def testx(output, delay, resources):

time.sleep(0.01)

print(float(output), float(output % 2 + randint(0, 100)), delay, 'testx')

,→

return float(output), float(output % 2 + randint(0, 100)), delay

if __name__ == '__main__':

main()

import numpy as np import tkinter as tk

from pymeasure import instruments from GaussMeterClass import GaussMeter

class Measurement():

# initialize this class, includes passed arguments

相關文件