Source code for measurement_plugins.forge_tools

"""This file contains a class containing all tools necessary for forging your own
Measurement plugins"""

import numpy as np
from time import sleep, time
from scipy import stats

try:
    from ..utilities import build_command
except:
    pass
import logging


[docs]class tools(object): """ Some tools for forging your own measurement plugins. It needs the framework and the event_loop object """ def __init__(self, framework=None, event_loop=None): self.framework = framework self.event_loop = event_loop self.vcw = framework["VCW"] self.settings = framework["Configs"]["config"] self.queue_to_main = framework["Message_to_main"] self.toolslog = logging.getLogger(__name__)
[docs] def steady_state_check( self, device, command="get_read", max_slope=0.001, wait=0.2, samples=4, Rsq=0.95, compliance=50e-6, do_anyway=False, check_compliance=True, iteration=7, wait_time_factor=1.0 ): """ This function reads values from a device and fits a linear fit to it. If the fit exceeds a maximum slope it waits a specified time and does ot again. If the slope condition is not reached after a few attempts the function returns False. Otherwise it returns True and indicates a equilibrium has been reached. :param device: The device object which should be queried :param command: The command which should be queried :param max_slope: The maximum slope, x-axis is measured in seconds, so a slope of 1e-9 is a change of 1n per second :param wait: How long to wait between attempts :param samples: How many samples should be used :param Rsq: Minimum R^2 value :param compliance: The compliance value which the value should not exceed :param do_anyway: If the result should be ignored :param check_compliance: If the program should check the compliance value :param iteration: number of iterations :param wait_time_factor: factor the waiting time is increased with every iteration :return: Bool - True means steady state reached """ # TODO: I have the feeling that this function is not exactly doing what she is supposed to do, check! steady_state = False bad_fit = False high_error = False max_iterations = iteration if do_anyway: self.toolslog.warning( "Overwriting steady_state_check is not advised. Use with caution" ) stop = False counter = 0 # Todo: the stop signal does not work correctly here. while not steady_state: if counter > max_iterations: # If too many attempts where made self.toolslog.info( "Attempt to reach steady state was not successfully after {} times for device {}".format( max_iterations, device["Device_name"] ) ) return False counter += 1 values = np.zeros(samples) times = np.zeros(samples) comm = build_command(device, command) if compliance and check_compliance: if self.check_compliance(device, float(compliance), command=command): self.stop_measurement() return False self.toolslog.debug( "Conducting steady state check. Iteration={}...".format(counter) ) for i in range(samples): start = time() values[i] = float( str(self.vcw.query(device, comm).split()[0]).split(",")[0] ) times[i] = time() if (time() - start) <= wait*wait_time_factor: sleep(abs(time() - start - wait)*wait_time_factor) slope, intercept, r_value, p_value, std_err = stats.linregress( np.append([0], np.diff(times)), values ) self.toolslog.debug( "Slope parameters: slope={}, intercept={}, r^2={}, err={}".format( slope, intercept, r_value * r_value, std_err ) ) bad_fit = True if r_value * r_value < Rsq else False high_error = True if std_err * 2.5 > abs(slope) else False if std_err <= 1e-6 and abs(slope) <= abs(max_slope): self.toolslog.debug( "Steady state in device {} was reached with slope={} at Iteration={}".format( device["Device_name"], slope, counter ) ) if bad_fit: self.toolslog.debug( "Steady state check on device {} yielded bad fit conditions. Results may be compromised! R^2={} at iteration={}".format( device["Device_name"], round(r_value * r_value, 2), counter ) ) high_error_slope = abs(slope) + 2.5 * std_err > abs(max_slope) # If fit errors are high, the 2.5x the error is bigger as the slope and 0.5% of the maximum_error_slope is bigger as the actuall value if ( high_error and high_error_slope and abs(slope) + 2.5 * std_err > abs(intercept) ): self.toolslog.warning( "Steady state check on device {} yielded high fit error conditions. Results may be compromised! std={} at iteration={}".format( device["Device_name"], std_err, counter ) ) return True else: self.toolslog.debug( "Steady state was not reached due to high error and steep slope. Iteration={}".format( counter ) ) return False
[docs] def ramp_value_log10(self, min_value, max_value, deltasteps): """ This function takes a min and max value, deltasteps and generates a list of values in log10 format with each deltasteps values per decade :param min_value: Start value :param max_value: End Value :param deltasteps: How many steps per decade :return: List """ # Todo: from max to min is not working yet if max_value > min_value: positive = True else: positive = False # Make it linear min = np.log10([abs(min_value)])[0] max = np.log10([abs(max_value)])[0] # Find absolute delta abs_delta = abs(min - max) delta_steps = round(abs(abs_delta / deltasteps), 5) ramp_list = [min + delta_steps * step for step in range(int(deltasteps))] to_big = True while to_big and len(ramp_list) > 1: if ramp_list[-1] >= max_value: ramp_list = ramp_list[:-1] else: to_big = False if positive: ramp_list.append(max) else: ramp_list.append(min) # Now make a log skale out of it ramp_list = map(lambda x: round(10 ** x, 0), ramp_list) # if not positive: # Reverses the ramp list # ramp_list = [x for x in reversed(ramp_list)] return ramp_list
[docs] def refine_ramp(self, ramp, start, stop, step): """ Refines a ramp of values eg. for IVCV, in the beginning it makes sense to refine the ramp :param ramp: A list of values :param start: Start point of refinement (must be inside of ramp) :param stop: End point of refinement (must be inside of ramp) :param step: The step size of the refinement :return: Updated list """ if ( ramp[0] * start >= 0 and ramp[-1] * stop >= 0 and abs(ramp[0]) <= abs(start) and abs(ramp[-1]) >= abs(stop) ): # Todo: if the refined array has positive and negative values it does not work currently ramp = np.array(ramp) ref_ramp = self.ramp_value(start, stop, step) to_delete = np.logical_and(abs(ramp) >= abs(start), abs(ramp) <= abs(stop)) start_ind = np.nonzero(to_delete)[0][ 0 ] # Finds the index where I have to insert the new array del_list = ramp[~to_delete].tolist() for ind, elem in enumerate(ref_ramp): del_list.insert(start_ind + ind, elem) return del_list else: self.toolslog.error( "Refining of array not possible. Boundaries for refinement must be inside source array. Returning input array" ) return ramp
[docs] def ramp_value(self, min_value, max_value, deltaI): """ Generates a list of values in the defined stepsize between the and min/max values :param min_value: Start point of list :param max_value: End point of list :param deltaI: Stepsize between points :return: List """ deltaI = abs(deltaI) # Find out if positive or negative ramp if max_value > min_value: positive = True else: positive = False # Find absolute delta abs_delta = abs(min_value - max_value) delta_steps = round(abs(abs_delta / deltaI), 0) if positive: ramp_list = [min_value + deltaI * step for step in range(int(delta_steps))] to_big = True while to_big and len(ramp_list) > 1: if ramp_list[-1] > max_value: ramp_list = ramp_list[:-1] else: to_big = False else: ramp_list = [min_value - deltaI * step for step in range(int(delta_steps))] to_big = True while to_big and len(ramp_list) > 1: if ramp_list[-1] < max_value: ramp_list = ramp_list[:-1] else: to_big = False if len(ramp_list) > 1: if ramp_list[-1] != max_value: ramp_list.append(max_value) else: ramp_list.append(max_value) return ramp_list
[docs] def do_ramp_value( self, resource, order, voltage_Start, voltage_End, step, wait_time=0.05, compliance=None, set_value=None, ): """ This functions ramps a value from one point to another. It actually sends the commands to the device :param resource: Device object :param order: The command to set tje voltage :param voltage_Start: Start point :param voltage_End: End point :param step: Stepsize :param wait_time: Wait between values :param compliance: Compliance, if None the compliance check will be skipped :param set_value: If you want to set a value each interation in the state machine (must be a callable function) :return: True """ self.toolslog.debug("Start ramping...") voltage_End = float(voltage_End) voltage_Start = float(voltage_Start) step = float(step) wait_time = float(wait_time) voltage_step_list = self.ramp_value(voltage_Start, voltage_End, step) # Check if current bias voltage is inside this ramp and delete if necessary # bias_voltage = float(self.settings["settings"]["bias_voltage"]) # for i, voltage in enumerate(voltage_step_list): # if abs(voltage) > abs(bias_voltage): # voltage_step_list = voltage_step_list[i:] # break for voltage in voltage_step_list: self.change_value(resource, order, voltage) if set_value: set_value(voltage) if ( voltage != 0 and compliance ): # Otherwise measurement can take to long, which leads to a potential timeout error if self.check_compliance(resource, compliance): return False # self.settings["settings"]["bias_voltage"] = float(voltage) # changes the bias voltage sleep(wait_time) return True
[docs] def query_value(self, device_dict, order, value=""): """ This function simply queries a command to a device. :param device_dict: The device object :param order: The command to be send :param value: The value for the command :return: the return from the device """ command = build_command(device_dict, (order, value)) return self.vcw.query(device_dict, command)
[docs] def change_value_query( self, device_dict, order, value="", answer="1", ignore_answer=False, ): """ This function query a command to a device and waits for a return value and compares it with the answer statement, if they are the same a true is returned otherwise a None :param device_dict: Device object :param order: The command to be send :param value: The value :param answer: The answer it should have :param ignore_answer: Should the answer be ignored :return: Answer or None """ answ = "" if not ignore_answer: if type(order) == list: for com in order: command = build_command(device_dict, (com, value)) answ = self.vcw.query( device_dict, command ) # writes the new order to the device else: command = build_command(device_dict, (order, value)) answ = self.vcw.query( device_dict, command ) # writes the new order to the device answ = str(answ).strip() if answ == answer: return None else: return answ # For errorhandling it is the return from the device which was not the expected answer else: self.toolslog.critical( "Overwrite in progress in change_value_query, no check of correct answer is done!!!!" ) command = build_command(device_dict, (order, value)) self.vcw.write(device_dict, command) return None
[docs] def send_to_device(self, device_dict, command, seperate=None): """ This command just sends the command directly to the device. Warning it is not recommended to use this function. Use this function only if you must! Use change_value instead. :param device_dict: Dictionary of the device :param command: The command you want to send to the device :param seperate: if specified, the string gets split and the command will be send separately at the separator :return: None """ try: if seperate: for x in str(command).split(seperate): self.vcw.write(device_dict, str(x)) else: self.vcw.write( device_dict, str(command) ) # writes the new order to the device except Exception as e: self.toolslog.error( "Could not send {command!s} to device {device!s}, error {error!s} occured".format( command=command, device=device_dict, error=e ) )
[docs] def query_device(self, device_dict, command): """ This command just sends the command directly to the device, and waits for an answer. Warning it is not recommended to use this function. Use this function only if you must! Use query_value instead :param device_dict: Dictionary of the device :param command: The command you want to send to the device :return: Return string from the device """ try: return self.vcw.query( device_dict, str(command) ) # writes the new order to the device except Exception as e: self.toolslog.error( "Could not send {command!s} to device {device!s}, error {error!s} occured".format( command=command, device=device_dict, error=e ) )
[docs] def change_value(self, device_dict, order, value=""): """ This function simply sends a command to a device. :param device_dict: The device object :param order: The command to be send :param value: The value for the command :return: None """ command = build_command(device_dict, (order, value)) self.vcw.write(device_dict, command) return None
[docs] def check_compliance(self, device, compliance=None, command="get_read"): """ This function checks if the current compliance is reached. :param device: The device object :param compliance: The compliance value :param command: The command to check :return: Bool """ try: if compliance == None: self.toolslog.error( "No compliance set for measurement, default compliance is used! This may cause deamage to the sensor!" ) compliance = device["default_compliance"] except: self.toolslog.error("Device " + str(device) + " has no compliance set!") command = build_command(device, command) # value = float(str(self.vcw.query(device, command)).split(",")[0]) #237SMU value = str(self.vcw.query(device, command)).split("\t") if len(value) > 1: self.settings["settings"]["bias_voltage"] = str( value[1] ).strip() # changes the bias voltage if 0.0 < (abs(float(value[0])) - abs(float(compliance) * 0.99)): self.toolslog.error( "compliance reached in instrument " + str(device["Device_name"]) + " at: " + str(value[0]) + ". compliance at " + str(compliance) ) # self.queue_to_main.put({"MeasError": "Compliance reached. Value. " + str(value[0]) + " A"}) return True else: return False
[docs] def config_setup(self, device, commands=(), delay=0.0): """ This function configures the setup for a specific measurement. Commands is a list of tuples, containing (command, values) if no value is defined only command will be send :param device: The device object :param commands: A list of tuples with (command, values). If no value is defined only command will be send :param delay: The delay between each command :return: None """ for command in commands: final_string = build_command(device, command) self.vcw.write( device, str(final_string) ) # finally writes the command to the device sleep(delay)
[docs] def stop_measurement(self): """Sends a signal to the framework to stop every measurement""" self.toolslog.critical("Measurement stop sended by tools functions...") order = {"ABORT_MEASUREMENT": True} # just for now self.queue_to_main.put(order)
[docs] def capacitor_discharge( self, device_dict, relay_dict, Setterminal=None, terminal=None, do_anyway=False, ): """ This function is rather special for SQC. It checks if the capacitor of the decouple box is correctly discharged :param device_dict: Device object :param relay_dict: Relay object :param Setterminal: set terminla command :param terminal: terminal Front or back :param do_anyway: Do it anyway :return: Bool """ # Todo: this function needs clean up and generalization self.toolslog.info("Discharging capacitors...") answer_from_device = "OK" if not device_dict or not relay_dict: self.toolslog.info( "No discharge device/switching specified, skipping discharge and returning True..." ) return True # First switch the smu terminals front/rear if Setterminal: self.change_value(device_dict, Setterminal, terminal) # Set the switching for the discharge (a relay must be switched in the decouple box by applying 5V # sleep(1.) # Slow switching shit on BB all_ok = False while not all_ok: error = self.change_value_query(relay_dict, "set_discharge", "ON", answer_from_device) if error: self.queue_to_main.put( { "RequestError": "Capacitor discharged failed! Switching the discharge relay failed! Expected reply from device would be: " + str(answer_from_device) + " got " + str(error) + " instead." } ) self.toolslog.error( "Capacitor discharged failed! Switching the discharge relay failed! Expected reply from device would be: " + str(answer_from_device) + " got " + str(error) + " instead." ) all_ok = False else: all_ok = True sleep(1.0) # relay is really slow self.change_value(device_dict, "set_source_current") self.change_value(device_dict, "set_measure_voltage") self.change_value(device_dict, "set_output", "ON") counter = 0 while True: counter += 1 voltage = [] for i in range(3): command = build_command(device_dict, "get_read") voltage.append(float(self.vcw.query(device_dict, command))) if sum(voltage) / len(voltage) <= 0.3: # this is when we break the loop self.change_value(device_dict, "set_output", "OFF") self.change_value(device_dict, "set_source_voltage") self.change_value(device_dict, "set_measure_current") self.change_value(device_dict, Setterminal, "REAR") # return to default mode for this switching sleep(1.0) # Slow switching shit on BB error = self.change_value_query( relay_dict, "set_discharge", "OFF", answer_from_device ) if error: self.queue_to_main.put( { "RequestError": "Capacitor discharged failed! Switching the discharge" " relay failed! Expected reply from device would be: " + str(answer_from_device) + " got " + str(error) + " instead." } ) self.toolslog.error( "Capacitor discharged failed! Switching the discharge relay failed! Expected reply from device would be: " + str(answer_from_device) + " got " + str(error) + " instead." ) return False sleep(1.0) # relay is really slow return True else: self.queue_to_main.put( { "Info": "Capacitor discharged failed: " + str(counter) + " times, with a voltage of " + str(sum(voltage) / len(voltage)) } ) if counter >= 5: self.queue_to_main.put( { "FatalError": "The capacitor discharge failed more than 5 times. Discharge the capacitor manually!" } ) # Set output to on for reading mode command = build_command( device_dict, ("set_output", "OFF") ) # switch back to default terminal self.vcw.write( device_dict, command ) # writes the new order to the device # return to default mode for this switching error = self.change_value_query( relay_dict, "set_discharge", "OFF", "OK" ) if error: self.queue_to_main.put( { "RequestError": "Capacitor discharged failed! Switching the discharge relay failed! Expected reply from device would be: " + str("OK") + " got " + str(error) + " instead." } ) self.toolslog.error( "Capacitor discharged failed! Switching the discharge relay failed! Expected reply from device would be: " + str("OK") + " got " + str(error) + " instead." ) return False