From 0ca185aefa705c8980ef0ee68ddab5296a52e610 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=9A=E5=AF=85?= <13017899+hefei-wu-yanzu@user.noreply.gitee.com> Date: Mon, 25 Aug 2025 15:22:29 +0800 Subject: [PATCH 1/2] first commit --- cqlib_adapter/pennylane_ext/device.py | 198 +++++++++++++++++++++----- requirements.txt | 3 +- 2 files changed, 167 insertions(+), 34 deletions(-) diff --git a/cqlib_adapter/pennylane_ext/device.py b/cqlib_adapter/pennylane_ext/device.py index 4adc1ed..5abfa10 100644 --- a/cqlib_adapter/pennylane_ext/device.py +++ b/cqlib_adapter/pennylane_ext/device.py @@ -16,7 +16,7 @@ import json import os from typing import Dict, List, Union -import cqlib +from pennylane.ops.op_math import Sum, Prod, SProd import numpy as np import pennylane as qml from cqlib import TianYanPlatform @@ -36,7 +36,6 @@ class CQLibDevice(Device): # Device metadata short_name = "cqlib.device" - config_filepath = os.path.join(os.path.dirname(__file__), "cqlib_config.toml") def __init__(self, wires, shots=None, cqlib_backend_name="default", login_key=None): @@ -112,13 +111,6 @@ class CQLibDevice(Device): for circuit in circuits: # Insert basis change gates for PauliX/PauliY measurements new_ops = list(circuit.operations) - for measurement in circuit.measurements: - if isinstance(measurement, qml.measurements.ExpectationMP): - if measurement.obs.name == "PauliX": - new_ops.append(qml.Hadamard(wires=measurement.obs.wires)) - elif measurement.obs.name == "PauliY": - new_ops.append(qml.S(wires=measurement.obs.wires).inv()) - new_ops.append(qml.Hadamard(wires=measurement.obs.wires)) # Convert circuit to QCIS format qasm_str = circuit.to_openqasm() @@ -214,7 +206,7 @@ class CQLibDevice(Device): results = [] for measurement in circuit.measurements: if isinstance(measurement, qml.measurements.ExpectationMP): - results.append(self._process_expectation(measurement, raw_result)) + results.append(self._process_expectation(measurement, raw_result, circuit)) elif isinstance(measurement, qml.measurements.ProbabilityMP): results.append(self._process_probability(measurement, raw_result)) else: @@ -225,32 +217,172 @@ class CQLibDevice(Device): # Return single result directly if only one measurement return results[0] if len(results) == 1 else results - def _process_expectation(self, measurement, raw_result) -> float: - """Processes expectation value measurements. + def _process_expectation(self, measurement, raw_result, circuit=None) -> float: + """Processes expectation value measurements.""" + obs = measurement.obs + + # 统一把 circuit 传下去(用于需要重跑的情况) + return self._expval_recursive(obs, raw_result, circuit) + + def _expval_recursive(self, obs, raw_result, circuit): + # Hamiltonian(旧版常见) + if isinstance(obs, qml.Hamiltonian): + total = 0.0 + for coeff, term in zip(obs.coeffs, obs.ops): + total += coeff * self._expval_recursive(term, raw_result, circuit) + return total + + # 和:Sum(op1, op2, ...) + if isinstance(obs, Sum): + return sum(self._expval_recursive(term, raw_result, circuit) for term in obs.operands) + + # 张量积:Prod(op1, op2, ...) + if isinstance(obs, Prod): + # 注意:纠缠态下 ⟨A⊗B⟩ ≠ ⟨A⟩⟨B⟩ + # 我们要把整个张量算符当成一个“Pauli 字符串”整体来算 + return self._expval_pauli_tensor(obs.operands, raw_result, circuit) + + # 标量乘:SProd(c, op) + if isinstance(obs, SProd): + return float(obs.scalar) * self._expval_recursive(obs.base, raw_result, circuit) + + # 单一可观测量(PauliX/Y/Z/Identity) + return self._process_single_observable(obs, raw_result, circuit) + - Args: - measurement: Expectation measurement operation. - raw_result: Raw result data. + def _process_single_observable(self, obs, raw_result, circuit=None) -> float: + name = getattr(obs, "name", None) - Returns: - float: Processed expectation value. + # 恒等算符:返回 1.0 + if name in ("Identity", "IdentityMP"): + return 1.0 - Raises: - NotImplementedError: If expectation for non-PauliZ observables is requested. - ValueError: If raw_result type is unsupported. - """ - if measurement.obs.name != "PauliZ": - raise NotImplementedError( - f"Expectation for {measurement.obs.name} is not supported" - ) + # 单比特 Pauli:Z/X/Y + if name in ("PauliZ", "PauliX", "PauliY"): + # 单比特时,我们可以把它当作长度为1的“Pauli 字符串”统一处理 + return self._expval_pauli_tensor([obs], raw_result, circuit) + + # 其它算符(如 Projector、Hermitian 等)当前不支持 + raise NotImplementedError(f"Expectation for {name} is not supported") + + + def _expval_pauli_tensor(self, operands, raw_result, circuit): + """计算一个 Pauli 张量(可能跨多比特,可能包含 X/Y/Z)的期望值。""" + + # 收集每个算符的 (axis, wire) + axes = [] # list of ('X'/'Y'/'Z', int_wire) + for term in operands: + n = getattr(term, "name", None) + if n == "PauliX": + axes.append(("X", term.wires[0])) + elif n == "PauliY": + axes.append(("Y", term.wires[0])) + elif n == "PauliZ": + axes.append(("Z", term.wires[0])) + elif n in ("Identity", "IdentityMP"): + # 对应位恒等不影响乘积 + continue + else: + raise NotImplementedError(f"Unsupported operator in tensor: {n}") + + if not axes: + return 1.0 + + # 情况 A:全是 Z —— 直接用已有 raw_result(一次执行的计数/概率) + if all(ax == "Z" for ax, _ in axes): + return self._expval_zz_string_from_raw(raw_result, [w for _, w in axes]) + + # 情况 B:包含 X/Y —— 需要按该项做基变换重跑一次 + if circuit is None: + # 为了健壮:如果未传入 circuit,我们无法重跑,只能报错 + raise NotImplementedError("X/Y terms require circuit to re-execute with basis change") + + return self._expval_via_basis_rotation(circuit, axes) + + def _expval_zz_string_from_raw(self, raw_result, wires): + """从 raw_result(dict 或 list 包含 probability)计算 Z⊗Z⊗... 的期望值。""" + if isinstance(raw_result, dict): + total = sum(raw_result.values()) + if total == 0: + return 0.0 + acc = 0.0 + for bitstring, cnt in raw_result.items(): + # 注意你的位序:你此前用的是 bitstring[::-1] + eigen = 1 + for w in wires: + bit = int(bitstring[-w-1]) # 低位是 wire 0 + eigen *= (1 if bit == 0 else -1) + acc += eigen * cnt + return acc / total + + elif isinstance(raw_result, list): + # 硬件/云返回 probability JSON(字符串或 dict) + prob_dict = raw_result[0]["probability"] + if isinstance(prob_dict, str): + prob_dict = json.loads(prob_dict) + + acc = 0.0 + for bitstring, p in prob_dict.items(): + eigen = 1 + for w in wires: + bit = int(bitstring[-w-1]) + eigen *= (1 if bit == 0 else -1) + acc += eigen * p + return acc - if isinstance(raw_result, list): - return self.process_results(raw_result) - elif isinstance(raw_result, dict): - local_expectation = self.process_results_local(raw_result) - return local_expectation[measurement.wires[0]] else: raise ValueError(f"Unsupported raw_result type: {type(raw_result)}") + + + def _expval_via_basis_rotation(self, circuit: QuantumScript, axes): + """对包含 X/Y 的 Pauli 字符串,复制电路、加入对应基变换并执行一次,再用 Z 基统计计算期望。""" + + # 1) 复制原有操作 + new_ops = list(circuit.operations) + + # 2) 在对应 wire 上加入基变换 + for ax, w in axes: + if ax == "X": + new_ops.append(qml.Hadamard(wires=w)) + elif ax == "Y": + new_ops.append(qml.adjoint(qml.S)(wires=w)) + new_ops.append(qml.Hadamard(wires=w)) + # Z 不需要加门 + + # 3) 构造测量:在所有涉及的 wires 上做 Probability(拿到全局分布最安全) + # 也可以只测所有 qubits 的概率,保持简单:我们直接测全体概率 + new_measurements = [qml.probs(wires=range(self.num_wires))] + + # 4) 组装新 tape 并执行(重用你原有的 QASM 转换/后端路径) + rotated_circuit = qml.tape.QuantumScript(new_ops, new_measurements, shots=circuit.shots) + + # —— 以下与 execute() 里一致:转换为 QCIS 并执行 + qasm_str = rotated_circuit.to_openqasm() + cqlib_circuit = qasm2.loads(qasm_str) + cqlib_qcis = cqlib_circuit.qcis + + if self._is_tianyan_hardware(): + compiled_circuit = transpile_qcis(cqlib_qcis, self.cqlib_backend) + query_id = self.cqlib_backend.submit_experiment(compiled_circuit[0].qcis, num_shots=self.num_shots) + raw = self.cqlib_backend.query_experiment(query_id, readout_calibration=True) + prob = extract_probability(raw, num_wires=self.num_wires) + elif self._is_tianyan_simulator(): + query_id = self.cqlib_backend.submit_experiment(cqlib_qcis, num_shots=self.num_shots) + raw = self.cqlib_backend.query_experiment(query_id) + prob = extract_probability(raw, num_wires=self.num_wires) + else: + sim = StatevectorSimulator(cqlib_circuit) + raw = sim.sample() # dict: bitstring -> count + # 本地模拟器时直接从计数算 + return self._expval_zz_string_from_raw(raw, [w for _, w in axes]) + + # 硬件/云:prob 是 dict[str_bitstring] -> float + # 组合成 list 形式以复用 _expval_zz_string_from_raw 的 list 分支逻辑 + raw_list = [{"probability": prob}] + return self._expval_zz_string_from_raw(raw_list, [w for _, w in axes]) + + + def _process_probability(self, measurement, raw_result) -> np.ndarray: """Processes probability measurements. @@ -389,7 +521,7 @@ class CQLibDevice(Device): Returns: str: String representation of the device. """ - return f"<{self.name()} device (wires={self.num_wires}, shots={self.shots})>" + return f"<{self.name} device (wires={self.num_wires}, shots={self.shots})>" def extract_probability( @@ -400,7 +532,7 @@ def extract_probability( Args: json_data: JSON data containing measurement results, expected to be a list containing dictionaries with 'probability' field. - num_wires: Number of quantum wires (qubits) in the circuit. + num_wires: Number of quantum wires (qubits) in the circuit*(Reserved for future update). Returns: Dict: Probability distribution for each quantum state. @@ -424,4 +556,4 @@ def extract_probability( except (KeyError, TypeError) as error: raise ValueError(f"Invalid probability field format: {error}") from error - return probability_dict \ No newline at end of file + return probability_dict diff --git a/requirements.txt b/requirements.txt index 683e41d..bd76dd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ cqlib>=1.3 requests>=2.28.2 qiskit>=1.0 -python-dotenv>=1.0 \ No newline at end of file +python-dotenv>=1.0 +pennylane>=0.42.2 \ No newline at end of file -- Gitee From c85be0b68d1782470e67fc49e36f82a6660cd919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=9A=E5=AF=85?= <13017899+hefei-wu-yanzu@user.noreply.gitee.com> Date: Mon, 25 Aug 2025 16:20:48 +0800 Subject: [PATCH 2/2] refactor(tests): split test.py into modular test files - Remove test.py - Add test_backends.py for backend functionality tests - Add test_gradients.py for gradient and optimization tests - Maintain all existing test functionality in new structure --- tests/test_pennylane/test_backends.py | 81 +++++++++ .../{test.py => test_gradients.py} | 163 +++++++----------- 2 files changed, 139 insertions(+), 105 deletions(-) create mode 100644 tests/test_pennylane/test_backends.py rename tests/test_pennylane/{test.py => test_gradients.py} (31%) diff --git a/tests/test_pennylane/test_backends.py b/tests/test_pennylane/test_backends.py new file mode 100644 index 0000000..531def3 --- /dev/null +++ b/tests/test_pennylane/test_backends.py @@ -0,0 +1,81 @@ +# test_backends.py +# This code is part of cqlib. +# +# Copyright (C) 2025 China Telecom Quantum Group. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +import os +import pytest +import pennylane as qml +from pennylane import numpy as np +from pennylane.devices import Device + +# Configuration parameters +TOKEN = os.getenv("CQLIB_TOKEN", None) +SHOTS = 500 +WIRES = 2 +INITIAL_PARAMS = np.array([0.5, 0.8]) + + +def create_device( + backend_name: str, shots: int = SHOTS, wires: int = WIRES +) -> Device: + """Create a quantum device instance. + + Args: + backend_name (str): Name of the quantum backend to use. + shots (int): Number of measurement shots. + wires (int): Number of quantum wires (qubits). + + Returns: + Device: Configured quantum device instance. + """ + return qml.device( + "cqlib.device", + wires=wires, + shots=shots, + cqlib_backend_name=backend_name, + login_key=TOKEN if backend_name != "default" else None, + ) + + +@pytest.mark.parametrize("backend_name", ["tianyan504", "tianyan_sw", "default"]) +def test_backend_runs(backend_name): + """Test that circuits can run successfully on a given backend. + + Args: + backend_name (str): The backend to test. + """ + device = create_device(backend_name) + + @qml.qnode(device) + def circuit_probs(params: np.ndarray) -> np.ndarray: + """Circuit returning probability distribution.""" + qml.RX(params[0], wires=0) + qml.RY(params[1], wires=1) + qml.CNOT(wires=[0, 1]) + return qml.probs(wires=[0, 1]) + + @qml.qnode(device) + def circuit_expval(params: np.ndarray) -> float: + """Circuit returning expectation value.""" + qml.RX(params[0], wires=0) + qml.RY(params[1], wires=1) + qml.CNOT(wires=[0, 1]) + return qml.expval(qml.PauliZ(0)) + + params = INITIAL_PARAMS.copy() + probs = circuit_probs(params) + expval = circuit_expval(params) + + # Assertions + assert np.isclose(np.sum(probs), 1.0, atol=1e-6) + assert isinstance(expval, (float, np.floating, np.ndarray)) + diff --git a/tests/test_pennylane/test.py b/tests/test_pennylane/test_gradients.py similarity index 31% rename from tests/test_pennylane/test.py rename to tests/test_pennylane/test_gradients.py index 449b2c0..2cf67a8 100644 --- a/tests/test_pennylane/test.py +++ b/tests/test_pennylane/test_gradients.py @@ -1,3 +1,4 @@ +# test_gradients.py # This code is part of cqlib. # # Copyright (C) 2025 China Telecom Quantum Group. @@ -9,35 +10,34 @@ # Any modifications or derivative works of this code must retain this # copyright notice, and modified files need to carry a notice indicating # that they have been altered from the originals. - - +import os +import pytest import pennylane as qml from pennylane import numpy as np from pennylane.devices import Device -# Configuration parameters -TOKEN = "your_token" - +# Configuration parameters +TOKEN = os.getenv("CQLIB_TOKEN", None) DEFAULT_BACKEND = "default" SHOTS = 500 WIRES = 2 STEP_SIZE = 0.1 -TRAINING_STEPS = 10 +TRAINING_STEPS = 3 INITIAL_PARAMS = np.array([0.5, 0.8]) def create_device( - backend_name: str, shots: int = SHOTS, wires: int = WIRES + backend_name: str = DEFAULT_BACKEND, shots: int = SHOTS, wires: int = WIRES ) -> Device: - """Creates a quantum device instance. + """Create a quantum device instance. Args: - backend_name: Name of the quantum backend to use. - shots: Number of measurement shots. Defaults to SHOTS. - wires: Number of quantum wires (qubits). Defaults to WIRES. + backend_name (str): Name of the quantum backend to use. + shots (int): Number of measurement shots. + wires (int): Number of quantum wires (qubits). Returns: - qml.Device: Configured quantum device instance. + Device: Configured quantum device instance. """ return qml.device( "cqlib.device", @@ -48,24 +48,23 @@ def create_device( ) -def create_circuit( - device: Device, diff_method: str = "parameter-shift" -) -> qml.QNode: - """Creates a differentiable quantum circuit. +def create_circuit(device: Device, diff_method: str = "parameter-shift") -> qml.QNode: + """Create a differentiable quantum circuit. Args: - device: Quantum device to run the circuit on. - diff_method: Differentiation method to use. Defaults to "parameter-shift". + device (Device): Quantum device to run the circuit on. + diff_method (str): Differentiation method. Returns: - qml.QNode: Configured quantum circuit as a QNode. + qml.QNode: Configured quantum circuit. """ + @qml.qnode(device, diff_method=diff_method) def circuit(params: np.ndarray) -> float: - """Quantum circuit with parameterized rotations and measurement. + """Parameterized quantum circuit. Args: - params: Array of rotation parameters [theta_x, theta_y]. + params (np.ndarray): Rotation parameters [theta_x, theta_y]. Returns: float: Expectation value of PauliZ on wire 0. @@ -78,103 +77,57 @@ def create_circuit( return circuit -def test_backend(backend_name: str) -> None: - """Tests quantum computing functionality on a specific backend. - +@pytest.mark.parametrize("diff_method", ["parameter-shift", "finite-diff"]) +def test_gradient_computation(diff_method): + """Test that gradients can be computed using different methods. + Args: - backend_name: Name of the backend to test. - """ - print(f"\n=== Testing backend: {backend_name} ===") - - # Create device - device = create_device(backend_name) - - # Define probability measurement circuit - @qml.qnode(device) - def circuit_probs(params: np.ndarray) -> np.ndarray: - """Circuit for measuring probability distribution. - - Args: - params: Array of rotation parameters [theta_x, theta_y]. - - Returns: - np.ndarray: Probability distribution over computational basis states. - """ - qml.RX(params[0], wires=0) - qml.RY(params[1], wires=1) - qml.CNOT(wires=[0, 1]) - return qml.probs(wires=[0, 1]) - - # Define expectation value measurement circuit - @qml.qnode(device) - def circuit_expval(params: np.ndarray) -> float: - """Circuit for measuring expectation value. - - Args: - params: Array of rotation parameters [theta_x, theta_y]. - - Returns: - float: Expectation value of PauliZ on wire 0. - """ - qml.RX(params[0], wires=0) - qml.RY(params[1], wires=1) - qml.CNOT(wires=[0, 1]) - return qml.expval(qml.PauliZ(0)) - - # Test circuits - params = INITIAL_PARAMS.copy() - probabilities = circuit_probs(params) - expectation_value = circuit_expval(params) - - print(f"Probability distribution: {probabilities}") - print(f"Expectation value: {expectation_value:.6f}") - - -def optimize_circuit() -> None: - """Performs parameter optimization of the quantum circuit. - - Uses gradient descent optimization to minimize the expectation value - of the quantum circuit with respect to the rotation parameters. + diff_method (str): Differentiation method to test. """ - print("\n=== Starting parameter optimization ===") - - # Create default device and initialize circuit + device = create_device(DEFAULT_BACKEND) + circuit = create_circuit(device, diff_method=diff_method) + + # Test gradient computation + grad_fn = qml.grad(circuit) + gradient = grad_fn(INITIAL_PARAMS) + + # Assertions + assert gradient is not None + assert isinstance(gradient, np.ndarray) + assert gradient.shape == INITIAL_PARAMS.shape + + +def test_optimization_converges(): + """Test that gradient descent optimization reduces expectation value.""" device = create_device(DEFAULT_BACKEND) circuit = create_circuit(device) - # Initialize optimizer and parameters optimizer = qml.GradientDescentOptimizer(stepsize=STEP_SIZE) params = INITIAL_PARAMS.copy() + initial_expval = circuit(params) - # Execute optimization loop - for step in range(TRAINING_STEPS): + for _ in range(TRAINING_STEPS): params = optimizer.step(circuit, params) - expectation = circuit(params) - print( - f"Step {step + 1:2d}: " - f"params = [{params[0]:.6f}, {params[1]:.6f}], " - f"expectation = {expectation:.6f}" - ) + final_expval = circuit(params) - print("\n=== Optimization completed ===") - print(f"Final parameters: [{params[0]:.6f}, {params[1]:.6f}]") - print(f"Final expectation value: {circuit(params):.6f}") + # Assertion: final expectation value should not exceed the initial value + assert final_expval <= initial_expval + 1e-6 -def main() -> None: - """Main function to demonstrate quantum circuit functionality. - - Tests different quantum backends and performs parameter optimization. - """ - # Test different backends - test_backend("tianyan504") - test_backend("tianyan_sw") - test_backend("default") - - # Execute optimization - optimize_circuit() +def test_circuit_differentiability(): + """Test that the circuit is properly differentiable.""" + device = create_device(DEFAULT_BACKEND) + circuit = create_circuit(device) + + # Test that we can compute the gradient + try: + grad_fn = qml.grad(circuit) + gradient = grad_fn(INITIAL_PARAMS) + assert gradient is not None + except Exception as e: + pytest.fail(f"Circuit differentiation failed: {e}") if __name__ == "__main__": - main() \ No newline at end of file + pytest.main([__file__, "-v"]) \ No newline at end of file -- Gitee