Source code for mjolnir.measurements.handler

"""Stateful control of optical measurements.

TODO:
    - quantify-core might be suitable to replace this.

Notes
-----
DAC trigger output::

    # Route external trigger 1 to internal trigger 1
    dac.write('OUTP:TRIG1:SOUR INT1')
    # Width 1 ms
    dac.write('OUTP:TRIG1:WIDT 1e-3')
    # Fire internal trigger 1 (triggers external trigger 1)
    dac.write('TINT:SIGN: 1')

"""
import abc
import contextlib
import dataclasses
import json
import numbers
import random
import time
from datetime import timedelta
from types import ModuleType
from typing import Any, Callable, Literal, Sequence, TypeVar, cast

import git
import qcodes
import qcodes_contrib_drivers
import qutil
from git import InvalidGitRepositoryError
from qcodes.dataset import (SequentialParamsCaller, ThreadPoolParamsCaller,
                            initialise_or_create_database_at, load_or_create_experiment)
from qcodes.dataset.data_set_protocol import DataSetProtocol, values_type
from qcodes.dataset.experiment_container import Experiment
from qcodes.dataset.measurements import DataSaver, Measurement
from qcodes.parameters import ParameterBase
from qcodes.utils import DelayedKeyboardInterrupt, NumpyJSONEncoder
from qcodes_contrib_drivers.drivers.Andor.Andor_iDus4xx import AndorIDus4xx
from qcodes_contrib_drivers.drivers.SwabianInstruments.Swabian_Instruments_Time_Tagger import (
    CountRateMeasurement, TimeTagger, tt
)
from qcodes_contrib_drivers.drivers.SwabianInstruments.private.time_tagger import (
    TimeTaggerMeasurement
)
from qutil import itertools, ui

import mjolnir
from .. import _INSULTS
from ..helpers import camel_to_snake, find_param_source, timed
from ..instruments.logical_instruments import TrapSample
from ..measurements.measures import Measure, MeasureSet
from ..measurements.sweeps import GridSweep, SweepList, SweepProtocol
from ..parameters import (DelegateParameterWithSetSideEffect, LeakageCurrentsParameter,
                          TimeParameter)

_T = TypeVar('_T', bound=numbers.Number)
_ActionsT = tuple[Callable[..., Any], Sequence[Any]]
OutType = list[tuple[ParameterBase, values_type]]


