Source code for mjolnir.instruments.logical_instruments

from __future__ import annotations

import atexit
import importlib
import time
import warnings
from collections import deque
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import Any, Literal, Mapping, MutableMapping, Sequence, TypeVar
from unittest import mock

import git
import numpy as np
import numpy.typing as npt
from qcodes.instrument import ChannelTuple, Instrument, InstrumentBase
from qcodes.instrument.delegate import DelegateInstrumentChannel
from qcodes.instrument_drivers.oxford import OxfordMercuryiPS, OxfordTriton
from qcodes.parameters import (DelegateParameter, GroupedParameter, ManualParameter, Parameter,
                               ParameterBase, create_on_off_val_mapping)
from qcodes.station import Station
from qcodes.utils.attribute_helpers import strip_attrs
from qcodes.validators import validators
from qcodes_contrib_drivers.drivers.Andor.Andor_iDus4xx import AndorIDus4xx
from qcodes_contrib_drivers.drivers.Attocube.AMC100 import AttocubeAMC100
from qcodes_contrib_drivers.drivers.Aviosys import IP_Power_9258S
from qcodes_contrib_drivers.drivers.Horiba.Horiba_FHR import HoribaFHR
from qcodes_contrib_drivers.drivers.LighthousePhotonics.Lighthouse_Photonics_Sprout_G import (
    LighthousePhotonicsSproutG
)
from qcodes_contrib_drivers.drivers.M2.M2_Solstis_3 import M2Solstis3
from qcodes_contrib_drivers.drivers.QDevil import QDAC2
from qcodes_contrib_drivers.drivers.SwabianInstruments.Swabian_Instruments_Time_Tagger import (
    TimeTagger, tt)
from qcodes_contrib_drivers.drivers.Thermotek.Thermotek_T255p import ThermotekT255p
from qcodes_contrib_drivers.drivers.Thorlabs.Thorlabs_K10CR1.kinesis import ThorlabsK10CR1
from qcodes_contrib_drivers.drivers.Thorlabs.Thorlabs_MFF10x.kinesis import ThorlabsMFF10x
from qcodes_contrib_drivers.drivers.Thorlabs.Thorlabs_PM100D.tlpm import ThorlabsPM100D
from qutil import const, functools, io, itertools, typecheck, ui

from .physical_instruments import BreakoutBox, mercury_ips_limits
from ..calibration import CcdCalibrationHandler, PowerCalibrationHandler, RejectionFeedbackHandler
from ..helpers import (make_dc_constant_V_blocking, make_dc_constant_V_nonblocking,
                       update_qdac2_line_label)
from ..parameters import (DelegateParameterWithSetSideEffect, DelegateParameterWithUncertainty,
                          DelegatePowerParameter, MembraneGateParameterContext, PowerParameter,
                          with_underlying_instrument)

_DelegateInstrumentChannelT = TypeVar('_DelegateInstrumentChannelT',
                                      bound=DelegateInstrumentChannel)
_InstrumentT = TypeVar('_InstrumentT', bound=InstrumentBase)


