diff --git "a/Z25070051\351\202\261\345\273\272\344\270\232/.keep" "b/Z25070051\351\202\261\345\273\272\344\270\232/.keep" new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git "a/Z25070051\351\202\261\345\273\272\344\270\232/Z25070051\351\202\261\345\273\272\344\270\232.docx" "b/Z25070051\351\202\261\345\273\272\344\270\232/Z25070051\351\202\261\345\273\272\344\270\232.docx" new file mode 100644 index 0000000000000000000000000000000000000000..5adf727d51afa8f15ee659b41bfad37710409138 Binary files /dev/null and "b/Z25070051\351\202\261\345\273\272\344\270\232/Z25070051\351\202\261\345\273\272\344\270\232.docx" differ diff --git "a/Z25070051\351\202\261\345\273\272\344\270\232/\345\271\266\350\241\214.py" "b/Z25070051\351\202\261\345\273\272\344\270\232/\345\271\266\350\241\214.py" new file mode 100644 index 0000000000000000000000000000000000000000..99ba5155b625fe20dd6a8103c318ad9d7bbf1882 --- /dev/null +++ "b/Z25070051\351\202\261\345\273\272\344\270\232/\345\271\266\350\241\214.py" @@ -0,0 +1,1097 @@ +class Config: + """配置参数类""" + + # 路径提取参数 + MAX_PATH_LENGTH = 3 + MAX_PATHS_PER_PAIR = 50 + BEAM_WIDTH = 100 + + # 嵌入参数 + EMBEDDING_DIM = 100 + MARGIN = 1.0 + LEARNING_RATE = 0.001 + + # 模型参数 + ATTENTION_HEADS = 4 + DROPOUT_RATE = 0.1 + + # 训练参数 + BATCH_SIZE = 64 + EPOCHS = 50 + VALIDATION_SPLIT = 0.1 + + # 数据路径 + DATA_DIR = "./data/" + MODEL_DIR = "./models/" + + +import numpy as np +from collections import deque, defaultdict +from typing import List, Tuple, Set, Dict +import heapq + + +class PathExtractor: + """基于双向BFS的路径提取算子""" + + def __init__(self, triplets: List[Tuple], max_length: int = 3, max_paths: int = 50): + """ + 初始化路径提取器 + + Args: + triplets: 知识图谱三元组列表 [(头实体, 关系, 尾实体)] + max_length: 最大路径长度 + max_paths: 每对实体最大提取路径数 + """ + self.max_length = max_length + self.max_paths = max_paths + self.entity_to_relations = defaultdict(set) + self.relation_to_entities = defaultdict(set) + self._build_graph(triplets) + + def _build_graph(self, triplets: List[Tuple]): + """构建图结构""" + for h, r, t in triplets: + self.entity_to_relations[(h, 'out')].add((r, t)) + self.entity_to_relations[(t, 'in')].add((r, h)) + self.relation_to_entities[r].add((h, t)) + + def extract_paths(self, head: str, tail: str) -> List[List[str]]: + """ + 提取头尾实体间的所有关系路径 + + Args: + head: 头实体 + tail: 尾实体 + + Returns: + 关系路径列表,每条路径是关系序列 + """ + if head == tail: + return [] + + # 双向BFS搜索 + forward_paths = self._bfs_search(head, tail, direction='forward') + backward_paths = self._bfs_search(tail, head, direction='backward') + + # 合并路径并去重 + all_paths = forward_paths + backward_paths + unique_paths = self._remove_duplicate_paths(all_paths) + + # 限制路径数量 + return unique_paths[:self.max_paths] + + def _bfs_search(self, start: str, target: str, direction: str = 'forward') -> List[List[str]]: + """BFS搜索路径""" + queue = deque([(start, [])]) + visited = set([(start, tuple())]) + found_paths = [] + + while queue: + current_entity, current_path = queue.popleft() + + # 检查路径长度限制 + if len(current_path) >= self.max_length: + continue + + # 获取当前实体的邻居 + neighbors = self._get_neighbors(current_entity, direction) + + for relation, next_entity in neighbors: + new_path = current_path + [relation] + path_key = (next_entity, tuple(new_path)) + + if path_key in visited: + continue + + visited.add(path_key) + + # 找到目标实体 + if next_entity == target: + found_paths.append(new_path) + if len(found_paths) >= self.max_paths: + return found_paths + + # 继续搜索 + queue.append((next_entity, new_path)) + + return found_paths + + def _get_neighbors(self, entity: str, direction: str) -> Set[Tuple]: + """获取实体的邻居""" + if direction == 'forward': + return self.entity_to_relations.get((entity, 'out'), set()) + else: + return self.entity_to_relations.get((entity, 'in'), set()) + + def _remove_duplicate_paths(self, paths: List[List[str]]) -> List[List[str]]: + """去除重复路径""" + seen = set() + unique_paths = [] + + for path in paths: + path_tuple = tuple(path) + if path_tuple not in seen: + seen.add(path_tuple) + unique_paths.append(path) + + return unique_paths + + def extract_all_pairs_paths(self, entity_pairs: List[Tuple[str, str]]) -> Dict[Tuple[str, str], List[List[str]]]: + """ + 批量提取实体对之间的路径 + + Args: + entity_pairs: 实体对列表 [(头实体, 尾实体)] + + Returns: + 字典:实体对 -> 路径列表 + """ + results = {} + for head, tail in entity_pairs: + paths = self.extract_paths(head, tail) + if paths: + results[(head, tail)] = paths + + return results + + +import torch +import torch.nn as nn +import torch.nn.functional as F +import numpy as np +from typing import List, Dict + + +class PathEncoder(nn.Module): + """基于TransE的路径编码算子""" + + def __init__(self, num_relations: int, embedding_dim: int = 100): + super(PathEncoder, self).__init__() + self.embedding_dim = embedding_dim + + # 关系嵌入层 + self.relation_embeddings = nn.Embedding( + num_relations + 1, # +1 for padding + embedding_dim, + padding_idx=num_relations # 最后一维作为padding + ) + + # 位置编码 + self.position_embeddings = nn.Embedding(10, embedding_dim) # 支持最大路径长度10 + + # 自注意力层 + self.attention = nn.MultiheadAttention( + embed_dim=embedding_dim, + num_heads=4, + dropout=0.1, + batch_first=True + ) + + # 前馈网络 + self.feed_forward = nn.Sequential( + nn.Linear(embedding_dim, embedding_dim * 2), + nn.ReLU(), + nn.Linear(embedding_dim * 2, embedding_dim) + ) + + # 层归一化 + self.layer_norm1 = nn.LayerNorm(embedding_dim) + self.layer_norm2 = nn.LayerNorm(embedding_dim) + + def forward(self, path_indices: torch.Tensor, path_lengths: torch.Tensor) -> torch.Tensor: + """ + 编码路径 + + Args: + path_indices: 路径索引 [batch_size, max_path_len] + path_lengths: 路径实际长度 [batch_size] + + Returns: + 路径编码向量 [batch_size, embedding_dim] + """ + batch_size, max_len = path_indices.shape + + # 获取关系嵌入 + relation_embeds = self.relation_embeddings(path_indices) # [batch, seq_len, dim] + + # 添加位置编码 + positions = torch.arange(max_len, device=path_indices.device).unsqueeze(0).expand(batch_size, -1) + position_embeds = self.position_embeddings(positions) + embeddings = relation_embeds + position_embeds + + # 创建注意力mask + attention_mask = self._create_attention_mask(path_lengths, max_len, path_indices.device) + + # 自注意力编码 + attn_output, _ = self.attention( + embeddings, embeddings, embeddings, + key_padding_mask=attention_mask + ) + embeddings = self.layer_norm1(embeddings + attn_output) + + # 前馈网络 + ff_output = self.feed_forward(embeddings) + embeddings = self.layer_norm2(embeddings + ff_output) + + # 池化得到路径表示(考虑路径长度) + mask = (torch.arange(max_len, device=path_indices.device).unsqueeze(0) + < path_lengths.unsqueeze(1)).float().unsqueeze(-1) + masked_embeddings = embeddings * mask + + # 平均池化 + path_encodings = torch.sum(masked_embeddings, dim=1) / path_lengths.unsqueeze(1).clamp(min=1) + + return path_encodings + + def _create_attention_mask(self, lengths: torch.Tensor, max_len: int, device: torch.device) -> torch.Tensor: + """创建注意力mask""" + mask = torch.zeros(len(lengths), max_len, device=device, dtype=torch.bool) + for i, length in enumerate(lengths): + mask[i, length:] = True + return mask + + def compute_path_similarity(self, head: torch.Tensor, tail: torch.Tensor, + path_encoding: torch.Tensor) -> torch.Tensor: + """ + 计算路径相似度得分(基于TransE) + + Args: + head: 头实体嵌入 [batch_size, dim] + tail: 尾实体嵌入 [batch_size, dim] + path_encoding: 路径编码 [batch_size, dim] + + Returns: + 相似度得分 [batch_size] + """ + # 计算 h + path ≈ t + predicted_tail = head + path_encoding + similarity = -torch.norm(predicted_tail - tail, p=2, dim=1) + return similarity + + +import torch +import torch.nn as nn +import torch.nn.functional as F +import numpy as np +from typing import List, Tuple + + +class PathScorer(nn.Module): + """基于注意力机制的路径评分算子""" + + def __init__(self, embedding_dim: int = 100): + super(PathScorer, self).__init__() + self.embedding_dim = embedding_dim + + # 注意力网络 + self.query = nn.Linear(embedding_dim * 3, embedding_dim) + self.key = nn.Linear(embedding_dim, embedding_dim) + self.value = nn.Linear(embedding_dim, embedding_dim) + + # 评分网络 + self.scoring_network = nn.Sequential( + nn.Linear(embedding_dim * 3, embedding_dim), + nn.ReLU(), + nn.Dropout(0.1), + nn.Linear(embedding_dim, 1) + ) + + # 路径类型编码 + self.path_type_embeddings = nn.Embedding(5, embedding_dim) # 5种路径类型 + + def forward(self, head_emb: torch.Tensor, tail_emb: torch.Tensor, + path_encodings: List[torch.Tensor], path_types: torch.Tensor = None) -> Tuple[ + torch.Tensor, torch.Tensor]: + """ + 对多条路径进行评分 + + Args: + head_emb: 头实体嵌入 [batch_size, dim] + tail_emb: 尾实体嵌入 [batch_size, dim] + path_encodings: 路径编码列表,每个元素形状为 [num_paths_i, dim] + path_types: 路径类型 [total_paths] 或 None + + Returns: + scores: 路径得分 [total_paths] + attention_weights: 注意力权重 [batch_size, max_paths_per_pair] + """ + batch_size = head_emb.shape[0] + + # 准备输入 + all_scores = [] + all_attention_weights = [] + + for i in range(batch_size): + if len(path_encodings[i]) == 0: + # 如果没有路径,添加虚拟路径 + dummy_path = torch.zeros(1, self.embedding_dim, device=head_emb.device) + path_encodings[i] = dummy_path + + num_paths = len(path_encodings[i]) + paths = path_encodings[i] # [num_paths, dim] + + # 复制头尾实体 + h_i = head_emb[i].unsqueeze(0).expand(num_paths, -1) # [num_paths, dim] + t_i = tail_emb[i].unsqueeze(0).expand(num_paths, -1) # [num_paths, dim] + + # 添加路径类型信息 + if path_types is not None: + start_idx = sum([len(p) for p in path_encodings[:i]]) + types = path_types[start_idx:start_idx + num_paths] + type_emb = self.path_type_embeddings(types) + paths = paths + type_emb + + # 计算注意力得分 + attention_scores = self._compute_attention(h_i, t_i, paths) # [num_paths] + + # 计算综合得分 + combined = torch.cat([h_i, paths, t_i], dim=1) # [num_paths, dim*3] + path_scores = self.scoring_network(combined).squeeze(-1) # [num_paths] + + # 结合注意力 + final_scores = path_scores * torch.sigmoid(attention_scores) + + all_scores.append(final_scores) + all_attention_weights.append(torch.softmax(attention_scores, dim=0)) + + # 合并所有得分 + scores = torch.cat(all_scores, dim=0) + + return scores, all_attention_weights + + def _compute_attention(self, head: torch.Tensor, tail: torch.Tensor, + paths: torch.Tensor) -> torch.Tensor: + """计算注意力得分""" + # 构建查询向量 + query_input = torch.cat([head, tail, paths], dim=1) # [num_paths, dim*3] + query = self.query(query_input) # [num_paths, dim] + + # 键和值 + key = self.key(paths) # [num_paths, dim] + value = self.value(paths) # [num_paths, dim] + + # 缩放点积注意力 + scale = self.embedding_dim ** 0.5 + attention_logits = torch.matmul(query, key.transpose(0, 1)) / scale # [num_paths, num_paths] + + # 自注意力 + attention_weights = torch.softmax(attention_logits, dim=1) + + # 聚合值 + context = torch.matmul(attention_weights, value) # [num_paths, dim] + + # 计算注意力得分 + attention_scores = torch.sum(query * context, dim=1) # [num_paths] + + return attention_scores + + def aggregate_path_scores(self, scores: torch.Tensor, path_counts: List[int]) -> torch.Tensor: + """ + 聚合同一实体对的多条路径得分 + + Args: + scores: 所有路径得分 [total_paths] + path_counts: 每个实体对的路径数量列表 + + Returns: + 聚合后的得分 [batch_size] + """ + aggregated = [] + start_idx = 0 + + for count in path_counts: + if count == 0: + # 如果没有路径,得分为0 + aggregated.append(torch.tensor(0.0, device=scores.device)) + else: + # 使用log-sum-exp聚合 + pair_scores = scores[start_idx:start_idx + count] + # 软最大值聚合 + aggregated_score = torch.logsumexp(pair_scores, dim=0) + aggregated.append(aggregated_score) + start_idx += count + + return torch.stack(aggregated) + + +import torch +import torch.nn as nn +import torch.nn.functional as F +import numpy as np +from typing import List, Tuple, Dict + + +class RelationPredictor(nn.Module): + """基于路径的关系预测算子""" + + def __init__(self, num_relations: int, embedding_dim: int = 100): + super(RelationPredictor, self).__init__() + self.num_relations = num_relations + + # 关系分类器 + self.relation_classifier = nn.Sequential( + nn.Linear(embedding_dim * 3, embedding_dim * 2), + nn.ReLU(), + nn.Dropout(0.2), + nn.Linear(embedding_dim * 2, embedding_dim), + nn.ReLU(), + nn.Linear(embedding_dim, num_relations) + ) + + # 路径到关系的映射网络 + self.path_to_relation = nn.Sequential( + nn.Linear(embedding_dim, embedding_dim * 2), + nn.ReLU(), + nn.Linear(embedding_dim * 2, num_relations) + ) + + # 关系兼容性网络 + self.compatibility_network = nn.Sequential( + nn.Linear(embedding_dim * 2, embedding_dim), + nn.ReLU(), + nn.Linear(embedding_dim, 1) + ) + + def forward(self, head_emb: torch.Tensor, tail_emb: torch.Tensor, + path_encodings: torch.Tensor, candidate_relations: torch.Tensor = None) -> Tuple[torch.Tensor, Dict]: + """ + 预测头尾实体间的关系 + + Args: + head_emb: 头实体嵌入 [batch_size, dim] + tail_emb: 尾实体嵌入 [batch_size, dim] + path_encodings: 路径聚合编码 [batch_size, dim] + candidate_relations: 候选关系索引,如果为None则预测所有关系 + + Returns: + relation_scores: 关系得分 [batch_size, num_relations] 或 [batch_size, num_candidates] + prediction_info: 预测相关信息字典 + """ + batch_size = head_emb.shape[0] + + # 方法1: 基于实体和路径的直接预测 + combined = torch.cat([head_emb, path_encodings, tail_emb], dim=1) # [batch_size, dim*3] + direct_scores = self.relation_classifier(combined) # [batch_size, num_relations] + + # 方法2: 基于路径的关系映射 + path_based_scores = self.path_to_relation(path_encodings) # [batch_size, num_relations] + + # 方法3: 关系兼容性得分 + relation_embeddings = self._get_relation_embeddings() # [num_relations, dim] + relation_embeddings = relation_embeddings.unsqueeze(0).expand(batch_size, -1, + -1) # [batch_size, num_relations, dim] + + head_expanded = head_emb.unsqueeze(1).expand(-1, self.num_relations, -1) # [batch_size, num_relations, dim] + tail_expanded = tail_emb.unsqueeze(1).expand(-1, self.num_relations, -1) # [batch_size, num_relations, dim] + + compatibility_input = torch.cat([ + head_expanded + relation_embeddings, + tail_expanded + ], dim=-1) # [batch_size, num_relations, dim*2] + + compatibility_scores = self.compatibility_network(compatibility_input).squeeze( + -1) # [batch_size, num_relations] + + # 综合三种方法 + final_scores = (direct_scores + path_based_scores + compatibility_scores) / 3.0 + + # 如果有候选关系,只返回候选关系的得分 + if candidate_relations is not None: + candidate_scores = torch.gather( + final_scores, + 1, + candidate_relations + ) + final_scores = candidate_scores + + # 准备预测信息 + prediction_info = { + 'direct_scores': direct_scores, + 'path_based_scores': path_based_scores, + 'compatibility_scores': compatibility_scores, + 'final_scores': final_scores + } + + return final_scores, prediction_info + + def _get_relation_embeddings(self) -> torch.Tensor: + """获取关系嵌入权重""" + # 这里假设关系嵌入已经学习好,实际中需要从嵌入层获取 + return self.relation_classifier[0].weight[:self.num_relations, :self.relation_classifier[0].in_features // 3] + + def predict_with_explanation(self, head: str, tail: str, paths: List[List[str]], + head_emb: torch.Tensor, tail_emb: torch.Tensor, + path_encodings: torch.Tensor, top_k: int = 5) -> Dict: + """ + 预测关系并提供可解释的结果 + + Args: + head: 头实体 + tail: 尾实体 + paths: 路径列表 + head_emb: 头实体嵌入 + tail_emb: 尾实体嵌入 + path_encodings: 路径编码 + top_k: 返回前k个预测 + + Returns: + 包含预测和解释的字典 + """ + # 预测关系 + relation_scores, pred_info = self.forward( + head_emb.unsqueeze(0), + tail_emb.unsqueeze(0), + path_encodings.unsqueeze(0) + ) + + # 获取top-k关系 + scores = relation_scores.squeeze(0) + top_scores, top_indices = torch.topk(scores, min(top_k, len(scores))) + + # 分析路径贡献 + path_contributions = self._analyze_path_contributions( + head_emb, tail_emb, path_encodings, paths, top_indices + ) + + # 构建解释 + explanation = { + 'head_entity': head, + 'tail_entity': tail, + 'num_paths_found': len(paths), + 'top_predictions': [ + { + 'relation_id': idx.item(), + 'score': score.item(), + 'confidence': torch.sigmoid(score).item() + } + for idx, score in zip(top_indices, top_scores) + ], + 'path_contributions': path_contributions, + 'prediction_components': { + 'direct_component': pred_info['direct_scores'].mean().item(), + 'path_based_component': pred_info['path_based_scores'].mean().item(), + 'compatibility_component': pred_info['compatibility_scores'].mean().item() + } + } + + return explanation + + def _analyze_path_contributions(self, head_emb: torch.Tensor, tail_emb: torch.Tensor, + path_encodings: torch.Tensor, paths: List[List[str]], + relation_indices: torch.Tensor) -> List[Dict]: + """分析每条路径对不同关系的贡献""" + contributions = [] + + for i, path in enumerate(paths): + path_emb = path_encodings[i].unsqueeze(0) + + # 计算该路径对不同关系的支持度 + path_scores = [] + for rel_idx in relation_indices: + # 简化的贡献度计算 + rel_emb = self._get_relation_embeddings()[rel_idx].unsqueeze(0) + compatibility = -torch.norm(head_emb + rel_emb - tail_emb, p=2) + path_similarity = torch.cosine_similarity(path_emb, rel_emb.unsqueeze(0)) + contribution = (compatibility + path_similarity) / 2 + path_scores.append(contribution.item()) + + contributions.append({ + 'path': ' → '.join(path), + 'length': len(path), + 'relation_support': dict(zip(relation_indices.tolist(), path_scores)), + 'most_supported_relation': relation_indices[torch.argmax(torch.tensor(path_scores))].item() + }) + + return contributions + + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import Dataset, DataLoader +import numpy as np +import json +from typing import List, Tuple, Dict +import logging +from datetime import datetime + +from operators.path_extractor import PathExtractor +from operators.path_encoder import PathEncoder +from operators.path_scorer import PathScorer +from operators.relation_predictor import RelationPredictor +from models.transE import TransE +from utils.graph_utils import KnowledgeGraph +from utils.metrics import calculate_metrics +from config import Config + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class KGDataset(Dataset): + """知识图谱数据集""" + + def __init__(self, triplets: List[Tuple], entity_vocab: Dict, relation_vocab: Dict): + self.triplets = triplets + self.entity_vocab = entity_vocab + self.relation_vocab = relation_vocab + + def __len__(self): + return len(self.triplets) + + def __getitem__(self, idx): + h, r, t = self.triplets[idx] + return { + 'head': self.entity_vocab[h], + 'relation': self.relation_vocab[r], + 'tail': self.entity_vocab[t], + 'head_str': h, + 'relation_str': r, + 'tail_str': t + } + + +class PathReasoningModel(nn.Module): + """完整的路径推理模型""" + + def __init__(self, num_entities: int, num_relations: int, config: Config): + super(PathReasoningModel, self).__init__() + self.config = config + + # TransE嵌入模型 + self.transE = TransE(num_entities, num_relations, config.EMBEDDING_DIM) + + # 路径编码器 + self.path_encoder = PathEncoder(num_relations, config.EMBEDDING_DIM) + + # 路径评分器 + self.path_scorer = PathScorer(config.EMBEDDING_DIM) + + # 关系预测器 + self.relation_predictor = RelationPredictor(num_relations, config.EMBEDDING_DIM) + + def forward(self, head_ids: torch.Tensor, tail_ids: torch.Tensor, + paths_list: List[List[List[int]]], path_lengths_list: List[List[int]]): + """前向传播""" + # 获取实体嵌入 + head_emb = self.transE.entity_embeddings(head_ids) + tail_emb = self.transE.entity_embeddings(tail_ids) + + # 编码路径 + batch_path_encodings = [] + for paths, lengths in zip(paths_list, path_lengths_list): + if len(paths) == 0: + batch_path_encodings.append(torch.zeros(0, self.config.EMBEDDING_DIM)) + continue + + # 将路径转换为张量 + max_len = max(lengths) + padded_paths = [] + for path in paths: + padded = path + [self.path_encoder.relation_embeddings.num_embeddings - 1] * (max_len - len(path)) + padded_paths.append(padded) + + paths_tensor = torch.tensor(padded_paths, dtype=torch.long) + lengths_tensor = torch.tensor(lengths, dtype=torch.long) + + # 编码路径 + path_encodings = self.path_encoder(paths_tensor, lengths_tensor) + batch_path_encodings.append(path_encodings) + + # 评分路径 + path_scores, attention_weights = self.path_scorer( + head_emb, tail_emb, batch_path_encodings + ) + + # 聚合路径表示 + aggregated_path_encodings = [] + path_counts = [] + for encodings in batch_path_encodings: + path_counts.append(len(encodings)) + if len(encodings) == 0: + aggregated = torch.zeros(self.config.EMBEDDING_DIM) + else: + # 加权平均聚合 + weights = attention_weights[len(aggregated_path_encodings)] + aggregated = torch.sum(encodings * weights.unsqueeze(-1), dim=0) + aggregated_path_encodings.append(aggregated) + + aggregated_embeddings = torch.stack(aggregated_path_encodings) + + # 预测关系 + relation_scores, pred_info = self.relation_predictor( + head_emb, tail_emb, aggregated_embeddings + ) + + return { + 'relation_scores': relation_scores, + 'path_scores': path_scores, + 'attention_weights': attention_weights, + 'prediction_info': pred_info, + 'aggregated_path_embeddings': aggregated_embeddings + } + + +class PathReasoningPipeline: + """路径推理管道""" + + def __init__(self, config: Config): + self.config = config + self.model = None + self.path_extractor = None + self.entity_vocab = None + self.relation_vocab = None + + def load_data(self, data_path: str): + """加载数据""" + logger.info("加载数据...") + + # 加载三元组 + with open(f"{data_path}/train.txt", 'r') as f: + train_triplets = [line.strip().split('\t') for line in f] + + with open(f"{data_path}/valid.txt", 'r') as f: + valid_triplets = [line.strip().split('\t') for line in f] + + with open(f"{data_path}/test.txt", 'r') as f: + test_triplets = [line.strip().split('\t') for line in f] + + # 构建词汇表 + all_entities = set() + all_relations = set() + + for triplets in [train_triplets, valid_triplets, test_triplets]: + for h, r, t in triplets: + all_entities.add(h) + all_entities.add(t) + all_relations.add(r) + + self.entity_vocab = {e: i for i, e in enumerate(all_entities)} + self.relation_vocab = {r: i for i, r in enumerate(all_relations)} + + logger.info(f"实体数量: {len(self.entity_vocab)}") + logger.info(f"关系数量: {len(self.relation_vocab)}") + logger.info(f"训练三元组: {len(train_triplets)}") + logger.info(f"验证三元组: {len(valid_triplets)}") + logger.info(f"测试三元组: {len(test_triplets)}") + + return train_triplets, valid_triplets, test_triplets + + def prepare_paths(self, triplets: List[Tuple]) -> Dict: + """准备路径数据""" + logger.info("提取路径...") + + # 初始化路径提取器 + self.path_extractor = PathExtractor( + triplets, + max_length=self.config.MAX_PATH_LENGTH, + max_paths=self.config.MAX_PATHS_PER_PAIR + ) + + # 提取实体对 + entity_pairs = [(h, t) for h, _, t in triplets] + + # 提取路径 + paths_dict = self.path_extractor.extract_all_pairs_paths(entity_pairs) + + # 转换为索引格式 + indexed_paths = {} + for (h, t), paths in paths_dict.items(): + indexed_paths[(self.entity_vocab[h], self.entity_vocab[t])] = [ + [self.relation_vocab[r] for r in path] + for path in paths + ] + + logger.info(f"提取了 {len(indexed_paths)} 个实体对的路径") + + return indexed_paths + + def train(self, train_triplets: List[Tuple], valid_triplets: List[Tuple], + train_paths: Dict, valid_paths: Dict): + """训练模型""" + logger.info("开始训练模型...") + + # 初始化模型 + self.model = PathReasoningModel( + len(self.entity_vocab), + len(self.relation_vocab), + self.config + ) + + # 优化器 + optimizer = optim.Adam(self.model.parameters(), lr=self.config.LEARNING_RATE) + + # 损失函数 + criterion = nn.CrossEntropyLoss() + + # 训练循环 + best_val_loss = float('inf') + patience = 10 + patience_counter = 0 + + for epoch in range(self.config.EPOCHS): + self.model.train() + total_loss = 0 + + # 分批训练 + batch_size = self.config.BATCH_SIZE + num_batches = (len(train_triplets) + batch_size - 1) // batch_size + + for batch_idx in range(num_batches): + start_idx = batch_idx * batch_size + end_idx = min((batch_idx + 1) * batch_size, len(train_triplets)) + + batch_triplets = train_triplets[start_idx:end_idx] + + # 准备批数据 + head_ids = [] + tail_ids = [] + relation_ids = [] + paths_list = [] + path_lengths_list = [] + + for h, r, t in batch_triplets: + h_idx = self.entity_vocab[h] + t_idx = self.entity_vocab[t] + r_idx = self.relation_vocab[r] + + head_ids.append(h_idx) + tail_ids.append(t_idx) + relation_ids.append(r_idx) + + # 获取路径 + if (h_idx, t_idx) in train_paths: + paths = train_paths[(h_idx, t_idx)] + paths_list.append(paths) + path_lengths_list.append([len(p) for p in paths]) + else: + paths_list.append([]) + path_lengths_list.append([]) + + # 转换为张量 + head_tensor = torch.tensor(head_ids, dtype=torch.long) + tail_tensor = torch.tensor(tail_ids, dtype=torch.long) + relation_tensor = torch.tensor(relation_ids, dtype=torch.long) + + # 前向传播 + outputs = self.model(head_tensor, tail_tensor, paths_list, path_lengths_list) + relation_scores = outputs['relation_scores'] + + # 计算损失 + loss = criterion(relation_scores, relation_tensor) + + # 反向传播 + optimizer.zero_grad() + loss.backward() + optimizer.step() + + total_loss += loss.item() + + if batch_idx % 100 == 0: + logger.info(f"Epoch {epoch + 1}, Batch {batch_idx}/{num_batches}, Loss: {loss.item():.4f}") + + avg_train_loss = total_loss / num_batches + logger.info(f"Epoch {epoch + 1}, Average Train Loss: {avg_train_loss:.4f}") + + # 验证 + val_loss = self.validate(valid_triplets, valid_paths, criterion) + logger.info(f"Epoch {epoch + 1}, Validation Loss: {val_loss:.4f}") + + # 早停 + if val_loss < best_val_loss: + best_val_loss = val_loss + patience_counter = 0 + # 保存最佳模型 + torch.save(self.model.state_dict(), f"{self.config.MODEL_DIR}/best_model.pt") + else: + patience_counter += 1 + if patience_counter >= patience: + logger.info(f"早停在 epoch {epoch + 1}") + break + + logger.info("训练完成") + + def validate(self, triplets: List[Tuple], paths: Dict, criterion) -> float: + """验证模型""" + self.model.eval() + total_loss = 0 + + with torch.no_grad(): + for h, r, t in triplets: + h_idx = self.entity_vocab[h] + t_idx = self.entity_vocab[t] + r_idx = self.relation_vocab[r] + + # 获取路径 + if (h_idx, t_idx) in paths: + paths_list = [paths[(h_idx, t_idx)]] + path_lengths_list = [[len(p) for p in paths[(h_idx, t_idx)]]] + else: + paths_list = [[]] + path_lengths_list = [[]] + + # 转换为张量 + head_tensor = torch.tensor([h_idx], dtype=torch.long) + tail_tensor = torch.tensor([t_idx], dtype=torch.long) + relation_tensor = torch.tensor([r_idx], dtype=torch.long) + + # 前向传播 + outputs = self.model(head_tensor, tail_tensor, paths_list, path_lengths_list) + relation_scores = outputs['relation_scores'] + + # 计算损失 + loss = criterion(relation_scores, relation_tensor) + total_loss += loss.item() + + return total_loss / len(triplets) + + def predict(self, head: str, tail: str, top_k: int = 5) -> Dict: + """预测头尾实体间的关系""" + if head not in self.entity_vocab or tail not in self.entity_vocab: + raise ValueError(f"实体 {head} 或 {tail} 不在词汇表中") + + self.model.eval() + + # 获取实体索引 + h_idx = self.entity_vocab[head] + t_idx = self.entity_vocab[tail] + + # 提取路径 + paths = self.path_extractor.extract_paths(head, tail) + indexed_paths = [[self.relation_vocab[r] for r in path] for path in paths] + + # 转换为模型输入格式 + paths_list = [indexed_paths] + path_lengths_list = [[len(p) for p in indexed_paths]] + + # 获取实体嵌入 + head_tensor = torch.tensor([h_idx], dtype=torch.long) + tail_tensor = torch.tensor([t_idx], dtype=torch.long) + + with torch.no_grad(): + # 获取预测结果 + outputs = self.model(head_tensor, tail_tensor, paths_list, path_lengths_list) + + # 使用关系预测器的解释功能 + head_emb = self.model.transE.entity_embeddings(head_tensor).squeeze(0) + tail_emb = self.model.transE.entity_embeddings(tail_tensor).squeeze(0) + path_encodings = outputs['aggregated_path_embeddings'].squeeze(0) + + explanation = self.model.relation_predictor.predict_with_explanation( + head, tail, paths, head_emb, tail_emb, path_encodings, top_k + ) + + return explanation + + def evaluate(self, test_triplets: List[Tuple], test_paths: Dict) -> Dict: + """评估模型性能""" + logger.info("评估模型...") + + self.model.eval() + all_predictions = [] + all_labels = [] + + with torch.no_grad(): + for h, r, t in test_triplets: + h_idx = self.entity_vocab[h] + t_idx = self.entity_vocab[t] + r_idx = self.relation_vocab[r] + + # 获取路径 + if (h_idx, t_idx) in test_paths: + paths_list = [test_paths[(h_idx, t_idx)]] + path_lengths_list = [[len(p) for p in test_paths[(h_idx, t_idx)]]] + else: + paths_list = [[]] + path_lengths_list = [[]] + + # 转换为张量 + head_tensor = torch.tensor([h_idx], dtype=torch.long) + tail_tensor = torch.tensor([t_idx], dtype=torch.long) + + # 预测 + outputs = self.model(head_tensor, tail_tensor, paths_list, path_lengths_list) + relation_scores = outputs['relation_scores'] + + # 获取预测结果 + pred_probs = torch.softmax(relation_scores, dim=1) + pred_label = torch.argmax(pred_probs, dim=1) + + all_predictions.append(pred_label.item()) + all_labels.append(r_idx) + + # 计算指标 + from utils.metrics import calculate_metrics + metrics = calculate_metrics(all_predictions, all_labels) + + logger.info(f"评估结果: {metrics}") + return metrics + + +def main(): + """主函数""" + config = Config() + + # 创建管道 + pipeline = PathReasoningPipeline(config) + + # 加载数据 + train_triplets, valid_triplets, test_triplets = pipeline.load_data(config.DATA_DIR) + + # 准备路径 + logger.info("准备训练路径...") + train_paths = pipeline.prepare_paths(train_triplets) + + logger.info("准备验证路径...") + valid_paths = pipeline.prepare_paths(valid_triplets) + + logger.info("准备测试路径...") + test_paths = pipeline.prepare_paths(test_triplets) + + # 训练模型 + pipeline.train(train_triplets, valid_triplets, train_paths, valid_paths) + + # 加载最佳模型 + pipeline.model.load_state_dict(torch.load(f"{config.MODEL_DIR}/best_model.pt")) + + # 评估模型 + metrics = pipeline.evaluate(test_triplets, test_paths) + + # 保存评估结果 + with open(f"{config.MODEL_DIR}/evaluation_results.json", 'w') as f: + json.dump(metrics, f, indent=2) + + # 示例预测 + if len(test_triplets) > 0: + sample = test_triplets[0] + head, true_rel, tail = sample + + logger.info(f"\n示例预测:") + logger.info(f"头实体: {head}") + logger.info(f"尾实体: {tail}") + logger.info(f"真实关系: {true_rel}") + + try: + prediction = pipeline.predict(head, tail, top_k=3) + + logger.info(f"预测结果:") + for pred in prediction['top_predictions']: + # 查找关系名称 + rel_name = [k for k, v in pipeline.relation_vocab.items() if v == pred['relation_id']][0] + logger.info(f" 关系: {rel_name}, 得分: {pred['score']:.4f}, 置信度: {pred['confidence']:.4f}") + + logger.info(f"找到的路径数: {prediction['num_paths_found']}") + + # 保存预测示例 + with open(f"{config.MODEL_DIR}/prediction_example.json", 'w') as f: + json.dump(prediction, f, indent=2, default=str) + + except Exception as e: + logger.error(f"预测失败: {e}") + + logger.info("实验完成") + + +if __name__ == "__main__": + main() \ No newline at end of file