"""Stateful control of optical measurements.
TODO:
- quantify-core might be suitable to replace this.
Notes
-----
DAC trigger output::
# Route external trigger 1 to internal trigger 1
dac.write('OUTP:TRIG1:SOUR INT1')
# Width 1 ms
dac.write('OUTP:TRIG1:WIDT 1e-3')
# Fire internal trigger 1 (triggers external trigger 1)
dac.write('TINT:SIGN: 1')
"""
import abc
import contextlib
import dataclasses
import json
import numbers
import random
import time
from datetime import timedelta
from types import ModuleType
from typing import Any, Callable, Literal, Sequence, TypeVar, cast
import git
import qcodes
import qcodes_contrib_drivers
import qutil
from git import InvalidGitRepositoryError
from qcodes.dataset import (SequentialParamsCaller, ThreadPoolParamsCaller,
initialise_or_create_database_at, load_or_create_experiment)
from qcodes.dataset.data_set_protocol import DataSetProtocol, values_type
from qcodes.dataset.experiment_container import Experiment
from qcodes.dataset.measurements import DataSaver, Measurement
from qcodes.parameters import ParameterBase
from qcodes.utils import DelayedKeyboardInterrupt, NumpyJSONEncoder
from qcodes_contrib_drivers.drivers.Andor.Andor_iDus4xx import AndorIDus4xx
from qcodes_contrib_drivers.drivers.SwabianInstruments.Swabian_Instruments_Time_Tagger import (
CountRateMeasurement, TimeTagger, tt
)
from qcodes_contrib_drivers.drivers.SwabianInstruments.private.time_tagger import (
TimeTaggerMeasurement
)
from qutil import itertools, ui
import mjolnir
from .. import _INSULTS
from ..helpers import camel_to_snake, find_param_source, timed
from ..instruments.logical_instruments import TrapSample
from ..measurements.measures import Measure, MeasureSet
from ..measurements.sweeps import GridSweep, SweepList, SweepProtocol
from ..parameters import (DelegateParameterWithSetSideEffect, LeakageCurrentsParameter,
TimeParameter)
_T = TypeVar('_T', bound=numbers.Number)
_ActionsT = tuple[Callable[..., Any], Sequence[Any]]
OutType = list[tuple[ParameterBase, values_type]]
[docs]
@dataclasses.dataclass
class MeasurementHandler(abc.ABC):
"""Abstract base class for specifying measurement protocols.
Subclasses override methods of this class which get run at different
points during :meth:`measure` and :meth:`loop`. See those methods
for documentation on when they are run and what they should do.
Parameters
----------
station :
The station hosting the logical instruments that control the
experiment
sample_name :
A string identifier for the sample. Defaults to the name of a
:class:`~mjolnir.instruments.logical_instruments.TrapSample`
that is a member of ``station``, if any.
experiment :
A qcodes experiment. Defaults to "optics".
measure_leakage :
Automatically measure the leakage current if a QDAC channel is
swept.
critical_leakage :
Current (in Ampere) that determines a critical leakage threshold
above which sweeps are aborted.
measurement_nplc :
The number of power line cycles (PLC) to average currents over.
measurement_count :
The number of current samples taken.
database_location :
The location of the database. Defaults to the qcodes default.
"""
station: qcodes.Station
sample_name: str | None = None
experiment: Experiment | str = dataclasses.field(default='optics', repr=False)
measure_leakage: bool = True
critical_leakage: float = 100e-9
measurement_nplc: int = 1
measurement_count: int = 1
database_location: str = qcodes.config.core.db_location
def __post_init__(self):
initialise_or_create_database_at(self.database_location)
if self.sample_name is None:
for name, component in self.station.components.items():
if isinstance(component, TrapSample):
self.sample_name = name
break
else:
raise ValueError('Give a sample name.')
if isinstance(self.experiment, str):
self.experiment = load_or_create_experiment(self.experiment, self.sample_name,
load_last_duplicate=True)
@property
def metadata(self) -> dict[str, dict[str, Any]]:
"""Exports git commit hashes and this class instance."""
metadata = {'git': {}, 'MeasurementHandler': {}}
for package in [qcodes, qcodes_contrib_drivers, qutil, mjolnir]:
try:
metadata['git'][package.__name__] = self._get_hexsha(package)
except InvalidGitRepositoryError:
metadata['git'][package.__name__] = None
for key, val in self.__dict__.items():
try:
metadata['MeasurementHandler'][key] = json.dumps(val)
except TypeError:
pass
return metadata
@property
def sample(self) -> TrapSample:
"""The logical instrument representing the current sample."""
return cast(TrapSample, self.station.get_component(self.sample_name))
[docs]
def cya(self) -> bool:
"""Teardown actions to perform when measure(cya=True)."""
tic = time.perf_counter()
print('cya() triggered. Giving a grace period of 10 seconds to abort.')
try:
while time.perf_counter() - tic < 10:
pass
except KeyboardInterrupt:
print('cya() aborted.')
return False
print(random.choice(_INSULTS))
return True
@staticmethod
def _fopi_metadata(x, y, zs=None) -> dict[str, dict[str, str]]:
metadata = {
'fopi': {
"preferred_x_axis": x.full_name
}
}
if zs is not None:
metadata['fopi']["preferred_y_axis"] = y.full_name
metadata['fopi']["preferred_z_axes"] = [z.full_name for z in zs]
else:
metadata['fopi']["preferred_y_axis"] = y[0].full_name
return metadata
@staticmethod
def _get_hexsha(module: ModuleType) -> str:
return git.Repo(module.__file__, search_parent_directories=True).head.object.hexsha
@staticmethod
def _loop_desc(sweep, j) -> str:
"""Progressbar description."""
return "Sweeping ({}) ({}/{})".format(', '.join((param.label for param in sweep.param)),
j + 1,
len(list(sweep)))
[docs]
def get_leakage_measures(self, sweeps: SweepList) -> MeasureSet:
"""Qdac channel leakage measures conditional on the flag."""
if self.measure_leakage:
return sweeps.leak_measures
return MeasureSet()
[docs]
def get_leakage_parameter_contexts(self, sweeps: SweepList) -> dict[ParameterBase, int]:
"""Parses the :class:`SweepList` to find QDAC-II current parameters."""
try:
params = _extract_qdac_current_params(self.get_leakage_measures(sweeps).params)
physical_channels = self.sample.dac.channels[tuple(
int(find_param_source(leak_param).instrument.name[-2:]) - 1
for leak_param in params
)]
except AttributeError:
# Not a TrapSample with QDAC-II channels
return {}
if physical_channels:
return {physical_channels.measurement_nplc: self.measurement_nplc,
physical_channels.measurement_count: self.measurement_count}
else:
return {}
[docs]
def leakage_limit_exceeded(self, params) -> tuple[bool, list[float]]:
"""Measures leakage on the QDAC channels underlying *params*
and tests if the critical limit is exceeded.
Returns the boolean and the list of leakage currents.
"""
# Wait for the current sensors to stabilize and then read
time.sleep(max(((find_param_source(param).instrument.measurement_nplc.get_latest() + 2)
/ 50 for param in _extract_qdac_current_params(params)),
default=0))
if itertools.absmax(
itertools.chain.from_iterable(vals := [param() for param in params]), default=0
) > self.critical_leakage:
return True, vals
return False, vals
[docs]
def loop(self, datasaver: DataSaver, params_caller_context: Callable[[], OutType],
sweeps: SweepList, measures: MeasureSet, *, i: int = 0, show_progress: bool = True):
"""The main measurement loop.
This function recurses into *sweeps*, measures *measures* at the
lowest recursion level (the innermost loop), and saves the
results into *datasaver*.
Sweeps can specify initialization procedures as well, see
:mod:`.sweeps`.
Measures can specify a live-plotting procedure that is passed
current measurement data at every point, see :mod:`.measures`.
"""
leakage_measures = self.get_leakage_measures(sweeps)
measures_without_leakage = measures - leakage_measures
# Loop over different sweep directions of the ith sweep
for j, setpoints in enumerate(sweep := sweeps[i]):
# Go to start. Might be different for n-directional sweeps
sweep.initialize(j)
# Loop over all setpoints of one sweep direction
for k, setpoint in enumerate(ui.progressbar(setpoints, desc=self._loop_desc(sweep, j),
leave=None, disable=not show_progress)):
# set returns the setpoint or the measured value depending on :attr:`get_after_set`
sweeps.current_setpoints[i] = sweep.set(*setpoint)
# Does nothing if no leak_params
abort, leakage = self.leakage_limit_exceeded(leakage_measures.params)
if abort:
sweep.initialize(j)
print(f'Aborted while sweeping {sweep} at {sweeps.current_setpoints} '
f'because leakage was too large: {leakage}')
break
if i == len(sweeps) - 1:
# Innermost loop, record result
assert len(sweeps.current_setpoints) == len(sweeps)
assert len(leakage_measures) == len(leakage)
datasaver.add_result(
*params_caller_context(),
*zip(sweeps.params, itertools.flatten(sweeps.current_setpoints)),
*zip(leakage_measures.params, leakage),
*zip(measures_without_leakage.delegates,
measures_without_leakage.get_delegates())
)
# Plot if any
try:
with DelayedKeyboardInterrupt():
measures.plot_callback(j, k)
except RuntimeError:
pass
else:
# Move on to loop further inwards
self.loop(datasaver, params_caller_context, sweeps, measures,
i=i+1, show_progress=show_progress)
[docs]
@timed
def measure(self,
sweeps: SweepList | SweepProtocol | None = None,
measures: MeasureSet | Measure | ParameterBase | None = None,
*,
add_before_run: Sequence[_ActionsT] = (),
add_after_run: Sequence[_ActionsT] = (),
parameter_contexts: dict[ParameterBase, Any] | None = None,
return_to_start: bool = True,
preallocate_results: bool = True,
cya: bool = False,
live_plot: bool = True,
show_progress: bool = True,
threaded_acquisition: bool = True,
write_in_background: bool = True,
metadata: tuple[str, Any] | Any | None = None,
default_sweeps_kwargs: dict[Any, Any] | None = None,
loop_kwargs: dict[Any, Any] | None = None,
initialize_plotting_kwargs: dict[Any, Any] | None = None,
**initialization_settings) -> DataSetProtocol:
"""The measurement entrypoint.
This function takes care of all measurement set-up, tear-down,
as well as running the actual measurement loop.
Parameters
----------
sweeps :
A :class:`~.sweeps.SweepList` defining the nD sweep to
execute, or a single sweep implementing the
:class:`~.sweeps.SweepProtocol`. If ``None``, only whatever
the subclass implementation of :meth:`add_default_sweeps`
adds is used.
measures :
A :class:`~.measures.MeasureSet` defining the parameters to
measure at each point of the sweep, or a single
:class:`~qcodes:qcodes.parameters.ParameterBase` or
:class:`~.measures.Measure` to get. If ``None``, only
whatever the subclass implementation of
:meth:`add_default_measures` adds is used.
add_before_run :
A sequence of two-tuples (callable, args) to run before the
measurement is started.
add_after_run :
A sequence of two-tuples (callable, args) to run before the
measurement is started.
parameter_contexts :
A mapping of
:class:`~qcodes:qcodes.parameters.Parameter` to values that
the parameters should be set to for the duration of the
measurement.
return_to_start :
Set parameters to their value before the measurement after
it is done.
preallocate_results :
Preallocate measurement results in the database. Should
normally only be set to a different value by subclasses.
cya :
Run :meth:`cya` after finishing the measurement.
live_plot :
Run the :attr:`~.measures.MeasureSet.plot_callback` hooks
each time a data point is taken. Individual measures can be
excluded by setting their
:attr:`~.measures.Measure.live_plot` attribute to False.
show_progress :
Show a progress bar for the outermost sweep loop.
threaded_acquisition :
Use :class:`~qcodes:qcodes.dataset.ThreadPoolParamsCaller`
to acquire data asynchronously.
write_in_background :
Write data to the db in a background thread.
metadata :
Runtime metadata either as a tuple (tag, metadata) or just
metadata.
default_sweeps_kwargs :
Kwargs that are passed on to :meth:`add_default_sweeps`.
loop_kwargs :
Kwargs that are passed on to :meth:`loop`.
initialize_plotting_kwargs :
Kwargs that are passed on to
:meth:`~.measures.MeasureSet.initialize_plotting`.
**initialization_settings :
Kwargs that are passed on to :meth:`initialize`.
Returns
-------
dataset :
The :class:`~qcodes:qcodes.dataset.data_set.DataSet` holding
the measurement.
"""
if sweeps is None:
sweeps = SweepList([])
elif isinstance(sweeps, SweepProtocol):
sweeps = SweepList([sweeps])
elif not isinstance(sweeps, SweepList):
raise TypeError('Expected sweeps to be AbstractSweep or SweepList, not '
f'{type(sweeps)}')
if measures is None:
measures = MeasureSet()
elif isinstance(measures, ParameterBase):
measures = MeasureSet([Measure(measures)])
elif isinstance(measures, Measure):
measures = MeasureSet([measures])
elif not isinstance(measures, MeasureSet):
raise TypeError('Expected measures to be None, Measure, or MeasureSet, not '
f'{type(measures)}')
if metadata is not None:
if isinstance(metadata, Sequence) and len(metadata) == 2:
tag, metadata = metadata
else:
tag = 'runtime_metadata'
metadata = cast(str, json.dumps(metadata, cls=NumpyJSONEncoder))
# --> Subclass implements
sweeps = self.add_default_sweeps(sweeps, measures, **(default_sweeps_kwargs or {}))
measures = self.add_default_measures(sweeps, measures)
add_before_run = self.add_before_run(sweeps, measures) + tuple(add_before_run)
add_after_run = self.add_after_run(sweeps, measures) + tuple(add_after_run)
# <--
measurement = Measurement(
self.experiment, self.station,
name='_'.join(itertools.chain([self.type] if self.type else [],
map(str, sweeps.params)))
)
for sweep_param in sweeps.params:
measurement.register_parameter(sweep_param)
for measure in measures:
if isinstance(measure.param, ParameterBase):
measurement.register_parameter(measure.param, setpoints=sweeps.params)
for delegate in measure.delegates:
measurement.register_parameter(delegate, setpoints=sweeps.params)
elif callable(measure.param):
measurement.register_custom_parameter(measure.param.__qualname__,
setpoints=sweeps.params)
for func, args in add_before_run:
measurement.add_before_run(func, args)
for func, args in add_after_run:
measurement.add_after_run(func, args)
if return_to_start:
for sweep_param in sweeps.params:
measurement.add_after_run(sweep_param.set, (sweep_param.get(),))
# --> Subclass implements
self.initialize(**initialization_settings)
self.run_assertions(sweeps, measures)
# Set default contexts last so that custom contexts can not be blocked if the same
# parameter is used in different contexts (eg., the shutter position)
# TODO: Does this make sense?
parameter_contexts = (
(parameter_contexts or {})
| (self.get_leakage_parameter_contexts(sweeps) if self.measure_leakage else {})
| self.get_default_parameter_contexts(sweeps, measures)
)
custom_metadata = json.dumps(
self.metadata
| self.get_custom_metadata(sweeps, measures)
| {'measurement_initialization_settings': initialization_settings,
'measurement_parameter_contexts': {param.full_name: val
for param, val
in parameter_contexts.items()}},
cls=NumpyJSONEncoder
)
# <--
if preallocate_results:
# Needed to specify shape in incomplete measurements (eg b/c of break conditions)
# Do so after initializations (shape might change)
measurement.set_shapes({full_name: sweeps.shape + shape
for full_name, shape in zip(
measures.full_names + measures.full_names_delegates,
measures.shapes + measures.shapes_delegates
)})
# Only register non-leakage measures with *ParamsCaller as leakage is measured separately
measures_without_leakage = measures - self.get_leakage_measures(sweeps)
if threaded_acquisition and len(measures_without_leakage):
params_caller = ThreadPoolParamsCaller(*measures_without_leakage.params)
else:
params_caller = SequentialParamsCaller(*measures_without_leakage.params)
if live_plot:
measures.initialize_plotting(sweeps, **(initialize_plotting_kwargs or {}))
else:
measures.reset_plotting()
try:
with contextlib.ExitStack() as stack:
for parameter, value in parameter_contexts.items():
stack.enter_context(parameter.set_to(value, allow_changes=True))
params_caller_context = stack.enter_context(params_caller)
datasaver = stack.enter_context(measurement.run(write_in_background))
datasaver.dataset.add_metadata('custom_metadata', custom_metadata)
if metadata is not None:
datasaver.dataset.add_metadata(tag, metadata)
self.loop(datasaver, params_caller_context, sweeps, measures,
show_progress=show_progress, **(loop_kwargs or {}))
except KeyboardInterrupt:
print('Interrupted.')
finally:
# To make sure cya always succeeds, we extract it from the various measurement contexts
# which might block parameters used in cya
if cya:
self.cya()
try:
return datasaver.dataset
except UnboundLocalError as err:
raise RuntimeError from err
@property
@abc.abstractmethod
def type(self) -> str:
"""An identifier for the type of measurement this class executes."""
pass
[docs]
@abc.abstractmethod
def add_default_sweeps(self, sweeps: SweepList, measures: MeasureSet,
**default_sweeps_kwargs) -> SweepList:
"""Add default sweep to the list *sweeps*."""
pass
[docs]
@abc.abstractmethod
def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet:
"""Add default parameters to measure to the set *measures*.
:attr:`sweeps.leak_measures` is kept separate to be able to act
conditioned on their value.
"""
pass
[docs]
@abc.abstractmethod
def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
"""Default actions to run before starting the measurement."""
pass
[docs]
@abc.abstractmethod
def add_after_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
"""Default actions to run after finishing the measurement."""
pass
[docs]
@abc.abstractmethod
def initialize(self, **initialization_settings):
"""Initialize instruments for the measurement."""
pass
[docs]
@abc.abstractmethod
def run_assertions(self, sweeps: SweepList, measures: MeasureSet):
"""Assert conditions necessary for the measurement."""
assert len(sweeps), 'Not sweeping anything'
assert len(measures), 'Not measuring anything'
[docs]
@abc.abstractmethod
def get_default_parameter_contexts(self, sweeps: SweepList,
measures: MeasureSet) -> dict[ParameterBase, Any]:
"""Default parameter values for each measurement."""
pass
[docs]
class DefaultMeasurementHandler(MeasurementHandler):
"""The default measurement handler, which does nothing on the side.
"""
@property
def type(self) -> str:
return ''
[docs]
def add_default_sweeps(self, sweeps: SweepList, measures: MeasureSet,
**default_sweeps_kwargs) -> SweepList:
return sweeps
[docs]
def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet:
measures |= self.get_leakage_measures(sweeps)
return measures
[docs]
def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
return ()
[docs]
def add_after_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
return ()
[docs]
def initialize(self, **initialization_settings):
pass
[docs]
def run_assertions(self, sweeps: SweepList, measures: MeasureSet):
super().run_assertions(sweeps, measures)
[docs]
def get_default_parameter_contexts(self, sweeps: SweepList,
measures: MeasureSet) -> dict[ParameterBase, Any]:
return {}
[docs]
@dataclasses.dataclass
class CcdMeasurementHandler(DefaultMeasurementHandler):
"""Handler for measurements using the CCD."""
single_track_settings: dict[int, tuple[int, int]] = dataclasses.field(
default_factory=lambda: {600: (132, 18), 1800: (116, 19)}
)
"""Settings for single-track acquisition mode of the CCD."""
@property
def ccd(self) -> AndorIDus4xx:
return self.station.detection_path.ccd
@property
def type(self) -> str:
return 'ccd'
[docs]
def cya(self) -> bool:
"""Disables CCD cooler and white light."""
if not super().cya():
return False
try:
self.station.detection_path.ccd.warm_up()
except Exception as err:
print(f'Failed to disable the CCD cooler: {err}')
try:
self.station.excitation_path.white_light.enabled(False)
except Exception as err:
print(f'Failed to disable the white light: {err}')
return True
[docs]
def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet:
measures = super().add_default_measures(sweeps, measures)
if self.ccd.ccd_data not in (find_param_source(p) for p in measures.params):
measures |= self.ccd.ccd_data_rate_bg_corrected
return measures
[docs]
def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
return ((self.assert_background_acquired, ()),)
[docs]
def initialize(self, **initialization_settings):
self.initialize_ccd_single_acquisition(**initialization_settings)
[docs]
def initialize_ccd_single_acquisition(
self,
exposure_time: float = 1.0,
preamp_gain: float = 1.0,
horizontal_shift_speed: float = 0.1,
vertical_shift_speed: float | None = None,
read_mode: str = 'single track',
acquisition_mode: Literal['single scan', 'accumulate'] = 'accumulate',
number_accumulations: int = 2,
number_kinetics: int = 1,
accumulation_cycle_time: float = 0.,
kinetic_cycle_time: float = 0.,
**_
):
"""Set the CCD up for acquisition of a single buffer."""
self.ccd.preamp_gain(preamp_gain)
self.ccd.horizontal_shift_speed(horizontal_shift_speed)
self.ccd.vertical_shift_speed(vertical_shift_speed or
self.ccd.fastest_recommended_vertical_shift_speed())
self.ccd.read_mode(read_mode)
self.ccd.single_track_settings(self.single_track_settings[
int(self.station.detection_path.active_grating().name_parts[-1].split('_')[-1])
])
self.ccd.acquisition_mode(acquisition_mode)
self.ccd.number_accumulations(number_accumulations)
self.ccd.number_kinetics(number_kinetics)
self.ccd.exposure_time(exposure_time)
self.ccd.accumulation_cycle_time(accumulation_cycle_time)
self.ccd.kinetic_cycle_time(kinetic_cycle_time)
if (
(acquisition_mode == 'accumulate' and number_accumulations > 1)
or (
acquisition_mode == 'kinetics'
and (number_accumulations > 1 or number_kinetics > 1)
)
):
self.ccd.cosmic_ray_filter_mode(True)
# assert_background_acquired is run in a add_before_run hook so that all parameter contexts
# are taken care of
[docs]
def initialize_ccd_buffered_acquisition(self,
n_pts: int,
exposure_time: float = 1.0,
preamp_gain: float = 1.0,
horizontal_shift_speed: float = 0.1,
vertical_shift_speed: float | None = None,
delay: float = 0.0,
**_) -> float:
"""Set the CCD up for buffered acquisition. Unused."""
self.ccd.preamp_gain(preamp_gain)
self.ccd.horizontal_shift_speed(horizontal_shift_speed)
self.ccd.vertical_shift_speed(vertical_shift_speed or
self.ccd.fastest_recommended_vertical_shift_speed())
self.ccd.trigger_mode('external')
self.ccd.read_mode('single track')
self.ccd.single_track_settings(self.single_track_settings)
self.ccd.acquisition_mode('kinetics')
self.ccd.number_kinetics(n_pts)
self.ccd.exposure_time(exposure_time)
self.ccd.kinetic_cycle_time(self.ccd.exposure_time() + delay)
return self.ccd.kinetic_cycle_time.get()
[docs]
def get_default_parameter_contexts(self, sweeps, measures) -> dict[ParameterBase, Any]:
return (
super().get_default_parameter_contexts(sweeps, measures)
| {self.station.detection_path.active_detection_path: 'ccd'}
)
[docs]
@abc.abstractmethod
def assert_background_acquired(self):
"""This method needs to be run after all parameters (possibly in
parameter contexts) are set. Hence, it cannot be included in
:meth:`run_assertions`."""
pass
[docs]
@dataclasses.dataclass
class LaserCcdMeasurementHandler(CcdMeasurementHandler):
"""Handler for measurements with Laser+CCD."""
@property
def type(self) -> str:
return 'laser_ccd'
[docs]
def cya(self) -> bool:
"""Disables CCD cooler."""
if not super().cya():
return False
try:
self.station.excitation_path.disable_laser()
except Exception as err:
print(f'Failed to disable the laser: {err}')
return True
[docs]
def add_default_measures(self, sweeps: SweepList, measures: MeasureSet) -> MeasureSet:
measures = super().add_default_measures(sweeps, measures)
if self.station.excitation_path.power not in (find_param_source(p) for p in sweeps.params):
measures |= self.station.excitation_path.power_at_sample
return measures
[docs]
def add_before_run(self, sweeps: SweepList, measures: MeasureSet) -> tuple[_ActionsT, ...]:
actions = super().add_before_run(sweeps, measures)
return actions + ((self.station.excitation_path.wait_for_power_to_settle, (1e-2, 200)),)
[docs]
def run_assertions(self, sweeps: SweepList, measures: MeasureSet):
super().run_assertions(sweeps, measures)
# If we sweep the laser, it's not necessary to check if it's locked beforehand.
if not any(
any(find_param_source(param).underlying_instrument
is self.station.excitation_path.laser
for param in sweep.param)
for sweep in sweeps
):
if not self.station.excitation_path.laser.lock():
print('Locking laser.')
tic = time.time()
timeout = 10
self.station.excitation_path.laser.lock(True)
while (time.time() - tic) < timeout:
if self.station.excitation_path.laser.lock():
break
else:
raise AssertionError('Could not lock laser within 5s.')
[docs]
def get_default_parameter_contexts(self, sweeps: SweepList,
measures: MeasureSet) -> dict[ParameterBase, Any]:
return (
super().get_default_parameter_contexts(sweeps, measures)
| {self.station.excitation_path.shutter.position: 'open'}
)
[docs]
def assert_background_acquired(self):
# background_is_valid calls ccd.get_acquisition_timings() so params are up to date
if not self.ccd.background_is_valid:
# Need to hackily override the default behavior of a parameter context (which is to
# block) changes, so that this action can be run after the setting of parameter
# contexts which might include an open shutter
old_value = self.station.excitation_path.shutter.position.settable
try:
self.station.excitation_path.shutter.position._settable = True
with self.station.excitation_path.close_shutter():
print('Acquiring background.')
self.ccd.background.get()
finally:
self.station.excitation_path.shutter.position._settable = old_value
def _extract_active_tagger_channels(measures: MeasureSet) -> list[int]:
channels = []
for module in [module for measure in measures if
isinstance(module := measure.param.instrument, TimeTaggerMeasurement)]:
if hasattr(module, 'channels'):
# Generic
channels.extend(module.channels())
if hasattr(module, 'start_channel'):
# LogBins histogram
channels.append(module.start_channel())
if hasattr(module, 'click_channel'):
# LogBins histogram
channels.append(module.click_channel())
if hasattr(module, 'get_channel'):
# Combiner virtual channel
channels.append(module.get_channel())
channels = set(channels)
channels.discard(tt.CHANNEL_UNUSED)
return list(channels)
def _extract_qdac_current_params(params: Sequence[ParameterBase]) -> list[ParameterBase]:
return list(itertools.collapse((param.source_params
if isinstance(param, LeakageCurrentsParameter) else param
for param in params),
base_type=ParameterBase))