Source code for mjolnir.parameters

from __future__ import annotations

import abc
import time
import warnings
from contextlib import nullcontext, contextmanager
from typing import Any, Callable, Sequence, TypedDict, TypeVar, Self, cast

import numpy as np
from numpy import typing as npt
from qcodes import validators
from qcodes.instrument import InstrumentBase, ChannelList
from qcodes.parameters import (GroupedParameter, DelegateGroup, DelegateParameter, Parameter,
                               ParamRawDataType, GroupParameter, MultiParameter,
                               ElapsedTimeParameter, DelegateGroupParameter, ParamDataType,
                               ParameterBase)
from qcodes.parameters.cache import _Cache
from qcodes_contrib_drivers.drivers.QDevil import QDAC2
from qutil import functools, itertools

from .helpers import calculate_ramp_time_with_filter, find_param_source

_paramT = TypeVar('_paramT', bound=Parameter)


[docs] def with_root_instrument(cls: type[_paramT], root_instrument: InstrumentBase) -> type[_paramT]: """Patches a Parameter with a custom root_instrument.""" def fget(self) -> InstrumentBase | None: return root_instrument return type(cls.__name__, (cls,), {'root_instrument': property(fget, doc=cls.root_instrument.__doc__)})
[docs] def with_underlying_instrument(cls: type[_paramT], underlying_instrument: InstrumentBase) -> type[_paramT]: """Patches a Parameter with a custom underlying_instrument.""" def fget(self) -> InstrumentBase | None: return underlying_instrument return type(cls.__name__, (cls,), {'underlying_instrument': property(fget, doc=cls.underlying_instrument.__doc__)})
[docs] def with_delegates_with_side_effect(cls: type[Parameter]) -> type[Parameter]: """Patches a Parameter to execute side effects of its delegates. Only works with Parameters whose set_raw method may be overridden. TODO: Unused as of now. """ def __init__(self, *args, **kwargs) -> None: cls.__init__(self, *args, **kwargs) self.set = functools.chain(self.set, self.execute_side_effects) def execute_side_effects(self, *_) -> None: for delegate in self._delegates: try: # delegate may have also been patched to have delegates itself delegate.execute_side_effects() except AttributeError: # delegate is a simple DelegateParameterWithSideEffect delegate.set_side_effect(delegate, delegate.cache.get(False), delegate.cache.get(True)) def register_delegate(self, delegate: DelegateParameterWithSetSideEffect): self._delegates.add(delegate) class Dict(TypedDict): __init__: Callable[[Self, ...], None] _delegates: set[DelegateParameterWithSetSideEffect] register_delegate: Callable[[Self, DelegateParameterWithSetSideEffect], None] execute_side_effects: Callable[[Self], None] dct: Dict = {'__init__': __init__, '_delegates': set(), 'register_delegate': register_delegate, 'execute_side_effects': execute_side_effects} return type(cls.__name__, (cls,), dct)
[docs] class DelegateParameterWithUncertainty(DelegateParameter, MultiParameter): """A DelegateParameter that calls its source for n_avg_callback() times and returns a tuple of (mean, std).""" def __init__(self, name: str, source: Parameter, n_avg_callback: Callable[[], int], get_context=None, **kwargs: Any): super().__init__(name, source=source, names=(f'{source.name}_mean', f'{source.name}_std'), labels=(f'{source.label} mean', f'{source.label} std'), units=(source.unit, source.unit), shapes=((), ()), setpoints=((), ()), **kwargs) self._n_avg_callback = n_avg_callback self.get_context = get_context if get_context is not None else nullcontext self.cache = _Cache(self, max_val_age=kwargs.get('max_val_age', None))
[docs] def get_raw(self) -> tuple[float, float]: with self.get_context(): vals = np.array([self.source.get() for _ in range(self.n_avg)]) return vals.mean(), vals.std()
@property def n_avg(self) -> int: return self._n_avg_callback()
[docs] class DelegateParameterWithSetSideEffect(DelegateParameter): """Modifies the built-in :class:`~qcodes:qcodes.parameters.DelegateParameter` to allow for side effects on set events. If *source* was patched using :func:`with_delegates_with_side_effect`, instances of this class are registered as delegates thereof, meaning the set side effects are also executed if the source was set. Parameters ---------- set_side_effect : A callable that is run *after* every set event. Receives the parameter instance, the previous, and the set value as arguments. execute_before : Run the side effect before setting the parameter or after. """ def __init__(self, name: str, source: Parameter | None, *args: Any, set_side_effect: Callable[ [Self, ParamRawDataType, ParamRawDataType], Any ] = lambda *_: ..., execute_before: bool = False, run_side_effect_on_instantiation: bool = False, **kwargs: Any): super().__init__(name, source, *args, **kwargs) self.register_with_source() self.set_side_effect = set_side_effect self.execute_before = execute_before if run_side_effect_on_instantiation and self.cache.valid: # Run the side effect in case the source has been set before # registering self.set_side_effect(self, self.cache.get(), self.cache.get())
[docs] def set_raw(self, value: ParamRawDataType) -> None: old_value = self.cache.get(False) if self.execute_before: self.set_side_effect(self, old_value, value) super().set_raw(value) if not self.execute_before: if hasattr(self.source, '_delegates'): # side effect should have been triggered by the source parameter. pass else: self.set_side_effect(self, old_value, value)
[docs] def register_with_source(self): """Register with source parameter. Used with :func:`with_delegates_with_side_effect`. """ try: self.source.register_delegate(self) except AttributeError: # Source has not been made delegate-aware pass
class _QdacErrorMixin: """Mix in method to check for errors after communicating with the instrument.""" _qdac: QDAC2.QDac2 def _handle_qdac_errors(self): match error := self._qdac.error(): case '0, "No error"': return case _: self._qdac.log.error(error)
[docs] class LeakageCurrentsParameter(_QdacErrorMixin, MultiParameter): """Groups several QDAC2 current sensors into a single parameter.""" def __init__(self, name: str, params: Sequence[Parameter], instrument: InstrumentBase | None = None, **kwargs: Any): self.source_params = [find_param_source(param) for param in params] self._channels = [param.instrument for param in self.source_params] assert len(set(param.underlying_instrument for param in self.source_params)) == 1, \ 'All params should be part of the same underlying instrument' self._qdac = cast(QDAC2.QDac2, self.source_params[0].underlying_instrument) super().__init__(name, names=tuple(param.name for param in params), shapes=tuple(() for _ in params), labels=tuple(param.label for param in params), units=tuple(param.unit for param in params), instrument=instrument, **kwargs) @property def counts(self) -> tuple[int, ...]: return tuple(channel.measurement_count.get() for channel in self._channels) @property def shapes(self) -> tuple[tuple[int], ...]: return tuple((counts,) if counts > 1 else () for counts in self.counts) @shapes.setter def shapes(self, val): # hacky override to always make this dynamically dependent on counts pass @property def setpoints(self) -> tuple[Sequence, ...]: # hacky override to always make this dynamically dependent on counts return tuple((np.arange(c),) if c > 1 else () for c in self.counts) @setpoints.setter def setpoints(self, val): pass
[docs] def get_raw(self) -> tuple[ParamRawDataType, ...]: v = self._qdac.ask("READ:CURR? (@{})".format( ','.join(str(ch.number) for ch in self._channels) )) self._handle_qdac_errors() res_tups = tuple(itertools.batched_variable(QDAC2.comma_sequence_to_list_of_floats(v), self.counts)) return tuple(tup if len(tup) > 1 else tup[0] for tup in res_tups)
[docs] class VirtualParameterContext(_QdacErrorMixin, GroupedParameter, QDAC2.Arrangement_Context, metaclass=abc.ABCMeta): """A GroupedParameter that combines several physical parameters and generates virtual parameters from them. It also inherits from the QDAC2's :class:`~qcodes_contrib_drivers:qcodes_contrib_drivers.drivers.QDevil.QDAC2.Arrangement_Context` to provide methods for virtual sweeps etc. """ def __init__(self, name: str, group: DelegateGroup, unit: str | None = None, label: str | None = None, output_triggers: dict[str, int] | None = None, internal_triggers: Sequence[str] | None = None, outer_trigger_channel: int | None = None, **kwargs: Any): GroupedParameter.__init__(self, name, group, unit, label, **kwargs) channels = [param.instrument for param in group.source_parameters] channel_numbers = [int(ch.name_parts[-1][-2:]) for ch in channels] QDAC2.Arrangement_Context.__init__(self, cast(QDAC2.QDac2, channels[0].parent), dict(zip(self.contact_names, channel_numbers)), output_triggers, internal_triggers, outer_trigger_channel) current_params = [] for (param_name, parameter), channel in zip(self.parameters.items(), self.channels): parameter.label = f'{self.instrument.label} {self.label} {param_name}' current_param = DelegateParameter( f'{self.name}_{param_name}_current', source=channel.read_current_A, label=f'{self.instrument.label} {self.name} {param_name} current', bind_to_instrument=True, instrument=self.instrument ) current_params.append(current_param) setattr(self, f'{param_name}_current', current_param) # XXX: Modifying the class seems hacky. setattr(self.__class__, param_name, self._generate_group_parameter_property(param_name)) currents_param = LeakageCurrentsParameter( f'{self.name}_currents', current_params, bind_to_instrument=True, instrument=self.instrument ) setattr(self, f'{self.name}_currents', currents_param) def __repr__(self) -> str: source_params = ", ".join(str(_) for _ in self.source_parameters) return f"<{self.__class__.__name__}(name={self.name}, source_parameters={source_params})>" @property @abc.abstractmethod def contact_names(self) -> Sequence[str]: pass def _generate_group_parameter_property(self, name) -> property: def fget(self) -> GroupParameter: return self.group.parameters[name] return property(fget) def _effectuate_virtual_voltages(self) -> None: """Overridden to step gates in parallel.""" slew_rates = self.channels.dc_slew_rate_V_per_s.get_latest() starts = self.channels.dc_constant_V.get() ends = self.actual_voltages() # Some of the channels might not be connected to the breakout box. RCs = itertools.replace_except((ch.bob_filter_RC.get_latest() for ch in self.channels), AttributeError, replacement=0.0) ramp_time = max(calculate_ramp_time_with_filter(slew_rate, abs(end - start), RC, 0.99) for slew_rate, start, end, RC in zip(slew_rates, starts, ends, RCs)) self._assert_in_bounds(ends) for i, (channel, voltage) in enumerate(zip(self.channel_numbers, ends)): self._qdac.write(f'SOUR{channel}:VOLT:TRIG {voltage}') self._handle_qdac_errors() self._qdac.write(f'SOUR:DC:INIT {self._all_channels_as_suffix()}') self._handle_qdac_errors() time.sleep(ramp_time) def _virtual_from_actual(self, actual: npt.ArrayLike) -> npt.NDArray[np.float64]: return np.linalg.solve(self._correction, actual) def _assert_in_bounds(self, voltages): for channel, parameter, voltage in zip(self.channels, self.parameters.values(), voltages): rng = channel.output_range.get_latest().lower() assert ( getattr(channel, f'output_{rng}_range_minimum_V').get_latest() <= voltage <= getattr(channel, f'output_{rng}_range_maximum_V').get_latest() ), f'{channel.name} out of bounds: {voltage}' parameter.validate(voltage) @property def root_instrument(self) -> InstrumentBase: return self._qdac @property def underlying_instrument(self) -> InstrumentBase: return self._qdac @functools.cached_property def channels(self) -> ChannelList: return self._qdac.channels[tuple(n - 1 for n in self.channel_numbers)] @property def current_parameters(self) -> tuple[Parameter, ...]: return tuple(getattr(self, f'{param_name}_current') for param_name in self.parameters)
[docs] def virtual_voltage(self, contact: str) -> float: """Virtual voltage applied to contact.""" self._virtual_voltages[:] = self._virtual_from_actual(self.get()) # Cast to python float from numpy dtype return float(super().virtual_voltage(contact))
_VirtualParameterContextT = TypeVar('_VirtualParameterContextT', bound=VirtualParameterContext)
[docs] class MembraneGateParameterContext(VirtualParameterContext): """Abstraction of a gate pair on two sides of a membrane. TODO: Could remove one layer of nesting and have all four gates of a trap make up one logical group. Would allow for virtual gates in the canonical sense. In that case, one could probably do away with the GroupedParameterChannel hacking. TODO: Document TODO: triggers """ top: DelegateGroupParameter bottom: DelegateGroupParameter def __init__(self, name: str, group: DelegateGroup, unit: str | None = None, label: str | None = None, output_triggers: dict[str, int] | None = None, internal_triggers: Sequence[str] | None = None, outer_trigger_channel: int | None = None, **kwargs: Any): super().__init__(name, group, unit, label, output_triggers, internal_triggers, outer_trigger_channel, **kwargs) # Set up the virtual gate matrix # DM --> "Virtual T gate" # CM --> "Virtual B gate" # # (T) 1 (1 1) (CM) # | | = - | | | | # (B) 2 (1 -1) (DM) self.initiate_correction('common_mode', [0.5, 0.5]) self.initiate_correction('difference_mode', [0.5, -0.5]) self.difference_mode = VirtualParameter( f'{self.name}_difference_mode', get_cmd=lambda: self.virtual_voltage('difference_mode'), set_cmd=lambda v: self.set_virtual_voltage('difference_mode', v), vals=validators.Numbers( self.top.source.vals.min_value - self.bottom.source.vals.max_value, self.top.source.vals.max_value - self.bottom.source.vals.min_value, ), unit='V', label=f"{self.instrument.label} {self.name} difference mode", instrument=self.instrument, parameter_context=self, ) self.common_mode = VirtualParameter( f'{self.name}_common_mode', get_cmd=lambda: self.virtual_voltage('common_mode'), set_cmd=lambda v: self.set_virtual_voltage('common_mode', v), vals=validators.Numbers( self.top.source.vals.min_value + self.bottom.source.vals.min_value, self.top.source.vals.max_value + self.bottom.source.vals.max_value, ), unit='V', label=f"{self.instrument.label} {self.name} common mode", instrument=self.instrument, parameter_context=self, ) @property def contact_names(self) -> Sequence[str]: return 'common_mode', 'difference_mode'
[docs] class VirtualParameter(Parameter): """A parameter part of a :class:`VirtualParameterContext`.""" def __init__(self, name: str, parameter_context: _VirtualParameterContextT, **kwargs: Any) -> None: super().__init__(name, **kwargs) self._parameter_context = parameter_context @property def parameter_context(self) -> _VirtualParameterContextT: return self._parameter_context
[docs] class TimeParameter(ElapsedTimeParameter): """A parameter that tracks elapsed time and can sleep until the set time has elapsed. Setting to, e.g., 10 will sleep until the time since instantiation or the last call to :meth:`reset_clock` is 10 seconds. """
[docs] def set_raw(self, value: ParamRawDataType) -> None: elapsed = time.perf_counter() - self.t0 if elapsed < value: time.sleep(value - elapsed) else: warnings.warn('More time elapsed than I would have waited for.', RuntimeWarning)
class _PowerMixin: """Overrides :meth:`set_to` to disable tuning when returning to previous value. Instead, it just restores the position of the ND-filter. """ @contextmanager def set_to(self: ParameterBase, value: ParamDataType, allow_changes: bool = True): if not allow_changes: raise NotImplementedError('allow_changes must be True') with self.instrument.nd_filter.position.restore_at_exit(allow_changes=True): yield self.set(value)
[docs] class PowerParameter(_PowerMixin, Parameter): pass
[docs] class DelegatePowerParameter(_PowerMixin, DelegateParameter): pass