[docs] @dataclasses.dataclass class MeasurementHandler(abc.ABC): """Abstract base class for specifying measurement protocols. Subclasses override methods of this class which get run at different points during :meth:`measure` and :meth:`loop`. See those methods for documentation on when they are run and what they should do. Parameters ---------- station : The station hosting the logical instruments that control the experiment sample_name : A string identifier for the sample. Defaults to the name of a :class:`~mjolnir.instruments.logical_instruments.TrapSample` that is a member of ``station``, if any. experiment : A qcodes experiment. Defaults to "optics". measure_leakage : Automatically measure the leakage current if a QDAC channel is swept. critical_leakage : Current (in Ampere) that determines a critical leakage threshold above which sweeps are aborted. measurement_nplc : The number of power line cycles (PLC) to average currents over. measurement_count : The number of current samples taken. database_location : The location of the database. Defaults to the qcodes default. """ station: qcodes.Station sample_name: str | None = None experiment: Experiment | str = dataclasses.field(default='optics', repr=False) measure_leakage: bool = True critical_leakage: float = 100e-9 measurement_nplc: int = 1 measurement_count: int = 1 database_location: str = qcodes.config.core.db_location def __post_init__(self): initialise_or_create_database_at(self.database_location) if self.sample_name is None: for name, component in self.station.components.items(): if isinstance(component, TrapSample): self.sample_name = name break else: raise ValueError('Give a sample name.') if isinstance(self.experiment, str): self.experiment = load_or_create_experiment(self.experiment, self.sample_name, load_last_duplicate=True) @property def metadata(self) -> dict[str, dict[str, Any]]: """Exports git commit hashes and this class instance.""" metadata = {'git': {}, 'MeasurementHandler': {}} for package in [qcodes, qcodes_contrib_drivers, qutil, mjolnir]: try: metadata['git'][package.__name__] = self._get_hexsha(package) except InvalidGitRepositoryError: metadata['git'][package.__name__] = None for key, val in self.__dict__.items(): try: metadata['MeasurementHandler'][key] = json.dumps(val) except TypeError: pass return metadata @property def sample(self) -> TrapSample: """The logical instrument representing the current sample.""" return cast(TrapSample, self.station.get_component(self.sample_name))
[docs] def cya(self) -> bool: """Teardown actions to perform when measure(cya=True).""" tic = time.perf_counter() print('cya() triggered. Giving a grace period of 10 seconds to abort.') try: while time.perf_counter() - tic < 10: pass except KeyboardInterrupt: print('cya() aborted.') return False print(random.choice(_INSULTS)) return True
@staticmethod def _fopi_metadata(x, y, zs=None) -> dict[str, dict[str, str]]: metadata = { 'fopi': { "preferred_x_axis": x.full_name } } if zs is not None: metadata['fopi']["preferred_y_axis"] = y.full_name metadata['fopi']["preferred_z_axes"] = [z.full_name for z in zs] else: metadata['fopi']["preferred_y_axis"] = y[0].full_name return metadata @staticmethod def _get_hexsha(module: ModuleType) -> str: return git.Repo(module.__file__, search_parent_directories=True).head.object.hexsha @staticmethod def _loop_desc(sweep, j) -> str: """Progressbar description.""" return "Sweeping ({}) ({}/{})".format(', '.join((param.label for param in sweep.param)), j + 1, len(list(sweep)))
[docs] def get_leakage_measures(self, sweeps: SweepList) -> MeasureSet: """Qdac channel leakage measures conditional on the flag.""" if self.measure_leakage: return sweeps.leak_measures return MeasureSet()
[docs] def get_leakage_parameter_contexts(self, sweeps: SweepList) -> dict[ParameterBase, int]: """Parses the :class:`SweepList` to find QDAC-II current parameters.""" try: params = _extract_qdac_current_params(self.get_leakage_measures(sweeps).params) physical_channels = self.sample.dac.channels[tuple( int(find_param_source(leak_param).instrument.name[-2:]) - 1 for leak_param in params )] except AttributeError: # Not a TrapSample with QDAC-II channels return {} if physical_channels: return {physical_channels.measurement_nplc: self.measurement_nplc, physical_channels.measurement_count: self.measurement_count} else: return {}
[docs] def leakage_limit_exceeded(self, params) -> tuple[bool, list[float]]: """Measures leakage on the QDAC channels underlying *params* and tests if the critical limit is exceeded. Returns the boolean and the list of leakage currents. """ # Wait for the current sensors to stabilize and then read time.sleep(max(((find_param_source(param).instrument.measurement_nplc.get_latest() + 2) / 50 for param in _extract_qdac_current_params(params)), default=0)) if itertools.absmax( itertools.chain.from_iterable(vals := [param() for param in params]), default=0 ) > self.critical_leakage: return True, vals return False, vals
[docs] def loop(self, datasaver: DataSaver, params_caller_context: Callable[[], OutType], sweeps: SweepList, measures: MeasureSet, *, i: int = 0, show_progress: bool = True): """The main measurement loop. This function recurses into *sweeps*, measures *measures* at the lowest recursion level (the innermost loop), and saves the results into *datasaver*. Sweeps can specify initialization procedures as well, see :mod:`.sweeps`. Measures can specify a live-plotting procedure that is passed current measurement data at every point, see :mod:`.measures`. """ leakage_measures = self.get_leakage_measures(sweeps) measures_without_leakage = measures - leakage_measures # Loop over different sweep directions of the ith sweep for j, setpoints in enumerate(sweep := sweeps[i]): # Go to start. Might be different for n-directional sweeps sweep.initialize(j) # Loop over all setpoints of one sweep direction for k, setpoint in enumerate(ui.progressbar(setpoints, desc=self._loop_desc(sweep, j), leave=None, disable=not show_progress)): # set returns the setpoint or the measured value depending on :attr:`get_after_set` sweeps.current_setpoints[i] = sweep.set(*setpoint) # Does nothing if no leak_params abort, leakage = self.leakage_limit_exceeded(leakage_measures.params) if abort: sweep.initialize(j) print(f'Aborted while sweeping {sweep} at {sweeps.current_setpoints} ' f'because leakage was too large: {leakage}') break if i == len(sweeps) - 1: # Innermost loop, record result assert len(sweeps.current_setpoints) == len(sweeps) assert len(leakage_measures) == len(leakage) datasaver.add_result( *params_caller_context(), *zip(sweeps.params, itertools.flatten(sweeps.current_setpoints)), *zip(leakage_measures.params, leakage), *zip(measures_without_leakage.delegates, measures_without_leakage.get_delegates()) ) # Plot if any try: with DelayedKeyboardInterrupt(): measures.plot_callback(j, k) except RuntimeError: pass else: # Move on to loop further inwards self.loop(datasaver, params_caller_context, sweeps, measures, i=i+1, show_progress=show_progress)
[docs] @timed def measure(self, sweeps: SweepList | SweepProtocol | None = None, measures: MeasureSet | Measure | ParameterBase | None = None, *, add_before_run: Sequence[_ActionsT] = (), add_after_run: Sequence[_ActionsT] = (), parameter_contexts: dict[ParameterBase, Any] | None = None, return_to_start: bool = True, preallocate_results: bool = True, cya: bool = False, live_plot: bool = True, show_progress: bool = True, threaded_acquisition: bool = True, write_in_background: bool = True, metadata: tuple[str, Any] | Any | None = None, default_sweeps_kwargs: dict[Any, Any] | None = None, loop_kwargs: dict[Any, Any] | None = None, initialize_plotting_kwargs: dict[Any, Any] | None = None, **initialization_settings) -> DataSetProtocol: """The measurement entrypoint. This function takes care of all measurement set-up, tear-down, as well as running the actual measurement loop. Parameters ---------- sweeps : A :class:`~.sweeps.SweepList` defining the nD sweep to execute, or a single sweep implementing the :class:`~.sweeps.SweepProtocol`. If ``None``, only whatever the subclass implementation of :meth:`add_default_sweeps` adds is used. measures : A :class:`~.measures.MeasureSet` defining the parameters to measure at each point of the sweep, or a single :class:`~qcodes:qcodes.parameters.ParameterBase` or :class:`~.measures.Measure` to get. If ``None``, only whatever the subclass implementation of :meth:`add_default_measures` adds is used. add_before_run : A sequence of two-tuples (callable, args) to run before the measurement is started. add_after_run : A sequence of two-tuples (callable, args) to run before the measurement is started. parameter_contexts : A mapping of :class:`~qcodes:qcodes.parameters.Parameter` to values that the parameters should be set to for the duration of the measurement. return_to_start : Set parameters to their value before the measurement after it is done. preallocate_results : Preallocate measurement results in the database. Should normally only be set to a different value by subclasses. cya : Run :meth:`cya` after finishing the measurement. live_plot : Run the :attr:`~.measures.MeasureSet.plot_callback` hooks each time a data point is taken. Individual measures can be excluded by setting their :attr:`~.measures.Measure.live_plot` attribute to False. show_progress : Show a progress bar for the outermost sweep loop. threaded_acquisition : Use :class:`~qcodes:qcodes.dataset.ThreadPoolParamsCaller` to acquire data asynchronously. write_in_background : Write data to the db in a background thread. metadata : Runtime metadata either as a tuple (tag, metadata) or just metadata. default_sweeps_kwargs : Kwargs that are passed on to :meth:`add_default_sweeps`. loop_kwargs : Kwargs that are passed on to :meth:`loop`. initialize_plotting_kwargs : Kwargs that are passed on to :meth:`~.measures.MeasureSet.initialize_plotting`. **initialization_settings : Kwargs that are passed on to :meth:`initialize`. Returns ------- dataset : The :class:`~qcodes:qcodes.dataset.data_set.DataSet` holding the measurement. """ if sweeps is None: sweeps = SweepList([]) elif isinstance(sweeps, SweepProtocol): sweeps = SweepList([sweeps]) elif not isinstance(sweeps, SweepList): raise TypeError('Expected sweeps to be AbstractSweep or SweepList, not ' f'{type(sweeps)}') if measures is None: measures = MeasureSet() elif isinstance(measures, ParameterBase): measures = MeasureSet([Measure(measures)]) elif isinstance(measures, Measure): measures = MeasureSet([measures]) elif not isinstance(measures, MeasureSet): raise TypeError('Expected measures to be None, Measure, or MeasureSet, not ' f'{type(measures)}') if metadata is not None: if isinstance(metadata, Sequence) and len(metadata) == 2: tag, metadata = metadata else: tag = 'runtime_metadata' metadata = cast(str, json.dumps(metadata, cls=NumpyJSONEncoder)) # --> Subclass implements sweeps = self.add_default_sweeps(sweeps, measures, **(default_sweeps_kwargs or {})) measures = self.add_default_measures(sweeps, measures) add_before_run = self.add_before_run(sweeps, measures) + tuple(add_before_run) add_after_run = self.add_after_run(sweeps, measures) + tuple(add_after_run) # <-- measurement = Measurement( self.experiment, self.station, name='_'.join(itertools.chain([self.type] if self.type else [], map(str, sweeps.params))) ) for sweep_param in sweeps.params: measurement.register_parameter(sweep_param) for measure in measures: if isinstance(measure.param, ParameterBase): measurement.register_parameter(measure.param, setpoints=sweeps.params) for delegate in measure.delegates: measurement.register_parameter(delegate, setpoints=sweeps.params) elif callable(measure.param): measurement.register_custom_parameter(measure.param.__qualname__, setpoints=sweeps.params) for func, args in add_before_run: measurement.add_before_run(func, args) for func, args in add_after_run: measurement.add_after_run(func, args) if return_to_start: for sweep_param in sweeps.params: measurement.add_after_run(sweep_param.set, (sweep_param.get(),)) # --> Subclass implements self.initialize(**initialization_settings) self.run_assertions(sweeps, measures) # Set default contexts last so that custom contexts can not be blocked if the same # parameter is used in different contexts (eg., the shutter position) # TODO: Does this make sense? parameter_contexts = ( (parameter_contexts or {}) | (self.get_leakage_parameter_contexts(sweeps) if self.measure_leakage else {}) | self.get_default_parameter_contexts(sweeps, measures) ) custom_metadata = json.dumps( self.metadata | self.get_custom_metadata(sweeps, measures) | {'measurement_initialization_settings': initialization_settings, 'measurement_parameter_contexts': {param.full_name: val for param, val in parameter_contexts.items()}}, cls=NumpyJSONEncoder ) # <-- if preallocate_results: # Needed to specify shape in incomplete measurements (eg b/c of break conditions) # Do so after initializations (shape might change) measurement.set_shapes({full_name: sweeps.shape + shape for full_name, shape in zip( measures.full_names + measures.full_names_delegates, measures.shapes + measures.shapes_delegates )}) # Only register non-leakage measures with *ParamsCaller as leakage is measured separately measures_without_leakage = measures - self.get_leakage_measures(sweeps) if threaded_acquisition and len(measures_without_leakage): params_caller = ThreadPoolParamsCaller(*measures_without_leakage.params) else: params_caller = SequentialParamsCaller(*measures_without_leakage.params) if live_plot: measures.initialize_plotting(sweeps, **(initialize_plotting_kwargs or {})) else: measures.reset_plotting() try: with contextlib.ExitStack() as stack: for parameter, value in parameter_contexts.items(): stack.enter_context(parameter.set_to(value, allow_changes=True)) params_caller_context = stack.enter_context(params_caller) datasaver = stack.enter_context(measurement.run(write_in_background)) datasaver.dataset.add_metadata('custom_metadata', custom_metadata) if metadata is not None: datasaver.dataset.add_metadata(tag, metadata) self.loop(datasaver, params_caller_context, sweeps, measures, show_progress=show_progress, **(loop_kwargs or {})) except KeyboardInterrupt: print('Interrupted.') finally: # To make sure cya always succeeds, we extract it from the various measurement contexts # which might block parameters used in cya if cya: self.cya() try: return datasaver.dataset except UnboundLocalError as err: raise RuntimeError from err
@property @abc.abstractmethod def type(self) -> str: """An identifier for the type of measurement this class executes.""" pass
[docs] @abc.abstractmethod def add_default_sweeps(self, sweeps: SweepList, measures: MeasureSet, **default_sweeps_kwargs) -> SweepList: """Add default sweep to the list *sweeps*.""" pass
[docs] @abc.abstractmethod def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet: """Add default parameters to measure to the set *measures*. :attr:`sweeps.leak_measures` is kept separate to be able to act conditioned on their value. """ pass
[docs] @abc.abstractmethod def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: """Default actions to run before starting the measurement.""" pass
[docs] @abc.abstractmethod def add_after_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: """Default actions to run after finishing the measurement.""" pass
[docs] @abc.abstractmethod def initialize(self, **initialization_settings): """Initialize instruments for the measurement.""" pass
[docs] @abc.abstractmethod def run_assertions(self, sweeps: SweepList, measures: MeasureSet): """Assert conditions necessary for the measurement.""" assert len(sweeps), 'Not sweeping anything' assert len(measures), 'Not measuring anything'
[docs] @abc.abstractmethod def get_custom_metadata(self, sweeps: SweepList, measures: MeasureSet) -> dict[str, Any]: """Extract custom metadata to be saved with the measurement.""" pass
[docs] @abc.abstractmethod def get_default_parameter_contexts(self, sweeps: SweepList, measures: MeasureSet) -> dict[ParameterBase, Any]: """Default parameter values for each measurement.""" pass
[docs] class DefaultMeasurementHandler(MeasurementHandler): """The default measurement handler, which does nothing on the side. """ @property def type(self) -> str: return ''
[docs] def add_default_sweeps(self, sweeps: SweepList, measures: MeasureSet, **default_sweeps_kwargs) -> SweepList: return sweeps
[docs] def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet: measures |= self.get_leakage_measures(sweeps) return measures
[docs] def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: return ()
[docs] def add_after_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: return ()
[docs] def initialize(self, **initialization_settings): pass
[docs] def run_assertions(self, sweeps: SweepList, measures: MeasureSet): super().run_assertions(sweeps, measures)
[docs] def get_custom_metadata(self, sweeps: SweepList, measures: MeasureSet) -> dict[str, Any]: return self._fopi_metadata(*(sweep.param[0] for sweep in reversed(sweeps)), measures.params)
[docs] def get_default_parameter_contexts(self, sweeps: SweepList, measures: MeasureSet) -> dict[ParameterBase, Any]: return {}
[docs] @dataclasses.dataclass class CcdMeasurementHandler(DefaultMeasurementHandler): """Handler for measurements using the CCD.""" single_track_settings: dict[int, tuple[int, int]] = dataclasses.field( default_factory=lambda: {600: (132, 18), 1800: (116, 19)} ) """Settings for single-track acquisition mode of the CCD.""" @property def ccd(self) -> AndorIDus4xx: return self.station.detection_path.ccd @property def type(self) -> str: return 'ccd'
[docs] def cya(self) -> bool: """Disables CCD cooler and white light.""" if not super().cya(): return False try: self.station.detection_path.ccd.warm_up() except Exception as err: print(f'Failed to disable the CCD cooler: {err}') try: self.station.excitation_path.white_light.enabled(False) except Exception as err: print(f'Failed to disable the white light: {err}') return True
[docs] def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet: measures = super().add_default_measures(sweeps, measures) if self.ccd.ccd_data not in (find_param_source(p) for p in measures.params): measures |= self.ccd.ccd_data_rate_bg_corrected return measures
[docs] def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: return ((self.assert_background_acquired, ()),)
[docs] def initialize(self, **initialization_settings): self.initialize_ccd_single_acquisition(**initialization_settings)
[docs] def get_custom_metadata(self, sweeps: SweepList, measures: MeasureSet) -> dict[str, dict[str, str]]: return self._fopi_metadata(self.ccd.ccd_data.setpoints[0], sweeps.params[-1], measures.params)
[docs] def initialize_ccd_single_acquisition( self, exposure_time: float = 1.0, preamp_gain: float = 1.0, horizontal_shift_speed: float = 0.1, vertical_shift_speed: float | None = None, read_mode: str = 'single track', acquisition_mode: Literal['single scan', 'accumulate'] = 'accumulate', number_accumulations: int = 2, number_kinetics: int = 1, accumulation_cycle_time: float = 0., kinetic_cycle_time: float = 0., **_ ): """Set the CCD up for acquisition of a single buffer.""" self.ccd.preamp_gain(preamp_gain) self.ccd.horizontal_shift_speed(horizontal_shift_speed) self.ccd.vertical_shift_speed(vertical_shift_speed or self.ccd.fastest_recommended_vertical_shift_speed()) self.ccd.read_mode(read_mode) self.ccd.single_track_settings(self.single_track_settings[ int(self.station.detection_path.active_grating().name_parts[-1].split('_')[-1]) ]) self.ccd.acquisition_mode(acquisition_mode) self.ccd.number_accumulations(number_accumulations) self.ccd.number_kinetics(number_kinetics) self.ccd.exposure_time(exposure_time) self.ccd.accumulation_cycle_time(accumulation_cycle_time) self.ccd.kinetic_cycle_time(kinetic_cycle_time) if ( (acquisition_mode == 'accumulate' and number_accumulations > 1) or ( acquisition_mode == 'kinetics' and (number_accumulations > 1 or number_kinetics > 1) ) ): self.ccd.cosmic_ray_filter_mode(True)
# assert_background_acquired is run in a add_before_run hook so that all parameter contexts # are taken care of
[docs] def initialize_ccd_buffered_acquisition(self, n_pts: int, exposure_time: float = 1.0, preamp_gain: float = 1.0, horizontal_shift_speed: float = 0.1, vertical_shift_speed: float | None = None, delay: float = 0.0, **_) -> float: """Set the CCD up for buffered acquisition. Unused.""" self.ccd.preamp_gain(preamp_gain) self.ccd.horizontal_shift_speed(horizontal_shift_speed) self.ccd.vertical_shift_speed(vertical_shift_speed or self.ccd.fastest_recommended_vertical_shift_speed()) self.ccd.trigger_mode('external') self.ccd.read_mode('single track') self.ccd.single_track_settings(self.single_track_settings) self.ccd.acquisition_mode('kinetics') self.ccd.number_kinetics(n_pts) self.ccd.exposure_time(exposure_time) self.ccd.kinetic_cycle_time(self.ccd.exposure_time() + delay) return self.ccd.kinetic_cycle_time.get()
[docs] def get_default_parameter_contexts(self, sweeps, measures) -> dict[ParameterBase, Any]: return ( super().get_default_parameter_contexts(sweeps, measures) | {self.station.detection_path.active_detection_path: 'ccd'} )
[docs] @abc.abstractmethod def assert_background_acquired(self): """This method needs to be run after all parameters (possibly in parameter contexts) are set. Hence, it cannot be included in :meth:`run_assertions`.""" pass
[docs] @dataclasses.dataclass class LaserCcdMeasurementHandler(CcdMeasurementHandler): """Handler for measurements with Laser+CCD.""" @property def type(self) -> str: return 'laser_ccd'
[docs] def cya(self) -> bool: """Disables CCD cooler.""" if not super().cya(): return False try: self.station.excitation_path.disable_laser() except Exception as err: print(f'Failed to disable the laser: {err}') return True
[docs] def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet: measures = super().add_default_measures(sweeps, measures) if self.station.excitation_path.power not in (find_param_source(p) for p in sweeps.params): measures |= self.station.excitation_path.power_at_sample return measures
[docs] def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: actions = super().add_before_run(sweeps, measures) return actions + ((self.station.excitation_path.wait_for_power_to_settle, (1e-2, 200)),)
[docs] def run_assertions(self, sweeps: SweepList, measures: MeasureSet): super().run_assertions(sweeps, measures) # If we sweep the laser, it's not necessary to check if it's locked beforehand. if not any( any(find_param_source(param).underlying_instrument is self.station.excitation_path.laser for param in sweep.param) for sweep in sweeps ): if not self.station.excitation_path.laser.lock(): print('Locking laser.') tic = time.time() timeout = 10 self.station.excitation_path.laser.lock(True) while (time.time() - tic) < timeout: if self.station.excitation_path.laser.lock(): break else: raise AssertionError('Could not lock laser within 5s.')
[docs] def get_default_parameter_contexts(self, sweeps: SweepList, measures: MeasureSet) -> dict[ParameterBase, Any]: return ( super().get_default_parameter_contexts(sweeps, measures) | {self.station.excitation_path.shutter.position: 'open'} )
[docs] def assert_background_acquired(self): # background_is_valid calls ccd.get_acquisition_timings() so params are up to date if not self.ccd.background_is_valid: # Need to hackily override the default behavior of a parameter context (which is to # block) changes, so that this action can be run after the setting of parameter # contexts which might include an open shutter old_value = self.station.excitation_path.shutter.position.settable try: self.station.excitation_path.shutter.position._settable = True with self.station.excitation_path.close_shutter(): print('Acquiring background.') self.ccd.background.get() finally: self.station.excitation_path.shutter.position._settable = old_value
[docs] @dataclasses.dataclass class TimeTaggerMeasurementHandler(DefaultMeasurementHandler): """Handler for measurements using the APDs and Time Tagger. Measurements executed with :meth:`measure` by default get a :class:`~..parameters.TimeParameter` sweep that simply waits for a given interval for a certain number of times. Laser wavelength and power are automatically recalibrated at each interval. """ max_duration: timedelta = timedelta(days=1) """The maximum duration the default sweep will run for if not interrupted.""" @property def type(self) -> str: return 'Photon Counting' @property def tagger(self) -> TimeTagger: return self.station.detection_path.tagger
[docs] def get_default_parameter_contexts(self, sweeps: SweepList, measures: MeasureSet) -> dict[ParameterBase, Any]: return {self.station.excitation_path.shutter.position: 'open', self.station.detection_path.active_detection_path: 'apd'}
[docs] def add_default_sweeps(self, sweeps: SweepList, measures: MeasureSet, update_interval: float, parameter_contexts: dict[ParameterBase, Any]) -> SweepList: def side_effect(*_): try: return self.station.excitation_path.wavelength_constant_power(wavelength) except Exception as err: self.station.excitation_path.log.error( "Couldn't set wavelength {} or power {:.2g}".format( wavelength, self.station.excitation_path.wavelength_constant_power_setpoint() ), exc_info=err ) # TODO: should probably check if user specified the constant power setpoint or is # sweeping the power itself. if not any(isinstance(find_param_source(param), TimeParameter) for param in sweeps.params): wavelength = self._extract_wavelength(parameter_contexts) snoozer_with_feedback = DelegateParameterWithSetSideEffect( 'snoozer_with_feedback', source=TimeParameter('snoozer'), set_side_effect=side_effect, execute_before=False, label='Time axis w/ active feedback' ) n_pts = self.max_duration.total_seconds() // update_interval max_duration = n_pts * update_interval sweeps |= GridSweep(snoozer_with_feedback, (0, max_duration), int(n_pts)) elif not any(isinstance(find_param_source(param), TimeParameter) for param in sweeps[-1].param): raise ValueError("TimeParameter must be on the last axis of the sweep") return sweeps
[docs] def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet: measures |= self.station.excitation_path.power_at_sample measures |= self.station.excitation_path.wavelength if not any(channel for measure in measures if isinstance(channel := measure.param.instrument, CountRateMeasurement)): count_rate = self.tagger.add_count_rate_measurement() count_rate.channels(_extract_active_tagger_channels(measures)) count_rate.data.metadata.update(_mjolnir_tagger_internal='default_measure') measures |= count_rate.data return measures
[docs] def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: add_before_run = [(self.register_tagger_measurements, (measures,))] time_param = itertools.first_true( sweeps.params, default=None, pred=lambda param: isinstance(find_param_source(param), TimeParameter) ) if time_param is not None: add_before_run.append(((find_param_source(time_param)).reset_clock, ())) if any(measure.param.underlying_instrument is self.tagger for measure in measures): add_before_run.append((self.tagger.synchronized_measurements.clear, ())) add_before_run.append((self.tagger.synchronized_measurements.start, ())) return tuple(add_before_run)
[docs] def add_after_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]: add_after_run = [(self.unregister_tagger_measurements, (measures,))] for measure in measures: # Remove channels added by add_default_measures if measure.param.metadata.get('_mjolnir_tagger_internal', None) == 'default_measure': channel_name = camel_to_snake(measure.param.instrument.__class__.__name__) channel_list = self.tagger.submodules.get(f'{channel_name}s') add_after_run.append((channel_list.remove, (measure.param.instrument,))) return tuple(add_after_run)
[docs] def measure(self, sweeps: SweepList | SweepProtocol | None = None, measures: MeasureSet | Measure | ParameterBase | None = None, *, add_before_run: Sequence[_ActionsT] = (), add_after_run: Sequence[_ActionsT] = (), parameter_contexts: dict[ParameterBase, Any] | None = None, cya: bool = False, live_plot: bool = True, update_interval: float = 10., add_result_on_update: bool = False, show_progress: bool = True, threaded_acquisition: bool = True, write_in_background: bool = True, metadata: tuple[str, Any] | None = None, initialize_plotting_kwargs: dict[Any, Any] | None = None, **initialization_settings) -> DataSetProtocol: """The measurement entry point. For most parameters, see the base method. Parameters ---------- update_interval : Interval in seconds to wait before getting new data from the TimeTagger API, as well as updating the live plot, if active. add_result_on_update : Add results obtained during each update interval to the DB. If False (the default), only the last value before the measurement is finished or aborted is added. """ return super().measure(sweeps, measures, add_before_run=add_before_run, add_after_run=add_after_run, parameter_contexts=parameter_contexts, return_to_start=False, preallocate_results=False, cya=cya, live_plot=live_plot, show_progress=show_progress, threaded_acquisition=threaded_acquisition, write_in_background=write_in_background, metadata=metadata, default_sweeps_kwargs=dict( update_interval=update_interval, parameter_contexts=parameter_contexts or {} ), loop_kwargs=dict(add_result_on_update=add_result_on_update), initialize_plotting_kwargs=initialize_plotting_kwargs, **initialization_settings)
[docs] def loop(self, datasaver: DataSaver, params_caller_context: Callable[[], OutType], sweeps: SweepList, measures: MeasureSet, *, i: int = 0, show_progress: bool = True, add_result_on_update: bool = False): """Custom measurement loop for Time Tagger measurements. Measurements can be gracefully terminated by sending a keyboard interrupt. """ results = [] if i == 0: self.clear_tagger_measurements(measures) try: leakage_measures = self.get_leakage_measures(sweeps) measures_without_leakage = measures - leakage_measures # Loop over different sweep directions of the ith sweep for j, setpoints in enumerate(sweep := sweeps[i]): # Go to start. Might be different for n-directional sweeps sweep.initialize(j) # Loop over all setpoints of one sweep direction for k, setpoint in enumerate(ui.progressbar(setpoints, leave=None, desc=self._loop_desc(sweep, j), disable=not show_progress)): # set returns the setpoint or the measured value depending on # sweep.get_after_set sweeps.current_setpoints[i] = sweep.set(*setpoint) # Does nothing if no leak_params abort, leakage = self.leakage_limit_exceeded(leakage_measures.params) if abort: sweep.initialize(j) print(f'Aborted while sweeping {sweep} at {sweeps.current_setpoints} ' f'because leakage was too large: {leakage}') break if i == len(sweeps) - 1: # Innermost loop, record result assert len(sweeps.current_setpoints) == len(sweeps) assert len(leakage_measures) == len(leakage) results = tuple(itertools.chain( params_caller_context(), zip(sweeps.params, itertools.flatten(sweeps.current_setpoints)), zip(leakage_measures.params, leakage), zip(measures_without_leakage.delegates, measures_without_leakage.get_delegates()) )) if add_result_on_update: datasaver.add_result(*results) # Plot if any try: measures.plot_callback(j, k) except RuntimeError: pass else: # Move on to loop further inwards self.loop(datasaver, params_caller_context, sweeps, measures, i=i+1, show_progress=show_progress, add_result_on_update=add_result_on_update) finally: if not add_result_on_update and results: # If the interrupt came after any results were acquired, save them if they # weren't saved periodically. datasaver.add_result(*results)
[docs] def register_tagger_measurements(self, measures: MeasureSet): """Register a TimeTagger measurement with the :class:`~qcodes_contrib_drivers.drivers.SwabianInstruments.private.time_tagger.TimeTaggerSynchronizedMeasurements` object to synchronize measurements.""" for channel in (channel for measure in measures if isinstance(channel := measure.param.instrument, TimeTaggerMeasurement)): self.tagger.synchronized_measurements.register_measurement(channel)
[docs] def unregister_tagger_measurements(self, measures: MeasureSet): """Unregister TimeTagger measurements.""" for channel in (channel for measure in measures if isinstance(channel := measure.param.instrument, TimeTaggerMeasurement)): self.tagger.synchronized_measurements.unregister_measurement(channel)
[docs] @staticmethod def clear_tagger_measurements(measures: MeasureSet): """Clear all TimeTagger measurements.""" for channel in (channel for measure in measures if isinstance(channel := measure.param.instrument, TimeTaggerMeasurement)): channel.clear() channel.start()
def _extract_wavelength(self, parameter_contexts): wavelength = parameter_contexts.get(self.station.excitation_path.wavelength, None) if wavelength is None: wavelength = parameter_contexts.get( self.station.excitation_path.wavelength_constant_power, None ) if wavelength is None: wavelength = self.station.excitation_path.wavelength() return wavelength
def _extract_active_tagger_channels(measures: MeasureSet) -> list[int]: channels = [] for module in [module for measure in measures if isinstance(module := measure.param.instrument, TimeTaggerMeasurement)]: if hasattr(module, 'channels'): # Generic channels.extend(module.channels()) if hasattr(module, 'start_channel'): # LogBins histogram channels.append(module.start_channel()) if hasattr(module, 'click_channel'): # LogBins histogram channels.append(module.click_channel()) if hasattr(module, 'get_channel'): # Combiner virtual channel channels.append(module.get_channel()) channels = set(channels) channels.discard(tt.CHANNEL_UNUSED) return list(channels) def _extract_qdac_current_params(params: Sequence[ParameterBase]) -> list[ParameterBase]: return list(itertools.collapse((param.source_params if isinstance(param, LeakageCurrentsParameter) else param for param in params), base_type=ParameterBase))