#This class provides usefull functions for gernerall purposes
#Class functions:
#create_new_file,
#IO-functions
# open_file,
# close_file,
# flush_to_file,
# close_file,
# read_from_file,
# write_to_file,
#
#Ramping
# ramp_voltage
import os, sys, os.path, re
from time import sleep
import time
import threading
import logging, json
from PyQt5 import QtCore
import numpy as np
from numpy.linalg import solve, norm, det, qr, inv
import datetime
import pyqtgraph as pg
import VisaConnectWizard
from PyQt5.QtWidgets import QApplication
#from __future__ import print_function # Needed for the rtd functions that its written in 3
l = logging.getLogger(__name__)
lock = threading.Lock()
[docs]class help_functions:
"""Helpfull function is a multipurpose class with a variety of function and tasks.
It contains functions regarding error handling an file generation as well as generating ramps (e.g. for bias ramps)
For more information see he docs for the functions itself"""
def __init__(self ):
pass
[docs] def write_init_file(self, name, data, path = ""):
"""
This function writes init files for devices and default files
:param name: Name of the file to be written to
:param data: Data in Dict format
:param path: Path where to write
:return:
"""
# find the resource and exclude it from the file
data = data.copy()
#Removes the Visa resource if needed
try:
data.remove("Visa_Resource")
except:
pass
if os.path.isfile(os.path.abspath(str(path) + str(name.split(".")[0]) + ".json")):
os.remove(os.path.abspath(path + str(name.split(".")[0]) + ".json"))
#directory = path[:len(path)-len(path.split("/")[-1])]
file = self.create_new_file(str(name.split(".")[0]), path, os_file=False, suffix=".json")
json.dump(data, file, indent=4, ensure_ascii=False)
self.close_file(file)
elif not os.path.isfile(os.path.abspath(path + str(name.split(".")[0]) + ".json")):
#directory = path[:len(path) - len(path.split("/")[-1])]
file = self.create_new_file(str(name.split(".")[0]), path, os_file=False, suffix=".json")
json.dump(data, file, indent=4, ensure_ascii=False)
self.close_file(file)
# Debricated
#for items in data.items():
# if type(items[1]) != type([]):
# string = str(items[0]) + " = \"" + str(items[1]) + "\"\n"
# os.write(file, str(string))
# else:
# string = str(items[0]) + " = \""
# for i in items[1]:
# string += str(i).strip("'").strip("[").strip("]") + ","
# string = string[:-1]
# string += "\"\n"
# print string
# os.write(file, string)
else:
return -1
# This function works as a decorator to measure the time of function to execute
[docs] def timeit(self, method):
"""
Intended to be used as decorator for functions. It returns the time needed for the function to run
:param method: method to be timed
:return: time needed by method
"""
def timed(*args, **kw):
start_time = time.time()
result = method(*args, **kw)
end_time = time.time()
exce_time = end_time-start_time
return result, exce_time
return timed # here the memberfunction timed will be called
# These functions are for reading and writing to files------------------------------------
#-----------------------------------------------------------------------------------------
# This function works as a decorator to raise errors if python interpretor does not do that automatically
[docs] def raise_exception(self, method):
"""
Intended to be used as decorator for pyQt functions in the case that errors are not correctly passed to the python interpret
"""
def raise_ex(*args, **kw):
try:
#Try running the method
result = method(*args, **kw)
# raise the exception and print the stack trace
except Exception as error:
print("Some error occured in the function " + str(method.__name__) +". With Error:", repr(error)) # this is optional but sometime the raise does not work
raise # this raises the error with stack backtrack
return result
return raise_ex# here the memberfunction timed will be called
# This function works as a decorator to raise errors if python interpretor does not do that automatically
[docs] def run_with_lock(self, method):
"""
Intended to be used as decorator for functions which need to be threadsave. Warning: all functions acquire the same lock, be carefull.
"""
def with_lock(*args, **kw):
try:
# Try running the method
with lock:
l.debug("Lock acquired by program: " + str(method.__name__))
result = method(*args, **kw)
l.debug("Lock released by program: " + str(method.__name__))
# raise the exception and print the stack trace
except Exception as error:
print("A lock could not be acquired in " + str(method.__name__) +". With Error:", repr(error)) # this is optional but sometime the raise does not work
raise # this raises the error with stack backtrace
return result
return with_lock # here the memberfunction timed will be called
# Creates a new file
[docs] def create_new_file(self, filename="default.txt", filepath = "default_path", os_file=True, suffix = ".txt"):
"""
Simply creates a file
:param filename:
:param filepath:
:param os_file:
:param suffix:
:return:
"""
counter = 0
if filepath == "default_path":
filepath = ""
elif filepath == "":
pass
else:
filepath += "/"
filename += str(suffix)
#First check if Filename already exists, when so, add a counter to the file.
if os.path.isfile(os.path.abspath(filepath+filename)):
print("Warning filename " + str(filename) + " already exists!")
l.warning("Warning filename " + str(filename) + " already exists!")
filename = filename[:-4] + "_" + str(counter) + ".txt" # Adds sufix to filename
while os.path.isfile(os.path.abspath(filepath+filename)): # checks if file exists
filename = filename[:-5] + str(counter) + ".txt" # if exists than change the last number in filename string
counter += 1
print("Filename changed to " + filename + ".")
l.info("Filename changed to " + filename + ".")
if os_file:
fd = os.open(os.path.abspath(filepath+filename), os.O_WRONLY | os.O_CREAT) # Creates the file
else:
fd = open(os.path.abspath(filepath+filename), "w")
l.info("Generated file: " + str(filename))
print("Generated file: " + str(filename))
return fd
# Opens a file for reading and writing
[docs] def open_file(self, filename="default.txt", filepath = "default_path"):
"""
Just opens a file and returns the file pointer
:return: File
"""
if filepath == "default_path":
filepath = ""
try:
fd = open(filepath + filename, 'r+') #Opens file for reading and writing
return fd
except IOError:
print(str(filepath + filename) + " is not an existing file.")
# Closes a file (just needs the file pointer)
[docs] def close_file(self, file):
"""
Closed the file specified in param file
"""
try:
try:
os.close(file)
except:
file.close()
except GeneratorExit:
print("Closing the file: " + str(file) + " was not possible")
except:
print("Unknown error occured, while closing file " + str(file) + "Error: ", sys.exc_info()[0])
# This flushes a string to a file
[docs] def flush_to_file(self, file, message):
"""
Flushes data to a opend file
Only strings or numbers allowed, Lists will work too but may cause data scrambling
Only use this with created files from function 'create_new_file'
"""
os.write(file, str(message)) #Writes the message to file
os.fsync(file) # ensures that the data is written on HDD
[docs] def write_to_file(self, content, filename="default.txt", filepath = "default_path"):
"""
This writes content to a file. Be aware, input must be of type 'list' each entry containing the information of one line
"""
file = self.open_file(filename, filepath)
try:
for line in content:
file.write(str(line))
except IOError:
print("Writing to file " + filename + " was not possible")
except:
print("Unknown error occured, while writing to file " + str(filename) + "Error: ", sys.exc_info()[0])
self.close_file(file)
[docs] def read_from_file(self, filename="default.txt", filepath = "default_path"):
"""
Gives you the content of the file in an list, each list entry is one line of the file (datatype=string)
Warning: File gets closed after reading
"""
file = self.open_file(filename, filepath)
try:
return file.readlines()
except IOError:
print("Could not read from file.")
return []
except:
print("Unknown error occured, while reading from file " + str(filename) + "Error: ", sys.exc_info()[0])
self.close_file(file)
# These functions are for reading and writing to files------------------------------------
# -------------------------------------------------------------------------------------end
[docs] def ramp_voltage_job(self, queue, resource, order, voltage_Start, voltage_End, step, wait_time = 0.2, complience=100e-6):
"""
Only use this function for simple ramping for the main, never inside a measurement!!!
"""
job = {"Measurement": {"ramp_voltage": {"Resource": resource,
"Order": order,
"StartVolt": voltage_Start,
"EndVolt": voltage_End,
"Steps": step,
"Wait": wait_time,
"Complience": complience}}}
queue.put(job)
# These function is for ramping up or down (not final)
[docs] def ramp_voltage(self, VisaResource, instrument, order_code, max_value, ramp_steps, time_to_wait, ramp_up=True):
"""
Deprecated
This function ramps a value by sending commands after a timeout. Usage, ramping voltage for a device
:param VisaResource: Device which gets the commands
:param instrument:
:param order_code: Code which has to be send
:param max_value: Maximum value from where to be ramped
:param ramp_steps: Stepssize
:param time_to_wait: waitingtime
:param ramp_up: Whether or not to ramp from abs up or down
:return:
"""
print("Warning old ramp function was used!!!")
step_size = round(max_value / ramp_steps, 0)
instrument_to_write = VisaResource.myInstruments[instrument]
for voltage in range(ramp_steps + 1):
if ramp_up:
VisaResource.write(instrument_to_write, str(order_code) + " " + str(voltage * step_size))
sleep(time_to_wait)
else:
VisaResource.write(instrument_to_write, str(order_code) + " " + str((ramp_steps - voltage) * step_size))
sleep(time_to_wait)
if ramp_up:
VisaResource.write(instrument_to_write, str(order_code) + " " + str(max_value))
else:
VisaResource.write(instrument_to_write, str(order_code) + " " + str(0))
# These function is for ramping up or down
[docs] def int2dt(self, ts, ts_mult = 1e3):
"""
Convert seconds value into datatime struct which can be used for x-axis labeeling
"""
return datetime.datetime.utcfromtimestamp(float(ts) / ts_mult)
[docs] def get_timestring_from_int(self, time_array, format = "%H:%M:%S"):
"""
Converts int time to timestring
"""
list = []
for value in time_array:
list.append((value, self.int2dt(value,1).strftime(format)))
return list
[docs] def get_thicks_for_timestamp_plot(self, time_array, max_number_of_thicks = 10, format = "%H:%M:%S"):
"""
This gives back a list of tuples for the thicks
"""
final_thicks = []
if len(time_array) <= max_number_of_thicks:
final_thicks = self.get_timestring_from_int(time_array, format)
else:
length = len(time_array)
delta = int(length/max_number_of_thicks)
for i in range(0, length, delta):
final_thicks.append((time_array[i], self.int2dt(time_array[i],1).strftime(format)))
return final_thicks
[docs] class CAxisTime(pg.AxisItem):
"""Over riding the tickString method by extending the class"""
# @param[in] values List of time.
# @param[in] scale Not used.
# @param[in] spacing Not used.
[docs] def tickStrings(self, values, scale, spacing):
"""Generate the string labeling of X-axis from the seconds value of Y-axis"""
# sending a list of values in format "HH:MM:SS.SS" generated from Total seconds.
return [(self.int2dt(value).strftime("%H:%M:%S.%f"))[:-4] for value in values]
[docs] def int2dt(ts, ts_mult=1e3):
"""Convert seconds value into datatime struct which can be used for x-axis labeeling"""
return (datetime.utcfromtimestamp(float(ts) / ts_mult))
hf = help_functions()
[docs]class newThread(threading.Thread): # This class inherite the functions of the threading class
'''Creates new threads easy, it needs the thread ID a name, the function/class,
which should run in a seperated thread and the arguments passed to the object'''
[docs] def __init__(self, threadID, name, object__, *args): # Init where the threadID and Thread Name are defined
"""
:param threadID: ID the thread should have
:param name: Give the thread a name you want
:param object__: The object which should run in the new thread
:param args: Arguments passed to object__
"""
threading.Thread.__init__(self) # Opens the threading class
self.threadID = threadID
self.name = name
self.object__= object__
self.args = args
def run_process(self, object__, args): # Just for clarification, not necessary.
"""
"""
return object__(*args)
[docs] def run(self):
"""Starts running the thread"""
print ("Starting thread: " + self.name) # run() is a member function of Thread() class. This will be called, when object thread will be started via thread.start()
l.info("Starting thread: " + self.name)
self.object__ = self.run_process(self.object__, self.args)
[docs] def get(self):
'''returns the Object'''
return self.object__
class newThread_(QtCore.QThread): # This class inherite the functions of the threading class
'''Creates new threads easy, it needs the thread ID, a name, the function/class, which should run in a seperated thread
This is the same thing like newThread but instead QTCore modules are used.'''
def __init__(self, threadID, name, object__, *args): # Init where the threadID and Thread Name are defined
QtCore.QThread.__init__(self) # Opens the threading class
self.threadID = threadID
self.name = name
self.object__= object__
self.args = args
def run_process(self, object__, args): # Just for clarification, not necessary.
return object__(*args)
def run(self):
print ("Starting thread: " + self.name) # run() is a member function of Thread() class. This will be called, when object thread will be started via thread.start()
l.info("Starting thread: " + self.name)
self.object__ = self.run_process(self.object__, self.args)
@staticmethod
def get():
'''returns the Object''' # Not working, do not use
newThread_.object__()
[docs]class LogFile:
"""
This class handles the Logfile for the whole framework
"""
def __init__(self, logging_level = "debug"):
"""
Initiates the logfile with the logging level
:param logging_level: None, debug, info, warning, error critical
"""
self.LOG_FORMAT = "%(levelname)s %(asctime)s in function %(funcName)s - %(message)s"
self.file_PATH = os.path.normpath(os.path.realpath(__file__)[:-30] + "/Logfiles/QTC_Logfile.log") # Filepath to Logfile directory
self.file_directory = os.path.normpath(os.path.realpath(__file__)[:-30] + "/Logfiles")
self.logging_level = logging_level
self.log_LEVELS = {"NOTSET": 0, "DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50}
self.welcome_string = "\n" \
"\t\t\t\t\t\t\t\t __ ______ ______ ______ __ __ ______ ______ ______ ______\n \
/\ \ /\ __ \ /\ ___\ /\ ___\ /\ \ /\ \ /\ ___\ /\ __ \ /\__ _\ /\ ___\ \n \
\ \ \____ \ \ \/\ \ \ \ \__ \ \ \ __\ \ \ \ \ \ \____ \ \ __\ \ \ \/\_\ \/_/\ \/ \ \ \____ \n \
\ \_____\ \ \_____\ \ \_____\ \ \_\ \ \_\ \ \_____\ \ \_____\ \ \___\_\ \ \_\ \ \_____\ \n \
\/_____/ \/_____/ \/_____/ \/_/ \/_/ \/_____/ \/_____/ \/___/_/ \/_/ \/_____\n\n\n"
self.get_logging_level()
try:
os.remove(self.file_PATH) # IS needed, because sometimes other modules like the visa modules would override the file and all logs are lost, so the log file is in appending mode and no data loss.
except Exception as e:
print ("Old Logfile could not be deleted: " + str(e))
# Check if the folder already exists or create the folder
if os.path.isdir(self.file_directory):
logging.basicConfig(filename=self.file_PATH, level=self.logging_level, format = self.LOG_FORMAT, filemode= 'a') # Overrides the old file (this is just for initialization)
else:
os.makedirs(self.file_directory)
logging.basicConfig(filename=self.file_PATH, level=self.logging_level, format = self.LOG_FORMAT, filemode= 'a') # Overrides the old file (this is just for initialization)
# Create a logger Object
self.LOG = logging.getLogger()
# Print welcome message
self.LOG.critical(self.welcome_string)
#Simply changes the string input to a int for the logging level
[docs] def get_logging_level(self):
''' Checks the logging level input. If it is a string -> convert. Otherwise, do nothing'''
if type(self.logging_level) == type("string"):
self.logging_level=self.log_LEVELS.get(self.logging_level.upper(), "DEBUG")
else:
pass
[docs]class Framework:
"""
Generall class for handling all Framework related tasks, like updating the GUI etc.
"""
def __init__(self, values_from_GUI):
"""
:param values_from_GUI: A tuple of (list of functions, udate intervall)
"""
self.functions, self.update_interval = values_from_GUI()
self.timer = None
[docs] def start_timer(self): # Bug timer gets not called due to a lock somewhere else
"""
Simply starts the timer
:return: timer
"""
l.info("Framework initialized")
timer = QtCore.QTimer()
timer.timeout.connect(self.update_)
timer.start(self.update_interval)
self.timer = timer
return timer
[docs] def update_(self):
"""
Periodically updates the functions in self.functions list
:return: None
"""
#start = time.time()
for function in self.functions:
try:
function()
except:
l.error("Could not update framework " + function)
#end = time.time()
#print end - start
class transformation_old:
def vector_transform(self,v,T):
'''This function transform one vector into the basis of another'''
return T.dot(v)
def transformation_matrix(self, a, b):
'''Calculates the transformation matrix of the system.
Via the equation T = P^-1 Q where P and Q are comming from the linear system s*T + V0 = v
a and must be the linear hull (or simple three linear independent vecotrs)'''
P = np.transpose(np.array([[a[1][0]-a[0][0],a[2][0]-a[0][0]],
[a[1][1]-a[0][1],a[2][1]-a[0][1]]]))
#[a[1][2]-a[0][2],a[2][2]-a[0][2]]]))
print(P.shape)
Pinvert = inv(P)
Q = np.transpose(np.array([ [b[1][0]-b[0][0],b[2][0]-b[0][0]],
[b[1][1]-b[0][1],b[2][1]-b[0][1]],
[b[1][2]-b[0][2],b[2][2]-b[0][2]]
]))
print(Q.shape)
#T = solve(P, Q)
T = Pinvert.dot(Q)
print(T)
#print T.dot(np.transpose(a)[1])
#V0 = np.transpose(b)[1] - np.dot(np.transpose(a)[1],T)
#print V0
return
def linear_dependency(self, x):
'''Calculates if the matrizes a and b are linear independet.
a and b must be matrizes of type np.array([a1,a2,a3]), where ai are the corresponding linearhull.
'''
# Test linear dependency
detx = det(x)
if detx != 0:
return True
else:
return False
def qr_factorisation(self,x):
'''Calculates the qr factorisation of a matrix (linear indenendecy is requiered)
Basically it just calculates a Orthogonal matrix q and a upper triangula matrix r.
A=Q*R is the basic principal behind it.'''
q,r = qr(x)
return q
def renorm(self,x):
'''Renormalise the basis vektors of a matrix'''
for i in range(len(x)):
x[i]=x[i]/norm(x[i])
return x
[docs]class measurement_job_generation:
"""This class handles all measurement generation items"""
def __init__(self, main_variables, queue_to_measurement_event_loop):
"""
:param main_variables: Simply the state machine variables ('defaults')
:param queue_to_measurement_event_loop:
"""
self.variables = main_variables["Defaults"]
self.queue_to_measure = queue_to_measurement_event_loop
self.final_job = {}
[docs] def generate_job(self, additional_settings_dict):
'''
This function handles all the work need to be done in order to generate a job
:param additional_settings_dict: If any additional settings are in place
'''
self.final_job = additional_settings_dict
header = "# Measurement file: \n " \
"# Project: " + self.variables["Current_project"] + "\n " \
"# Sensor Type: " + self.variables["Current_sensor"] + "\n " \
"# ID: " + self.variables["Current_filename"] + "\n " \
"# Operator: " + self.variables["Current_operator"] + "\n " \
"# Date: " + str(time.asctime()) + "\n\n"
IVCV_dict = self.generate_IVCV("") # here additional header can be added
strip_dict = self.generate_strip("")
if IVCV_dict:
self.final_job.update({"IVCV": IVCV_dict})
if strip_dict:
self.final_job.update({"stripscan": strip_dict})
# Check if filepath is a valid path
if self.variables["Current_filename"] and os.path.isdir(self.variables["Current_directory"]):
self.final_job.update({"Header": header})
self.queue_to_measure.put({"Measurement": self.final_job})
print ("Sendet job: " + str({"Measurement": self.final_job}))
else:
l.error("Please enter a valid path and name for the measurement file.")
print ("Please enter a valid path and name for the measurement file.")
[docs] def generate_IVCV(self, header):
'''
This function generates all that has to do with IV or CV
:param header: An additional header
:return: the job dictionary
'''
final_dict = {}
file_header = header
if self.variables["IV_measure"][0]:
file_header += "voltage[V]".ljust(24) + "current[A]".ljust(24)
if self.variables["CV_measure"][0]:
file_header += "voltage[V]".ljust(24) + "capacitance[F]".ljust(24)
file_header += "temperature[deg]".ljust(24) + "humidity[rel%]".ljust(24)
final_dict.update({"header": file_header})
if self.variables["IV_measure"][0]:
values = self.variables["IV_measure"]
final_dict.update({"IV": {"StartVolt": 0, "EndVolt": values[1], "Complience": str(values[2])+ "e-6", "Steps": values[3]}})
if self.variables["CV_measure"][0]:
values = self.variables["CV_measure"]
final_dict.update({"CV": {"StartVolt": 0, "EndVolt": values[1], "Complience": str(values[2])+ "e-6", "Steps": values[3]}})
if len(final_dict) > 1:
return final_dict
else:
return {} # If the dict consits only of one element (only the header)
[docs] def generate_strip(self, header):
'''
This function generate all tha has to do with strip scans
:param header: Additional header
:return: strip job dictionary
'''
final_dict = {}
all_measurements = ["Rint", "Istrip", "Idiel", "Rpoly","Cac", "Cint", "Idark", "Cback", "CintAC"] # warning if this is not included here no job will generated. is intentional
def generate_dict(values):
''' Generates a simple dict for strip scan measurements'''
if values[0]: # Checks if the checkbox is true or false, so if the measurement should be condcuted or not
return {"measure_every": values[1], "start_strip": values[2], "end_strip": values[3]}
else:
return {}
# First check if strip scan should be done or not
if self.variables["Stripscan_measure"][0]:
final_dict.update({"StartVolt": 0, "EndVolt": self.variables["Stripscan_measure"][1], "Complience": str(self.variables["Stripscan_measure"][2])+ "e-6", "Steps": self.variables["Stripscan_measure"][3]})
for elemets in all_measurements:
dict = generate_dict(self.variables[elemets + "_measure"])
if dict:
final_dict.update({elemets: dict})
final_dict.update({"Additional Header": header})
if len(final_dict) > 2:
return final_dict
else:
return {} # If the dict consits only of one element (only the header)
[docs]class table_control_class:
'''This class handles all interactions with the table. Movement, status etc.
This class is designed to be running in different instances.'''
def __init__(self, main_variables, device, queue_to_GUI, shell):
"""
:param main_variables: Defaults dict
:param device: The VISA device object
:param queue_to_GUI:
:param shell: The UniDAQ shell object
"""
self.variables = main_variables["Defaults"]
self.device = device
self.table_ready = self.variables["table_ready"]
self.queue = queue_to_GUI
self.vcw = VisaConnectWizard.VisaConnectWizard()
self.shell = None
try:
self.visa_resource = self.device["Visa_Resource"]
self.table_ready = True
except:
print ("Warning table control could not be initialized!")
self.table_ready = False
self.queue.put({"RequestError": "Table seems not to be connected!"})
l.error("Table control could not be initialized!")
self.zmove = self.variables["height_movement"]
def __build_command(self, order, values , command_order = 1):
'''
This function builds the correct orders together, it always returns a list,
if a order needs to be sended several times with different values
:param order: The actual command
:param values: List of values which need to be send alongside order
:param command_order: If the commands in the list should be sended reversed or not (-1, 1)
'''
if values:
if type(values) != list: # ensures we have a list
values_list = [values]
else:
values_list = values
full_command_list = []
if int(command_order) == 1:
for item in values_list:
full_command_list.append(str(order)+ " " + str(item).strip())
return full_command_list
elif int(command_order) == -1:
for item in values_list:
full_command_list.append(str(item) + " " + str(order))
return full_command_list
else:
l.error("Something went wrong with the building of the table command!")
else:
if type(order) != list: # ensures we have a list
return [order]
else:
return order
[docs] def get_current_position(self):
'''Queries the current position of the table and writes it to the state machine'''
if self.table_ready:
while True: # Loop as long as there is a valid position from the corvus
try:
command = self.__build_command(self.device["get_position"], "", self.device["command_order"])
string = self.vcw.query(self.visa_resource, command[0],self.device.get("execution_terminator", "")).strip()
list = re.split('\s+', string)[:3]
self.device["x_pos"] = float(list[0])
self.device["y_pos"] = float(list[1])
self.device["z_pos"] = float(list[2])
return [float(i) for i in list]
except:
l.error("The corvus has replied with a non valid position string: " + str(string))
[docs] def check_if_ready(self, timeout = 0, maxcounter = -1):
'''
This function checks if the movement of the table is done or not
:param timeout: Timeout how long to wait between the attempts
:param maxcounter: How often does the function try to get an answer, -1 infinite
:return: 0 if ok error if not
'''
# Alot of time can be wasted by the timeout of the visa read order
ready_command = self.__build_command(self.device["all_done"], "", self.device["command_order"])
counter = 0 # counter how often the read attempt will be carried out
cal_not_done = True
self.vcw.write(self.visa_resource, ready_command[0], self.device.get("execution_terminator", ""))
while cal_not_done:
done = self.vcw.read(self.visa_resource)
if maxcounter != -1:
counter += 1
if counter > maxcounter:
cal_not_done = False #exits the loop after some attempts
if float(done) == -1: # case when corvus is busy and does not respond to anything
pass
elif float(done) == 1: # motors are in movement
self.vcw.write(self.visa_resource, ready_command[0], self.device.get("execution_terminator", "")) # writes the command again
elif float(done) == 2: # joystick active
cal_not_done = False
return {"RequestError": "Joystick of table control is active."}
elif float(done) == 4: # joystick active
cal_not_done = False
return {"RequestError": "Table control is not switched on."}
elif float(done) > 4: # joystick active
cal_not_done = False
return {"RequestError": "The table control reported an unknown error, with errorcode: " + str(done)}
elif float(done) == 0: # when corvus is read again
self.get_current_position()
cal_not_done = False
return 0
QApplication.processEvents() # Updates the GUI, maybe after some iterations
sleep(timeout)
[docs] def initiate_table(self):
'''
This function triggers the table initiation
:return: 0 if ok error if not
'''
if self.table_ready and not self.variables["table_is_moving"]:
self.variables["table_is_moving"] = True
commands = self.__build_command(self.device["calibrate_motor"], "", self.device["command_order"])
for order in commands:
self.vcw.write(self.visa_resource, order, self.device.get("execution_terminator", ""))
errorcode = self.check_if_ready()
if errorcode == 0:
pos = self.get_current_position()
if commands[0] == order:
self.device["table_xmin"] = float(pos[0])
self.device["table_ymin"] = float(pos[1])
self.device["table_zmin"] = float(pos[2])
else:
self.device["table_xmax"] = float(pos[0])
self.device["table_ymax"] = float(pos[1])
self.device["table_zmax"] = float(pos[2])
else:
return errorcode
self.variables["table_is_moving"] = False
return 0
[docs] def check_position(self, desired_pos):
'''
This function checks if two positions are equal or not
:param desired_pos: The position it should be
:return: 0 if ok error if not
'''
new_pos = self.get_current_position()
for i, pos in enumerate(new_pos):
if abs(float(pos) - float(desired_pos[i])) > 0.5: # up to a half micrometer
errorcode = {"MeasError": "Table movement failed. Position: " + str(new_pos) + " is not equal to desired position: " + str(desired_pos)}
l.error("Table movement failed. Position: " + str(new_pos) + " is not equal to desired position: " + str(desired_pos))
return errorcode
return 0
def __already_there(self, pad_file, strip, transfomation_class, T, V0):
'''
Checks if we are already at the strip we want to move to
:param pad_file:
:param strip:
:param transfomation_class:
:param T:
:param V0:
:return: True if at strip
'''
pos = self.get_current_position() # table position
pad_pos = [float(x) for x in pad_file["data"][strip][1:4]] # where it should be in sensor system
table_pos = transfomation_class.vector_trans(pad_pos, T, V0) # Transforms the vektor to the right basis
deltapos = [abs(x1 - x2) for (x1, x2) in zip(pos, table_pos)]
for delta in deltapos:
if delta >= 0.5: # Checks if the position is correct in a range of 0.5 um
return False
return True
[docs] def move_to_strip(self, pad_file, strip, transfomation_class, T, V0, height_movement = 800):
'''
Moves to a specific strip
:param transfomation_class:
:param height_movement: How much should the table move down
:return: None or errorcode
'''
error = None
if transformation != []:
if not self.__already_there(pad_file, strip, transfomation_class, T, V0):
pad_pos = pad_file["data"][strip][1:4]
l.info("Moving to strip: {} at position {},{},{}.".format(strip, pad_pos[0], pad_pos[1], pad_pos[2]))
table_abs_pos = transfomation_class.vector_trans(pad_pos, T, V0)
error = self.move_to(table_abs_pos, move_down=True, lifting = height_movement)
self.variables["current_strip"] = int(strip+1)
return error
else:
return {"RequestError": "No Transformation Matrix found! Is the alignment done?"}
[docs] def relative_move_to(self, position, move_down = True, lifting = 800):
'''
This function moves the table to the desired position (relative). position is a list of coordinates
The move down parameter can prohibit a down and up movement with the move, !!!!USE WITH CARE!!!!
:return: none or error code
'''
error = self.move_to(position, move_down, lifting, True)
return error
[docs] def move_to(self, position, move_down = True, lifting = 800, relative_move = False):
'''
This function moves the table to the desired position. position is a list of coordinates
The move down parameter can prohibits a down and up movement with the move, !!!!USE WITH CARE!!!!
:return: None or errorcode
'''
if self.table_ready and not self.variables["table_is_moving"]:
# get me the current position
old_pos = self.get_current_position()
#Move the table down if necessary
if move_down:
error = self.move_down(lifting)
if error != 0:
return error
# Change the state of the table
self.variables["table_is_moving"] = True
pos_string = ""
# Build the position to move to
for i, pos in enumerate(position): # This is that the table is in the down position when moved
if move_down and i==2 and not relative_move:
pos_string += str(float(pos)-float(lifting)) + " "
else:
pos_string += str(pos) + " "
# Move the table to the position
if relative_move:
move_command = self.__build_command(self.device["relative_move_to"], pos_string, self.device["command_order"])
else:
move_command = self.__build_command(self.device["move_to"], pos_string, self.device["command_order"])
self.vcw.write(self.visa_resource, move_command[0], self.device.get("execution_terminator", ""))
error = self.check_if_ready()
if error != 0:
return error
# State that the table is not moving anymore
self.variables["table_is_moving"] = False
# Move the table back up again
if move_down:
error = self.move_up(lifting)
if error != 0:
return error
# Finally make sure the position is correct
if relative_move:
error = self.check_position([sum(x) for x in zip(old_pos, position)])
else:
error = self.check_position(position)
if error != 0:
return error
self.variables["table_is_moving"] = False
return 0
[docs] def move_up(self, lifting):
'''
This function moves the table up
:param lifting: hight movement
:return: none or errorcode
'''
if not self.variables["Table_state"]:
errorcode = self.move_to([0,0,lifting], False, 0, True)
if not errorcode:
self.variables["Table_state"] = True
return errorcode
return 0
[docs] def move_down(self, lifting):
'''
This function moves the table down
:param lifting: hight movement
:return: none or errorcode
'''
if self.variables["Table_state"]:
errorcode = self.move_to([0,0,-lifting], False, 0, True)
if not errorcode:
self.variables["Table_state"] = False
return errorcode
return 0
@hf.raise_exception
def set_joystick(self, bool):
'''This enables or disables the joystick'''
if self.table_ready:
if bool:
command = self.__build_command(self.device["set_joystick"], "1", self.device["command_order"])
else:
command = self.__build_command(self.device["set_joystick"], "0", self.device["command_order"])
self.vcw.write(self.visa_resource, command[0], self.device.get("execution_terminator", ""))
[docs] def set_joystick_speed(self, speed):
'''This sets the speed for the joystick'''
if self.table_ready:
command = self.__build_command(self.device["set_joy_speed"], str(speed), self.device["command_order"])
self.vcw.write(self.visa_resource, command[0], self.device.get("execution_terminator", ""))
[docs] def set_axis(self, axis_list):
'''This sets the axis on or off. axis_list must contain a list of type [x=bool, y=bool, z=bool]'''
if self.table_ready:
final_axis_list = []
for i, axis in enumerate(axis_list):
if axis:
final_axis_list.append("1 " + str(i+1))
else:
final_axis_list.append("0 " + str(i+1))
command = self.__build_command(self.device["set_axis"], final_axis_list, self.device["command_order"])
for com in command:
self.vcw.write(self.visa_resource, com, self.device.get("execution_terminator", ""))
[docs] def stop_move(self):
'''This function stops the table movement immediately'''
if self.table_ready:
command = self.__build_command(self.device["abort_movement"], "", self.device["command_order"])
self.visa_resource.write(command)
[docs]class switching_control:
"""
This class handles all switching controls of all switching matrices
"""
def __init__(self, settings, devices, queue_to_main, shell):
'''
This class handles all switching actions
:param settings: default settings ( state machine )
:param devices: devices dict
:param shell: UniDAQ shell object
'''
self.settings = settings
self.message_to_main = queue_to_main
self.devices = devices
self.vcw = VisaConnectWizard.VisaConnectWizard()
self.shell = None
[docs] def reset_switching(self, device="all"):
'''
This function resets all switching or one device switching
:param device: all oder device object:
'''
if device == "all": # opens all switches in all relays
for dev in self.devices.values():
if "Switching relay" in dev["Device_type"]:
self.change_switching(dev, []) # Opens all closed switches
else:
self.change_switching(device, [])
[docs] def check_switching_action(self):
"""Checks what channels are closed on all switching devices"""
for devices in self.devices.values():
if "Switching relay" in devices["Device_type"] and "Visa_Resource" in devices:
current_switching = str(self.vcw.query(devices["Visa_Resource"], devices["check_all_closed_channel"], devices.get("execution_terminator", ""))).strip()
current_switching = self.pick_switch_response(devices, current_switching)
self.settings["Defaults"]["current_switching"][devices["Display_name"]] = current_switching
return current_switching
[docs] def apply_specific_switching(self, switching_dict):
"""This function takes a dict of type {"Switching": [/switch nodes], ....} and switches to these specific type"""
num_devices = len(switching_dict) # how many devices need to be switched
devices_found = 0
for devices in self.devices.values(): # Loop over all devices values (we need the display name)
for display_names in switching_dict: # extract all devices names for identification
if display_names in devices["Display_name"]:
switching_success = self.change_switching(devices, switching_dict[display_names])
devices_found += 1
if num_devices != devices_found:
l.error("At least one switching was not possible, no devices for switching included/connected")
self.message_to_main.put({"MeasError": "At least one switching was not possible, no devices for switching included/connected"})
switching_success = False
return switching_success
[docs] def switch_to_measurement(self, measurement):
'''
This function switches all switching systems to a specific measurement type
:param measurement: string e.g. "IV", "CV" must be defined in the switching dict
:return: true or false, so if switching was successfull or not
'''
# First find measurement
switching_success = False
if measurement in self.settings["Switching"]:
# When measurement was found
for name, switch_list in self.settings["Switching"][measurement].items():
device_found = False
for devices in self.devices.values():
if name in devices["Display_name"]:
device = devices
switching_success = self.change_switching(device, switch_list)
device_found = True
if not device_found:
self.message_to_main.put({"RequestError": "Switching device: " + str(name) + " was not found in active resources. No switching done!"})
return False
return switching_success
else:
l.error("Measurement " + str(measurement) + " switching could not be found in defined switching schemes.")
self.message_to_main.put({"MeasError": "Measurement " + str(measurement) + " switching could not be found in defined switching schemes."})
return False
def __send_switching_command(self, device, order, list_of_commands, syntax):
"""Sends a switching command"""
if list_of_commands:
if list_of_commands[0]:
mult_commands = ""
for channel in list_of_commands:
mult_commands += channel + ","
command = mult_commands[:-1] # Get rid of last comma
if len(syntax) > 1:
command = syntax[0] + command + syntax[1] # for the syntax
command = self.build_command(device, (order, command))
self.vcw.write(device["Visa_Resource"], command, device.get("execution_terminator", "")) # Write new switching
[docs] def pick_switch_response(self, device, current_switching):
'''
This function picks the string response and returns a list.
:param current_switching: is a string containing the current switching
'''
syntax_list = device.get("switch_syntax", "")
if syntax_list:
syntax_list = syntax_list.split("channel")# gets me header an footer from syntax
# Warning 7001 keithley sometimes seperates values by , and sometimes by : !!!!!!!!!
# Sometimes it also happens that it mixes everything -> this means that the channel from to are closed etc.
# Todo: implement the : exception
if ":" in current_switching:
l.error("The switching syntax for this is not yet implemented, discrepancies do occure from displayed to actually switched case. TODO")
if "," in current_switching: # if this shitty mix happens
current_switching = current_switching.replace(",", ":")
if len(syntax_list) > 1:
current_switching = current_switching[len(syntax_list[0]): -len(syntax_list[1])]
return current_switching.strip().split(":") # now we have the right commands
else:
return current_switching.strip().split(":") # now we have the right commands
if "," in current_switching:
if ":" in current_switching: # if this shitty mix happens
current_switching = current_switching.replace(":", ",")
if len(syntax_list) > 1:
current_switching = current_switching[len(syntax_list[0]): -len(syntax_list[1])]
#current_switching = current_switching.split(syntax_list[0]).split(syntax_list[1])
return current_switching.strip().split(",") # now we have the right commands
else:
return current_switching.strip().split(",") # now we have the right commands
elif "@" in current_switching: # if no switching at all happens
if len(syntax_list) > 1:
current_switching = current_switching[len(syntax_list[0]): -len(syntax_list[1])]
return current_switching.strip().split() # now we have the right commands
else:
return current_switching.strip().split() # now we have the right commands
else:
return current_switching.strip().split() # now we have the right commands
@hf.raise_exception
@hf.run_with_lock
def change_switching(self, device, config): # Has to be a string command or a list of commands containing strings!!
'''
Fancy name, but just sends the swithing command
:param config: the list of nodes which need to be switched
'''
# TODO: check when switching was not possible that the programm shutsdown! Now the due to the brandbox this is switched off
if type(config) == unicode or type(config) == str:
configs = [config]
else:
configs = config
if "Visa_Resource" in device: #Searches for the visa resource
resource = device["Visa_Resource"]
else:
self.message_to_main.put({"RequestError": "The VISA resource for device " + str(device["Display_name"]) + " could not be found. No switching possible."})
return -1
current_switching = str(self.vcw.query(resource, device["check_all_closed_channel"], device.get("execution_terminator", ""))).strip() # Get current switching
syntax_list = device.get("switch_syntax", "")
if syntax_list:
syntax_list = syntax_list.split("channel")# gets me header an footer from syntax
current_switching = self.pick_switch_response(device, current_switching)
to_open_channels = list(set(current_switching) - (set(configs))) # all channels which need to be closed
to_close_channels = configs
# Close channels
self.__send_switching_command(device, device["set_close_channel"], to_close_channels, syntax_list)
sleep(0.01) # Just for safety reasons because the brandbox is very slow
# Open channels
self.__send_switching_command(device, device["set_open_channel"], to_open_channels, syntax_list)
# Check if switching is done (basically the same proceedure like before only in the end there is a check
device_not_ready = True
counter = 0
while device_not_ready:
current_switching = None
all_done = str(self.vcw.query(resource, device["operation_complete"], device.get("execution_terminator", ""))).strip()
if all_done == "1" or all_done == "Done":
current_switching = str(self.vcw.query(resource, device["check_all_closed_channel"], device.get("execution_terminator", ""))).strip()
syntax_list = device.get("switch_syntax", "")
if syntax_list:
syntax_list = syntax_list.split("channel") # gets me header an footer from syntax
current_switching = self.pick_switch_response(device, current_switching)
self.settings["Defaults"]["current_switching"][device["Display_name"]] = current_switching
command_diff = list(set(configs).difference(set(current_switching)))
if len(command_diff) != 0: #Checks if all the right channels are closed
l.error("Switching to " + str(configs) + " was not possible. Difference read: " + str(current_switching))
print ("Switching to " + str(configs) + " was not possible. Difference read: " + str(current_switching))
device_not_ready = False
return False
device_not_ready = False
return True
if counter > 5:
device_not_ready = False
counter += 1
l.error("No response from switching system: " + device["Display_name"])
self.message_to_main.put({"RequestError": "No response from switching system: " + device["Display_name"]})
return False
[docs] def build_command(self, device, command_tuple):
"""
Builds the command correctly, so that device recognise the command
:param device: device dictionary
:param command_tuple: command, value
"""
if type(command_tuple) == unicode:
command_tuple = (str(command_tuple),) # so the correct casting is done
for key, value in device.items():
try:
if command_tuple[0] == value: # finds the correct value
command_keyword = str(key.split("_")[-1])
if ("CSV_command_" + command_keyword) in device: # searches for CSV command in dict
data_struct = device["CSV_command_" + command_keyword].split(",")
final_string = str(command_tuple[0]) + " " # so the key word
for i, given_orders in enumerate(command_tuple[1:]): # so if more values are given, these will be processed first
final_string += str(given_orders).upper() + "," # so the value
if len(command_tuple[1:]) <= len(data_struct): # searches if additional data needs to be added
for data in data_struct[i+1:]:
# print device[command_keyword + "_" + data]
if command_keyword + "_" + data in device:
final_string += str(device[command_keyword + "_" + data]).upper() + ","
else:
final_string += ","# if no such value is in the dict
return final_string[:-1] # because there is a colon to much in it
else:
if len(command_tuple) > 1:
return str(command_tuple[0]) + " " + str(command_tuple[1])
else:
return str(command_tuple[0])
except Exception as e:
#print e
pass
if __name__ == "__main__":
trans = transformation()
# Just for testing the transformation
a1 = [350, 250, 12]
a2 = [600, 250, 15]
a3 = [350, 400, 12]
b1 = [0, 0, 0]
b2 = [250, 0, 0]
b3 = [0, 150, 0]
T, V0 = trans.transformation_matrix(b1,b2,b3,a1,a2,a3)
print (T)
print (V0)
print (trans.vector_trans([12,23, 0], T,V0))