diff --git "a/2207010323_\345\274\240\344\270\226\350\210\252/.keep" "b/2207010323_\345\274\240\344\270\226\350\210\252/.keep" new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git "a/2207010323_\345\274\240\344\270\226\350\210\252/2207010323 -\345\274\240\344\270\226\350\210\252.docx" "b/2207010323_\345\274\240\344\270\226\350\210\252/2207010323 -\345\274\240\344\270\226\350\210\252.docx" new file mode 100644 index 0000000000000000000000000000000000000000..27433c60d64eaaf2b0a761a392620b201b1099b9 Binary files /dev/null and "b/2207010323_\345\274\240\344\270\226\350\210\252/2207010323 -\345\274\240\344\270\226\350\210\252.docx" differ diff --git "a/2207010323_\345\274\240\344\270\226\350\210\252/main_experiment.py" "b/2207010323_\345\274\240\344\270\226\350\210\252/main_experiment.py" new file mode 100644 index 0000000000000000000000000000000000000000..031353c3d58ff76a9b1ca57b78aa8b9d32140ff7 --- /dev/null +++ "b/2207010323_\345\274\240\344\270\226\350\210\252/main_experiment.py" @@ -0,0 +1,342 @@ +""" +医学图像处理算子开发实验主程序 +完整的实验流程,包括数据加载、预处理、分割和评估 +""" + +import numpy as np +import matplotlib.pyplot as plt +import cv2 +import os +from medical_image_operators import ( + AnisotropicDiffusion, + EdgeIndicator, + LevelSetSegmentation, + MedicalImageOperator, + visualize_results +) + +def run_experiment_2d_image(): + """ + 在2D合成图像上测试算法 + """ + print("="*60) + print("实验1: 2D合成医学图像处理") + print("="*60) + + # 创建模拟医学图像(带噪声的圆形目标) + image_size = 256 + image = np.zeros((image_size, image_size), dtype=np.uint8) + + # 创建目标区域(模拟肿瘤或器官) + center = (128, 128) + radius = 50 + y, x = np.ogrid[:image_size, :image_size] + mask = (x - center[1])**2 + (y - center[0])**2 <= radius**2 + image[mask] = 180 + + # 添加高斯噪声 + noise = np.random.normal(0, 25, image.shape) + noisy_image = np.clip(image.astype(float) + noise, 0, 255).astype(np.uint8) + + # 添加乘性噪声(模拟超声) + noisy_image = MedicalImageOperator.add_speckle_noise(noisy_image, 0.1) + + print(f"图像尺寸: {noisy_image.shape}") + print(f"噪声水平: PSNR = {compute_psnr(image, noisy_image):.2f} dB") + + # Step 1: 各向异性扩散去噪 + print("\n[Step 1] 执行各向异性扩散去噪...") + ad = AnisotropicDiffusion(noisy_image) + denoised = ad.diffuse(num_iters=15, K=25, delta=0.15, mode='perona-malik-2') + + psnr_after = compute_psnr(image, denoised) + ssim_after = compute_ssim(image, denoised) + print(f"去噪后: PSNR = {psnr_after:.2f} dB, SSIM = {ssim_after:.4f}") + + # Step 2: 计算边缘指示函数 + print("\n[Step 2] 计算边缘指示函数...") + edge_map = EdgeIndicator.compute_edge_enhanced_by_diffusion( + denoised, num_iters=5, K=20 + ) + print(f"边缘指示函数范围: [{edge_map.min():.3f}, {edge_map.max():.3f}]") + + # Step 3: 水平集分割 + print("\n[Step 3] 执行水平集分割...") + lse = LevelSetSegmentation(denoised, edge_map) + + # 初始化(在目标内部) + lse.initialize_circle(center=(120, 120), radius=30) + + # 演化 + final_segmentation = lse.evolve(num_iters=150, visualize=False) + + # Step 4: 评估 + print("\n[Step 4] 评估分割结果...") + dice = MedicalImageOperator.evaluate_dice_coefficient(final_segmentation, mask.astype(np.uint8)) + print(f"Dice系数: {dice:.4f}") + + # 可视化 + visualize_results( + noisy_image, denoised, edge_map, + final_segmentation, mask.astype(np.uint8) + ) + + return { + 'psnr': psnr_after, + 'ssim': ssim_after, + 'dice': dice + } + +def run_experiment_ct_colon(): + """ + 在CT结肠数据集上测试算法(模拟真实场景) + """ + print("\n" + "="*60) + print("实验2: CT结肠息肉检测与分割") + print("="*60) + + # 如果实际数据不可用,创建模拟CT图像 + print("注意: 使用模拟CT数据,实际应用请替换为真实DICOM文件") + + # 创建模拟CT结肠图像(含息肉) + image_size = 512 + ct_image = np.zeros((image_size, image_size), dtype=np.uint8) + + # 结肠壁 + y, x = np.ogrid[:image_size, :image_size] + center_dist = np.sqrt((x - image_size//2)**2 + (y - image_size//2)**2) + colon_wall = (center_dist > 180) & (center_dist < 200) + ct_image[colon_wall] = 150 + + # 息肉(附着在结肠壁上的凸起) + polyp_center = (image_size//2 - 190, image_size//2) + polyp_mask = ((x - polyp_center[1])**2 + (y - polyp_center[0])**2) <= 15**2 + ct_image[polyp_mask] = 200 + + # 添加CT噪声 + noise = np.random.normal(0, 15, ct_image.shape) + ct_noisy = np.clip(ct_image + noise, 0, 255).astype(np.uint8) + + # Step 1: 各向异性扩散 + print("\n[Step 1] CT图像各向异性扩散...") + ad = AnisotropicDiffusion(ct_noisy) + ct_denoised = ad.diffuse(num_iters=20, K=30, delta=0.1, mode='perona-malik-1') + + # Step 2: 边缘指示函数 + print("\n[Step 2] 计算边缘指示函数...") + edge_map = EdgeIndicator.compute_edge_indicator(ct_denoised, sigma=1.0, alpha=2.5) + + # Step 3: 水平集分割息肉 + print("\n[Step 3] 水平集分割息肉...") + lse = LevelSetSegmentation(ct_denoised, edge_map) + + # 在息肉附近初始化 + lse.initialize_circle(center=(image_size//2 - 190, image_size//2), radius=10) + + # 演化(增加气球力以检测弱边缘) + lse.nu = 1.5 # 增强气球力 + polyp_segmentation = lse.evolve(num_iters=200, visualize=False) + + # 可视化 + plt.figure(figsize=(15, 5)) + + plt.subplot(131) + plt.imshow(ct_noisy, cmap='gray') + plt.plot(polyp_center[1], polyp_center[0], 'r+', markersize=20, label='Polyp Center') + plt.title('CT Colon Image with Polyp') + plt.axis('off') + plt.legend() + + plt.subplot(132) + plt.imshow(ct_denoised, cmap='gray') + plt.contour(polyp_segmentation, levels=[0.5], colors='r', linewidths=2) + plt.title('Segmentation Result') + plt.axis('off') + + plt.subplot(133) + plt.imshow(edge_map, cmap='hot') + plt.contour(polyp_segmentation, levels=[0.5], colors='cyan', linewidths=2) + plt.title('Edge Map with Contour') + plt.axis('off') + + plt.tight_layout() + plt.show() + + return ct_denoised, polyp_segmentation + +def run_parameter_sensitivity_analysis(): + """ + 参数敏感性分析实验 + """ + print("\n" + "="*60) + print("实验3: 参数敏感性分析") + print("="*60) + + # 创建测试图像 + image_size = 256 + image = np.zeros((image_size, image_size), dtype=np.uint8) + y, x = np.ogrid[:image_size, :image_size] + mask = (x - 128)**2 + (y - 128)**2 <= 40**2 + image[mask] = 150 + + noise = np.random.normal(0, 20, image.shape) + noisy_image = np.clip(image + noise, 0, 255).astype(np.uint8) + + # 测试不同K值对去噪的影响 + K_values = [10, 20, 30, 40, 50] + psnr_values = [] + + plt.figure(figsize=(15, 10)) + + for i, K in enumerate(K_values): + ad = AnisotropicDiffusion(noisy_image) + denoised = ad.diffuse(num_iters=10, K=K, delta=0.15, mode='perona-malik-2') + + psnr = compute_psnr(image, denoised) + psnr_values.append(psnr) + + plt.subplot(2, 3, i+1) + plt.imshow(denoised, cmap='gray') + plt.title(f'K={K}, PSNR={psnr:.2f}dB') + plt.axis('off') + + plt.subplot(2, 3, 6) + plt.plot(K_values, psnr_values, 'bo-', linewidth=2) + plt.xlabel('K value') + plt.ylabel('PSNR (dB)') + plt.title('Parameter Sensitivity: K vs PSNR') + plt.grid(True) + plt.tight_layout() + plt.show() + + # 找到最优K值 + best_k = K_values[np.argmax(psnr_values)] + print(f"最优K值: {best_k}, 最高PSNR: {max(psnr_values):.2f} dB") + + return best_k + +def compute_psnr(original: np.ndarray, processed: np.ndarray) -> float: + """ + 计算峰值信噪比 + + Args: + original: 原始图像 + processed: 处理后图像 + + Returns: + PSNR值(dB) + """ + mse = np.mean((original.astype(float) - processed.astype(float))**2) + if mse == 0: + return float('inf') + + max_pixel = 255.0 + psnr = 20 * np.log10(max_pixel / np.sqrt(mse)) + return psnr + +def compute_ssim(img1: np.ndarray, img2: np.ndarray, + window_size: int = 11, L: int = 255) -> float: + """ + 计算结构相似性指数 + + Args: + img1, img2: 输入图像 + window_size: 滑动窗口大小 + L: 动态范围 + + Returns: + SSIM值 + """ + # 简化的SSIM实现 + from scipy import signal + + # 转为浮点数 + img1 = img1.astype(np.float64) + img2 = img2.astype(np.float64) + + # 高斯窗口 + sigma = 1.5 + gauss = cv2.getGaussianKernel(window_size, sigma) + window = gauss @ gauss.T + + # 常数 + C1 = (0.01 * L)**2 + C2 = (0.03 * L)**2 + + # 均值 + mu1 = signal.convolve2d(img1, window, boundary='symm', mode='same') + mu2 = signal.convolve2d(img2, window, boundary='symm', mode='same') + + # 方差和协方差 + mu1_sq = mu1**2 + mu2_sq = mu2**2 + mu1_mu2 = mu1 * mu2 + + sigma1_sq = signal.convolve2d(img1**2, window, boundary='symm', mode='same') - mu1_sq + sigma2_sq = signal.convolve2d(img2**2, window, boundary='symm', mode='same') - mu2_sq + sigma12 = signal.convolve2d(img1*img2, window, boundary='symm', mode='same') - mu1_mu2 + + # SSIM公式 + ssim_map = ((2*mu1_mu2 + C1)*(2*sigma12 + C2)) / \ + ((mu1_sq + mu2_sq + C1)*(sigma1_sq + sigma2_sq + C2)) + + return ssim_map.mean() + +def save_results_to_markdown(results: dict, filename: str = 'experiment_results.md'): + """ + 保存实验结果到Markdown文件 + """ + with open(filename, 'w', encoding='utf-8') as f: + f.write("# 医学图像处理算子实验结果\n\n") + f.write("## 实验配置\n\n") + f.write("- 各向异性扩散迭代次数: 15\n") + f.write("- 水平集演化次数: 150\n") + f.write("- 时间步长: 0.5\n\n") + + f.write("## 实验结果\n\n") + for exp_name, metrics in results.items(): + f.write(f"### {exp_name}\n\n") + for metric, value in metrics.items(): + f.write(f"- **{metric}**: {value:.4f}\n") + f.write("\n") + + print(f"\n实验结果已保存到 {filename}") + +def main(): + """ + 主函数:运行完整实验流程 + """ + print("开始医学图像处理算子开发实验...") + print("="*60) + + # 创建输出目录 + os.makedirs('results', exist_ok=True) + + # 实验1: 2D合成图像 + print("\n>>> 运行实验1: 2D合成医学图像...") + results_exp1 = run_experiment_2d_image() + + # 实验2: CT结肠图像 + print("\n>>> 运行实验2: CT结肠息肉分割...") + ct_result, polyp_mask = run_experiment_ct_colon() + + # 实验3: 参数敏感性分析 + print("\n>>> 运行实验3: 参数敏感性分析...") + best_k = run_parameter_sensitivity_analysis() + + # 汇总结果 + all_results = { + 'Experiment 1 - 2D Synthetic': results_exp1, + 'Experiment 2 - CT Colon': {'Best K': best_k} + } + + # 保存结果 + save_results_to_markdown(all_results, 'results/experiment_report.md') + + print("\n" + "="*60) + print("所有实验完成!") + print("请查看生成的结果图像和 results/experiment_report.md 文件") + print("="*60) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git "a/2207010323_\345\274\240\344\270\226\350\210\252/medical_image_operators.py" "b/2207010323_\345\274\240\344\270\226\350\210\252/medical_image_operators.py" new file mode 100644 index 0000000000000000000000000000000000000000..9a1fa888e5bfc1c256f1ef16f162022f332db193 --- /dev/null +++ "b/2207010323_\345\274\240\344\270\226\350\210\252/medical_image_operators.py" @@ -0,0 +1,607 @@ +""" +医学图像处理算子库 +包含各向异性扩散去噪和改进水平集分割算法 +""" + +import numpy as np +import cv2 +import SimpleITK as sitk +from scipy import ndimage +from scipy.ndimage import gaussian_filter +import matplotlib.pyplot as plt +from typing import Tuple, Optional + +class AnisotropicDiffusion: + """各向异性扩散滤波算子""" + + def __init__(self, image: np.ndarray): + """ + 初始化各向异性扩散滤波器 + + Args: + image: 输入灰度图像(2D numpy数组) + """ + self.image = image.astype(np.float32) + self.height, self.width = image.shape + + def compute_diffusion_coefficient(self, gradient: np.ndarray, + mode: str = 'perona-malik-1', + K: float = 20.0) -> np.ndarray: + """ + 计算扩散系数 + + Args: + gradient: 梯度模值图像 + mode: 扩散系数类型 ('perona-malik-1' 或 'perona-malik-2') + K: 梯度阈值参数 + + Returns: + 扩散系数图像 + """ + if mode == 'perona-malik-1': + # g1 = exp(-(grad/K)^2) + return np.exp(-(gradient**2) / (K**2)) + elif mode == 'perona-malik-2': + # g2 = 1 / (1 + (grad/K)^2) + return 1.0 / (1.0 + (gradient**2) / (K**2)) + else: + raise ValueError("Unsupported diffusion coefficient mode") + + def compute_gradients(self, image: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + 计算图像梯度(中心差分) + + Returns: + grad_x: x方向梯度 + grad_y: y方向梯度 + grad_mag: 梯度模值 + """ + # 使用中心差分计算梯度 + grad_x = np.zeros_like(image) + grad_y = np.zeros_like(image) + + # x方向梯度 + grad_x[:, 1:-1] = (image[:, 2:] - image[:, :-2]) / 2.0 + grad_x[:, 0] = image[:, 1] - image[:, 0] # 前向差分 + grad_x[:, -1] = image[:, -1] - image[:, -2] # 后向差分 + + # y方向梯度 + grad_y[1:-1, :] = (image[2:, :] - image[:-2, :]) / 2.0 + grad_y[0, :] = image[1, :] - image[0, :] + grad_y[-1, :] = image[-1, :] - image[-2, :] + + grad_mag = np.sqrt(grad_x**2 + grad_y**2) + + return grad_x, grad_y, grad_mag + + def diffuse(self, num_iters: int = 15, K: float = 20.0, + delta: float = 0.15, mode: str = 'perona-malik-1') -> np.ndarray: + """ + 执行各向异性扩散 + + Args: + num_iters: 迭代次数 + K: 梯度阈值 + delta: 时间步长 + mode: 扩散系数类型 + + Returns: + 去噪后的图像 + """ + result = self.image.copy() + + for it in range(num_iters): + # 计算梯度 + grad_x, grad_y, grad_mag = self.compute_gradients(result) + + # 计算扩散系数 + diffusion_coeff = self.compute_diffusion_coefficient(grad_mag, mode, K) + + # 计算扩散流 + diffusion_x = diffusion_coeff * grad_x + diffusion_y = diffusion_coeff * grad_y + + # 计算散度(空间导数) + div = np.zeros_like(result) + + # x方向散度分量 + div_x = np.zeros_like(result) + div_x[:, 1:-1] = diffusion_x[:, 2:] - diffusion_x[:, 1:-1] + div_x[:, 0] = diffusion_x[:, 1] - diffusion_x[:, 0] + div_x[:, -1] = diffusion_x[:, -1] - diffusion_x[:, -2] + + # y方向散度分量 + div_y = np.zeros_like(result) + div_y[1:-1, :] = diffusion_y[2:, :] - diffusion_y[1:-1, :] + div_y[0, :] = diffusion_y[1, :] - diffusion_y[0, :] + div_y[-1, :] = diffusion_y[-1, :] - diffusion_y[-2, :] + + div = div_x + div_y + + # 更新图像 + result += delta * div + + # 边界条件处理(Neumann边界条件) + result[:, 0] = result[:, 1] + result[:, -1] = result[:, -2] + result[0, :] = result[1, :] + result[-1, :] = result[-2, :] + + print(f"Iteration {it+1}/{num_iters}, Diffusion energy: {np.sum(np.abs(div)):.4f}") + + return np.clip(result, 0, 255).astype(np.uint8) + + +class EdgeIndicator: + """边缘指示函数计算器""" + + @staticmethod + def compute_edge_indicator(image: np.ndarray, + sigma: float = 1.0, + alpha: float = 1.0) -> np.ndarray: + """ + 计算边缘指示函数 g = 1 / (1 + |∇I|^α) + + Args: + image: 输入图像 + sigma: 高斯平滑参数 + alpha: 指数参数 + + Returns: + 边缘指示函数图像 + """ + # 高斯平滑 + smoothed = gaussian_filter(image, sigma, mode='constant') + + # 计算梯度 + grad_x = ndimage.sobel(smoothed, axis=0, mode='constant') + grad_y = ndimage.sobel(smoothed, axis=1, mode='constant') + grad_mag = np.sqrt(grad_x**2 + grad_y**2) + + # 边缘指示函数 + edge_indicator = 1.0 / (1.0 + (grad_mag / 255.0)**alpha) + + return edge_indicator + + @staticmethod + def compute_edge_enhanced_by_diffusion(image: np.ndarray, + num_iters: int = 10, + K: float = 20.0) -> np.ndarray: + """ + 先进行各向异性扩散,再计算边缘指示函数 + + Args: + image: 输入图像 + num_iters: 扩散迭代次数 + K: 扩散阈值 + + Returns: + 增强后的边缘指示函数 + """ + # 各向异性扩散去噪 + ad = AnisotropicDiffusion(image) + diffused = ad.diffuse(num_iters=num_iters, K=K) + + # 在扩散后的图像上计算边缘指示函数 + edge_map = EdgeIndicator.compute_edge_indicator(diffused, sigma=0.5, alpha=2.0) + + return edge_map + + +class LevelSetSegmentation: + """改进的水平集分割算子""" + + def __init__(self, image: np.ndarray, edge_indicator: np.ndarray): + """ + 初始化水平集分割器 + + Args: + image: 原始图像 + edge_indicator: 边缘指示函数 + """ + self.image = image.astype(np.float32) + self.edge_indicator = edge_indicator + self.phi = None # 水平集函数 + + # 参数设置 + self.dt = 0.5 # 时间步长 + self.mu = 0.2 # 长度正则化系数 + self.nu = 1.0 # 气球力系数 + self.epsilon = 1.0 # Dirac函数宽度 + + # 窄带宽度 + self.narrow_band_width = 10 + + def initialize_circle(self, center: Tuple[int, int], radius: int): + """ + 用圆形初始轮廓初始化水平集函数 + + Args: + center: 圆心坐标 (y, x) + radius: 半径 + """ + h, w = self.image.shape + y, x = np.ogrid[:h, :w] + self.phi = np.sqrt((x - center[1])**2 + (y - center[0])**2) - radius + self.phi = np.sign(self.phi) # 符号距离函数简化版本 + + def initialize_rectangle(self, top_left: Tuple[int, int], + bottom_right: Tuple[int, int]): + """ + 用矩形初始轮廓初始化水平集函数 + + Args: + top_left: 左上角坐标 (y, x) + bottom_right: 右下角坐标 (y, x) + """ + h, w = self.image.shape + self.phi = np.ones((h, w)) + self.phi[top_left[0]:bottom_right[0], + top_left[1]:bottom_right[1]] = -1.0 + self.phi = -self.phi # 内部为负,外部为正 + + def dirac_delta(self, phi: np.ndarray) -> np.ndarray: + """ + Dirac delta函数 + + Args: + phi: 水平集函数 + + Returns: + Dirac函数值 + """ + # 正则化Dirac函数 + epsilon = self.epsilon + return epsilon / (np.pi * (epsilon**2 + phi**2)) + + def heaviside(self, phi: np.ndarray) -> np.ndarray: + """ + Heaviside函数 + + Args: + phi: 水平集函数 + + Returns: + Heaviside函数值 + """ + # 正则化Heaviside函数 + epsilon = self.epsilon + return 0.5 * (1 + (2 / np.pi) * np.arctan(phi / epsilon)) + + def compute_curvature(self, phi: np.ndarray) -> np.ndarray: + """ + 计算水平集函数的曲率 + + Args: + phi: 水平集函数 + + Returns: + 曲率图像 + """ + # 计算一阶导数 + phi_x = ndimage.sobel(phi, axis=1, mode='constant') + phi_y = ndimage.sobel(phi, axis=0, mode='constant') + + # 计算二阶导数 + phi_xx = ndimage.sobel(phi_x, axis=1, mode='constant') + phi_yy = ndimage.sobel(phi_y, axis=0, mode='constant') + phi_xy = ndimage.sobel(phi_x, axis=0, mode='constant') + + # 计算曲率 + numerator = phi_xx * phi_y**2 - 2 * phi_x * phi_y * phi_xy + phi_yy * phi_x**2 + denominator = phi_x**2 + phi_y**2 + 1e-10 # 避免除零 + + curvature = numerator / denominator + + return curvature + + def get_narrow_band(self, phi: np.ndarray) -> np.ndarray: + """ + 获取窄带区域掩码 + + Args: + phi: 水平集函数 + + Returns: + 窄带区域掩码 + """ + # 窄带区域:|phi| < narrow_band_width + narrow_band = np.abs(phi) < self.narrow_band_width + return narrow_band + + def evolve(self, num_iters: int = 200, + visualize: bool = False, + save_interval: int = 50) -> np.ndarray: + """ + 演化水平集函数 + + Args: + num_iters: 演化迭代次数 + visualize: 是否可视化演化过程 + save_interval: 保存中间结果的间隔 + + Returns: + 最终的分割结果 + """ + if self.phi is None: + raise ValueError("必须首先初始化水平集函数") + + if visualize: + plt.figure(figsize=(12, 6)) + + for it in range(num_iters): + # 1. 计算窄带区域 + narrow_band = self.get_narrow_band(self.phi) + if np.sum(narrow_band) < 10: # 停止条件 + print(f"窄带过窄,在第{it}次迭代停止") + break + + # 2. 计算曲率 + curvature = self.compute_curvature(self.phi) + + # 3. 计算边缘指示函数梯度 + g = self.edge_indicator + g_x = ndimage.sobel(g, axis=1, mode='constant') + g_y = ndimage.sobel(g, axis=0, mode='constant') + + # 4. 计算水平集函数梯度 + phi_x = ndimage.sobel(self.phi, axis=1, mode='constant') + phi_y = ndimage.sobel(self.phi, axis=0, mode='constant') + + # 5. 计算各项作用力 + # 曲率项: div(g * ∇phi / |∇phi|) + phi_grad_mag = np.sqrt(phi_x**2 + phi_y**2) + 1e-10 + normal_x = phi_x / phi_grad_mag + normal_y = phi_y / phi_grad_mag + + # 散度 + term_curvature_x = ndimage.sobel(g * normal_x, axis=1, mode='constant') + term_curvature_y = ndimage.sobel(g * normal_y, axis=0, mode='constant') + term_curvature = g * (term_curvature_x + term_curvature_y) + + # 气球力项: g * nu * |∇phi| + term_balloon = g * self.nu * phi_grad_mag + + # 边缘吸引项: ∇g · ∇phi + term_edge = g_x * phi_x + g_y * phi_y + + # 6. 更新水平集函数 + delta_phi = self.dirac_delta(self.phi) + update = delta_phi * (term_curvature + term_balloon + term_edge) + + # 仅在窄带区域更新 + update_mask = self.dt * update * narrow_band + self.phi -= update_mask + + # 7. 重新初始化(每50次迭代一次) + if it % 50 == 0 and it > 0: + self.phi = self.reinitialize_phi(self.phi) + + # 8. 可视化 + if visualize and (it % save_interval == 0 or it == num_iters - 1): + plt.subplot(1, 2, 1) + plt.imshow(self.image, cmap='gray') + plt.contour(self.phi, levels=[0], colors='r', linewidths=2) + plt.title(f'Iteration {it}') + plt.axis('off') + + plt.subplot(1, 2, 2) + plt.imshow(g, cmap='jet') + plt.title('Edge Indicator') + plt.axis('off') + + plt.pause(0.01) + plt.clf() + + # 9. 计算能量(监控收敛) + if it % 20 == 0: + energy = self.compute_energy() + print(f"Iteration {it}/{num_iters}, Energy: {energy:.6f}") + + if visualize: + plt.close() + + # 返回分割结果 + return self.get_segmentation_result() + + def reinitialize_phi(self, phi: np.ndarray, num_iters: int = 5) -> np.ndarray: + """ + 水平集函数重新初始化 + + Args: + phi: 需要重新初始化的水平集函数 + num_iters: 迭代次数 + + Returns: + 符号距离函数 + """ + # 简化版本:使用距离变换 + # 实际应用中可用更精确的PDE方法 + phi_new = phi.copy() + for _ in range(num_iters): + # 计算符号 + sign_phi = np.sign(phi_new) + sign_phi[sign_phi == 0] = 1 + + # 计算梯度 + phi_x = ndimage.sobel(phi_new, axis=1, mode='constant') + phi_y = ndimage.sobel(phi_new, axis=0, mode='constant') + grad_mag = np.sqrt(phi_x**2 + phi_y**2) + 1e-10 + + # Sussman重新初始化 + phi_new -= 0.5 * sign_phi * (grad_mag - 1) + + return phi_new + + def compute_energy(self) -> float: + """ + 计算总能量 + + Returns: + 能量值 + """ + # 长度项能量 + delta_phi = self.dirac_delta(self.phi) + phi_x = ndimage.sobel(self.phi, axis=1, mode='constant') + phi_y = ndimage.sobel(self.phi, axis=0, mode='constant') + length_energy = np.sum(self.edge_indicator * delta_phi * + np.sqrt(phi_x**2 + phi_y**2)) + + # 面积项能量 + area_energy = np.sum(self.heaviside(-self.phi)) + + return length_energy + self.mu * area_energy + + def get_segmentation_result(self) -> np.ndarray: + """ + 获取二值分割结果 + + Returns: + 二值掩码(目标区域为1,背景为0) + """ + if self.phi is None: + raise ValueError("水平集函数未初始化") + + return (self.phi <= 0).astype(np.uint8) + + +class MedicalImageOperator: + """医学图像处理算子综合类""" + + @staticmethod + def load_medical_image(filepath: str, slice_idx: Optional[int] = None) -> np.ndarray: + """ + 加载医学图像文件 + + Args: + filepath: 文件路径(支持DICOM, NIfTI, MHD等) + slice_idx: 如果是3D数据,指定切片索引 + + Returns: + 2D图像数组 + """ + try: + # 尝试使用SimpleITK读取 + image_sitk = sitk.ReadImage(filepath) + image_array = sitk.GetArrayFromImage(image_sitk) + + # 如果是3D数据,选择中间切片 + if image_array.ndim == 3: + if slice_idx is None: + slice_idx = image_array.shape[0] // 2 + image_2d = image_array[slice_idx] + else: + image_2d = image_array + + # 归一化到0-255 + image_2d = np.clip(image_2d, np.percentile(image_2d, 1), + np.percentile(image_2d, 99)) + image_2d = ((image_2d - image_2d.min()) / + (image_2d.max() - image_2d.min()) * 255) + + return image_2d.astype(np.uint8) + + except Exception as e: + print(f"SimpleITK读取失败,尝试OpenCV: {e}") + # 回退到OpenCV + image = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE) + if image is None: + raise ValueError(f"无法读取图像文件: {filepath}") + return image + + @staticmethod + def add_speckle_noise(image: np.ndarray, noise_variance: float = 0.05) -> np.ndarray: + """ + 为图像添加乘性噪声(模拟超声图像) + + Args: + image: 输入图像 + noise_variance: 噪声方差 + + Returns: + 加噪图像 + """ + row, col = image.shape + gauss = np.random.randn(row, col) + gauss = gauss.reshape(row, col) + noisy = image + image * gauss * noise_variance + return np.clip(noisy, 0, 255).astype(np.uint8) + + @staticmethod + def evaluate_dice_coefficient(mask1: np.ndarray, mask2: np.ndarray) -> float: + """ + 计算Dice系数 + + Args: + mask1, mask2: 二值掩码 + + Returns: + Dice系数 + """ + intersection = np.sum(mask1 * mask2) + union = np.sum(mask1) + np.sum(mask2) + + if union == 0: + return 1.0 + + return 2.0 * intersection / union + + +# 辅助可视化函数 +def visualize_results(image: np.ndarray, + denoised: np.ndarray, + edge_map: np.ndarray, + segmentation: np.ndarray, + ground_truth: Optional[np.ndarray] = None): + """ + 可视化处理结果 + + Args: + image: 原始图像 + denoised: 去噪后图像 + edge_map: 边缘指示函数 + segmentation: 分割结果 + ground_truth: 真实标签(可选) + """ + fig, axes = plt.subplots(2, 3, figsize=(18, 12)) + + axes[0, 0].imshow(image, cmap='gray') + axes[0, 0].set_title('Original Image') + axes[0, 0].axis('off') + + axes[0, 1].imshow(denoised, cmap='gray') + axes[0, 1].set_title('Anisotropic Diffusion Denoised') + axes[0, 1].axis('off') + + axes[0, 2].imshow(edge_map, cmap='jet') + axes[0, 2].set_title('Edge Indicator Function') + axes[0, 2].axis('off') + + axes[1, 0].imshow(image, cmap='gray') + axes[1, 0].contour(segmentation, levels=[0.5], colors='r', linewidths=2) + axes[1, 0].set_title('Segmentation Result') + axes[1, 0].axis('off') + + if ground_truth is not None: + axes[1, 1].imshow(ground_truth, cmap='gray') + axes[1, 1].set_title('Ground Truth') + axes[1, 1].axis('off') + + # 计算Dice系数 + dice = MedicalImageOperator.evaluate_dice_coefficient(segmentation, ground_truth) + axes[1, 2].text(0.5, 0.5, f'Dice Coefficient: {dice:.4f}', + fontsize=20, ha='center', va='center') + axes[1, 2].set_title('Evaluation') + axes[1, 2].axis('off') + else: + axes[1, 1].imshow(segmentation, cmap='gray') + axes[1, 1].set_title('Binary Segmentation Mask') + axes[1, 1].axis('off') + + axes[1, 2].imshow(image, cmap='gray') + axes[1, 2].imshow(segmentation, cmap='Reds', alpha=0.3) + axes[1, 2].set_title('Overlay') + axes[1, 2].axis('off') + + plt.tight_layout() + plt.show()【 \ No newline at end of file