from __future__ import annotations
import abc
import time
import warnings
from contextlib import nullcontext, contextmanager
from typing import Any, Callable, Sequence, TypedDict, TypeVar, Self, cast
import numpy as np
from numpy import typing as npt
from qcodes import validators
from qcodes.instrument import InstrumentBase, ChannelList
from qcodes.parameters import (GroupedParameter, DelegateGroup, DelegateParameter, Parameter,
ParamRawDataType, GroupParameter, MultiParameter,
ElapsedTimeParameter, DelegateGroupParameter, ParamDataType,
ParameterBase)
from qcodes.parameters.cache import _Cache
from qcodes_contrib_drivers.drivers.QDevil import QDAC2
from qutil import functools, itertools
from .helpers import calculate_ramp_time_with_filter, find_param_source
_paramT = TypeVar('_paramT', bound=Parameter)
[docs]
def with_root_instrument(cls: type[_paramT], root_instrument: InstrumentBase) -> type[_paramT]:
"""Patches a Parameter with a custom root_instrument."""
def fget(self) -> InstrumentBase | None:
return root_instrument
return type(cls.__name__, (cls,),
{'root_instrument': property(fget, doc=cls.root_instrument.__doc__)})
[docs]
def with_underlying_instrument(cls: type[_paramT],
underlying_instrument: InstrumentBase) -> type[_paramT]:
"""Patches a Parameter with a custom underlying_instrument."""
def fget(self) -> InstrumentBase | None:
return underlying_instrument
return type(cls.__name__, (cls,),
{'underlying_instrument': property(fget, doc=cls.underlying_instrument.__doc__)})
[docs]
def with_delegates_with_side_effect(cls: type[Parameter]) -> type[Parameter]:
"""Patches a Parameter to execute side effects of its delegates.
Only works with Parameters whose set_raw method may be overridden.
TODO:
Unused as of now.
"""
def __init__(self, *args, **kwargs) -> None:
cls.__init__(self, *args, **kwargs)
self.set = functools.chain(self.set, self.execute_side_effects)
def execute_side_effects(self, *_) -> None:
for delegate in self._delegates:
try:
# delegate may have also been patched to have delegates itself
delegate.execute_side_effects()
except AttributeError:
# delegate is a simple DelegateParameterWithSideEffect
delegate.set_side_effect(delegate,
delegate.cache.get(False),
delegate.cache.get(True))
def register_delegate(self, delegate: DelegateParameterWithSetSideEffect):
self._delegates.add(delegate)
class Dict(TypedDict):
__init__: Callable[[Self, ...], None]
_delegates: set[DelegateParameterWithSetSideEffect]
register_delegate: Callable[[Self, DelegateParameterWithSetSideEffect], None]
execute_side_effects: Callable[[Self], None]
dct: Dict = {'__init__': __init__, '_delegates': set(), 'register_delegate': register_delegate,
'execute_side_effects': execute_side_effects}
return type(cls.__name__, (cls,), dct)
[docs]
class DelegateParameterWithUncertainty(DelegateParameter, MultiParameter):
"""A DelegateParameter that calls its source for n_avg_callback()
times and returns a tuple of (mean, std)."""
def __init__(self, name: str, source: Parameter, n_avg_callback: Callable[[], int],
get_context=None, **kwargs: Any):
super().__init__(name,
source=source,
names=(f'{source.name}_mean', f'{source.name}_std'),
labels=(f'{source.label} mean', f'{source.label} std'),
units=(source.unit, source.unit),
shapes=((), ()),
setpoints=((), ()),
**kwargs)
self._n_avg_callback = n_avg_callback
self.get_context = get_context if get_context is not None else nullcontext
self.cache = _Cache(self, max_val_age=kwargs.get('max_val_age', None))
[docs]
def get_raw(self) -> tuple[float, float]:
with self.get_context():
vals = np.array([self.source.get() for _ in range(self.n_avg)])
return vals.mean(), vals.std()
@property
def n_avg(self) -> int:
return self._n_avg_callback()
[docs]
class DelegateParameterWithSetSideEffect(DelegateParameter):
"""Modifies the built-in
:class:`~qcodes:qcodes.parameters.DelegateParameter` to allow for
side effects on set events.
If *source* was patched using :func:`with_delegates_with_side_effect`,
instances of this class are registered as delegates thereof, meaning
the set side effects are also executed if the source was set.
Parameters
----------
set_side_effect :
A callable that is run *after* every set event. Receives
the parameter instance, the previous, and the set value as
arguments.
execute_before :
Run the side effect before setting the parameter or after.
"""
def __init__(self, name: str, source: Parameter | None, *args: Any,
set_side_effect: Callable[
[Self, ParamRawDataType, ParamRawDataType], Any
] = lambda *_: ...,
execute_before: bool = False,
run_side_effect_on_instantiation: bool = False,
**kwargs: Any):
super().__init__(name, source, *args, **kwargs)
self.register_with_source()
self.set_side_effect = set_side_effect
self.execute_before = execute_before
if run_side_effect_on_instantiation and self.cache.valid:
# Run the side effect in case the source has been set before
# registering
self.set_side_effect(self, self.cache.get(), self.cache.get())
[docs]
def set_raw(self, value: ParamRawDataType) -> None:
old_value = self.cache.get(False)
if self.execute_before:
self.set_side_effect(self, old_value, value)
super().set_raw(value)
if not self.execute_before:
if hasattr(self.source, '_delegates'):
# side effect should have been triggered by the source parameter.
pass
else:
self.set_side_effect(self, old_value, value)
[docs]
def register_with_source(self):
"""Register with source parameter.
Used with :func:`with_delegates_with_side_effect`.
"""
try:
self.source.register_delegate(self)
except AttributeError:
# Source has not been made delegate-aware
pass
class _QdacErrorMixin:
"""Mix in method to check for errors after communicating with the
instrument."""
_qdac: QDAC2.QDac2
def _handle_qdac_errors(self):
match error := self._qdac.error():
case '0, "No error"':
return
case _:
self._qdac.log.error(error)
[docs]
class LeakageCurrentsParameter(_QdacErrorMixin, MultiParameter):
"""Groups several QDAC2 current sensors into a single parameter."""
def __init__(self, name: str, params: Sequence[Parameter],
instrument: InstrumentBase | None = None, **kwargs: Any):
self.source_params = [find_param_source(param) for param in params]
self._channels = [param.instrument for param in self.source_params]
assert len(set(param.underlying_instrument for param in self.source_params)) == 1, \
'All params should be part of the same underlying instrument'
self._qdac = cast(QDAC2.QDac2, self.source_params[0].underlying_instrument)
super().__init__(name,
names=tuple(param.name for param in params),
shapes=tuple(() for _ in params),
labels=tuple(param.label for param in params),
units=tuple(param.unit for param in params),
instrument=instrument,
**kwargs)
@property
def counts(self) -> tuple[int, ...]:
return tuple(channel.measurement_count.get() for channel in self._channels)
@property
def shapes(self) -> tuple[tuple[int], ...]:
return tuple((counts,) if counts > 1 else () for counts in self.counts)
@shapes.setter
def shapes(self, val):
# hacky override to always make this dynamically dependent on counts
pass
@property
def setpoints(self) -> tuple[Sequence, ...]:
# hacky override to always make this dynamically dependent on counts
return tuple((np.arange(c),) if c > 1 else () for c in self.counts)
@setpoints.setter
def setpoints(self, val):
pass
[docs]
def get_raw(self) -> tuple[ParamRawDataType, ...]:
v = self._qdac.ask("READ:CURR? (@{})".format(
','.join(str(ch.number) for ch in self._channels)
))
self._handle_qdac_errors()
res_tups = tuple(itertools.batched_variable(QDAC2.comma_sequence_to_list_of_floats(v),
self.counts))
return tuple(tup if len(tup) > 1 else tup[0] for tup in res_tups)
[docs]
class VirtualParameterContext(_QdacErrorMixin, GroupedParameter, QDAC2.Arrangement_Context,
metaclass=abc.ABCMeta):
"""A GroupedParameter that combines several physical parameters and
generates virtual parameters from them.
It also inherits from the QDAC2's
:class:`~qcodes_contrib_drivers:qcodes_contrib_drivers.drivers.QDevil.QDAC2.Arrangement_Context`
to provide methods for virtual sweeps etc.
"""
def __init__(self, name: str, group: DelegateGroup,
unit: str | None = None,
label: str | None = None,
output_triggers: dict[str, int] | None = None,
internal_triggers: Sequence[str] | None = None,
outer_trigger_channel: int | None = None,
**kwargs: Any):
GroupedParameter.__init__(self, name, group, unit, label, **kwargs)
channels = [param.instrument for param in group.source_parameters]
channel_numbers = [int(ch.name_parts[-1][-2:]) for ch in channels]
QDAC2.Arrangement_Context.__init__(self, cast(QDAC2.QDac2, channels[0].parent),
dict(zip(self.contact_names, channel_numbers)),
output_triggers, internal_triggers,
outer_trigger_channel)
current_params = []
for (param_name, parameter), channel in zip(self.parameters.items(), self.channels):
parameter.label = f'{self.instrument.label} {self.label} {param_name}'
current_param = DelegateParameter(
f'{self.name}_{param_name}_current',
source=channel.read_current_A,
label=f'{self.instrument.label} {self.name} {param_name} current',
bind_to_instrument=True,
instrument=self.instrument
)
current_params.append(current_param)
setattr(self, f'{param_name}_current', current_param)
# XXX: Modifying the class seems hacky.
setattr(self.__class__, param_name,
self._generate_group_parameter_property(param_name))
currents_param = LeakageCurrentsParameter(
f'{self.name}_currents',
current_params,
bind_to_instrument=True,
instrument=self.instrument
)
setattr(self, f'{self.name}_currents', currents_param)
def __repr__(self) -> str:
source_params = ", ".join(str(_) for _ in self.source_parameters)
return f"<{self.__class__.__name__}(name={self.name}, source_parameters={source_params})>"
@property
@abc.abstractmethod
def contact_names(self) -> Sequence[str]:
pass
def _generate_group_parameter_property(self, name) -> property:
def fget(self) -> GroupParameter:
return self.group.parameters[name]
return property(fget)
def _effectuate_virtual_voltages(self) -> None:
"""Overridden to step gates in parallel."""
slew_rates = self.channels.dc_slew_rate_V_per_s.get_latest()
starts = self.channels.dc_constant_V.get()
ends = self.actual_voltages()
# Some of the channels might not be connected to the breakout box.
RCs = itertools.replace_except((ch.bob_filter_RC.get_latest() for ch in self.channels),
AttributeError, replacement=0.0)
ramp_time = max(calculate_ramp_time_with_filter(slew_rate, abs(end - start), RC, 0.99)
for slew_rate, start, end, RC in zip(slew_rates, starts, ends, RCs))
self._assert_in_bounds(ends)
for i, (channel, voltage) in enumerate(zip(self.channel_numbers, ends)):
self._qdac.write(f'SOUR{channel}:VOLT:TRIG {voltage}')
self._handle_qdac_errors()
self._qdac.write(f'SOUR:DC:INIT {self._all_channels_as_suffix()}')
self._handle_qdac_errors()
time.sleep(ramp_time)
def _virtual_from_actual(self, actual: npt.ArrayLike) -> npt.NDArray[np.float64]:
return np.linalg.solve(self._correction, actual)
def _assert_in_bounds(self, voltages):
for channel, parameter, voltage in zip(self.channels, self.parameters.values(), voltages):
rng = channel.output_range.get_latest().lower()
assert (
getattr(channel, f'output_{rng}_range_minimum_V').get_latest()
<= voltage
<= getattr(channel, f'output_{rng}_range_maximum_V').get_latest()
), f'{channel.name} out of bounds: {voltage}'
parameter.validate(voltage)
@property
def root_instrument(self) -> InstrumentBase:
return self._qdac
@property
def underlying_instrument(self) -> InstrumentBase:
return self._qdac
@functools.cached_property
def channels(self) -> ChannelList:
return self._qdac.channels[tuple(n - 1 for n in self.channel_numbers)]
@property
def current_parameters(self) -> tuple[Parameter, ...]:
return tuple(getattr(self, f'{param_name}_current') for param_name in self.parameters)
[docs]
def virtual_voltage(self, contact: str) -> float:
"""Virtual voltage applied to contact."""
self._virtual_voltages[:] = self._virtual_from_actual(self.get())
# Cast to python float from numpy dtype
return float(super().virtual_voltage(contact))
_VirtualParameterContextT = TypeVar('_VirtualParameterContextT', bound=VirtualParameterContext)
[docs]
class MembraneGateParameterContext(VirtualParameterContext):
"""Abstraction of a gate pair on two sides of a membrane.
TODO:
Could remove one layer of nesting and have all four gates of a
trap make up one logical group. Would allow for virtual gates
in the canonical sense. In that case, one could probably do
away with the GroupedParameterChannel hacking.
TODO:
Document
TODO:
triggers
"""
top: DelegateGroupParameter
bottom: DelegateGroupParameter
def __init__(self, name: str, group: DelegateGroup,
unit: str | None = None,
label: str | None = None,
output_triggers: dict[str, int] | None = None,
internal_triggers: Sequence[str] | None = None,
outer_trigger_channel: int | None = None,
**kwargs: Any):
super().__init__(name, group, unit, label, output_triggers, internal_triggers,
outer_trigger_channel, **kwargs)
# Set up the virtual gate matrix
# DM --> "Virtual T gate"
# CM --> "Virtual B gate"
#
# (T) 1 (1 1) (CM)
# | | = - | | | |
# (B) 2 (1 -1) (DM)
self.initiate_correction('common_mode', [0.5, 0.5])
self.initiate_correction('difference_mode', [0.5, -0.5])
self.difference_mode = VirtualParameter(
f'{self.name}_difference_mode',
get_cmd=lambda: self.virtual_voltage('difference_mode'),
set_cmd=lambda v: self.set_virtual_voltage('difference_mode', v),
vals=validators.Numbers(
self.top.source.vals.min_value - self.bottom.source.vals.max_value,
self.top.source.vals.max_value - self.bottom.source.vals.min_value,
),
unit='V',
label=f"{self.instrument.label} {self.name} difference mode",
instrument=self.instrument,
parameter_context=self,
)
self.common_mode = VirtualParameter(
f'{self.name}_common_mode',
get_cmd=lambda: self.virtual_voltage('common_mode'),
set_cmd=lambda v: self.set_virtual_voltage('common_mode', v),
vals=validators.Numbers(
self.top.source.vals.min_value + self.bottom.source.vals.min_value,
self.top.source.vals.max_value + self.bottom.source.vals.max_value,
),
unit='V',
label=f"{self.instrument.label} {self.name} common mode",
instrument=self.instrument,
parameter_context=self,
)
@property
def contact_names(self) -> Sequence[str]:
return 'common_mode', 'difference_mode'
[docs]
class VirtualParameter(Parameter):
"""A parameter part of a :class:`VirtualParameterContext`."""
def __init__(self, name: str, parameter_context: _VirtualParameterContextT,
**kwargs: Any) -> None:
super().__init__(name, **kwargs)
self._parameter_context = parameter_context
@property
def parameter_context(self) -> _VirtualParameterContextT:
return self._parameter_context
[docs]
class TimeParameter(ElapsedTimeParameter):
"""A parameter that tracks elapsed time and can sleep until the set
time has elapsed.
Setting to, e.g., 10 will sleep until the time since instantiation
or the last call to :meth:`reset_clock` is 10 seconds.
"""
[docs]
def set_raw(self, value: ParamRawDataType) -> None:
elapsed = time.perf_counter() - self.t0
if elapsed < value:
time.sleep(value - elapsed)
else:
warnings.warn('More time elapsed than I would have waited for.', RuntimeWarning)
class _PowerMixin:
"""Overrides :meth:`set_to` to disable tuning when returning to
previous value.
Instead, it just restores the position of the ND-filter.
"""
@contextmanager
def set_to(self: ParameterBase, value: ParamDataType, allow_changes: bool = True):
if not allow_changes:
raise NotImplementedError('allow_changes must be True')
with self.instrument.nd_filter.position.restore_at_exit(allow_changes=True):
yield self.set(value)
[docs]
class PowerParameter(_PowerMixin, Parameter): pass
[docs]
class DelegatePowerParameter(_PowerMixin, DelegateParameter): pass