diff --git a/ing b/ing new file mode 100644 index 0000000000000000000000000000000000000000..51447d1c6db89334746caeb4277a52ae7c0e4e26 --- /dev/null +++ b/ing @@ -0,0 +1,266 @@ +import numpy as np +import networkx as nx +import matplotlib.pyplot as plt +from typing import Dict, Callable, Tuple + +plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['axes.unicode_minus'] = False + + +class MineVentilationSolver: + def __init__(self, nodes: list, branches: list): + """ + 初始化通风网络求解器 + + 参数: + nodes: 节点列表 [{'id': int, 'x': float, 'y': float, ...}] + branches: 分支列表 [{'id': str, 'start_node': int, 'end_node': int, 'resistance': float, ...}] + """ + self.nodes = nodes + self.branches = branches + + # 建立索引映射 + self.node_index = {n['id']: i for i, n in enumerate(nodes)} + self.branch_index = {b['id']: i for i, b in enumerate(branches)} + + # 构建网络矩阵 + self.incidence_matrix = self._build_incidence_matrix() + self.loop_matrix, self.cotree_edges = self._build_loop_matrix() + + # 物理参数 + self.R = np.array([b['resistance'] for b in branches]) # 分支阻力 + + def _build_incidence_matrix(self) -> np.ndarray: + """构造节点-分支关联矩阵 (n x m)""" + mat = np.zeros((len(self.nodes), len(self.branches))) + for i, b in enumerate(self.branches): + mat[self.node_index[b['start_node']], i] = 1 + mat[self.node_index[b['end_node']], i] = -1 + return mat + + def _build_loop_matrix(self) -> Tuple[np.ndarray, list]: + """生成独立回路矩阵 (k x m) 和余树枝列表""" + G = nx.Graph() + for b in self.branches: + G.add_edge(b['start_node'], b['end_node'], id=b['id']) + + # 获取生成树 + tree = nx.minimum_spanning_tree(G) + cotree_edges = [e for e in G.edges() if not tree.has_edge(*e)] + + # 构建回路矩阵 + loop_mat = [] + for e in cotree_edges: + u, v = e + path = nx.shortest_path(tree, u, v) + loop = path + [path[0]] + + # 标记回路方向 + loop_row = np.zeros(len(self.branches)) + for i in range(len(loop) - 1): + n1, n2 = loop[i], loop[i + 1] + for j, b in enumerate(self.branches): + if (b['start_node'], b['end_node']) == (n1, n2): + loop_row[j] = 1 + elif (b['start_node'], b['end_node']) == (n2, n1): + loop_row[j] = -1 + loop_mat.append(loop_row) + + return np.array(loop_mat), [G.edges[e]['id'] for e in cotree_edges] + + def set_fan_characteristics(self, fan_data: Dict[str, Tuple[Callable, Callable]]): + """ + 设置风机特性曲线 + + 参数: + fan_data: { + 分支ID: ( + 风压函数 H(Q), + 风压导数函数 dH/dQ + ) + } + """ + self.H_funcs = [None] * len(self.branches) + self.dH_funcs = [None] * len(self.branches) + for bid, (h_func, dh_func) in fan_data.items(): + idx = self.branch_index[bid] + self.H_funcs[idx] = h_func + self.dH_funcs[idx] = dh_func + + def solve(self, initial_flows: Dict[str, float] = None, tol: float = 1e-6, max_iter: int = 100) -> Dict[str, float]: + """ + 求解通风网络风量分布 + + 参数: + initial_flows: 初始风量 {分支ID: 风量值} + tol: 收敛容差 + max_iter: 最大迭代次数 + + 返回: + {分支ID: 风量值} + """ + # 初始化风量 + Q = self._initialize_flows(initial_flows) + + # 牛顿迭代 + A = self.incidence_matrix[1:, :] # 去掉参考节点 + B = self.loop_matrix + + for iter in range(max_iter): + # 计算压力损失 + delta_P = np.zeros(len(self.branches)) + for j in range(len(self.branches)): + H = self.H_funcs[j](Q[j]) if self.H_funcs[j] is not None else 0 + delta_P[j] = self.R[j] * Q[j] ** 2 - H + + # 构建残差 + F = np.concatenate([A @ Q, B @ delta_P]) + + # 打印迭代过程信息 + print(f"迭代次数: {iter + 1}, 残差范数: {np.linalg.norm(F)}") + print("当前风量:") + for i, b in enumerate(self.branches): + print(f"{b['id']}: {Q[i]:.4f} m³/s") + + # 收敛检查 + if np.linalg.norm(F) < tol: + print(f"收敛于第 {iter + 1} 次迭代") + return {b['id']: Q[i] for i, b in enumerate(self.branches)} + + # 构造雅可比矩阵 + D = np.array([ + 2 * self.R[j] * Q[j] - (self.dH_funcs[j](Q[j]) if self.dH_funcs[j] is not None else 0) + for j in range(len(self.branches)) + ]) + J = np.vstack([A, B @ np.diag(D)]) + + # 求解更新量 + dQ = np.linalg.lstsq(J, -F, rcond=None)[0] + Q += dQ + + # 物理约束 + Q = np.maximum(Q, 0.01) # 风量必须为正 + + raise RuntimeError(f"未能在 {max_iter} 次迭代内收敛") + + def _initialize_flows(self, initial_flows: Dict[str, float] = None) -> np.ndarray: + """生成初始风量分布""" + if initial_flows is None: + return np.ones(len(self.branches)) * 5.0 # 默认初始值 + + # 基于指定分支风量生成初始值 + Q = np.zeros(len(self.branches)) + for bid, flow in initial_flows.items(): + Q[self.branch_index[bid]] = flow + + # 对未指定的分支使用回路方程估算 + for i, b in enumerate(self.branches): + if b['id'] not in initial_flows: + # 简单估算:风量与阻力成反比 + Q[i] = 10 / (self.R[i] + 0.01) + + return Q + + def visualize(self, solution: Dict[str, float] = None): + """可视化通风网络""" + G = nx.DiGraph() + pos = {} + for node in self.nodes: + G.add_node(node['id']) + pos[node['id']] = (node['x'], node['y']) + + edge_colors = [] + edge_widths = [] + edge_labels = {} + for b in self.branches: + G.add_edge(b['start_node'], b['end_node'], id=b['id']) + edge_labels[(b['start_node'], b['end_node'])] = f"{b['id']}\nR={b['resistance']:.3f}" + + # 标记余树枝 + if b['id'] in self.cotree_edges: + edge_colors.append('red') + edge_widths.append(2.5) + else: + edge_colors.append('green') + edge_widths.append(2) + + # 显示计算结果 + if solution: + edge_labels[(b['start_node'], b['end_node'])] += f"\nQ={solution[b['id']]:.2f}" + + plt.figure(figsize=(12, 10)) + nx.draw_networkx_nodes(G, pos, node_size=700, node_color='lightblue') + nx.draw_networkx_edges( + G, pos, + edge_color=edge_colors, + width=edge_widths, + arrows=True, + arrowstyle='->', + arrowsize=20 + ) + nx.draw_networkx_labels(G, pos, font_size=12) + nx.draw_networkx_edge_labels( + G, pos, + edge_labels=edge_labels, + font_size=9, + bbox=dict(facecolor='white', alpha=0.8) + ) + + plt.title("矿井通风网络\n(红色: 余树枝, 绿色: 树边)", fontsize=14) + plt.axis('off') + plt.tight_layout() + plt.show() + + +# 使用示例 +if __name__ == "__main__": + # 定义网络结构 + nodes = [ + {'id': 1, 'x': 0, 'y': 0, 'pressure': 0}, # 参考节点 + {'id': 2, 'x': -1, 'y': 1}, + {'id': 3, 'x': 1, 'y': 1}, + {'id': 4, 'x': 0, 'y': 2} + ] + + branches = [ + {'id': 'B1', 'start_node': 1, 'end_node': 2, 'resistance': 0.38}, + {'id': 'B2', 'start_node': 1, 'end_node': 3, 'resistance': 0.50}, + {'id': 'B3', 'start_node': 2, 'end_node': 3, 'resistance': 0.65}, + {'id': 'B4', 'start_node': 2, 'end_node': 4, 'resistance': 0.20}, + {'id': 'B5', 'start_node': 3, 'end_node': 4, 'resistance': 0.085}, + {'id': 'B6', 'start_node': 4, 'end_node': 1, 'resistance': 0.01} + ] + + # 创建求解器 + solver = MineVentilationSolver(nodes, branches) + + # 设置风机特性 (示例: B6安装风机) + solver.set_fan_characteristics({ + 'B6': ( + lambda Q: -61.2432 + 16.2027 * Q - 0.3433 * Q ** 2, # 风机特性曲线 H = 150 - 0.05Q² + lambda Q: 16.2027 - 0.6866 * Q # 导数 dH/dQ = -0.1Q + ) + }) + + # 设置初始风量 (可选) + initial_flows = { + 'B6': 30.0, + 'B1': 16.0, + 'B4': 14.0, + } + + # 求解网络 + try: + solution = solver.solve(initial_flows=initial_flows) + + # 打印结果 + print("\n风量分布计算结果:") + for bid, flow in solution.items(): + print(f"{bid}: {flow:.4f} m³/s") + + # 可视化 + solver.visualize(solution) + + except RuntimeError as e: + print("求解失败:", e) + solver.visualize() # 显示网络结构用于调试