diff --git a/qiskit/transpiler/passes/routing/stochastic_swap.py b/qiskit/transpiler/passes/routing/stochastic_swap.py index 12a640fb7e61..ea0c8d206740 100644 --- a/qiskit/transpiler/passes/routing/stochastic_swap.py +++ b/qiskit/transpiler/passes/routing/stochastic_swap.py @@ -26,7 +26,7 @@ from qiskit.circuit import IfElseOp, WhileLoopOp, ForLoopOp, ControlFlowOp from qiskit._accelerate import stochastic_swap as stochastic_swap_rs -from .utils import combine_permutations, get_swap_map_dag +from .utils import get_swap_map_dag logger = logging.getLogger(__name__) @@ -70,7 +70,8 @@ def __init__(self, coupling_map, trials=20, seed=None, fake_run=False, initial_l self.fake_run = fake_run self.qregs = None self.initial_layout = initial_layout - self._qubit_indices = None + self._qubit_to_int = None + self._int_to_qubit = None def run(self, dag): """Run the StochasticSwap pass on `dag`. @@ -97,7 +98,10 @@ def run(self, dag): canonical_register = dag.qregs["q"] if self.initial_layout is None: self.initial_layout = Layout.generate_trivial_layout(canonical_register) - self._qubit_indices = {bit: idx for idx, bit in enumerate(dag.qubits)} + # Qubit indices are used to assign an integer to each virtual qubit during the routing: it's + # a mapping of {virtual: virtual}, for converting between Python and Rust forms. + self._qubit_to_int = {bit: idx for idx, bit in enumerate(dag.qubits)} + self._int_to_qubit = tuple(dag.qubits) self.qregs = dag.qregs logger.debug("StochasticSwap rng seeded with seed=%s", self.seed) @@ -174,18 +178,18 @@ def _layer_permutation(self, layer_partition, layout, qubit_subset, coupling, tr cdist2 = coupling._dist_matrix**2 int_qubit_subset = np.fromiter( - (self._qubit_indices[bit] for bit in qubit_subset), + (self._qubit_to_int[bit] for bit in qubit_subset), dtype=np.uintp, count=len(qubit_subset), ) int_gates = np.fromiter( - (self._qubit_indices[bit] for gate in gates for bit in gate), + (self._qubit_to_int[bit] for gate in gates for bit in gate), dtype=np.uintp, count=2 * len(gates), ) - layout_mapping = {self._qubit_indices[k]: v for k, v in layout.get_virtual_bits().items()} + layout_mapping = {self._qubit_to_int[k]: v for k, v in layout.get_virtual_bits().items()} int_layout = stochastic_swap_rs.NLayout(layout_mapping, num_qubits, coupling.size()) trial_circuit = DAGCircuit() # SWAP circuit for slice of swaps in this trial @@ -204,16 +208,15 @@ def _layer_permutation(self, layer_partition, layout, qubit_subset, coupling, tr edges, seed=self.seed, ) - # If we have no best circuit for this layer, all of the - # trials have failed + # If we have no best circuit for this layer, all of the trials have failed if best_layout is None: logger.debug("layer_permutation: failed!") return False, None, None, None edges = best_edges.edges() for idx in range(len(edges) // 2): - swap_src = self.initial_layout._p2v[edges[2 * idx]] - swap_tgt = self.initial_layout._p2v[edges[2 * idx + 1]] + swap_src = self._int_to_qubit[edges[2 * idx]] + swap_tgt = self._int_to_qubit[edges[2 * idx + 1]] trial_circuit.apply_operation_back(SwapGate(), [swap_src, swap_tgt], []) best_circuit = trial_circuit @@ -234,24 +237,17 @@ def _layer_update(self, dag, layer, best_layout, best_depth, best_circuit): best_depth (int): depth returned from _layer_permutation best_circuit (DAGCircuit): swap circuit returned from _layer_permutation """ - layout = best_layout - logger.debug("layer_update: layout = %s", layout) + logger.debug("layer_update: layout = %s", best_layout) logger.debug("layer_update: self.initial_layout = %s", self.initial_layout) # Output any swaps if best_depth > 0: logger.debug("layer_update: there are swaps in this layer, depth %d", best_depth) - dag.compose(best_circuit) + dag.compose(best_circuit, qubits={bit: bit for bit in best_circuit.qubits}) else: logger.debug("layer_update: there are no swaps in this layer") # Output this layer - layer_circuit = layer["graph"] - initial_v2p = self.initial_layout.get_virtual_bits() - new_v2p = layout.get_virtual_bits() - initial_order = [initial_v2p[qubit] for qubit in dag.qubits] - new_order = [new_v2p[qubit] for qubit in dag.qubits] - order = combine_permutations(initial_order, new_order) - dag.compose(layer_circuit, qubits=order) + dag.compose(layer["graph"], qubits=best_layout.reorder_bits(dag.qubits)) def _mapper(self, circuit_graph, coupling_graph, trials=20): """Map a DAGCircuit onto a CouplingMap using swap gates. @@ -376,29 +372,18 @@ def _controlflow_layer_update(self, dagcircuit_output, layer_dag, current_layout """ cf_opnode = layer_dag.op_nodes()[0] if isinstance(cf_opnode.op, IfElseOp): - updated_ctrl_op, cf_layout, idle_qubits = self._route_control_flow_multiblock( + new_op, new_qargs, new_layout = self._route_control_flow_multiblock( cf_opnode, current_layout, root_dag ) elif isinstance(cf_opnode.op, (ForLoopOp, WhileLoopOp)): - updated_ctrl_op, cf_layout, idle_qubits = self._route_control_flow_looping( + new_op, new_qargs, new_layout = self._route_control_flow_looping( cf_opnode, current_layout, root_dag ) else: raise TranspilerError(f"unsupported control flow operation: {cf_opnode}") - if self.fake_run: - return cf_layout - - cf_layer_dag = DAGCircuit() - cf_qubits = [qubit for qubit in root_dag.qubits if qubit not in idle_qubits] - qreg = QuantumRegister(len(cf_qubits), "q") - cf_layer_dag.add_qreg(qreg) - for creg in layer_dag.cregs.values(): - cf_layer_dag.add_creg(creg) - cf_layer_dag.apply_operation_back(updated_ctrl_op, cf_layer_dag.qubits, cf_opnode.cargs) - target_qubits = [qubit for qubit in dagcircuit_output.qubits if qubit not in idle_qubits] - order = current_layout.reorder_bits(target_qubits) - dagcircuit_output.compose(cf_layer_dag, qubits=order) - return cf_layout + if not self.fake_run: + dagcircuit_output.apply_operation_back(new_op, new_qargs, cf_opnode.cargs) + return new_layout def _new_seed(self): """Get a seed for a new RNG instance.""" @@ -432,19 +417,19 @@ def _route_control_flow_multiblock(self, node, current_layout, root_dag): Returns: ControlFlowOp: routed control flow operation. - final_layout (Layout): layout after instruction. - list(Qubit): list of idle qubits in controlflow layer. + List[Qubit]: the new physical-qubit arguments that the output `ControlFlowOp` should be + applied to. This might be wider than the input node if internal routing was needed. + Layout: the new layout after the control-flow operation is applied. """ # For each block, expand it up be the full width of the containing DAG so we can be certain # that it is routable, then route it within that. When we recombine later, we'll reduce all # these blocks down to remove any qubits that are idle. block_dags = [] block_layouts = [] - order = [self._qubit_indices[bit] for bit in node.qargs] for block in node.op.blocks: inner_pass = self._recursive_pass(current_layout) full_dag_block = root_dag.copy_empty_like() - full_dag_block.compose(circuit_to_dag(block), qubits=order) + full_dag_block.compose(circuit_to_dag(block), qubits=node.qargs) block_dags.append(inner_pass.run(full_dag_block)) block_layouts.append(inner_pass.property_set["final_layout"].copy()) @@ -455,23 +440,19 @@ def _route_control_flow_multiblock(self, node, current_layout, root_dag): deepest_index = np.argmax([block.depth(recurse=True) for block in block_dags]) final_layout = block_layouts[deepest_index] if self.fake_run: - return None, final_layout, None - p2v = current_layout.get_physical_bits() + return None, None, final_layout idle_qubits = set(root_dag.qubits) for i, updated_dag_block in enumerate(block_dags): if i != deepest_index: - swap_circuit, swap_qubits = get_swap_map_dag( + swap_dag, swap_qubits = get_swap_map_dag( root_dag, self.coupling_map, block_layouts[i], final_layout, seed=self._new_seed(), ) - if swap_circuit.depth(): - virtual_swap_dag = updated_dag_block.copy_empty_like() - order = [p2v[virtual_swap_dag.qubits.index(qubit)] for qubit in swap_qubits] - virtual_swap_dag.compose(swap_circuit, qubits=order) - updated_dag_block.compose(virtual_swap_dag) + if swap_dag.depth(): + updated_dag_block.compose(swap_dag, qubits=swap_qubits) idle_qubits &= set(updated_dag_block.idle_wires()) # Now for each block, expand it to be full width over all active wires (all blocks of a @@ -479,18 +460,8 @@ def _route_control_flow_multiblock(self, node, current_layout, root_dag): block_circuits = [] for i, updated_dag_block in enumerate(block_dags): updated_dag_block.remove_qubits(*idle_qubits) - new_dag_block = DAGCircuit() - new_num_qubits = updated_dag_block.num_qubits() - qreg = QuantumRegister(new_num_qubits, "q") - new_dag_block.add_qreg(qreg) - for creg in updated_dag_block.cregs.values(): - new_dag_block.add_creg(creg) - for inner_node in updated_dag_block.op_nodes(): - new_qargs = [qreg[updated_dag_block.qubits.index(bit)] for bit in inner_node.qargs] - new_dag_block.apply_operation_back(inner_node.op, new_qargs, inner_node.cargs) - block_circuits.append(dag_to_circuit(new_dag_block)) - - return node.op.replace_blocks(block_circuits), final_layout, idle_qubits + block_circuits.append(dag_to_circuit(updated_dag_block)) + return node.op.replace_blocks(block_circuits), block_circuits[0].qubits, final_layout def _route_control_flow_looping(self, node, current_layout, root_dag): """Route a control-flow operation that represents a loop, such as :class:`.ForOpLoop` or @@ -505,45 +476,36 @@ def _route_control_flow_looping(self, node, current_layout, root_dag): Returns: ControlFlowOp: routed control flow operation. - Layout: layout after instruction (this will be the same as the input layout). - list(Qubit): list of idle qubits in controlflow layer. + List[Qubit]: the new physical-qubit arguments that the output `ControlFlowOp` should be + applied to. This might be wider than the input node if internal routing was needed. + Layout: the new layout after the control-flow operation is applied. """ if self.fake_run: - return None, current_layout, None + return None, None, current_layout # Temporarily expand to full width, and route within that. inner_pass = self._recursive_pass(current_layout) - order = [self._qubit_indices[bit] for bit in node.qargs] full_dag_block = root_dag.copy_empty_like() - full_dag_block.compose(circuit_to_dag(node.op.blocks[0]), qubits=order) + full_dag_block.compose(circuit_to_dag(node.op.blocks[0]), qubits=node.qargs) updated_dag_block = inner_pass.run(full_dag_block) # Ensure that the layout at the end of the block is returned to being the layout at the # start of the block again, so the loop works. - swap_circuit, swap_qubits = get_swap_map_dag( + swap_dag, swap_qubits = get_swap_map_dag( root_dag, self.coupling_map, inner_pass.property_set["final_layout"], current_layout, seed=self._new_seed(), ) - if swap_circuit.depth(): - p2v = current_layout.get_physical_bits() - virtual_swap_dag = updated_dag_block.copy_empty_like() - order = [p2v[virtual_swap_dag.qubits.index(qubit)] for qubit in swap_qubits] - virtual_swap_dag.compose(swap_circuit, qubits=order) - updated_dag_block.compose(virtual_swap_dag) + if swap_dag.depth(): + updated_dag_block.compose(swap_dag, qubits=swap_qubits) # Contract the routed block back down to only operate on the qubits that it actually needs. idle_qubits = set(root_dag.qubits) & set(updated_dag_block.idle_wires()) updated_dag_block.remove_qubits(*idle_qubits) - new_dag_block = DAGCircuit() - new_num_qubits = updated_dag_block.num_qubits() - qreg = QuantumRegister(new_num_qubits, "q") - new_dag_block.add_qreg(qreg) - for creg in updated_dag_block.cregs.values(): - new_dag_block.add_creg(creg) - for inner_node in updated_dag_block.op_nodes(): - new_qargs = [qreg[updated_dag_block.qubits.index(bit)] for bit in inner_node.qargs] - new_dag_block.apply_operation_back(inner_node.op, new_qargs, inner_node.cargs) - updated_circ_block = dag_to_circuit(new_dag_block) - return node.op.replace_blocks([updated_circ_block]), current_layout, idle_qubits + updated_circ_block = dag_to_circuit(updated_dag_block) + return ( + node.op.replace_blocks([updated_circ_block]), + updated_dag_block.qubits, + current_layout, + ) diff --git a/qiskit/transpiler/passes/routing/utils.py b/qiskit/transpiler/passes/routing/utils.py index cea36b99c744..675b085113fa 100644 --- a/qiskit/transpiler/passes/routing/utils.py +++ b/qiskit/transpiler/passes/routing/utils.py @@ -12,49 +12,24 @@ """Utility functions for routing""" -from qiskit.transpiler import CouplingMap from qiskit.transpiler.exceptions import TranspilerError from .algorithms import ApproximateTokenSwapper -def combine_permutations(*permutations): - """ - Chain a series of permutations. - - Args: - *permutations (list(int)): permutations to combine - - Returns: - list: combined permutation - """ - order = permutations[0] - for this_order in permutations[1:]: - order = [order[i] for i in this_order] - return order - - def get_swap_map_dag(dag, coupling_map, from_layout, to_layout, seed, trials=4): - """Get the circuit of swaps to go from from_layout to to_layout, and the qubit ordering of the - qubits in that circuit.""" - + """Get the circuit of swaps to go from from_layout to to_layout, and the physical qubits + (integers) that the swap circuit should be applied on.""" if len(dag.qregs) != 1 or dag.qregs.get("q", None) is None: raise TranspilerError("layout transformation runs on physical circuits only") - if len(dag.qubits) > len(coupling_map.physical_qubits): raise TranspilerError("The layout does not match the amount of qubits in the DAG") - - if coupling_map: - graph = coupling_map.graph.to_undirected() - else: - coupling_map = CouplingMap.from_full(len(to_layout)) - graph = coupling_map.graph.to_undirected() - - token_swapper = ApproximateTokenSwapper(graph, seed) + token_swapper = ApproximateTokenSwapper(coupling_map.graph.to_undirected(), seed) # Find the permutation between the initial physical qubits and final physical qubits. permutation = { - pqubit: to_layout.get_virtual_bits()[vqubit] - for vqubit, pqubit in from_layout.get_virtual_bits().items() + pqubit: to_layout[vqubit] for vqubit, pqubit in from_layout.get_virtual_bits().items() } - permutation_circ = token_swapper.permutation_circuit(permutation, trials) - permutation_qubits = [dag.qubits[i] for i in sorted(permutation_circ.inputmap.keys())] - return permutation_circ.circuit, permutation_qubits + # The mapping produced here maps physical qubit indices of the outer dag to the bits used to + # represent them in the inner map. For later composing, we actually want the opposite map. + swap_circuit, phys_to_circuit_qubits = token_swapper.permutation_circuit(permutation, trials) + circuit_to_phys = {inner: outer for outer, inner in phys_to_circuit_qubits.items()} + return swap_circuit, [circuit_to_phys[bit] for bit in swap_circuit.qubits] diff --git a/test/python/transpiler/test_stochastic_swap.py b/test/python/transpiler/test_stochastic_swap.py index 63347a36e9d6..839cf69a63a4 100644 --- a/test/python/transpiler/test_stochastic_swap.py +++ b/test/python/transpiler/test_stochastic_swap.py @@ -13,6 +13,9 @@ """Test the Stochastic Swap pass""" import unittest + +import numpy.random + from ddt import ddt, data from qiskit.transpiler.passes import StochasticSwap from qiskit.transpiler import CouplingMap, PassManager @@ -21,6 +24,10 @@ from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit from qiskit.test import QiskitTestCase from qiskit.transpiler.passes.utils import CheckMap +from qiskit.circuit.random import random_circuit +from qiskit.providers.fake_provider import FakeMumbai +from qiskit.compiler.transpiler import transpile +from qiskit.circuit import ControlFlowOp, Clbit @ddt @@ -569,12 +576,17 @@ def test_single_gates_omitted(self): after = circuit_to_dag(after) self.assertEqual(expected_dag, after) - def test_controlflow_pre_if_else_route(self): + +@ddt +class TestStochasticSwapControlFlow(QiskitTestCase): + """Tests for control flow in stochastic swap.""" + + def test_pre_if_else_route(self): """test swap with if else controlflow construct""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") creg = ClassicalRegister(num_qubits) - coupling = CouplingMap([(i, i + 1) for i in range(num_qubits - 1)]) + coupling = CouplingMap.from_line(num_qubits) qc = QuantumCircuit(qreg, creg) qc.h(0) qc.cx(0, 2) @@ -589,28 +601,26 @@ def test_controlflow_pre_if_else_route(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=82).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) expected.swap(0, 1) expected.cx(1, 2) expected.measure(2, 2) - eqreg = QuantumRegister(2, name="q") - etrue_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[3, 4]], creg) etrue_body.x(0) - efalse_body = QuantumCircuit(eqreg, creg) + efalse_body = QuantumCircuit(qreg[[3, 4]], creg) efalse_body.x(1) new_order = [1, 0, 2, 3, 4] expected.if_else((creg[2], 0), etrue_body, efalse_body, qreg[[3, 4]], creg) expected.barrier(qreg) expected.measure(qreg, creg[new_order]) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) - def test_controlflow_pre_if_else_route_post_x(self): + def test_pre_if_else_route_post_x(self): """test swap with if else controlflow construct; pre-cx and post x""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -631,7 +641,9 @@ def test_controlflow_pre_if_else_route_post_x(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=431).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -639,22 +651,17 @@ def test_controlflow_pre_if_else_route_post_x(self): expected.cx(0, 1) expected.measure(1, 2) new_order = [0, 2, 1, 3, 4] - eqreg = QuantumRegister(2, "q") - etrue_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[3, 4]], creg) etrue_body.x(0) - efalse_body = QuantumCircuit(eqreg, creg) + efalse_body = QuantumCircuit(qreg[[3, 4]], creg) efalse_body.x(1) expected.if_else((creg[2], 0), etrue_body, efalse_body, qreg[[3, 4]], creg) expected.x(2) expected.barrier(qreg) expected.measure(qreg, creg[new_order]) + self.assertEqual(dag_to_circuit(cdag), expected) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - - def test_controlflow_post_if_else_route(self): + def test_post_if_else_route(self): """test swap with if else controlflow construct; post cx""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -676,15 +683,16 @@ def test_controlflow_post_if_else_route(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=6508).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) expected.measure(0, 0) - eqreg = QuantumRegister(2, "q") - etrue_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[3, 4]], creg) etrue_body.x(0) - efalse_body = QuantumCircuit(eqreg, creg) + efalse_body = QuantumCircuit(qreg[[3, 4]], creg) efalse_body.x(1) expected.barrier(qreg) expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[3, 4]], creg) @@ -693,12 +701,9 @@ def test_controlflow_post_if_else_route(self): expected.cx(1, 2) expected.barrier(qreg) expected.measure(qreg, creg[[1, 0, 2, 3, 4]]) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) - def test_controlflow_pre_if_else2(self): + def test_pre_if_else2(self): """test swap with if else controlflow construct; cx in if statement""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -718,7 +723,9 @@ def test_controlflow_pre_if_else2(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=38).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -726,21 +733,16 @@ def test_controlflow_pre_if_else2(self): expected.swap(0, 1) expected.cx(1, 2) expected.measure(1, 0) - eqreg = QuantumRegister(1, "q") - etrue_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[1]], creg) etrue_body.x(0) - efalse_body = QuantumCircuit(eqreg, creg) + efalse_body = QuantumCircuit(qreg[[1]], creg) new_order = [1, 0, 2, 3, 4] expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[1]], creg) expected.barrier(qreg) expected.measure(qreg, creg[new_order]) + self.assertEqual(dag_to_circuit(cdag), expected) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - - def test_controlflow_intra_if_else_route(self): + def test_intra_if_else_route(self): """test swap with if else controlflow construct""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -759,7 +761,9 @@ def test_controlflow_intra_if_else_route(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=8).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -778,13 +782,9 @@ def test_controlflow_intra_if_else_route(self): expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg, creg) new_order = [1, 2, 0, 4, 3] expected.measure(qreg, creg[new_order]) + self.assertEqual(dag_to_circuit(cdag), expected) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - - def test_controlflow_pre_intra_if_else(self): + def test_pre_intra_if_else(self): """test swap with if else controlflow construct; cx in if statement""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -804,12 +804,13 @@ def test_controlflow_pre_intra_if_else(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=2, trials=20).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) - eqreg = QuantumRegister(4, "q") - etrue_body = QuantumCircuit(eqreg, creg) - efalse_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[1, 2, 3, 4]], creg) + efalse_body = QuantumCircuit(qreg[[1, 2, 3, 4]], creg) expected.h(0) expected.x(1) expected.swap(0, 1) @@ -825,19 +826,15 @@ def test_controlflow_pre_intra_if_else(self): efalse_body.cx(1, 2) expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[1, 2, 3, 4]], creg) expected.measure(qreg, creg[[1, 2, 0, 4, 3]]) + self.assertEqual(dag_to_circuit(cdag), expected) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - - def test_controlflow_pre_intra_post_if_else(self): + def test_pre_intra_post_if_else(self): """test swap with if else controlflow construct; cx before, in, and after if statement""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") creg = ClassicalRegister(num_qubits) - coupling = CouplingMap([(i, i + 1) for i in range(num_qubits - 1)]) + coupling = CouplingMap.from_line(num_qubits) qc = QuantumCircuit(qreg, creg) qc.h(0) qc.cx(0, 2) @@ -855,7 +852,9 @@ def test_controlflow_pre_intra_post_if_else(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=1).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -864,29 +863,25 @@ def test_controlflow_pre_intra_post_if_else(self): expected.cx(0, 1) expected.measure(0, 0) etrue_body = QuantumCircuit(qreg, creg) - etrue_body.cx(0, 2) - etrue_body.swap(0, 2) + etrue_body.cx(0, 1) + etrue_body.swap(0, 1) etrue_body.swap(4, 3) - etrue_body.swap(1, 3) + etrue_body.swap(2, 3) efalse_body = QuantumCircuit(qreg, creg) - efalse_body.swap(0, 2) + efalse_body.swap(0, 1) efalse_body.swap(3, 4) - efalse_body.swap(1, 3) - efalse_body.cx(2, 1) - expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[0, 2, 1, 3, 4]], creg) + efalse_body.swap(2, 3) + efalse_body.cx(1, 2) + expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[0, 1, 2, 3, 4]], creg) expected.swap(1, 2) expected.h(4) expected.swap(3, 4) expected.cx(3, 2) expected.barrier() expected.measure(qreg, creg[[2, 4, 0, 3, 1]]) + self.assertEqual(dag_to_circuit(cdag), expected) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - - def test_controlflow_no_layout_change(self): + def test_no_layout_change(self): """test controlflow with no layout change needed""" num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") @@ -907,7 +902,9 @@ def test_controlflow_no_layout_change(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=23).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -915,22 +912,17 @@ def test_controlflow_no_layout_change(self): expected.swap(1, 2) expected.cx(0, 1) expected.measure(0, 0) - eqreg = QuantumRegister(2, "q") - etrue_body = QuantumCircuit(eqreg, creg) + etrue_body = QuantumCircuit(qreg[[1, 4]], creg) etrue_body.x(0) - efalse_body = QuantumCircuit(eqreg, creg) + efalse_body = QuantumCircuit(qreg[[1, 4]], creg) efalse_body.x(1) expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg[[1, 4]], creg) expected.barrier(qreg) expected.measure(qreg, creg[[0, 2, 1, 3, 4]]) - - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) @data(1, 2, 3) - def test_controlflow_for_loop(self, nloops): + def test_for_loop(self, nloops): """test stochastic swap with for_loop""" # if the loop has only one iteration it isn't necessary for the pass # to swap back to the starting layout. This test would check that @@ -950,7 +942,9 @@ def test_controlflow_for_loop(self, nloops): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=687).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -962,18 +956,14 @@ def test_controlflow_for_loop(self, nloops): loop_parameter = None expected.for_loop(range(nloops), loop_parameter, efor_body, qreg, creg) expected.measure(qreg, creg) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) - def test_controlflow_while_loop(self): + def test_while_loop(self): """test while loop""" num_qubits = 4 qreg = QuantumRegister(num_qubits, "q") creg = ClassicalRegister(len(qreg)) coupling = CouplingMap.from_line(num_qubits) - check_map_pass = CheckMap(coupling) qc = QuantumCircuit(qreg, creg) while_body = QuantumCircuit(qreg, creg) while_body.reset(qreg[2:]) @@ -986,7 +976,10 @@ def test_controlflow_while_loop(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=58).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + expected = QuantumCircuit(qreg, creg) ewhile_body = QuantumCircuit(qreg, creg) ewhile_body.reset(qreg[2:]) @@ -1000,11 +993,9 @@ def test_controlflow_while_loop(self): expected.while_loop((creg, 0), ewhile_body, expected.qubits, expected.clbits) expected.barrier() expected.measure(qreg, creg) - check_map_pass.run(circuit_to_dag(expected)) - self.assertEqual(cqc, expected) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) - def test_controlflow_nested_inner_cnot(self): + def test_nested_inner_cnot(self): """test swap in nested if else controlflow construct; swap in inner""" seed = 1 num_qubits = 3 @@ -1033,7 +1024,9 @@ def test_controlflow_nested_inner_cnot(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=seed).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -1055,19 +1048,15 @@ def test_controlflow_nested_inner_cnot(self): expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg, creg) expected.measure(qreg, creg) - self.assertEqual(cqc, expected) - check_map_pass = CheckMap(coupling) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) + self.assertEqual(dag_to_circuit(cdag), expected) - def test_controlflow_nested_outer_cnot(self): + def test_nested_outer_cnot(self): """test swap with nested if else controlflow construct; swap in outer""" seed = 200 num_qubits = 5 qreg = QuantumRegister(num_qubits, "q") creg = ClassicalRegister(num_qubits) coupling = CouplingMap.from_line(num_qubits) - check_map_pass = CheckMap(coupling) qc = QuantumCircuit(qreg, creg) qc.h(0) qc.x(1) @@ -1090,7 +1079,9 @@ def test_controlflow_nested_outer_cnot(self): dag = circuit_to_dag(qc) cdag = StochasticSwap(coupling, seed=seed).run(dag) - cqc = dag_to_circuit(cdag) + check_map_pass = CheckMap(coupling) + check_map_pass.run(cdag) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qreg, creg) expected.h(0) @@ -1104,20 +1095,17 @@ def test_controlflow_nested_outer_cnot(self): efor_body = QuantumCircuit(qreg, creg) efor_body.delay(10, 0) efor_body.barrier(qreg) - efor_body.cx(1, 3) - etrue_body.for_loop(range(3), loop_parameter, efor_body, qreg[[0, 2, 1, 3, 4]], creg) + efor_body.cx(2, 3) + etrue_body.for_loop(range(3), loop_parameter, efor_body, qreg[[0, 1, 2, 3, 4]], creg) efalse_body = QuantumCircuit(qreg, creg) efalse_body.y(0) efalse_body.swap(1, 2) expected.if_else((creg[0], 0), etrue_body, efalse_body, qreg, creg) expected.measure(qreg, creg[[0, 2, 1, 3, 4]]) + self.assertEqual(dag_to_circuit(cdag), expected) - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) - self.assertEqual(cqc, expected) - - def test_controlflow_disjoint_looping(self): + def test_disjoint_looping(self): """Test looping controlflow on different qubit register""" num_qubits = 4 cm = CouplingMap.from_line(num_qubits) @@ -1129,14 +1117,14 @@ def test_controlflow_disjoint_looping(self): cqc = StochasticSwap(cm, seed=0)(qc) expected = QuantumCircuit(qr) - efor_body = QuantumCircuit(3) + efor_body = QuantumCircuit(qr[[0, 1, 2]]) efor_body.swap(1, 2) efor_body.cx(0, 1) efor_body.swap(1, 2) expected.for_loop((0,), None, efor_body, [0, 1, 2], []) self.assertEqual(cqc, expected) - def test_controlflow_disjoint_multiblock(self): + def test_disjoint_multiblock(self): """Test looping controlflow on different qubit register""" num_qubits = 4 cm = CouplingMap.from_line(num_qubits) @@ -1151,16 +1139,16 @@ def test_controlflow_disjoint_multiblock(self): cqc = StochasticSwap(cm, seed=353)(qc) expected = QuantumCircuit(qr, cr) - etrue_body = QuantumCircuit(true_body.qregs[0], cr) + etrue_body = QuantumCircuit(qr[[0, 1, 2]], cr) etrue_body.cx(0, 1) etrue_body.swap(0, 1) - efalse_body = QuantumCircuit(false_body.qregs[0], cr) + efalse_body = QuantumCircuit(qr[[0, 1, 2]], cr) efalse_body.swap(0, 1) efalse_body.cx(1, 2) expected.if_else((cr[0], 1), etrue_body, efalse_body, [0, 1, 2], []) self.assertEqual(cqc, expected) - def test_controlflow_multiple_ops_per_layer(self): + def test_multiple_ops_per_layer(self): """Test circuits with multiple operations per layer""" num_qubits = 6 coupling = CouplingMap.from_line(num_qubits) @@ -1172,20 +1160,128 @@ def test_controlflow_multiple_ops_per_layer(self): with qc.for_loop((0,)): qc.cx(3, 5) cqc = StochasticSwap(coupling, seed=0)(qc) + check_map_pass(cqc) + self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) expected = QuantumCircuit(qr) expected.swap(0, 1) expected.cx(1, 2) - efor_body = QuantumCircuit(3) + efor_body = QuantumCircuit(qr[[3, 4, 5]]) efor_body.swap(1, 2) efor_body.cx(0, 1) efor_body.swap(2, 1) expected.for_loop((0,), None, efor_body, [3, 4, 5], []) - - check_map_pass.run(circuit_to_dag(expected)) - self.assertTrue(check_map_pass.property_set["is_swap_mapped"]) self.assertEqual(cqc, expected) +@ddt +class TestStochasticSwapRandomCircuitValidOutput(QiskitTestCase): + """Assert the output of a transpilation with stochastic swap is a physical circuit.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.backend = FakeMumbai() + cls.coupling_edge_set = set(tuple(x) for x in cls.backend.configuration().coupling_map) + cls.basis_gates = set(cls.backend.configuration().basis_gates) + cls.basis_gates.update(["for_loop", "while_loop", "if_else"]) + + def assert_valid_circuit(self, transpiled): + """Assert circuit complies with constraints of backend.""" + self.assertIsInstance(transpiled, QuantumCircuit) + self.assertIsNotNone(getattr(transpiled, "_layout", None)) + + def _visit_block(circuit, qubit_mapping=None): + for instruction in circuit: + if instruction.operation.name in {"barrier", "measure"}: + continue + self.assertIn(instruction.operation.name, self.basis_gates) + qargs = tuple(qubit_mapping[x] for x in instruction.qubits) + if not isinstance(instruction.operation, ControlFlowOp): + if len(qargs) > 2 or len(qargs) < 0: + raise Exception("Invalid number of qargs for instruction") + if len(qargs) == 2: + self.assertIn(qargs, self.coupling_edge_set) + else: + self.assertLessEqual(qargs[0], 26) + if isinstance(instruction.operation, ControlFlowOp): + for block in instruction.operation.blocks: + new_mapping = { + inner: qubit_mapping[outer] + for outer, inner in zip(instruction.qubits, block.qubits) + } + _visit_block(block, new_mapping) + + # Assert routing ran. + _visit_block( + transpiled, + qubit_mapping={qubit: index for index, qubit in enumerate(transpiled.qubits)}, + ) + + @data(*range(1, 27)) + def test_random_circuit_no_control_flow(self, size): + """Test that transpiled random circuits without control flow are physical circuits.""" + circuit = random_circuit(size, 3, measure=True, seed=12342) + tqc = transpile( + circuit, + self.backend, + routing_method="stochastic", + layout_method="dense", + seed_transpiler=12342, + ) + self.assert_valid_circuit(tqc) + + @data(*range(4, 27)) + def test_random_circuit_for_loop(self, size): + """Test that transpiled random circuits with nested for loops are physical circuits.""" + circuit = random_circuit(size, 3, measure=False, seed=12342) + for_block = random_circuit(3, 2, measure=False, seed=12342) + inner_for_block = random_circuit(2, 1, measure=False, seed=12342) + with circuit.for_loop((1,)): + with circuit.for_loop((1,)): + circuit.append(inner_for_block, [0, 3]) + circuit.append(for_block, [1, 0, 2]) + circuit.measure_all() + + tqc = transpile( + circuit, + self.backend, + basis_gates=list(self.basis_gates), + routing_method="stochastic", + layout_method="dense", + seed_transpiler=12342, + ) + self.assert_valid_circuit(tqc) + + @data(*range(6, 27)) + def test_random_circuit_if_else(self, size): + """Test that transpiled random circuits with if else blocks are physical circuits.""" + circuit = random_circuit(size, 3, measure=True, seed=12342) + if_block = random_circuit(3, 2, measure=True, seed=12342) + else_block = random_circuit(2, 1, measure=True, seed=12342) + + rng = numpy.random.default_rng(seed=12342) + inner_clbit_count = max((if_block.num_clbits, else_block.num_clbits)) + if inner_clbit_count > circuit.num_clbits: + circuit.add_bits([Clbit() for _ in [None] * (inner_clbit_count - circuit.num_clbits)]) + clbit_indices = list(range(circuit.num_clbits)) + rng.shuffle(clbit_indices) + + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.append(if_block, [0, 2, 1], clbit_indices[: if_block.num_clbits]) + with else_: + circuit.append(else_block, [2, 5], clbit_indices[: else_block.num_clbits]) + + tqc = transpile( + circuit, + self.backend, + basis_gates=list(self.basis_gates), + routing_method="stochastic", + layout_method="dense", + seed_transpiler=12342, + ) + self.assert_valid_circuit(tqc) + + if __name__ == "__main__": unittest.main()