[docs] class Trap(DelegateInstrumentChannel): pass
[docs] class LogicalInstrument(Instrument): """A logical grouping of instruments that behaves like an Instrument.""" def __init__(self, name: str, station: Station, metadata: Mapping[Any, Any] | None = None, label: str | None = None) -> None: super().__init__(name, metadata=metadata, label=label) self._station = station def _initialize_instrument(self, name: str, station: Station, recreate: bool, bind_to_instrument: bool = True) -> _InstrumentT | None: instrument_type = station.config['instruments'][name]['type'] module = importlib.import_module('.'.join(instrument_type.split('.')[:-1])) instrument_class = getattr(module, instrument_type.split('.')[-1]) if recreate: if name in station.components: station.close_and_remove_instrument(name) elif Instrument.exist(name, instrument_class): Instrument.find_instrument(name, instrument_class).close() try: instrument = station.load_instrument(name, revive_instance=True, update_snapshot=False) except KeyError: instrument = station.components.get(name) except Exception as error: if io.query_yes_no(f'Failed to load {name}:\n{str(error)}.\n\nContinue anyway?'): print(f'Continuing. {self.name} might be in an inconsistent state until {name} is ' 'connected.') return else: raise if bind_to_instrument: self.add_submodule(name, instrument) return instrument def _initialize_delegate_instrument_channel( self, name: str, station: Station, cls: type[DelegateInstrumentChannel] = DelegateInstrumentChannel, parameters: (Mapping[str, Sequence[str]] | Mapping[str, str]) | None = None, channels: (Mapping[str, Mapping[str, Any]] | Mapping[str, str]) | None = None, grouped_parameter_names: Mapping[str, Sequence[str] | str | None] | None = None, grouped_parameter_class: type[ParameterBase] | None = GroupedParameter, grouped_parameter_kwargs: Mapping[str, Any] | None = None, initial_values: Mapping[str, Any] | None = None, set_initial_values_on_load: bool = False, setters: Mapping[str, MutableMapping[str, Any]] | None = None, units: Mapping[str, str] | None = None, metadata: Mapping[Any, Any] | None = None, recreate: bool = False, bind_to_instrument: bool = True, **kwargs ) -> _DelegateInstrumentChannelT: # Make sure the physical instrument has been loaded physical_instruments = [] for value in parameters.values(): if isinstance(value, str): inst = self._initialize_instrument(value.split('.')[0], station, recreate, bind_to_instrument=False) physical_instruments.append(inst) elif isinstance(value, Sequence): for v in value: inst = self._initialize_instrument(v.split('.')[0], station, recreate, bind_to_instrument=False) physical_instruments.append(inst) if len(set(physical_instruments)) > 1: warnings.warn('More than one physical instrument detected in parameters for ' f'{cls} {name}. Choosing the first.', UserWarning, stacklevel=2) with mock.patch.object(cls, 'param_cls', with_underlying_instrument(cls.param_cls, physical_instruments[0])): channel = cls(self, name, station=station, parameters=parameters, channels=channels, grouped_parameter_names=grouped_parameter_names, grouped_parameter_class=grouped_parameter_class, grouped_parameter_kwargs=grouped_parameter_kwargs, initial_values=initial_values, set_initial_values_on_load=set_initial_values_on_load, setters=setters, units=units, metadata=metadata, **kwargs) if bind_to_instrument: self.add_submodule(name, channel) return channel def __repr__(self) -> str: submodules = ", ".join(self.submodules.keys()) return f"<{type(self).__name__}(name={self.name}, submodules={submodules})>"
[docs] def get_idn(self) -> dict[str, str | None]: return {'vendor': 'Odin', 'model': self.name, 'serial': None, 'firmware': git.Repo(__file__, search_parent_directories=True).head.object.hexsha}
[docs] def close(self, close_physical_instruments: bool = False) -> None: """Overrides instrument close to offer the possibility to close child instruments.""" def recurse(instrument): try: self._station.close_and_remove_instrument(instrument) except (KeyError, AttributeError): try: return recurse(instrument.parent) except AttributeError: return if close_physical_instruments: for instrument in self.instrument_modules.values(): recurse(instrument) super().close() else: # Do not call the super() method here because it strips stuff from # submodules. So just copied the part of the parent code that doesn't. if hasattr(self, "connection") and hasattr(self.connection, "close"): self.connection.close() strip_attrs(self, whitelist=["_short_name"]) self.remove_instance(self)
[docs] class ExcitationPath(LogicalInstrument): """Abstracts all instruments that are part of the excitation path.""" _power_calibration_handler: PowerCalibrationHandler _rejection_feedback_handler: RejectionFeedbackHandler chiller: ThermotekT255p | None pump_laser: LighthousePhotonicsSproutG | None laser: M2Solstis3 | None powermeter: ThorlabsPM100D | None nd_filter: ThorlabsK10CR1 | None shutter: ThorlabsMFF10x | None rotators: AttocubeAMC100 | None white_light: IP_Power_9258S.PowerChannel | None def __init__(self, name: str, station: Station, white_light_instrument_channel: str = 'A', recreate_physical_instruments: bool = False, metadata: Mapping[Any, Any] | None = None, label: str | None = None) -> None: super().__init__(name, station=station, metadata=metadata, label=label) self.chiller = self._initialize_instrument('chiller', self._station, recreate_physical_instruments) self.pump_laser = self._initialize_instrument('pump_laser', self._station, recreate_physical_instruments) self.laser = self._initialize_instrument('laser', self._station, recreate_physical_instruments) self.powermeter = self._initialize_instrument('powermeter', self._station, recreate_physical_instruments) self.nd_filter = self._initialize_instrument('nd_filter', self._station, recreate_physical_instruments) self.shutter = self._initialize_instrument('shutter', self._station, recreate_physical_instruments) self.rotators = self._initialize_instrument('rotators', self._station, recreate_physical_instruments) self.white_light = self._initialize_delegate_instrument_channel( 'white_light', self._station, parameters={'enabled': f'powerswitch.{white_light_instrument_channel}.power_enabled'}, recreate=recreate_physical_instruments ) # Disable white light and laser+chiller when exiting atexit.register(functools.partial(self.white_light.enabled, False)) atexit.register(functools.partial(self.pump_laser.output_mode, 'idle')) self.wavelength = self.add_parameter( 'wavelength', with_underlying_instrument(DelegateParameterWithSetSideEffect, self.laser), source=self.laser.wavelength_m, set_side_effect=self._wavelength_set_side_effect, execute_before=False, run_side_effect_on_instantiation=False, bind_to_instrument=True, label='Excitation wavelength' ) """The cw-laser's wavelength.""" self.wavelength_constant_power = self.add_parameter( 'wavelength_constant_power', with_underlying_instrument(Parameter, self.laser), get_cmd=self.wavelength.get, set_cmd=self._set_wavelength_constant_power, unit=self.wavelength.unit, label='Excitation wavelength power-locked' ) """The cw-laser's wavelength, where on each set the power is retuned either to the value of the previous point, or to the value of :attr:`wavelength_constant_power_setpoint`.""" self.power = self.add_parameter( 'power', with_underlying_instrument(PowerParameter, self.powermeter), get_cmd=self._get_power, set_cmd=self._set_power, unit='W', label='Power at optical head' ) """The power at the optical head. The setter inverts the power calibration handled by :attr:`power_calibration_handler`. """ self.wavelength_constant_power_setpoint = self.add_parameter( 'wavelength_constant_power_setpoint', ManualParameter, vals=validators.Numbers(0), unit=self.power.unit, label='Excitation wavelength power setpoint' ) """The power setpoint used for :attr:`wavelength_constant_power`.""" self.rejection_feedback = self.add_parameter( 'rejection_feedback', ManualParameter, val_mapping=create_on_off_val_mapping(), label='Active excitation rejection feedback', initial_value=False ) """If active excitation rejection is on.""" self.power_calibration_update_mode = self.add_parameter( 'power_calibration_update_mode', ManualParameter, vals=validators.Enum('full', 'fast'), initial_value='fast' ) """The power calibration update mode. - 'full' performs a sweep of the ND-filter's angle to obtain a calibration function power(angle). - 'fast' uses noisy optimization to minimize the difference between the current power and the setpoint. """ self.power_calibration_strategy = self.add_parameter( 'power_calibration_strategy', ManualParameter, vals=validators.Enum('sequential', 'adaptive'), initial_value='sequential' ) """The power calibration update strategy. Only applies to calibration mode 'full'. - 'sequential' performs a equidistant sweep between bounds. - 'adaptive' uses ``adaptive`` to perform adaptive sampling. """ self.power_calibration_oldest_datetime = self.add_parameter( 'power_calibration_oldest_datetime', ManualParameter, initial_value=datetime(1, 1, 1) ) """Oldest power calibration date.""" self.power_calibration_max_age = self.add_parameter( 'power_calibration_max_age', DelegateParameter, source=self.power_calibration_oldest_datetime, vals=validators.Numbers(), get_parser=lambda x: (datetime.now() - x).total_seconds(), set_parser=lambda x: datetime.now() - timedelta(seconds=x), unit='s', bind_to_instrument=True ) """Oldest power calibration age.""" self.power_n_avg = self.add_parameter( 'power_n_avg', ManualParameter, set_parser=int, initial_value=25 ) """Number of samples to average power over.""" self.power_calibration_tolerance = self.add_parameter( 'power_calibration_tolerance', ManualParameter, initial_value=0.10, label='Relative power calibration tolerance' ) """Relative tolerance of power setting. Used to determine whether power is satisfactorily close to the desired value. """ self.power_at_sample = self.add_parameter( 'power_at_sample', with_underlying_instrument(DelegatePowerParameter, self.powermeter), source=self.power, # Transmission is ~87%, reflection ~5.8% @ 795 nm & vertical polarization scale=15, bind_to_instrument=True, label='Power at sample' ) """The power at the sample.""" self.power_at_sample_avg = self.add_parameter( 'power_at_sample_avg', with_underlying_instrument(DelegateParameterWithUncertainty, self.powermeter), source=self.power_at_sample, n_avg_callback=self.power_n_avg, get_context=self.open_shutter, bind_to_instrument=True, label='Average power at sample' ) """A tuple of (mean, std) of average power at the sample.""" self.active_light_source = self.add_parameter( 'active_light_source', Parameter, get_cmd=self._get_active_light_source, set_cmd=self._set_active_light_source, vals=validators.Enum('laser', 'white_light', 'white light', 'both', None), label='Active light source' ) """The currently active light source.""" def _wavelength_set_side_effect(self, param, old_val, val: float): self.acquire_laser_lock(val, show_progress=False) self.wavelength_constant_power.cache.invalidate() self.powermeter.wavelength.set(val) if self.rejection_feedback(): self.rejection_feedback_handler.calibrate(val, old_val) def _set_wavelength_constant_power(self, val: float): if self.wavelength_constant_power_setpoint.cache.valid: power_setpoint = self.wavelength_constant_power_setpoint() else: with self.open_shutter(): power_setpoint = self.wait_for_power_to_settle().mean() self.wavelength.set(val) self.power.set(power_setpoint) def _get_power(self) -> float: with self.open_shutter(): return self.powermeter.power() def _set_power(self, val: float): return self.power_calibration_handler.update_calibration(target_power=val) def _get_active_light_source(self) -> str | None: white_light_enabled = self.white_light.enabled() laser_enabled = self.pump_laser.enabled() and self.shutter.position() == 'open' if laser_enabled and white_light_enabled: return 'both' if laser_enabled: return 'laser' if white_light_enabled: return 'white_light' return None def _set_active_light_source(self, src: str): if src in ('laser', 'both'): self.enable_laser() if src in ('white_light', 'white light', 'both'): self.white_light.enabled(True) if src not in ('white_light', 'white light', 'both'): self.white_light.enabled(False) if src not in ('laser', 'both'): self.shutter.position('close') @property def power_calibration_handler(self) -> PowerCalibrationHandler: """The power calibration handler.""" try: return self._power_calibration_handler except AttributeError: raise RuntimeError('Please initialize power calibration using ' 'ExcitationPath.initialize_power()') from None @power_calibration_handler.setter def power_calibration_handler(self, handler): self._power_calibration_handler = handler self.metadata['power_calibration_handler'] = handler @property def rejection_feedback_handler(self) -> RejectionFeedbackHandler: """The excitation rejection handler.""" try: return self._rejection_feedback_handler except AttributeError: raise RuntimeError('Please initialize rejection feedback using ' 'ExcitationPath.initialize_rejection_feedback()') from None @rejection_feedback_handler.setter def rejection_feedback_handler(self, handler): self._rejection_feedback_handler = handler self.metadata['rejection_feedback_handler'] = handler
[docs] def enable_laser(self, shutter: str = 'open', block: bool = True, show_progress: bool = True): """Turn on the pump laser. Parameters ---------- shutter : When done, leave the laser shutter open or closed. block : Block the interpreter during ramping. show_progress : Show a progressbar during ramping. """ if self.pump_laser.enabled(): assert self.chiller.enabled() self.shutter.position(shutter) return if io.query_yes_no('Are you in the lab?', default='no'): self.chiller.enabled(True) if block: self.pump_laser.ramp_up(show_progress=show_progress) else: self.pump_laser.enabled(True) self.shutter.position(shutter) else: raise RuntimeError('Safety first! Aborted.')
[docs] def disable_laser(self, shutter: str = 'close'): """Turn of the pump laser and chiller.""" self.pump_laser.enabled(False) self.chiller.enabled(False) self.shutter.position(shutter)
[docs] def initialize_laser(self, shutter: str = 'close', block: bool = True, show_progress: bool = True, wavelength: float | None = None): """Initialize the lasers. Parameters ---------- shutter : When done, leave the laser shutter open or closed. block : Block the interpreter during ramping. show_progress : Show a progressbar during ramping. wavelength : Set the cw laser's wavelength. """ self.enable_laser(shutter, block, show_progress) if wavelength is not None: self.acquire_laser_lock(wavelength, show_progress=show_progress) # Update cache self.wavelength.get()
[docs] def initialize_power(self, logical_station: Station, oldest_calibration_datetime: datetime | None = None, update_calibration: bool | Literal['full', 'fast'] = False, averaging_time: float = 1e-3, **calibration_kwargs): """Initialize the power calibration. Parameters ---------- logical_station : The station hosting the logical instruments. oldest_calibration_datetime : Oldest allowed datetime for the calibration. Defaults to the value of :attr:`power_calibration_oldest_datetime`. update_calibration : Update the calibration on initalization. averaging_time : The power meter's averaging time to set. **calibration_kwargs : Kwargs passed on to :class:`~mjolnir.calibration.PowerCalibrationHandler`. """ self.power_calibration_handler = PowerCalibrationHandler(logical_station, **calibration_kwargs) self.powermeter.averaging_time(averaging_time) self.nd_filter.jog_mode('SingleStep') self.nd_filter.stop_mode('Profiled') # [0, 360) degrees self.nd_filter.set_rotation_modes('RotationalWrapping', 'Quickest') if not self.nd_filter.is_homed(): with self.close_shutter(): self.log.info('Homing ND filter.') self.nd_filter.home() if oldest_calibration_datetime is not None: self.power_calibration_oldest_datetime.set(oldest_calibration_datetime) if update_calibration is True: update_calibration = self.power_calibration_update_mode() if update_calibration: self.initialize_laser() with self.power_calibration_update_mode.set_to(update_calibration): self.power_calibration_handler.update_calibration()
[docs] def initialize_rejection_feedback(self, logical_station, **kwargs): """Initialize the excitation rejection handler. Parameters ---------- logical_station : The station hosting the logical instruments. **kwargs : See :class:`~mjolnir.calibration.RejectionFeedbackHandler`. """ self.rejection_feedback_handler = RejectionFeedbackHandler(logical_station, **kwargs)
[docs] def acquire_laser_lock(self, wavelength: float, max_attempts: int = 10, show_progress: bool = True): """Acquire the laser wavelength-lock. Parameters ---------- wavelength : The wavelength to set. max_attempts : The maximum number of attempts for acquiring the lock. show_progress : Show a progressbar. """ if self.laser.lock() and round(self.laser.wavelength_m(), 3) == round(wavelength, 3): return for i in ui.progressbar(itertools.count(start=1), unit=' attempts', desc='Obtaining lock', disable=not show_progress): self.laser.lock(True) self.laser.wavelength_m(wavelength) time.sleep(1) if self.laser.lock() or i > max_attempts: break else: warnings.warn(f'Could not set laser lock after {i} attempts.', UserWarning) # Update cache self.wavelength.get()
[docs] def wait_for_power_to_settle(self, slope: float = 1e-2, n_samples: int | None = None) -> npt.NDArray[float]: """Block the interpreter until the power readings stabilize. Parameters ---------- slope : The maximum slope of the fitted line to accept. n_samples : The number of rolling samples to use for fitting. Returns ------- data : The last n_samples data points acquired. """ def current_slope(): if len(data) >= n_samples: data.popleft() data.append(self.power()) poly = np.polynomial.Polynomial.fit(np.linspace(0, 1, len(data)), np.array(data) / np.mean(data), deg=[0, 1]) return poly.coef[1] n_samples = self.power_n_avg() if n_samples is None else n_samples data = deque(self.power() for _ in range(n_samples)) while abs(current_slope()) > slope: continue return np.array(data)
[docs] @contextmanager def open_shutter(self): """Temporarily open the shutter and restore the previous state.""" with self.shutter.position.set_to('open', allow_changes=True): yield
[docs] @contextmanager def close_shutter(self): """Temporarily close the shutter and restore the previous state.""" with self.shutter.position.set_to('close', allow_changes=True): yield
[docs] def switch_active_light_source(self): """Switches between laser and white light, does nothing else.""" if self.active_light_source() == 'laser': self.active_light_source('white_light') elif self.active_light_source() == 'white_light': self.active_light_source('laser')
[docs] class DetectionPath(LogicalInstrument): """Abstracts all instruments that are part of the detection path.""" _ccd_calibration_handler: CcdCalibrationHandler ccd: AndorIDus4xx | None spectrometer: HoribaFHR | None tagger: TimeTagger | None def __init__(self, name: str, station: Station, recreate_physical_instruments: bool = False, metadata: Mapping[Any, Any] | None = None, label: str | None = None) -> None: super().__init__(name, station=station, metadata=metadata, label=label) self.ccd = self._initialize_instrument('ccd', self._station, recreate_physical_instruments) self.spectrometer = self._initialize_instrument('spectrometer', self._station, recreate_physical_instruments) self.tagger = self._initialize_instrument('tagger', self._station, recreate_physical_instruments) self.ccd_calibration_oldest_datetime = self.add_parameter( 'ccd_calibration_oldest_datetime', ManualParameter, initial_value=datetime(1970, 1, 3) ) """Oldest CCD calibration date.""" self.ccd_calibration_max_age = self.add_parameter( 'ccd_calibration_max_age', DelegateParameter, source=self.ccd_calibration_oldest_datetime, vals=validators.Numbers(), get_parser=lambda x: (datetime.now() - x).total_seconds(), set_parser=lambda x: datetime.now() - timedelta(seconds=x), unit='s', bind_to_instrument=True ) """Oldest CCD calibration age.""" self.ccd_calibration_update_mode = self.add_parameter( 'ccd_calibration_update_mode', ManualParameter, vals=validators.Enum('full', 'fast', 'dirty', None), initial_value='dirty' ) """The CCD calibration update mode. - 'full' performs a sweep of the laser's wavelength to obtain a calibration function wavelength(pixel). - 'fast' only measures the central wavelength. - 'dirty' simply uses the current calibration and shifts it by the change in wavelength. - None does not perform any recalibration. """ self.ccd_horizontal_axis_scale = self.add_parameter( 'ccd_horizontal_axis_scale', Parameter, set_cmd=self._set_ccd_horizontal_axis, vals=validators.Enum('energy', 'wavelength', None), initial_value='energy', label='Horizontal axis calibration units' ) """Set the unit of calibration for the horizontal axis of the CCD.""" self.central_wavelength = self.add_parameter( 'central_wavelength', with_underlying_instrument(Parameter, self.spectrometer), get_cmd=self._get_central_wavelength, set_cmd=self._set_central_wavelength, scale=1_000, unit='nm', label='Central detection wavelength' ) """The spectrometer grating central wavelength.""" self.bandwidth = self.add_parameter( 'bandwidth', with_underlying_instrument(DelegateParameter, self.spectrometer), source=self.spectrometer.slits.side_exit_slit.position, # dynamic, so cannot use scale get_parser=lambda x: x * self.dispersion, set_parser=lambda x: x / self.dispersion, initial_cache_value=0, unit='nm', label='Monochromator bandwidth', bind_to_instrument=True ) """The wavelength bandwidth of the monochromator.""" self.active_grating = self.add_parameter( 'active_grating', with_underlying_instrument(DelegateParameterWithSetSideEffect, self.spectrometer), source=self.spectrometer.active_grating, set_side_effect=self._grating_set_side_effect, execute_before=False, label='Active grating', snapshot_get=False, bind_to_instrument=True ) """The currently active grating.""" self.active_detection_path = self.add_parameter( 'active_detection_path', with_underlying_instrument(Parameter, self.spectrometer), get_cmd=self.spectrometer.mirrors[0].position.cache.get, set_cmd=self.spectrometer.mirrors[0].position.set, val_mapping={'ccd': 'front', 'apd': 'side'}, post_delay=5, label='Active detection path' ) """The currently active detection path.""" @property def ccd_calibration_handler(self) -> CcdCalibrationHandler: """The :class:`~..calibration.CCDCalibrationHandler`.""" try: return self._ccd_calibration_handler except AttributeError: raise RuntimeError('Please initialize the CCD calibration using ' 'DetectionPath.initialize_ccd()') from None @ccd_calibration_handler.setter def ccd_calibration_handler(self, handler): self._ccd_calibration_handler = handler self.metadata['ccd_calibration_handler'] = handler
[docs] def initialize_spectrometer(self, detection_path: Literal['ccd', 'apd'] = 'ccd', grating: str | int = 600, central_wavelength: float = 815.0, entrance_slit_width: float = 50): """Initialize the spectrometer. Parameters ---------- detection_path : The active detection path to set. grating : The spectrometer grating to use. central_wavelength : The grating's central wavelength to set. entrance_slit_width : The width of the entrance slit. """ self.active_detection_path(detection_path) self.spectrometer.slits.side_exit_slit.init() self.spectrometer.slits.front_entrance_slit.init() self.spectrometer.slits.front_entrance_slit.position(entrance_slit_width) # Cannot use self.central_wavelength at initialization since active grating is unknown self.spectrometer.gratings.get_channel_by_name(f'grating_{grating}').position( central_wavelength * 1e3 ) self.central_wavelength.cache.set(central_wavelength)
[docs] @typecheck.check_literals def initialize_ccd(self, logical_station: Station, cool: bool = True, temperature: float = -70, block: bool = True, target: Literal['reached', 'stabilized'] = 'reached', cooler_mode: Literal['maintain', 'return'] = 'maintain', horizontal_axis: Literal['wavelength', 'energy'] = 'energy', oldest_calibration_datetime: datetime | None = None, show_progress: bool = True, **calibration_kwargs): """Initialize the CCD. Parameters ---------- logical_station : The station hosting the logical instruments. cool : Turn on the thermoelectric cooler of the CCD. temperature : The target temperature when cooling the CCD. block : Block the interpreter during cooling. target : The cooling target to reach. cooler_mode : The CCD cooler mode. horizontal_axis : Calibrate the CCD pixels in terms of wavelength or energy. oldest_calibration_datetime : Oldest CCD calibration date to accept before recalibrating. show_progress : Show a progressbar during cooling. **calibration_kwargs : Kwargs passed on to :class:`~..calibration.CCDCalibrationHandler`. """ self.ccd_calibration_handler = CcdCalibrationHandler(logical_station, **calibration_kwargs) if cool: self.ccd.set_temperature(temperature) if block: self.ccd.cool_down(int(temperature), target, show_progress) else: self.ccd.cooler('On') if oldest_calibration_datetime is not None: self.ccd_calibration_oldest_datetime.set(oldest_calibration_datetime) self.ccd.cooler_mode(cooler_mode) self.ccd_horizontal_axis_scale(horizontal_axis) self.ccd.cosmic_ray_filter_mode(False) self.ccd.acquisition_mode('single scan') self.ccd.read_mode('full vertical binning') self.ccd.exposure_time(0.2) self.ccd.accumulation_cycle_time(0.) self.ccd.kinetic_cycle_time(0.) self.ccd.preamp_gain(1.0) self.ccd.horizontal_shift_speed(0.1) self.ccd.vertical_shift_speed(self.ccd.fastest_recommended_vertical_shift_speed()) self.ccd.get_acquisition_timings() self.ccd.atmcd64d.get_size_of_circular_buffer()
[docs] def initialize_tagger(self, trigger_level: float = 1.125): """Initialize the Time Tagger. Parameters ---------- trigger_level : The trigger level in volts to set for all channels. """ for channel in self.tagger.api.getChannelList(tt.ChannelEdge_Rising): self.tagger.api.setTriggerLevel(channel, trigger_level)
@property def dispersion(self) -> float: """Wavelength resolution in nm per μm.""" try: = abs(np.diff(self.ccd_calibration_handler.calibration.linspace(2)[1]).item()) dx = self.ccd.detector_size.get_latest()[0] return / dx except RuntimeError: # calibration not initialized. return 1 def _grating_set_side_effect(self, param, cache_val, val): if param.source.set_parser(val) != cache_val: self.ccd.background.cache.invalidate() def _ccd_pixel_wavelength_parser(self, px): return self.ccd_calibration_handler.calibration(px) def _get_central_wavelength(self) -> float: if (active_grating := self.spectrometer.active_grating()) is None: raise ValueError('No active grating. Please perform a grating move first.') return active_grating.position.get() def _set_central_wavelength(self, val: float): if (active_grating := self.spectrometer.active_grating()) is None: raise ValueError('No active grating. Please perform a grating move first.') return active_grating.position.set(val) def _set_ccd_horizontal_axis(self, val: Literal['energy', 'wavelength', None]): if val == 'wavelength': self.ccd.horizontal_axis.get_parser = self._ccd_pixel_wavelength_parser self.ccd.horizontal_axis.unit = 'nm' elif val == 'energy': self.ccd.horizontal_axis.get_parser = functools.chain( self._ccd_pixel_wavelength_parser, functools.scaled(1e+9)(const.lambda2eV) ) self.ccd.horizontal_axis.unit = 'eV' elif val is None: self.ccd.horizontal_axis.get_parser = None self.ccd.horizontal_axis.unit = 'px'
[docs] class Sample(LogicalInstrument): """An abstraction of a DUT.""" dac: QDAC2.QDac2 | None bob: BreakoutBox | None fridge: OxfordTriton | None magnet: OxfordMercuryiPS | None def __init__( self, name: str, station: Station, recreate_physical_instruments: bool = False, metadata: Mapping[Any, Any] | None = None, label: str | None = None ) -> None: super().__init__(name, station=station, metadata=metadata, label=label) self.dac = self._initialize_instrument('dac', self._station, recreate=recreate_physical_instruments) self.bob = self._initialize_instrument( 'bob', self._station, recreate=recreate_physical_instruments, # Avoid unnecessary snapshot clutter; BoB setting is stored in # self.dc_lines.bob_filter bind_to_instrument=False ) self.fridge = self._initialize_instrument('fridge', self._station, recreate=recreate_physical_instruments) self.magnet = self._initialize_instrument('magnet', self._station, recreate=recreate_physical_instruments) if self.fridge is not None: self.fridge.T8.label = 'MXC temperature' self.fridge.T13.label = 'Magnet temperature' if self.magnet is not None: self.magnet.set_new_field_limits(mercury_ips_limits) # Just to be safe, add validators self.magnet.GRPX.field_ramp_rate.add_validator(validators.Numbers(0, 0.125/60)) self.magnet.GRPZ.field_ramp_rate.add_validator(validators.Numbers(0, 0.250/60)) self.magnet.GRPX.field_ramp_rate(50e-3/60) self.magnet.GRPZ.field_ramp_rate(100e-3/60)
[docs] def initialize_dac(self, reset_voltages: bool = False, output_range: Literal['low', 'high'] | float = None, output_filter: Literal['dc', 'med', 'high'] | None = None, measurement_range: Literal['low', 'high'] | None = None, dc_slew_rate_V_per_s: float | None = None, step: float | None = None, step_delay: float | None = None): """Initialize the QDAC-II connected to the device. Parameters ---------- reset_voltages : Set voltages to zero before doing anything. output_range : Output range of the DAC channels. output_filter : Output filter of the DAC channels. measurement_range : Measurement range of the DAC channels. dc_slew_rate_V_per_s : Slew rate in V/s of the DAC channels. step : Voltage step size for each set. step_delay : Delay after each step. """ if reset_voltages and io.query_yes_no('Really reset voltages?', default='no'): self.dc_lines.dc_constant_V(0) if output_range is not None: if not all(voltage == 0 for voltage in self.dc_lines.dc_constant_V.get()): raise AssertionError('Did not reset output range because not all voltages were 0.') self.dc_lines.output_range(output_range) if output_filter is not None: self.dc_lines.output_filter(output_filter) if measurement_range is not None: self.dc_lines.measurement_range(measurement_range) if dc_slew_rate_V_per_s is not None: if not all(voltage == 0 for voltage in self.dc_lines.dc_constant_V.get()): raise AssertionError('Did not change slew rate because not all voltages were 0.') self.dc_lines.dc_slew_rate_V_per_s(dc_slew_rate_V_per_s) if step is not None: for line in self.dc_lines: line.dc_constant_V.step = step if step_delay is not None: for line in self.dc_lines: line.dc_constant_V.post_delay = step_delay
@functools.cached_property def gates(self) -> dict[str, Parameter]: """All DC voltages connected to gates.""" return {line.label: line.dc_constant_V for line in self.dc_lines}
[docs] class TrapSample(Sample): """An abstraction of a membrane exciton trap sample.""" def __init__( self, name: str, station: Station, trap_channel_mapping: Mapping[int, Mapping[ Literal['central', 'guard'], Mapping[ Literal['top', 'bottom'], tuple[int, int, str] ] ]], output_triggers: dict[str, int], recreate_physical_instruments: bool = False, metadata: Mapping[Any, Any] | None = None, label: str | None = None ) -> None: super().__init__(name, station=station, recreate_physical_instruments=recreate_physical_instruments, metadata=metadata, label=label) station.dac.free_all_triggers() # TODO: possibly move more stuff to Sample base class self._initialize_dc_lines(trap_channel_mapping or {}) self._initialize_traps(trap_channel_mapping, output_triggers) self.blocking_ramps = self.add_parameter( 'blocking_ramps', Parameter, set_cmd=self._set_blocking_ramps, vals=validators.Bool(), initial_value=True ) """Replace the dc_constant_V param with one that waits for the voltage to settle at a given threshold.""" self.active_trap = self.add_parameter( 'active_trap', ManualParameter, set_parser=self._active_trap_parser, label='Active trap' ) def _initialize_dc_lines(self, channel_mapping): dc_lines = [] for gate_kwargs in channel_mapping.values(): for gate_type, gates in gate_kwargs.items(): for side, (dac_ch, bob_ch, bob_setting) in gates.items(): channel = self.dac.get_component(f'ch{dac_ch:02d}') if 'bob_setting' not in channel.parameters: channel.add_parameter( 'bob_setting', parameter_class=with_underlying_instrument(DelegateParameter, self.bob), source=self.bob.get_component(f'ch{bob_ch:02d}').setting, label='Breakout Box setting', initial_value=bob_setting ) if 'bob_filter_RC' not in channel.parameters: channel.add_parameter( 'bob_filter_RC', parameter_class=with_underlying_instrument(DelegateParameter, self.bob), source=self.bob.get_component(f'ch{bob_ch:02d}').filter_RC, label='Breakout Box filter RC constant' ) # Make labels of channel parameters reflect gate names update_qdac2_line_label(channel, gate_type, side) dc_lines.append(channel) self.add_submodule('dc_lines', ChannelTuple(self, 'dc_lines', QDAC2.QDac2Channel, dc_lines, snapshotable=False)) def _initialize_traps(self, channel_mapping, output_triggers): traps = [] if channel_mapping is not None: for index, gate_kwargs in channel_mapping.items(): trap = self._initialize_delegate_instrument_channel( f'trap_{index}', self._station, cls=Trap, # The physical parameters that will actually be varied parameters={ gate_type: [ f"dac.ch{gates['top'][0]:02d}.dc_constant_V", f"dac.ch{gates['bottom'][0]:02d}.dc_constant_V" ] for gate_type, gates in gate_kwargs.items() }, # A Group is named 'guard' or 'central', and has parameters # named '<group_name>_top' and '<group_name>_bottom' grouped_parameter_names={ gate_type: ['top', 'bottom'] for gate_type in gate_kwargs }, # The MembraneGateParameterContext class will automatically generate # virtual difference and common_mode parameters for each group grouped_parameter_class=MembraneGateParameterContext, # Keyword arguments for the ArrangementContext parent of # MembraneGateParameterContext grouped_parameter_kwargs=dict(output_triggers=output_triggers), units={gate_type: 'V' for gate_type in gate_kwargs}, initial_values=None, set_initial_values_on_load=False, recreate=False, bind_to_instrument=False, label=f'Trap {index}' ) traps.append(trap) self.add_submodule('traps', ChannelTuple(self, 'traps', Trap, traps)) def _set_blocking_ramps(self, val: bool): for line in self.dc_lines: if val: make_dc_constant_V_blocking(line, thresh=0.99) else: make_dc_constant_V_nonblocking(line) def _active_trap_parser(self, value: int | Trap) -> Trap: if isinstance(value, int): return self.traps.get_channel_by_name(f'trap_{value}') return value @property def trap(self) -> Trap: """The currently selected trap.""" return self.active_trap.get()