diff --git a/qiskit_dynamics/pulse/pulse_to_signals.py b/qiskit_dynamics/pulse/pulse_to_signals.py index 12350e8da..4aafe7491 100644 --- a/qiskit_dynamics/pulse/pulse_to_signals.py +++ b/qiskit_dynamics/pulse/pulse_to_signals.py @@ -38,21 +38,19 @@ class InstructionToSignals: """Converts pulse instructions to Signals to be used in models. - The :class:`InstructionsToSignals` class converts a pulse schedule to a list - of signals that can be given to a model. This conversion is done by calling - the :meth:`get_signals` method on a schedule. The converter applies to instances - of :class:`Schedule`. Instances of :class:`ScheduleBlock` must first be - converted to :class:`Schedule` using the :meth:`block_to_schedule` in - Qiskit pulse. - - The converter can be initialized - with the optional arguments ``carriers`` and ``channels``. These arguments - change the returned signals of :meth:`get_signals`. When ``channels`` is given - then only the signals specified by name in ``channels`` are returned. The - ``carriers`` dictionary allows the user to specify the carrier frequency of - the channels. Here, the keys are the channel name, e.g. ``d12`` for drive channel - number 12, and the values are the corresponding frequency. If a channel is not - present in ``carriers`` it is assumed that the carrier frequency is zero. + The :class:`InstructionsToSignals` class converts a pulse schedule to a list of signals that can + be given to a model. This conversion is done by calling the :meth:`get_signals` method on a + schedule. The converter applies to instances of :class:`Schedule`. Instances of + :class:`ScheduleBlock` must first be converted to :class:`Schedule` using the + :meth:`block_to_schedule` in Qiskit pulse. + + The converter can be initialized with the optional arguments ``carriers`` and ``channels``. + These arguments change the returned signals of :meth:`get_signals`. When ``channels`` is given + then only the signals specified by name in ``channels`` are returned. The ``carriers`` + dictionary allows the user to specify the carrier frequency of the channels. Here, the keys are + the channel name, e.g. ``d12`` for drive channel number 12, and the values are the corresponding + frequency. If a channel is not present in ``carriers`` it is assumed that the carrier frequency + is zero. """ def __init__( @@ -65,15 +63,15 @@ def __init__( Args: dt: Length of the samples. This is required by the converter as pulse - schedule are specified in units of dt and typically do not carry the - value of dt with them. + schedule are specified in units of dt and typically do not carry the value of dt + with them. carriers: A dict of carrier frequencies. The keys are the names of the channels - and the values are the corresponding carrier frequency. + and the values are the corresponding carrier frequency. channels: A list of channels that the :meth:`get_signals` method should return. - This argument will cause :meth:`get_signals` to return the signals in the - same order as the channels. Channels present in the schedule but absent - from channels will not be included in the returned object. If None is given - (the default) then all channels present in the pulse schedule are returned. + This argument will cause :meth:`get_signals` to return the signals in the same order + as the channels. Channels present in the schedule but absent from channels will not + be included in the returned object. If None is given (the default) then all channels + present in the pulse schedule are returned. """ self._dt = dt @@ -84,21 +82,22 @@ def get_signals(self, schedule: Schedule) -> List[DiscreteSignal]: """ Args: schedule: The schedule to represent in terms of signals. Instances of - :class:`ScheduleBlock` must first be converted to :class:`Schedule` - using the :meth:`block_to_schedule` in Qiskit pulse. + :class:`ScheduleBlock` must first be converted to :class:`Schedule` using the + :meth:`block_to_schedule` in Qiskit pulse. Returns: a list of piecewise constant signals. """ - signals, phases, frequency_shifts = {}, {}, {} + signals, phases, frequency_shifts, phase_accumulations = {}, {}, {}, {} if self._channels is not None: schedule = schedule.filter(channels=[self._get_channel(ch) for ch in self._channels]) - for idx, chan in enumerate(schedule.channels): + for chan in schedule.channels: phases[chan.name] = 0.0 frequency_shifts[chan.name] = 0.0 + phase_accumulations[chan.name] = 0.0 carrier_freq = self._carriers.get(chan.name, 0.0) @@ -113,9 +112,7 @@ def get_signals(self, schedule: Schedule) -> List[DiscreteSignal]: chan = inst.channel.name phi = phases[chan] freq = frequency_shifts[chan] - if isinstance(inst, Play): - start_idx = len(signals[chan].samples) # get the instruction samples inst_samples = None @@ -125,10 +122,12 @@ def get_signals(self, schedule: Schedule) -> List[DiscreteSignal]: inst_samples = inst.pulse.get_waveform().samples # build sample array to append to signal - samples = [] - for idx, sample in enumerate(inst_samples): - time = self._dt * (idx + start_idx) - samples.append(sample * np.exp(2.0j * np.pi * freq * time + 1.0j * phi)) + times = self._dt * (start_sample + np.arange(len(inst_samples))) + samples = inst_samples * np.exp( + 2.0j * np.pi * freq * times + + 1.0j * phi + + 2.0j * np.pi * phase_accumulations[chan] + ) signals[chan].add_samples(start_sample, samples) if isinstance(inst, ShiftPhase): @@ -136,11 +135,17 @@ def get_signals(self, schedule: Schedule) -> List[DiscreteSignal]: if isinstance(inst, ShiftFrequency): frequency_shifts[chan] += inst.frequency + phase_accumulations[chan] -= inst.frequency * start_sample * self._dt if isinstance(inst, SetPhase): phases[chan] = inst.phase if isinstance(inst, SetFrequency): + phase_accumulations[chan] -= ( + (inst.frequency - (frequency_shifts[chan] + signals[chan].carrier_freq)) + * start_sample + * self._dt + ) frequency_shifts[chan] = inst.frequency - signals[chan].carrier_freq # ensure all signals have the same number of samples @@ -166,7 +171,6 @@ def get_signals(self, schedule: Schedule) -> List[DiscreteSignal]: ) return_signals.append(signal) - return return_signals @staticmethod @@ -188,7 +192,7 @@ def get_awg_signals( Args: signals: A list of signals for which to create I and Q. if_modulation: The intermediate frequency with which the AWG modulates the pulse - envelopes. + envelopes. Returns: iq signals: A list of signals which is twice as long as the input list of signals. diff --git a/releasenotes/notes/fix_delay_and_phase_accumulation-25c8b4c501110c50.yaml b/releasenotes/notes/fix_delay_and_phase_accumulation-25c8b4c501110c50.yaml new file mode 100644 index 000000000..00f14af56 --- /dev/null +++ b/releasenotes/notes/fix_delay_and_phase_accumulation-25c8b4c501110c50.yaml @@ -0,0 +1,8 @@ +--- +fixes: + - | + :class:`.InstructionToSignals` has been updated to fix issues with phase accumulation + resulting from ``SetFrequency`` and ``ShiftFrequency`` instructions. A phase accumulation + term has been added so that the digitized carrier frequency is continuous across these + instructions. A related phase error when generating samples for channels subject to + ``SetFrequency`` and ``ShiftFrequency`` instructions has also been fixed. diff --git a/test/dynamics/pulse/test_pulse_to_signals.py b/test/dynamics/pulse/test_pulse_to_signals.py index 934c7a110..3aa61e0e8 100644 --- a/test/dynamics/pulse/test_pulse_to_signals.py +++ b/test/dynamics/pulse/test_pulse_to_signals.py @@ -23,6 +23,7 @@ ControlChannel, MeasureChannel, Play, + Delay, Drag, ShiftFrequency, SetFrequency, @@ -122,6 +123,98 @@ def test_set_frequency(self): for idx in range(10): self.assertEqual(signals[0].samples[idx], np.exp(2.0j * idx * np.pi * -1.0 * 0.222)) + def test_set_and_shift_frequency(self): + """Test that ShiftFrequency after SetFrequency is properly converted. It confirms + implementation of phase accumulation is correct.""" + + duration = 20 + unit_dt = 0.222 + sched = Schedule() + sched += SetFrequency(5.5, DriveChannel(0)) + sched += Play(Constant(duration=duration, amp=1.0), DriveChannel(0)) + sched += SetFrequency(6, DriveChannel(0)) + sched += Play(Constant(duration=duration, amp=1.0), DriveChannel(0)) + sched += ShiftFrequency(-0.5, DriveChannel(0)) + sched += Play(Constant(duration=duration, amp=1.0), DriveChannel(0)) + + freq_shift = 0.5 + phase_accumulation = 0.0 + all_samples = np.exp(2j * np.pi * freq_shift * unit_dt * np.arange(0, duration)) + + freq_shift = 1.0 + phase_accumulation -= (6.0 - 5.5) * duration * unit_dt + all_samples = np.append( + all_samples, + np.exp( + 2j + * np.pi + * (freq_shift * unit_dt * np.arange(duration, 2 * duration) + phase_accumulation) + ), + ) + + freq_shift = 0.5 + phase_accumulation -= -0.5 * 2 * duration * unit_dt + all_samples = np.append( + all_samples, + np.exp( + 2j + * np.pi + * ( + freq_shift * unit_dt * np.arange(2 * duration, 3 * duration) + + phase_accumulation + ) + ), + ) + + converter = InstructionToSignals(dt=unit_dt, carriers={"d0": 5.0}) + signals = converter.get_signals(sched) + self.assertAllClose(signals[0].samples, all_samples) + + def test_delay(self): + """Test that Delay is properly reflected.""" + + sched = Schedule() + sched += Play(Constant(duration=10, amp=1.0), DriveChannel(0)) + sched += Delay(10, DriveChannel(0)) + sched += Play(Constant(duration=10, amp=1.0), DriveChannel(0)) + + converter = InstructionToSignals(dt=0.222, carriers={"d0": 5.0}) + signals = converter.get_signals(sched) + samples_with_delay = np.array([1] * 10 + [0] * 10 + [1] * 10) + for idx in range(30): + self.assertEqual(signals[0].samples[idx], samples_with_delay[idx]) + + def test_delay_and_shift_frequency(self): + """Test that delay after SetFrequency is properly converted. + It confirms implementation of phase accumulation is correct.""" + + duration = 20 + unit_dt = 0.222 + sched = Schedule() + sched += Play(Constant(duration=duration, amp=1.0), DriveChannel(0)) + sched += ShiftFrequency(1.0, DriveChannel(0)) + sched += Delay(duration, DriveChannel(0)) + sched += Play(Constant(duration=duration, amp=1.0), DriveChannel(0)) + + freq_shift = 1.0 + phase_accumulation = -1.0 * duration * unit_dt + phase_accumulation = -1.0 * duration * unit_dt + all_samples = np.append( + np.append(np.ones(duration), np.zeros(duration)), + np.exp( + 2j + * np.pi + * ( + freq_shift * unit_dt * np.arange(2 * duration, 3 * duration) + + phase_accumulation + ) + ), + ) + + converter = InstructionToSignals(dt=unit_dt, carriers={"d0": 5.0}) + signals = converter.get_signals(sched) + self.assertAllClose(signals[0].samples, all_samples) + def test_uneven_pulse_length(self): """Test conversion when length of pulses on a schedule is uneven."""