Source code for lakeshore336.Lakeshore336

""" Driver for the Lakeshore 336 Temperature Controller.

The :class:`Lakeshore336` main class manages the interface
to the device and implements some of the available
operations through Ethernet communication. A custom
exception :class:`StateError` is used for internal
error management.
"""

# Imports
from typing import List
import configparser
from enum import Enum

# Third party
from lab_utils.custom_logging import getLogger
from lakeshore import Model336, InstrumentException


[docs]class Channel: """ Simple container to hold channel information. """ channel_id: chr = None #: The channel ID (A to D). logging: bool = False #: Data from the gauge should be recorded. label: str = None #: Label of the gauge, to be used when logging to a database. temp_limit: float = 0.0 #: Maximum sensor temperature, when it is exceeded all control outputs are shut down data: float = None #: Latest temperature readout value.
[docs] def __init__( self, channel_id: chr, logging: bool = False, label: str = None, temp_limit: float = 0.0, ): """ Initializes the :class:`Channel` object. Parameters ---------- channel_id : str Channel ID, 'A' to 'D'. logging : bool, optional Data from the channel is to be logged, default is 'False'. label : str, optional Channel label, default is 'None'. temp_limit : float, optional Temperature limit above which control outputs are disabled, default is 0.0 (none). Raises ------ :class:`ValueError` Channel property out of bounds, e.g. negative temperature limit. """ # Sanity checks if ord('A') > ord(channel_id) or ord('D') < ord(channel_id): raise ValueError('Invalid channel ID {}'.format(channel_id)) if temp_limit < 0.0 or temp_limit > 350.0: raise ValueError('Invalid temperature limit {}'.format(temp_limit)) # Assignments self.channel_id = channel_id self.logging = logging self.label = label self.temp_limit = temp_limit
[docs]class HeaterOutputMode(Enum): off = 0 closed_loop = 1 zone = 2 open_loop = 3 monitor_out = 4 warmup = 5
[docs]class HeaterRange(Enum): off = 0 low = 1 medium = 2 high = 3
[docs]class Heater: """ Simple container to hold heater information. """ # General configuration heater_id: int = None #: The heater ID (1 or 2). active: bool = False #: Heater is active resistance: int = 50 #: Heater resistance: 25 or 50 Ohm. resistance_code: int = 2 #: Heater resistance code (1 for 25 Ohm, 2 for 50 Ohm) max_current: float = 1 #: Maximum heater current. control_input: chr = 'A' #: Input channel to be used for control (in PID mode only) # Data data: float = None #: Latest heater power value. # Device state mode: HeaterOutputMode = HeaterOutputMode.off #: Heater mode (see :class:`HeaterOutputMode`) manual_output: float = 0.0 #: Manual control of the heater output range: HeaterRange = HeaterRange.off #: Heater range (see :class:`HeaterRange`) set_point: float = 0.0 #: Target set point P: int = 0 #: Proportional (gain) control parameter. I: int = 0 #: Integral (reset) control parameter. D: int = 0 #: Derivative (rate) control parameter.
[docs] def __init__( self, heater_id: int, active: bool = False, resistance: int = 50, max_current: float = 1.0, control_input: chr = 'A', ): """ Initializes the :class:`Heater` object. Parameters ---------- heater_id : int Heater ID, 1 or 2. active : bool, optional The heater is active, default is 'False' resistance : int, optional Heater resistance: 25 or 50 Ohm, default is 50 Ohm. max_current : float, optional Maximum heater current, default is 1.0 A. Raises ------ :class:`ValueError` Heater property out of bounds, e.g. negative resistance. """ # Sanity checks if heater_id not in [1, 2]: raise ValueError('Invalid heater ID {}'.format(heater_id)) if resistance not in [25, 50]: raise ValueError('Invalid heater resistance {}'.format(resistance)) if max_current < 0.0 or max_current > 1.0: raise ValueError('Invalid heater maximum current {}'.format(max_current)) if ord('A') > ord(control_input) or ord('D') < ord(control_input): raise ValueError('Invalid control input ID {}'.format(control_input)) # Assignments self.heater_id = heater_id self.active = active self.resistance = resistance self.resistance_code = int(resistance/25) self.max_current = max_current self.control_input = control_input
[docs]class LakeShore336(Model336): # noqa (ignore CamelCase convention) """ Driver implementation for the Lakeshore 336 Temperature Controller. The driver is based upon the official implementation :class:`~lakeshore.model_336.Model336` """ # Connection configuration ip_address: str = '10.42.43.200' #: The device IP address timeout: float = 1.0 #: Time-out for Ethernet connection error. # Device setup config_file: str = 'conf/lakeshore336.ini' #: Device configuration file channel_info: List[Channel] = [] #: Channel information, loaded from the configuration file. number_of_channels: int = 4 #: Available input channels. heater_info: List[Heater] = [] #: Heater information, loaded from the configuration file. number_of_heaters: int = 2 #: Available control outputs.
[docs] def __init__(self, config_file: str = None, ip_address: str = None, timeout: float = None, ): """ Initializes the :class:`Lakeshore336` object. It calls the :meth:`config` method to set up the device if a :paramref:`~LakeShore336.__init__.config_file` is given. Upon initialization, the parent driver :class:`~lakeshore.Model336` will immediately attempt to connect over TCP to the device and raise an :class:`~lakeshore.InstrumentException` otherwise. Parameters ---------- config_file : str, optional Configuration file, default is 'None'. ip_address : str, optional IP address of the device, default is 'None' timeout : float, optional Ethernet communication time out, default is 'None' Raises ------ :class:`configparser.Error` Configuration file error :class:'lakeshore.InstrumentException` Generic device error :class:`StateError` Device was in the wrong state. """ # Initialize sensor and heater lists for ch in range(self.number_of_channels): self.channel_info.append(Channel(chr(ord('A')+ch))) for ch in range(self.number_of_heaters): self.heater_info.append(Heater(ch+1)) # Load config file, if given if config_file is not None: self.config(config_file) # Assign attributes, if given # They override they configuration file if ip_address is not None: self.ip_address = ip_address if timeout is not None: self.timeout = timeout # Call the parent class initializer, which attempts connection to the device over TCP getLogger().info('Connecting to Lakeshore 336 Temperature Controller at IP {}'.format(self.ip_address)) Model336.__init__( self, ip_address=self.ip_address, timeout=self.timeout ) # Override logger self.logger = getLogger() # Print out some confirmation message getLogger().info('Connection successful. Model: {}. Serial number: {}. Firmware: {}'.format( self.model_number, self.serial_number, self.firmware_version )) # Apply configuration self.apply_config()
[docs] def query(self, query_string: str) -> str: """Override parent method due to excessive logging. Parameters ---------- query_string : str A serial query ending in a question mark. Raises ------ :class:'lakeshore.InstrumentException` Generic device error Returns ------- str The instrument query response as a string. """ # Check that a valid connection is active if self.device_tcp is None: raise InstrumentException("No connections configured") # Query the instrument over TCP. with self.dut_lock: getLogger().debug('Sending query over TCP to %s: %s', self.serial_number, query_string) response = self._tcp_query(query_string) getLogger().debug('Received response from %s: %s', self.serial_number, response) return response
[docs] def command(self, command_string: str): """Override parent method due to excessive logging. Parameters ---------- command_string : str A serial command. Raises ------ :class:'lakeshore.InstrumentException` Generic device error """ # Check that a valid connection is active if self.device_tcp is None: raise InstrumentException("No connections configured") # Query the instrument over serial. If serial is not configured, use TCP. with self.dut_lock: getLogger().debug('Sending command over TCP to %s: %s', self.serial_number, command_string) self._tcp_command(command_string)
[docs] def reconnect(self): """ Closes the TCP connection, reloads device configuration and connects to the device again. Raises ------ :class:'lakeshore.InstrumentException` Generic device error :class:`configparser.Error` Configuration file error """ self.disconnect_tcp() self.config() super().__init__( ip_address=self.ip_address, timeout=self.timeout ) self.apply_config()
[docs] def config(self, new_config_file: str = None): """ Loads the Lakeshore 336 configuration from a file. If :paramref:`~Lakeshore336.config.new_config_file` is not given, the latest :attr:`config_file` is re-loaded; if it is given and the file is successfully parsed, :attr:`config_file` is updated to the new value. Parameters ---------- new_config_file : str, optional New configuration file to be loaded. Raises ------ :class:`configparser.Error` Configuration file error """ # Save previous configuration old_channel_config = self.channel_info old_heater_info = self.heater_info # Update configuration file, if given if new_config_file is None: new_config_file = self.config_file # Try to execute, otherwise revert to previous state try: # Initialize config parser and read file getLogger().info("Loading configuration file %s", new_config_file) config_parser = configparser.ConfigParser() config_parser.read(new_config_file) # Load Ethernet communication configuration self.ip_address = config_parser.get(section='Connection', option='ip_address') self.timeout = config_parser.getfloat(section='Connection', option='timeout') # Load input channels information for ch in range(self.number_of_channels): sec_name = 'Sensor_{}'.format(chr(ord('A')+ch)) log = False lab = None temp_limit = 0.0 if config_parser.has_section(sec_name): log = config_parser.getboolean(sec_name, 'logging') lab = config_parser.get(sec_name, 'label') temp_limit = config_parser.getfloat(sec_name, 'temp_limit') getLogger().info('Found sensor %d: %s, %s, %f K', ch+1, str(log), lab, temp_limit) else: getLogger().info('%s not found', sec_name) try: self.channel_info[ch] = Channel( channel_id=chr(ord('A')+ch), logging=log, label=lab, temp_limit=temp_limit ) except ValueError as e: raise configparser.Error('{}'.format(e)) # Load control output channels information for h in range(self.number_of_heaters): # Read info sec_name = 'Heater_{}'.format(h+1) act = False res = 50 max_cur = 1.0 control_input = None if config_parser.has_section(sec_name): act = config_parser.getboolean(sec_name, 'active') res = config_parser.getint(sec_name, 'resistance') max_cur = config_parser.getfloat(sec_name, 'max_current') control_input = config_parser.get(sec_name, 'control_input') getLogger().info('Found heater %d: %d Ohm, %f A, input %s', h+1, res, max_cur, control_input) else: getLogger().info('%s not found', sec_name) # Initialize heater object try: self.heater_info[h] = Heater( heater_id=h+1, active=act, resistance=res, max_current=max_cur, ) except ValueError as e: raise configparser.Error('{}'.format(e)) # Check control input is configured AND logging if not act: continue found = False for ch in self.channel_info: if ch.channel_id == control_input and ch.logging: found = True break if not found: raise configparser.Error('Heater {} is controlled by Input {}, which is not active'.format( h+1, control_input )) # If everything worked, update local config_file for future calls self.config_file = new_config_file except Exception as e: # Restore previous configuration self.channel_info = old_channel_config self.heater_info = old_heater_info raise e
[docs] def apply_config(self): """ Applies the configuration to the device. - Input sensors. - Control outputs. Raises ------ :class:'lakeshore.InstrumentException` Generic device error """ # Configure device input channels getLogger().info('Configuring device input channels') for ch in self.channel_info: # Channel enabled? if ch.label is None: continue # Set channel label self.command('INNAME {},{}'.format(ch.channel_id, ch.label)) lab = self.query('INNAME? {}'.format(ch.channel_id)) if lab.casefold() != ch.label.casefold(): getLogger().warning( 'Mismatch of Sensor {} Label: {} (device) and {} (user)'.format(ch.channel_id, lab, ch.label) ) # Set limit temperature self.command('TLIMIT {},{}'.format(ch.channel_id, ch.temp_limit)) # Configure device output channels getLogger().info('Configuring device output channels') for h in self.heater_info: # Page 133 self.command('HTRSET {},{},{},{},{}'.format( h.heater_id, # The heater ID h.resistance_code, # Resistance code (1 for 25 Ohm, 2 for 50 Ohm) 0, # Max current mode, 0 for user-defined h.max_current, # User-defined max current 2 # Heater output display, 1 for current and 2 for power ))
[docs] def retrieve_data(self): """ Queries the device for the latest temperature and heater data. """ # Query for all temperatures resp = self.query('KRDG? 0').split(sep=',') if len(resp) != self.number_of_channels: raise InstrumentException('Invalid length ({}) of temperature query response'.format(len(resp))) for t, ch in zip(resp, self.channel_info): if ch.logging: try: ch.data = float(t) except ValueError: getLogger().warning('Invalid temperature string for sensor {}: {}'.format(ch.label, resp)) ch.data = None # Query for heater outputs for h in self.heater_info: if h.active: resp = self.query('HTR? {}'.format(h.heater_id)) try: h.data = float(resp) except ValueError: getLogger().warning('Invalid power output string for heater {}: {}'.format(h.heater_id, resp)) h.data = None
[docs] def update(self): pass