diff --git a/aikg/python/ai_kernel_generator/core/agent/conductor.py b/aikg/python/ai_kernel_generator/core/agent/conductor.py index a52c657fb5a7b119e0dfdb82a2358c31986bdcec..43762f5b077b139d875eea9fc00610cae1e5689a 100644 --- a/aikg/python/ai_kernel_generator/core/agent/conductor.py +++ b/aikg/python/ai_kernel_generator/core/agent/conductor.py @@ -131,7 +131,7 @@ class Conductor(AgentBase): return ResultProcessor.get_agent_parser(agent_name, self.workflow_config_path, self.agent_parsers) def record_agent_execution(self, agent_name: str, result: str, prompt: str = "", reasoning: str = "", - error_log: str = "", profile_res=()) -> bool: + error_log: str = "", profile_res: dict = None) -> bool: """ 记录agent执行结果,进行解析并更新任务信息 @@ -141,7 +141,11 @@ class Conductor(AgentBase): prompt: 使用的prompt reasoning: 推理过程 error_log: 错误日志(主要用于verifier) - profile_res: 性能分析结果(主要用于verifier) + profile_res: 性能分析结果字典(主要用于verifier),包含: + - gen_time: 生成代码执行时间(微秒) + - base_time: 基准代码执行时间(微秒) + - speedup: 加速比 + - autotune_summary: autotune配置详情(可选,仅triton+ascend) Returns: bool: 解析是否成功(对于不需要解析器的agent返回True) diff --git a/aikg/python/ai_kernel_generator/core/agent/designer.py b/aikg/python/ai_kernel_generator/core/agent/designer.py index 5f94910d240d86c619e5db59921c6197400bd05e..9b55ae0ed19b47c6d42b8ce6784496100907b488 100644 --- a/aikg/python/ai_kernel_generator/core/agent/designer.py +++ b/aikg/python/ai_kernel_generator/core/agent/designer.py @@ -31,7 +31,18 @@ def get_inspirations(inspirations: List[dict]) -> str: Args: inspirations: 包含字典的列表,每个字典格式为: - {'strategy_mode':xxx, 'impl_code':str, 'profile':float, 'is_parent':bool} + { + 'strategy_mode': str, + 'impl_code': str, + 'sketch': str, + 'profile': { + 'gen_time': float, + 'base_time': float, + 'speedup': float, + 'autotune_summary': str (可选,仅triton+ascend) + }, + 'is_parent': bool + } Returns: str: 拼接后的字符串,包含所有impl_code和profile信息 @@ -49,7 +60,7 @@ def get_inspirations(inspirations: List[dict]) -> str: sketch = inspiration.get('sketch', '') impl_code = inspiration.get('impl_code', '') - profile = inspiration.get('profile', float('inf')) + profile = inspiration.get('profile', {}) is_parent = inspiration.get('is_parent', False) # 检测是否有父代 @@ -57,14 +68,19 @@ def get_inspirations(inspirations: List[dict]) -> str: has_parent = True if sketch or impl_code: # 只有当sketch或impl_code不为空时才添加 - # 处理profile信息,支持三元组格式 - if isinstance(profile, (list, tuple)) and len(profile) >= 3: - gen_time, base_time, speedup = profile[0], profile[1], profile[2] + # 处理profile信息(dict格式) + gen_time = profile.get('gen_time', float('inf')) + base_time = profile.get('base_time', 0.0) + speedup = profile.get('speedup', 0.0) + autotune_summary = profile.get('autotune_summary', '') + + if gen_time != float('inf'): profile_text = f"根据此方案草图生成的代码计算耗时: {gen_time:.4f}us, 基准代码耗时: {base_time:.4f}us, 加速比: {speedup:.2f}x" - elif isinstance(profile, (list, tuple)) and len(profile) >= 1: - profile_text = f"代码执行耗时: {profile[0]:.4f}us" + # 如果有autotune信息,添加到profile_text + if autotune_summary: + profile_text += f"\n\nAutotune配置详情:\n{autotune_summary}" else: - profile_text = f"代码执行耗时: {profile:.4f}us" if profile != float('inf') else "代码执行耗时: N/A" + profile_text = "代码执行耗时: N/A" # 如果是父代,添加标记 parent_mark = " 【父代方案】" if is_parent else "" diff --git a/aikg/python/ai_kernel_generator/core/evolve.py b/aikg/python/ai_kernel_generator/core/evolve.py index 390f0b77830dcd7b1bd3b69688075ab5a042f2b6..338d781334baed3b49a761dd062e536258fbea83 100644 --- a/aikg/python/ai_kernel_generator/core/evolve.py +++ b/aikg/python/ai_kernel_generator/core/evolve.py @@ -203,7 +203,11 @@ async def evolve( 'id': parent_implementation.get('id'), 'sketch': parent_implementation.get('sketch', ''), 'impl_code': parent_implementation.get('impl_code', ''), - 'profile': parent_implementation.get('profile', (float('inf'), 0.0, 0.0)), + 'profile': parent_implementation.get('profile', { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + }), 'strategy_mode': 'evolution', 'is_parent': True # 标记为父代 } @@ -325,8 +329,12 @@ async def evolve( total_successful_tasks += 1 island_success_count += 1 - # 获取完整的profile三元组 - profile_res = task_info.get("profile_res", (float('inf'), 0.0, 0.0)) + # 获取性能分析结果字典 + profile_res = task_info.get("profile_res", { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + }) # 收集成功的实现信息 impl_info = { @@ -387,9 +395,8 @@ async def evolve( # 添加到精英池(新生成的实现都有唯一ID,无需去重) elite_pool.extend(island_impls_list) - # 按性能排序精英池 - elite_pool.sort(key=lambda x: x['profile'][0] if isinstance( - x['profile'], (list, tuple)) else x['profile']) + # 按性能排序精英池(gen_time越小越好) + elite_pool.sort(key=lambda x: x.get('profile', {}).get('gen_time', float('inf'))) # 保持精英库大小限制 elite_pool = elite_pool[:elite_size * num_islands] @@ -427,8 +434,12 @@ async def evolve( total_successful_tasks += 1 round_success_count += 1 - # 获取完整的profile三元组 - profile_res = task_info.get("profile_res", (float('inf'), 0.0, 0.0)) + # 获取性能分析结果字典 + profile_res = task_info.get("profile_res", { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + }) # 收集成功的实现信息 impl_info = { @@ -507,8 +518,7 @@ async def evolve( [(f"failed_task_{i}", False) for i in range(round_total_count - round_success_count)]) # 按性能排序最佳实现(gen_time越小越好) - best_implementations.sort(key=lambda x: x['profile'][0] if isinstance( - x['profile'], (list, tuple)) else x['profile']) + best_implementations.sort(key=lambda x: x.get('profile', {}).get('gen_time', float('inf'))) # 计算最终成功率 final_success_rate = total_successful_tasks / total_tasks if total_tasks > 0 else 0.0 diff --git a/aikg/python/ai_kernel_generator/core/task.py b/aikg/python/ai_kernel_generator/core/task.py index c7b235c0401fe795fcc3a65a65076ddc6579a5a9..d270876e20d957ea540b174bcfb2e66d1607324e 100644 --- a/aikg/python/ai_kernel_generator/core/task.py +++ b/aikg/python/ai_kernel_generator/core/task.py @@ -246,7 +246,7 @@ class Task: self.verifier.run, self.conductor.task_info, current_step, device_id ) - profile_res = () + profile_res = {} if verify_res and self.task_type == "profile" and self.backend in ["ascend", "cuda"]: profile_settings = self.config.get("profile_settings", {}) profile_res = await loop.run_in_executor( diff --git a/aikg/python/ai_kernel_generator/core/trace.py b/aikg/python/ai_kernel_generator/core/trace.py index b76c444663a98e764df5739ea468f097ab591aee..960879c1673a043c188fa2d37d2d7a383afc7ddc 100644 --- a/aikg/python/ai_kernel_generator/core/trace.py +++ b/aikg/python/ai_kernel_generator/core/trace.py @@ -17,7 +17,8 @@ 只负责存储原始数据,不进行解析逻辑 """ import os -from dataclasses import dataclass +from dataclasses import dataclass, field +from typing import Optional @dataclass @@ -28,7 +29,7 @@ class AgentRecord: prompt: str = "" reasoning: str = "" error_log: str = "" - profile_res: tuple = () + profile_res: dict = field(default_factory=dict) class Trace: @@ -57,7 +58,7 @@ class Trace: f.write(str(content)) def insert_agent_record(self, agent_name: str, result: str = "", prompt: str = "", reasoning: str = "", - error_log: str = "", profile_res: tuple = ()) -> None: + error_log: str = "", profile_res: Optional[dict] = None) -> None: """ 插入agent执行记录(只保存原始数据,不进行解析) @@ -67,8 +68,11 @@ class Trace: prompt: 使用的prompt reasoning: 推理过程 error_log: 错误日志(主要用于verifier) - profile: 性能数据 + profile_res: 性能数据字典 """ + if profile_res is None: + profile_res = {} + record = AgentRecord( agent_name=agent_name, result=result, diff --git a/aikg/python/ai_kernel_generator/core/verifier/kernel_verifier.py b/aikg/python/ai_kernel_generator/core/verifier/kernel_verifier.py index 482e7f14997088feee7c3e7c9a3024646fb8c5af..c6f7e69b441d5811c1d17111eff217862460264f 100644 --- a/aikg/python/ai_kernel_generator/core/verifier/kernel_verifier.py +++ b/aikg/python/ai_kernel_generator/core/verifier/kernel_verifier.py @@ -426,8 +426,16 @@ class KernelVerifier: except Exception as e: logger.warning(f"[{self.task_id}:{self.op_name}] 保存加速比结果失败: {str(e)}") - def run_profile(self, current_step: int = 0, device_id: str = "0", profile_settings: dict = {}): - """运行profile分析""" + def run_profile(self, current_step: int = 0, device_id: str = "0", profile_settings: dict = {}) -> dict: + """运行profile分析 + + Returns: + dict: 性能分析结果,包含以下字段: + - gen_time: 生成代码执行时间(微秒) + - base_time: 基准代码执行时间(微秒) + - speedup: 加速比 + - autotune_summary: autotune配置详情(仅triton DSL) + """ original_cwd = os.getcwd() try: run_times = profile_settings.get("run_times", 50) @@ -443,9 +451,9 @@ class KernelVerifier: # 生成profile脚本并运行 self.gen_profile_project(verify_dir, device_id, warmup_times, run_times) - # 检查是否为triton DSL,如果是则使用do_bench + # 检查是否为triton DSL,如果是则运行脚本获取性能数据 if "triton" in self.dsl: - base_time, gen_time = self.run_triton_do_bench_profile(verify_dir) + base_time, gen_time = self.run_profile_scripts_and_collect_results(verify_dir) elif self.backend == "ascend": _, _, base_prof_path = self.run_msprof(os.path.join(verify_dir, f"profile_{self.op_name}_base.py")) _, _, gen_prof_path = self.run_msprof(os.path.join( @@ -458,11 +466,15 @@ class KernelVerifier: _, _, gen_prof_path = self.run_nsys(os.path.join(verify_dir, f"profile_{self.op_name}_generation.py")) _, _, gen_time = self.analyze_nsys_data(gen_prof_path, warmup_times, run_times, "generation") elif self.backend == "cpu": - # 走的triton验证流程 - base_time, gen_time = self.run_triton_do_bench_profile(verify_dir) + # CPU后端使用脚本方式收集性能数据 + base_time, gen_time = self.run_profile_scripts_and_collect_results(verify_dir) else: logger.warning(f"[{self.task_id}:{self.op_name}] 不支持的backend: {self.backend}") - return float('inf'), 0.0, 0.0 + return { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + } speedup = base_time / gen_time if gen_time > 0 else 0.0 speedup_percent = speedup * 100.0 @@ -470,10 +482,29 @@ class KernelVerifier: logger.info(f"orig performance is {base_time:.2f} us") logger.info(f"aikg performance is {gen_time:.2f} us") logger.info(f"[{self.task_id}:{self.op_name}] 性能分析完成,加速比(基准为100%): {speedup_percent:.2f} %") - return gen_time, base_time, speedup + + # 构建返回结果 + result = { + 'gen_time': gen_time, + 'base_time': base_time, + 'speedup': speedup + } + + # 只在 triton + ascend 情况下添加 autotune_summary + if "triton" in self.dsl and self.backend == "ascend": + autotune_summary = self.read_autotune_results_from_directory(verify_dir) + if autotune_summary: + result['autotune_summary'] = autotune_summary + logger.info(f"[{self.op_name}: {self.task_id}] Autotune配置详情:\n{autotune_summary}") + + return result except Exception as e: logger.warning(f"[{self.task_id}:{self.op_name}] 性能分析失败: {str(e)}") - return float('inf'), 0.0, 0.0 + return { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + } finally: # 恢复原始工作目录 try: @@ -481,11 +512,11 @@ class KernelVerifier: except Exception: pass - def run_triton_do_bench_profile(self, verify_dir: str) -> Tuple[float, float]: - """使用triton do_bench运行性能分析 + def run_profile_scripts_and_collect_results(self, verify_dir: str) -> Tuple[float, float]: + """运行性能测试脚本并收集结果 Args: - verify_dir: 验证目录 + verify_dir: 验证目录,包含性能测试脚本 Returns: (base_time_us, gen_time_us): 基准时间和生成时间(微秒) @@ -498,23 +529,23 @@ class KernelVerifier: os.chdir(verify_dir) try: - # 运行base profile脚本 + # 步骤1:运行基准性能测试脚本 base_script = f"profile_{self.op_name}_base.py" base_result = run_command(["python", base_script], cmd_msg="base_profile", timeout=300) if not base_result[0]: - logger.error(f"Base profile script execution failed: {base_result[1]}") + logger.error(f"[{self.op_name}: {self.task_id}] 基准性能脚本执行失败: {base_result[1]}") return float('inf'), float('inf') - # 运行generation profile脚本 + # 步骤2:运行生成代码性能测试脚本 gen_script = f"profile_{self.op_name}_generation.py" gen_result = run_command(["python", gen_script], cmd_msg="generation_profile", timeout=300) if not gen_result[0]: - logger.error(f"Generation profile script execution failed: {gen_result[1]}") + logger.error(f"[{self.op_name}: {self.task_id}] 生成代码性能脚本执行失败: {gen_result[1]}") return float('inf'), float('inf') - # 读取保存的时间结果 - base_time_us = self.read_triton_profile_result(verify_dir, "base_profile_result.json") - gen_time_us = self.read_triton_profile_result(verify_dir, "generation_profile_result.json") + # 步骤3:从JSON文件读取性能数据 + base_time_us = self.read_profile_result_from_json(verify_dir, "base_profile_result.json") + gen_time_us = self.read_profile_result_from_json(verify_dir, "generation_profile_result.json") return base_time_us, gen_time_us @@ -523,23 +554,97 @@ class KernelVerifier: os.chdir(original_cwd) except Exception as e: - logger.error(f"Triton do_bench profile failed: {e}") + logger.error(f"[{self.op_name}: {self.task_id}] 性能脚本执行和结果收集失败: {e}") return float('inf'), float('inf') - def read_triton_profile_result(self, verify_dir: str, result_file: str) -> float: - """读取triton profile结果文件 + def read_autotune_results_from_directory(self, verify_dir: str) -> str: + """从验证目录读取所有autotune结果并格式化输出 + + 读取指定目录下的所有 autotune_info_case_*.json 文件, + 并以类似 TRITON_PRINT_AUTOTUNING=1 的格式输出。 + + Args: + verify_dir: 验证目录路径 + + Returns: + 格式化的autotune结果字符串,格式如下: + + Case 0: + All config timings for kernel_name: + Config 1: BLOCK_M=128, BLOCK_N=256 -> 145.2300us (BEST) + Config 2: BLOCK_M=64, BLOCK_N=128 -> 178.5600us + ... + """ + from pathlib import Path + + result_lines = [] + + # 查找所有autotune文件 + verify_path = Path(verify_dir) + autotune_files = sorted(verify_path.glob("autotune_info_case_*.json")) + + if not autotune_files: + return "" + + # 逐个读取并格式化 + for autotune_file in autotune_files: + # 提取case索引 + case_idx = autotune_file.stem.split('_')[-1] + + try: + with open(autotune_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + result_lines.append(f"Case {case_idx}:") + + # 遍历每个kernel + for kernel_name, configs in data.items(): + result_lines.append(f"All config timings for {kernel_name}:") + + # 按rank排序输出 + sorted_configs = sorted(configs, key=lambda x: x['rank']) + + for config_info in sorted_configs: + config_str = config_info['config'] + timing_us = config_info['timing_us'] + is_best = config_info['is_best'] + rank = config_info['rank'] + + status = " (BEST)" if is_best else "" + result_lines.append(f" Config {rank}: {config_str} -> {timing_us:.4f}us{status}") + + result_lines.append("") # 空行分隔不同case + + except Exception as e: + logger.warning(f"[{self.op_name}: {self.task_id}] 读取autotune文件失败 {autotune_file.name}: {e}") + + return "\n".join(result_lines) + + def read_profile_result_from_json(self, verify_dir: str, result_file: str) -> float: + """从JSON文件读取性能测试结果 + + 该方法读取性能测试脚本生成的JSON结果文件,提取执行时间。 + + JSON文件格式示例: + { + "execution_time_us": 145.23, + "execution_time_ms": 0.14523, + "method": "triton_do_bench", + "warmup_times": 5, + "run_times": 50 + } Args: verify_dir: 验证目录 - result_file: 结果文件名 + result_file: 结果文件名(如 "base_profile_result.json") Returns: - 执行时间(微秒) + execution_time_us: 执行时间(微秒),失败时返回 float('inf') """ try: result_path = os.path.join(verify_dir, result_file) if not os.path.exists(result_path): - logger.error(f"Profile result file not found: {result_path}") + logger.error(f"[{self.op_name}: {self.task_id}] 性能结果文件不存在: {result_path}") return float('inf') with open(result_path, 'r') as f: @@ -549,11 +654,11 @@ class KernelVerifier: execution_time_us = result_data.get("execution_time_us", float('inf')) method = result_data.get("method", "unknown") - logger.info(f"Read profile result from {result_file}: {execution_time_us:.4f} us (method: {method})") + logger.info(f"[{self.op_name}: {self.task_id}] 从 {result_file} 读取性能数据: {execution_time_us:.4f} us (method: {method})") return execution_time_us except Exception as e: - logger.error(f"Failed to read profile result from {result_file}: {e}") + logger.error(f"[{self.op_name}: {self.task_id}] 读取性能结果文件失败 {result_file}: {e}") return float('inf') def run(self, task_info: Dict[str, Any], current_step: int = 0, device_id: int = 0): diff --git a/aikg/python/ai_kernel_generator/resources/docs/triton_docs/basic_docs.md b/aikg/python/ai_kernel_generator/resources/docs/triton_docs/basic_docs.md index faf029df4cf64ba7dbc06ffdeb3ba93bf0f0edc1..6152f6f959fb7e97a8762a4a5fe5c4b820431cfb 100644 --- a/aikg/python/ai_kernel_generator/resources/docs/triton_docs/basic_docs.md +++ b/aikg/python/ai_kernel_generator/resources/docs/triton_docs/basic_docs.md @@ -160,3 +160,36 @@ result = tl.where(condition, true_value, false_value) valid_mask = (offsets < n_elements) & (offsets >= 0) data = tl.load(ptr + offsets, mask=valid_mask, other=0.0) ``` + +## 5. autotune使用教程 + +Autotune 是 Triton 的自动性能优化机制,通过尝试不同的配置参数组合,自动找到最优的内核执行配置。示例: + +``` +@triton.autotune( + configs=[ + triton.Config({'BLOCK_M': 128, 'PARALLEL_NUM': 32}), + triton.Config({'BLOCK_M': 128, 'PARALLEL_NUM': 64}), + triton.Config({'BLOCK_M': 64, 'PARALLEL_NUM': 32}), + ], + key=['M'], +) +@triton.jit +def kernel( + input, + output, + BLOCK_M: tl.constexpr, PARALLEL_NUM: tl.constexpr # 注意将autotune的参数作为constexpr输入 +): + pass + +def host_func(input): + output = torch.empty_like(input) + grid = lambda meta: (triton.cdiv(M, meta['BLOCK_M'] * meta['PARALLEL_NUM'])) + matmul_kernel[grid]( + input, + output, # 注意调用时不要添加configs里的参数,这部分会在autotune时自动添加 + ) + return output +``` + +**注意** 不要对'num_warps', 'num_ctas', 'num_stages', 'num_buffers_warp_spec', 'num_consumer_groups', 'reg_dec_producer', 'reg_inc_consumer', 'maxnreg'进行修改调优,当前不支持 \ No newline at end of file diff --git a/aikg/python/ai_kernel_generator/resources/docs/triton_docs/meta_prompts.py b/aikg/python/ai_kernel_generator/resources/docs/triton_docs/meta_prompts.py index b61e1b7c18b93076949fa4f4921e24b1ea9b3dd4..77c1c656b988a4b41ac8cc3364d5dfdaba1492b9 100644 --- a/aikg/python/ai_kernel_generator/resources/docs/triton_docs/meta_prompts.py +++ b/aikg/python/ai_kernel_generator/resources/docs/triton_docs/meta_prompts.py @@ -1,5 +1,6 @@ suggestion1 = """ grid级别的设置需要考虑启动和运算开销:将2D的grid设置修改为1D的grid,之后在内核中进行处理,能够显著降低启动开销。 +对于二维输入数据,由于数据是连续存储的,可以将数据处理成(看成)一维形式,统一连续提取、计算,这样可以获得更灵活的切分、操作方式。 """ suggestion2 = """ @@ -9,6 +10,7 @@ suggestion2 = """ suggestion3 = """ 对于复杂的算子,例如某些融合算子,在不同计算阶段需要进行不同轴上的reduce操作的情况,我们可以将复杂的算子拆开进行处理:有的时候一味的融合并不能带来性能收益,反而拆开按顺序单独计算、多次调用不同kernel能够带来更好的性能。 +设计按顺序执行的elementwise操作和reduce操作时,也可以将这部分拆开作为两个kernel,设置不同的grid和blocksize。 """ suggestion4 = """ @@ -19,6 +21,11 @@ suggestion4 = """ suggestion5 = """ 可以尝试在核内计算时尝试更大/更小的Block_size,来平衡并行度和资源占用。 当硬件是NPU时,每次的数据搬运都是以256Bytes为单位的,所以在数据读取和存储时,考虑将数据对齐到256Bytes的倍数,或许可以提升性能。 +数据搬运的带宽性能上限大概是256*256Bytes,可以参照这个上限来设计数据搬运的策略。 +""" + +suggestion6 = """ +可以通过添加autotune来优化性能,所以可以在生成时列出多组参数来进行生成,并添加@llm_hint("autotune", autotune_configs)来提示LLM进行优化,列出autotune_configs的具体配置。 """ triton_meta_prompts: list[str] = [ @@ -27,4 +34,5 @@ triton_meta_prompts: list[str] = [ suggestion3, suggestion4, suggestion5, + suggestion6, ] diff --git a/aikg/python/ai_kernel_generator/resources/templates/kernel_verify_template.j2 b/aikg/python/ai_kernel_generator/resources/templates/kernel_verify_template.j2 index d402e55e16b666af2caf62c3b3ebbfa07fff1c3f..23125ab5d701dcdacf80ed24b825bdc5fe8c00c5 100644 --- a/aikg/python/ai_kernel_generator/resources/templates/kernel_verify_template.j2 +++ b/aikg/python/ai_kernel_generator/resources/templates/kernel_verify_template.j2 @@ -333,7 +333,6 @@ def verify_implementations(): # 设备设置 {% if framework == "torch" %} - torch.manual_seed(0) # PyTorch设备设置 if backend == "cuda": os.environ['CUDA_VISIBLE_DEVICES'] = str({{ device_id }}) @@ -351,7 +350,6 @@ def verify_implementations(): {% else %} os.environ['DEVICE_ID'] = str({{ device_id }}) device = torch.device("npu") - torch.npu.manual_seed(0) torch.npu.set_device({{ device_id }}) {% endif %} elif "ascend310" in arch: @@ -368,7 +366,6 @@ def verify_implementations(): {% elif framework == "mindspore" %} # MindSpore设备设置 os.environ['DEVICE_ID'] = str({{ device_id }}) - ms.set_seed(0) if backend == "ascend": device = "Ascend" # 设置Ascend架构 @@ -401,21 +398,25 @@ def verify_implementations(): return x {% endif %} - def verify_single_case(inputs): + def verify_single_case(inputs_for_framework, inputs_for_impl): """验证单个案例的公共逻辑""" - {% if backend == "ascend" %} - torch.npu.manual_seed(0) - {% endif %} # 运行框架实现 - framework_output = framework_model(*inputs) + framework_output = framework_model(*inputs_for_framework) - {% if dsl == "swft" %} + {% if framework == "torch" %} + torch.manual_seed(0) + {% if backend == "ascend" %} + torch.npu.manual_seed(0) + {% endif %} + {% endif %} + + {% if dsl == "swft" %} # 运行SWFT实现 data_dir = os.path.dirname(__file__) # 生成二进制数据文件 - gen_binary_data(inputs, framework_output, data_dir) + gen_binary_data(inputs_for_impl, framework_output, data_dir) # 运行SWFT实现 {{ impl_func_name }}(device_id=int({{ device_id }})) @@ -423,8 +424,8 @@ def verify_implementations(): # 加载SWFT输出 impl_output = load_binary_data(data_dir, framework_output) {% elif dsl in ["triton", "cuda_c", "cpp", "tilelang_npuir", "tilelang_cuda"] %} - # 运行Triton实现 - impl_output = {{ impl_func_name }}(*inputs) + # 运行实现 + impl_output = {{ impl_func_name }}(*inputs_for_impl) {% endif %} if not isinstance(framework_output, (list, tuple)): @@ -479,47 +480,95 @@ def verify_implementations(): {% if is_dynamic_shape %} # 动态shape:获取多组输入数据 - inputs_list = get_inputs_dyn_list() + # 设置随机种子以确保数据可复现 + {% if framework == "torch" %} + torch.manual_seed(0) + {% if backend == "ascend" %} + torch.npu.manual_seed(0) + {% endif %} + {% elif framework == "numpy" %} + np.random.seed(0) + {% elif framework == "mindspore" %} + ms.set_seed(0) + {% endif %} + framework_inputs_list = get_inputs_dyn_list() + + # 重新生成输入数据列表用于impl(避免原地操作影响) + {% if framework == "torch" %} + torch.manual_seed(0) + {% if backend == "ascend" %} + torch.npu.manual_seed(0) + {% endif %} + {% elif framework == "numpy" %} + np.random.seed(0) + {% elif framework == "mindspore" %} + ms.set_seed(0) + {% endif %} + impl_inputs_list = get_inputs_dyn_list() # 对每组输入进行验证 - for case_idx, inputs in enumerate(inputs_list): + for case_idx, (inputs_for_framework, inputs_for_impl) in enumerate(zip(framework_inputs_list, impl_inputs_list)): {% if framework == "torch" %} - inputs = [process_input(x) for x in inputs] + inputs_for_framework = [process_input(x) for x in inputs_for_framework] + inputs_for_impl = [process_input(x) for x in inputs_for_impl] {% endif %} - print(f"验证动态shape案例 {case_idx + 1}/{len(inputs_list)}") + print(f"验证动态shape案例 {case_idx + 1}/{len(framework_inputs_list)}") # 使用timeout装饰器包装整个验证过程 @with_timeout({{ timeout }}) def verify_case(): - return verify_single_case(inputs) + return verify_single_case(inputs_for_framework, inputs_for_impl) try: verify_result, framework_output = verify_case() print(f"动态shape案例 {case_idx + 1} 验证成功") except TimeoutError: - raise AssertionError(f"动态shape案例 {case_idx + 1} 验证超时({timeout}秒)") + raise AssertionError(f"动态shape案例 {case_idx + 1} 验证超时({{timeout}}秒)") {% else %} # 静态shape:获取单组输入数据 + # 设置随机种子以确保数据可复现 + {% if framework == "torch" %} + torch.manual_seed(0) + {% if backend == "ascend" %} + torch.npu.manual_seed(0) + {% endif %} + {% elif framework == "numpy" %} + np.random.seed(0) + {% elif framework == "mindspore" %} + ms.set_seed(0) + {% endif %} + inputs_for_framework = get_inputs() + {% if framework == "torch" %} + inputs_for_framework = [process_input(x) for x in inputs_for_framework] + {% endif %} + + # 重新生成输入数据用于impl(避免原地操作影响) {% if framework == "torch" %} - inputs = get_inputs() - inputs = [process_input(x) for x in inputs] + torch.manual_seed(0) + {% if backend == "ascend" %} + torch.npu.manual_seed(0) + {% endif %} {% elif framework == "numpy" %} - inputs = get_inputs() + np.random.seed(0) {% elif framework == "mindspore" %} - inputs = get_inputs() + ms.set_seed(0) + {% endif %} + inputs_for_impl = get_inputs() + {% if framework == "torch" %} + inputs_for_impl = [process_input(x) for x in inputs_for_impl] {% endif %} # 使用timeout装饰器包装整个验证过程 @with_timeout({{ timeout }}) def verify_case(): - return verify_single_case(inputs) + return verify_single_case(inputs_for_framework, inputs_for_impl) try: verify_result, framework_output = verify_case() except TimeoutError: - raise AssertionError(f"静态shape验证超时({timeout}秒)") + raise AssertionError(f"静态shape验证超时({{timeout}}秒)") {% endif %} # 构建验证成功信息 diff --git a/aikg/python/ai_kernel_generator/resources/templates/prof_generation_template.j2 b/aikg/python/ai_kernel_generator/resources/templates/prof_generation_template.j2 index dc25f02004760d157d2824ea7c039dd4fa0972c6..e8c849c83a154df5054b9504951b3cfdd347379d 100644 --- a/aikg/python/ai_kernel_generator/resources/templates/prof_generation_template.j2 +++ b/aikg/python/ai_kernel_generator/resources/templates/prof_generation_template.j2 @@ -251,7 +251,7 @@ def run_generation_implementations(): return x {% endif %} - def run_benchmark(inputs): + def run_benchmark(inputs, case_idx=0): """运行基准测试""" {% if "triton" in dsl %} {% if backend == "ascend" %} @@ -275,6 +275,16 @@ def run_generation_implementations(): # 获取收集的配置信息 config_timings = get_collected_config_timings() + + # 保存autotune信息到当前文件夹 + if config_timings: + autotune_filename = f"autotune_info_case_{case_idx}.json" + try: + with open(autotune_filename, 'w') as f: + json.dump(config_timings, f, indent=2, ensure_ascii=False) + print(f"[{{ op_name }}] Autotune info saved to {autotune_filename}") + except Exception as e: + print(f"[{{ op_name }}] Warning: Failed to save autotune info: {e}") {% endif %} # 进行最终的性能测试 @@ -363,7 +373,7 @@ def run_generation_implementations(): inputs = [process_input(x) for x in inputs] {% endif %} - execution_time, method = run_benchmark(inputs) + execution_time, method = run_benchmark(inputs, case_idx=case_idx) all_execution_times.append(execution_time) print(f"[{{ op_name }}] Case {case_idx + 1} execution time: {execution_time * 1000:.4f} us") @@ -395,7 +405,7 @@ def run_generation_implementations(): inputs = get_inputs() {% endif %} - execution_time, method = run_benchmark(inputs) + execution_time, method = run_benchmark(inputs, case_idx=0) # 保存时间结果到文件 result_data = { diff --git a/aikg/python/ai_kernel_generator/tools/single_evolve_runner.py b/aikg/python/ai_kernel_generator/tools/single_evolve_runner.py index e7276c5d1578f71fecff78ebfde9b5e01744b40f..0cb87e5c539741d2808c4fe54023e921ec0a9294 100644 --- a/aikg/python/ai_kernel_generator/tools/single_evolve_runner.py +++ b/aikg/python/ai_kernel_generator/tools/single_evolve_runner.py @@ -255,16 +255,18 @@ def print_evolution_result(evolution_result: Dict[str, Any], evolve_config: Evol if best_implementations: print(f"\n最佳实现 (前{len(best_implementations)}个):") for i, impl in enumerate(best_implementations, 1): - profile_data = impl.get('profile', float('inf')) - - # 处理profile信息,支持三元组格式 - if isinstance(profile_data, (list, tuple)) and len(profile_data) >= 3: - gen_time, base_time, speedup = profile_data[0], profile_data[1], profile_data[2] - profile_str = f"生成代码: {gen_time:.4f}us, 基准代码: {base_time:.4f}us, 加速比: {speedup:.2f}x" - elif isinstance(profile_data, (list, tuple)) and len(profile_data) >= 1: - profile_str = f"执行时间: {profile_data[0]:.4f}us" - elif profile_data != float('inf'): - profile_str = f"执行时间: {profile_data:.4f}us" + profile_data = impl.get('profile', {}) + + # 处理profile信息(dict格式) + if isinstance(profile_data, dict): + gen_time = profile_data.get('gen_time', float('inf')) + base_time = profile_data.get('base_time', 0.0) + speedup = profile_data.get('speedup', 0.0) + + if gen_time != float('inf'): + profile_str = f"生成代码: {gen_time:.4f}us, 基准代码: {base_time:.4f}us, 加速比: {speedup:.2f}x" + else: + profile_str = "性能: N/A" else: profile_str = "性能: N/A" diff --git a/aikg/python/ai_kernel_generator/utils/collector.py b/aikg/python/ai_kernel_generator/utils/collector.py index 540b72ef3bdc64c5c99a820bcfaae9ad9b0831ef..6e768f23710eabc96769e21054f3277bda02c6f2 100644 --- a/aikg/python/ai_kernel_generator/utils/collector.py +++ b/aikg/python/ai_kernel_generator/utils/collector.py @@ -318,7 +318,7 @@ class Collector: "workflow_name": task_info.get("workflow_name", ""), "framework_code": task_info.get("task_desc", ""), "impl_code": task_info.get("coder_code", ""), - "profile": task_info.get("profile_res", ()) # 保存完整的三元组 + "profile": task_info.get("profile_res", {}) # 保存性能分析结果字典 } # 生成文件名并保存 diff --git a/aikg/python/ai_kernel_generator/utils/evolve_utils.py b/aikg/python/ai_kernel_generator/utils/evolve_utils.py index 01821a54c98a438ae889a8639aa93f57ead46c41..00e41f99357f75d12ba445c7fbd7a39f48bf54a7 100644 --- a/aikg/python/ai_kernel_generator/utils/evolve_utils.py +++ b/aikg/python/ai_kernel_generator/utils/evolve_utils.py @@ -182,7 +182,7 @@ def load_best_implementations(storage_dir: str, max_count: int = None) -> List[D logger.warning(f"Failed to load {filepath}: {e}") # 按性能排序(gen_time越小越好) - implementations.sort(key=lambda x: x.get('profile', (float('inf'), 0.0, 0.0))[0]) + implementations.sort(key=lambda x: x.get('profile', {}).get('gen_time', float('inf'))) logger.info(f"Loaded {len(implementations)} implementations from {storage_dir}") @@ -211,8 +211,10 @@ def classify_implementations_by_performance(implementations: List[Dict[str, Any] # 过滤出有效的加速比数据 valid_impls = [] for impl in implementations: - profile = impl.get('profile', (float('inf'), 0.0, 0.0)) - if len(profile) >= 3 and profile[2] != float('inf') and profile[2] > 0: + profile = impl.get('profile', {}) + + speedup = profile.get('speedup', 0.0) + if speedup != float('inf') and speedup > 0: valid_impls.append(impl) if not valid_impls: @@ -220,8 +222,8 @@ def classify_implementations_by_performance(implementations: List[Dict[str, Any] total_count = len(valid_impls) - # 按加速比排序(从高到低) - valid_impls.sort(key=lambda x: x['profile'][2], reverse=True) + # 按生成时间排序(从小到大,越小越好) + valid_impls.sort(key=lambda x: x.get('profile', {}).get('gen_time', float('inf'))) # 分层策略:前30%为好,中间40%为中等,后30%为差 good_count = max(1, int(total_count * 0.3)) @@ -316,7 +318,11 @@ def sample_inspirations(implementations: List[Dict[str, Any]], sample_num: int = # 转换为inspiration格式 inspirations = [] for impl in selected: - profile_tuple = impl.get('profile', (float('inf'), 0.0, 0.0)) + profile_data = impl.get('profile', { + 'gen_time': float('inf'), + 'base_time': 0.0, + 'speedup': 0.0 + }) # 优先使用sketch,如果没有sketch则使用原始代码 sketch = impl.get('sketch', '') @@ -326,7 +332,7 @@ def sample_inspirations(implementations: List[Dict[str, Any]], sample_num: int = 'id': impl.get('id'), # 保留ID信息 'sketch': sketch, # 使用sketch作为inspiration内容 'impl_code': impl_code, # 使用原始代码作为inspiration内容 - 'profile': profile_tuple, # 保持完整的三元组 + 'profile': profile_data, # 保持完整的性能数据字典 'strategy_mode': 'evolution' } inspirations.append(inspiration) @@ -352,8 +358,8 @@ def migrate_elites(islands: List[List[Dict[str, Any]]], migration_size: int = 1) # 收集所有岛屿的精英 elites = [] for island in islands: - # 每个岛屿选择最好的几个个体 - sorted_island = sorted(island, key=lambda x: x.get('profile', (float('inf'), 0.0, 0.0))[0]) + # 每个岛屿选择最好的几个个体(gen_time越小越好) + sorted_island = sorted(island, key=lambda x: x.get('profile', {}).get('gen_time', float('inf'))) elites.extend(sorted_island[:migration_size]) # 随机打乱精英列表 diff --git a/aikg/python/ai_kernel_generator/utils/result_processor.py b/aikg/python/ai_kernel_generator/utils/result_processor.py index 695c5347857d376c53f3cf3fb49153fd337d4726..cbef4e649fd750eaac59024b44b5a4e56a8ebd74 100644 --- a/aikg/python/ai_kernel_generator/utils/result_processor.py +++ b/aikg/python/ai_kernel_generator/utils/result_processor.py @@ -111,7 +111,7 @@ class ResultProcessor: return False @staticmethod - def update_verifier_result(result: str, error_log: str, task_info: Dict[str, Any], profile_res: Optional[str] = None) -> None: + def update_verifier_result(result: str, error_log: str, task_info: Dict[str, Any], profile_res: Optional[dict] = None) -> None: """ 更新verifier结果 @@ -119,6 +119,11 @@ class ResultProcessor: result: verifier结果 error_log: 错误日志 task_info: 任务信息字典(会被修改) + profile_res: 性能分析结果字典,包含: + - gen_time: 生成代码执行时间(微秒) + - base_time: 基准代码执行时间(微秒) + - speedup: 加速比 + - autotune_summary: autotune配置详情(可选,仅triton+ascend) """ try: # 解析verifier结果 diff --git a/aikg/python/ai_kernel_generator/utils/triton_autotune_patch.py b/aikg/python/ai_kernel_generator/utils/triton_autotune_patch.py index 3d5b9cd74bcdb968c9e0e7df678eb61af167f4a3..835f8af417ca0f5ef888adb1d765ca4f19125efb 100644 --- a/aikg/python/ai_kernel_generator/utils/triton_autotune_patch.py +++ b/aikg/python/ai_kernel_generator/utils/triton_autotune_patch.py @@ -17,6 +17,55 @@ import os # 全局变量存储配置信息 _collected_config_timings = {} +# 需要过滤的底层实现参数 +_FILTERED_CONFIG_PARAMS = { + 'num_warps', + 'num_ctas', + 'num_stages', + 'num_buffers_warp_spec', + 'num_consumer_groups', + 'reg_dec_producer', + 'reg_inc_consumer', + 'maxnreg' +} + + +def _filter_config_string(config_str: str) -> str: + """过滤配置字符串,移除底层实现参数 + + 处理终端打印格式: + "BLOCK_B: 32, BLOCK_C: 32, num_warps: 4, num_ctas: 1, ..." + + Args: + config_str: 原始配置字符串 + + Returns: + 过滤后的配置字符串,如 "BLOCK_B: 32, BLOCK_C: 32" + """ + # 分割参数(按逗号分隔) + params = [] + for param in config_str.split(','): + param = param.strip() + if not param: + continue + + # 提取参数名(支持冒号和等号) + if ':' in param: + param_name = param.split(':', 1)[0].strip() + elif '=' in param: + param_name = param.split('=', 1)[0].strip() + else: + # 没有分隔符的参数保留 + params.append(param) + continue + + # 只保留非过滤参数 + if param_name not in _FILTERED_CONFIG_PARAMS: + params.append(param) + + # 重新组装 + return ', '.join(params) + def patch_triton_autotuner(): """动态补丁triton autotuner,添加配置信息收集功能""" @@ -68,9 +117,12 @@ def patch_triton_autotuner(): timing_value = timing[0] if isinstance(timing, list) else timing # profiler_npu返回的已经是微秒,无需转换 timing_us = timing_value + + # 过滤配置字符串 + config_str = _filter_config_string(str(config)) config_data.append({ - "config": str(config), + "config": config_str, "timing_us": float(timing_us), "is_best": is_best, "rank": i + 1 @@ -90,7 +142,9 @@ def patch_triton_autotuner(): status = " (BEST)" if config == self.best_config else "" timing_value = timing[0] if isinstance(timing, list) else timing timing_us = timing_value - print(f" Config {i+1}: {config} -> {timing_us:.4f}us{status}") + # 过滤配置字符串 + config_str = _filter_config_string(str(config)) + print(f" Config {i+1}: {config_str} -> {timing_us:.4f}us{status}") except (TypeError, ValueError, AttributeError): continue