diff --git a/README.md b/README.md index 849f423db9e66472a0161ea083dc591e2536fe2a..acc00db1ddfe81d576949d74f82ace3f25d9e3cc 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ splitter处于开发阶段,当前仅支持在openEuler上部署(建议使用 1. 安装系统依赖 ``` -dnf install python3-dnf git python3-pip cpio +dnf install python3-dnf git python3-pip cpio binutils ``` 2. 克隆源码仓库 diff --git a/bin/Dockerfile b/bin/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..1bf3f67b560d1cb3aca0e4f272521e78d269a0f2 --- /dev/null +++ b/bin/Dockerfile @@ -0,0 +1,26 @@ +# Dockerfile for the SDF Generator environment + +# Start from a specific openEuler release. +ARG RELEASE_TAG=24.03-lts +FROM hub.oepkgs.net/openeuler/openeuler:${RELEASE_TAG} + +# Install all the dependencies for the splitter tool. +RUN dnf install -y \ + python3-dnf \ + git \ + python3-pip \ + cpio \ + binutils \ + file && \ + dnf clean all + +# Copy the splitter source code into the image. +COPY . /splitter + +# Install the splitter tool itself and its dependencies. +RUN cd /splitter && \ + pip3 install -i https://repo.huaweicloud.com/repository/pypi/simple . + +WORKDIR /splitter + +CMD ["/bin/bash"] \ No newline at end of file diff --git a/bin/gen-sdf-docker.sh b/bin/gen-sdf-docker.sh new file mode 100755 index 0000000000000000000000000000000000000000..e986cb29434314aec8e9c9d8915d2d939023b4ec --- /dev/null +++ b/bin/gen-sdf-docker.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash +set -eu + +# base directory +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "${SCRIPT_DIR}/.." || exit 1 +BASE_DIR=$(pwd) + +ensure_docker_ready() { + # Check if Docker command exists + if ! command -v docker &> /dev/null; then + echo ">>> Docker not found. Installing ..." + # Check for root privileges. Exit if not root. + if [[ $EUID -ne 0 ]]; then + echo "This script must be run as root to install Docker." + exit 1 + fi + dnf install -y docker + systemctl restart docker + echo ">>> Docker installed." + fi +} + +# Help Function +usage() { + echo "Usage: $0 -p -r -o " + echo " -p, --package Required. The name of the RPM package to generate an SDF for (e.g., 'brotli')." + echo " -r, --release Required. The openEuler release (e.g., '24.03-LTS')." + echo " -o, --output Required. The directory to save the generated SDF file." + echo " -h, --help Show this help message." + exit 1 +} + +# Defaults +ARCH=$(uname -m) +RELEASE="" +PACKAGE_NAME="" +OUTPUT_DIR="" + +# Argument Parsing with getopts +while getopts ":p:r:o:h" opt; do + case ${opt} in + p ) + PACKAGE_NAME=$OPTARG + ;; + r ) + RELEASE=$OPTARG + ;; + o ) + OUTPUT_DIR=$OPTARG + ;; + h ) + usage + ;; + \? ) + echo "Invalid Option: -$OPTARG" 1>&2 + usage + ;; + : ) + echo "Invalid Option: -$OPTARG requires an argument" 1>&2 + usage + ;; + esac +done + +# Input Validation +if [[ -z "$PACKAGE_NAME" || -z "$RELEASE" || -z "$OUTPUT_DIR" ]]; then + echo "Error: Missing required arguments." + usage +fi + +ensure_docker_ready + +# Absolute path for the output directory for Docker mount +mkdir -p "${OUTPUT_DIR}" +ABS_OUTPUT_DIR="$(cd "$OUTPUT_DIR" && pwd)" + +RELEASE_TAG="${RELEASE,,}" # Convert to lowercase for consistency +CUSTOM_IMAGE_NAME="sdf-generator-base:${RELEASE_TAG}" +CONTAINER_NAME="sdf-generator-${PACKAGE_NAME}-$$" + +echo ">>> Starting SDF Generation for '${PACKAGE_NAME}'" +echo " Release: ${RELEASE}" +echo " Release Tag: ${RELEASE_TAG}" +echo " Arch: ${ARCH}" +echo " Output Dir: ${ABS_OUTPUT_DIR}" +echo " Using Docker Image: ${CUSTOM_IMAGE_NAME}" + +# Docker Image Build +echo ">>> Checking for custom base image: ${CUSTOM_IMAGE_NAME}" +if [[ -z "$(docker images -q "${CUSTOM_IMAGE_NAME}")" ]]; then + echo ">>> Base image not found. Building it now..." + docker build --no-cache --build-arg RELEASE_TAG="${RELEASE_TAG}" \ + -t "${CUSTOM_IMAGE_NAME}" \ + -f "${BASE_DIR}/bin/Dockerfile" \ + "${BASE_DIR}" + echo ">>> Base image built successfully." +else + echo ">>> Base image found." +fi + +echo ">>> Output SDF will be saved to: ${ABS_OUTPUT_DIR}" + +# The command to be executed inside the container. +INSTALL_CMD="dnf install -y ${PACKAGE_NAME}" +GENERATE_CMD="splitter gen -p ${PACKAGE_NAME} -r ${RELEASE} -o /output -a ${ARCH}" +FULL_COMMAND="${INSTALL_CMD} && ${GENERATE_CMD}" + +echo ">>> Starting Docker container from custom image..." +docker run --name "${CONTAINER_NAME}" \ + -v "${ABS_OUTPUT_DIR}:/output" \ + --rm \ + "${CUSTOM_IMAGE_NAME}" \ + /bin/bash -c "${FULL_COMMAND}" + +echo ">>> SDF Generation complete. Docker container has been removed." +echo ">>> Done." \ No newline at end of file diff --git a/tests/python/writer_test.py b/tests/python/writer_test.py new file mode 100644 index 0000000000000000000000000000000000000000..ffe28e80fd1be15f8b227b32b805aee27b88e950 --- /dev/null +++ b/tests/python/writer_test.py @@ -0,0 +1,78 @@ +import unittest +from tools.generator.writer import SDFWriter + + +class TestCompressPaths(unittest.TestCase): + def setUp(self): + self.writer = SDFWriter( + output="test_output", + package_name="test_package", + classified_slices={}, + slice_deps={}, + ) + + def test_empty_set(self): + """Test an empty set of files""" + result = self.writer._compress_paths(set()) + self.assertEqual(result, set()) + + def test_no_so_files(self): + """Test the case without .so files""" + files = {"file1.txt", "dir/file2.py"} + result = self.writer._compress_paths(files) + self.assertEqual(result, files) + + def test_single_so_file(self): + """Test a single .so file""" + files = {"libtest.so"} + result = self.writer._compress_paths(files) + self.assertEqual(result, files) + + def test_multiple_unrelated_so_files(self): + """Test multiple unrelated .so files""" + files = {"libA.so", "libB.so", "libC.so"} + result = self.writer._compress_paths(files) + self.assertEqual(result, files) + + def test_versioned_libs(self): + """Test compression of versioned library files""" + files = { + "libtest.so.1", + "libtest.so.1.2", + "libtest.so.1.2.3", + "other.so", + } + expected = {"libtest.so.1*", "other.so"} + result = self.writer._compress_paths(files) + self.assertEqual(result, expected) + + def test_mixed_files(self): + """Test mixed file types (.so and other files)""" + files = { + "libtest.so.1", + "libtest.so.1.2", + "file.txt", + "libother.so", + "script.py", + } + expected = {"libtest.so.1*", "libother.so", "file.txt", "script.py"} + result = self.writer._compress_paths(files) + self.assertEqual(result, expected) + + def test_multiple_version_groups(self): + """Test multiple groups of versioned library files""" + files = {"libA.so.1", "libA.so.1.2", "libB.so.1", "libB.so.1.2", "libC.so"} + expected = {"libA.so.1*", "libB.so.1*", "libC.so"} + result = self.writer._compress_paths(files) + self.assertEqual(result, expected) + + def test_partial_matches(self): + """Test partial but not exact matches""" + files = {"libtest.so.1", "libtestX.so.1.2", "libtest.so.1.2"} + expected = {"libtest.so.1*", "libtestX.so.1.2"} + result = self.writer._compress_paths(files) + self.assertEqual(result, expected) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/cmd/gen.py b/tools/cmd/gen.py new file mode 100644 index 0000000000000000000000000000000000000000..8a62c5456bff1981ca0d050585257e7487f93184 --- /dev/null +++ b/tools/cmd/gen.py @@ -0,0 +1,45 @@ +import click +import platform + +from tools.generator.sdfgenerator import SDFGenerator + + +@click.command( + name="gen", + help="Automatically generate a Slice Definition File (SDF) for an openEuler package.", +) +@click.option( + "-r", + "--release", + required=True, + help="This decides which openEuler release you will use, such as `openEuler-24.03-LTS-SP1`.", +) +@click.option( + "-a", + "--arch", + default=None, + help="The architecture. If not provided, it will be auto-detected from the host machine.", +) +@click.option( + "-o", "--output", default=".", help="The directory to save the generated SDF file." +) +@click.option( + "-p", + "--package", + required=True, + help="The name of the RPM package to generate an SDF for (e.g., 'attr').", +) +def gen(release, arch, output, package): + """ + CLI command to orchestrate SDF generation. + """ + if not arch: + arch = platform.machine() + click.echo(f"Architecture not specified, auto-detected: {arch}") + + click.echo( + f"Starting SDF generation for '{package}' on openEuler-{release} ({arch})..." + ) + + generator = SDFGenerator(release=release, arch=arch, output=output, package=package) + generator.gen() diff --git a/tools/generator/__init__.py b/tools/generator/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tools/generator/classifier.py b/tools/generator/classifier.py new file mode 100644 index 0000000000000000000000000000000000000000..ac6bbe915292960a2f4773ac0c5980ad1b0f94bd --- /dev/null +++ b/tools/generator/classifier.py @@ -0,0 +1,136 @@ +from collections import defaultdict +import os +from pathlib import Path +import subprocess + +# Slice Type Definitions +SLICE_TYPE_COPYRIGHT = "_copyright" +SLICE_TYPE_CONFIG = "_config" +SLICE_TYPE_BINS = "_bins" +SLICE_TYPE_LIBS = "_libs" + + +def _get_file_type_desc(filepath: str) -> str: + """ + Uses `file -L` to get the canonical type of a file. + Returns a lower-cased description string. + """ + if not os.path.exists(filepath): + return "" + try: + file_proc = subprocess.run( + ["file", "-L", filepath], + capture_output=True, + text=True, + check=True, + encoding="utf-8", + errors="ignore", + ) + return file_proc.stdout.lower() + except FileNotFoundError as e: + raise RuntimeError( + "FATAL: `file` command not found. Please ensure the `file` package is installed." + ) from e + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"`file` command failed on {filepath}: {e.stderr.strip()}. Halting analysis." + ) from e + + +def _is_copyright(filepath: str, pkg_root_dir: str) -> bool: + """ + Checks if a file is a copyright/license file. + This includes standard paths and special cases found in doc directories. + """ + # Standard prefix-based check + if filepath.startswith("/usr/share/licenses/"): + return True + + # Case-insensitive check for common license filenames + filename = Path(filepath).name.lower() + if any(keyword in filename for keyword in ("license", "copying", "copyright", "notice")): + # If the filename itself suggests it's a license, it's a strong indicator. + # This will catch cases like '/usr/share/doc/c-ares/LICENSE.md' + return True + + return False + + +def _is_config(filepath: str, pkg_root_dir: str) -> bool: + if filepath.startswith("/etc/ima/"): + return False + return filepath.startswith("/etc/") + + +def _is_bin(filepath: str, pkg_root_dir: str) -> bool: + """ + Checks if a file is a binary executable. + """ + if not (filepath.startswith(("/usr/bin/", "/usr/sbin/", "/usr/libexec/"))): + return False + + full_path = os.path.join(pkg_root_dir, filepath.lstrip("/")) + desc = _get_file_type_desc(full_path) + return "elf" in desc and "executable" in desc + + +def _is_library(filepath: str, pkg_root_dir: str) -> bool: + """Checks if a file is a shared library.""" + if not (filepath.startswith("/usr/lib") or filepath.startswith("/lib")): + return False + + full_path = os.path.join(pkg_root_dir, filepath.lstrip("/")) + desc = _get_file_type_desc(full_path) + return "elf" in desc and "shared object" in desc + + +def _get_pkg_files(pkg_root_dir: str) -> list[str]: + files = [] + for root, _, filenames in os.walk(pkg_root_dir): + for filename in filenames: + full_path = os.path.join(root, filename) + rel_path = os.path.relpath(full_path, pkg_root_dir) + files.append(f"/{rel_path}") + return files + + +# Rule Definition +CLASSIFICATION_RULES = [ + (SLICE_TYPE_COPYRIGHT, _is_copyright), + (SLICE_TYPE_CONFIG, _is_config), + (SLICE_TYPE_BINS, _is_bin), + (SLICE_TYPE_LIBS, _is_library), +] + + +def classify_files(package_name: str, pkg_root_dir: str) -> dict[str, set[str]]: + """ + Classifies a list of files into slices based on a defined set of rules. + + Args: + package_name: The name of the package. + pkg_root_dir: The root path where files were extracted. + + Returns: + A dictionary mapping slice names to a set of file paths. + """ + files = _get_pkg_files(pkg_root_dir) + classified_slices = defaultdict(set) + + for filepath in files: + + slice_type_suffix = None + + for suffix, checker_func in CLASSIFICATION_RULES: + if checker_func(filepath, pkg_root_dir): + slice_type_suffix = suffix + break + + # If no rule matched, skip the file + if slice_type_suffix is None: + continue + + slice_name = f"{package_name}{slice_type_suffix}" + classified_slices[slice_name].add(os.path.normpath(filepath)) + + return classified_slices diff --git a/tools/generator/dependency_analyzer.py b/tools/generator/dependency_analyzer.py new file mode 100644 index 0000000000000000000000000000000000000000..fb87477a7e212e66dba3cd7435caf71c485268eb --- /dev/null +++ b/tools/generator/dependency_analyzer.py @@ -0,0 +1,209 @@ +import subprocess +import re +import os +from collections import defaultdict +from tools.logger import logger +from tools.generator.classifier import ( + SLICE_TYPE_BINS, + SLICE_TYPE_LIBS, + SLICE_TYPE_CONFIG, +) + + +class DependencyAnalyzer: + """ + Analyzes binary dependencies for a given package. + """ + + def __init__( + self, + package_name: str, + pkg_extract_path: str, + classified_slices: dict[str, set[str]], + ): + self.package_name = package_name + self.pkg_extract_path = pkg_extract_path + self.classified_slices = classified_slices + + # Caches to improve performance + self._file_to_pkg_cache = {} + self._ldconfig_cache = None + + def analyze(self) -> dict[str, set[str]]: + """ + Main entry point for the analysis. + Orchestrates the process of analyzing dependencies for all relevant slices. + + Returns: + - A dictionary mapping slice names to sets of dependent slices. + """ + # Step 1: Analyze external binary dependencies (existing logic) + slice_deps = self._analyze_external_dependencies() + + # Step 2: Inject internal dependencies (new logic) + self._inject_internal_dependencies(slice_deps) + + return slice_deps + + def _analyze_external_dependencies(self) -> dict[str, set[str]]: + """ + Analyzes dependencies on external packages using readelf. + This method checks for shared libraries used by binaries + and maps them to their owning packages. + Returns: + - A dictionary mapping slice names to sets of dependent slices. + """ + + slice_deps = defaultdict(set) + + # Pre-load the ldconfig cache once for the entire analysis + self._load_ldconfig_cache() + + # Iterate through classified slices to find binaries and libraries + for slice_name, file_set in self.classified_slices.items(): + if not (slice_name.endswith(SLICE_TYPE_BINS) or slice_name.endswith(SLICE_TYPE_LIBS)): + continue + + for file in file_set: + full_file_path = os.path.join(self.pkg_extract_path, file.lstrip("/")) + + if not os.path.isfile(full_file_path): + continue + + # Get dependencies for a single binary file + needed_libs = self._get_needed_libraries(full_file_path) + + for lib_name in needed_libs: + # Resolve the library name to a package owner + owner_pkg = self._resolve_library_to_package(lib_name) + + if owner_pkg and owner_pkg != self.package_name: + dep_slice = f"{owner_pkg}_libs" + logger.debug( + f"'{slice_name}' dependency found: {file} -> {lib_name} ({dep_slice})" + ) + slice_deps[slice_name].add(dep_slice) + + return slice_deps + + def _inject_internal_dependencies(self, slice_deps: dict[str, set[str]]): + """ + Adds dependencies between slices of the same package (e.g., bins -> config). + """ + logger.debug("Injecting internal dependencies...") + + # Construct the names of the potential slices within this package + config_slice_name = f"{self.package_name}{SLICE_TYPE_CONFIG}" + bins_slice_name = f"{self.package_name}{SLICE_TYPE_BINS}" + libs_slice_name = f"{self.package_name}{SLICE_TYPE_LIBS}" + + # Check if a config slice exists for this package + if config_slice_name in self.classified_slices: + # If bins slice exists, make it depend on the config slice + if bins_slice_name in self.classified_slices: + logger.info(f"Adding internal dependency: {bins_slice_name} -> {config_slice_name}") + slice_deps[bins_slice_name].add(config_slice_name) + + # If libs slice exists, make it depend on the config slice + if libs_slice_name in self.classified_slices: + logger.info(f"Adding internal dependency: {libs_slice_name} -> {config_slice_name}") + slice_deps[libs_slice_name].add(config_slice_name) + + def _load_ldconfig_cache(self): + """ + Executes `ldconfig -p` once and caches its output. + """ + logger.debug("Loading ldconfig cache...") + try: + ldconfig_proc = subprocess.run( + ["ldconfig", "-p"], capture_output=True, text=True, check=True + ) + self._ldconfig_cache = ldconfig_proc.stdout + except FileNotFoundError as e: + raise RuntimeError( + "`ldconfig` command not found. Please ensure the `glibc` package is installed." + ) from e + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"`ldconfig` command failed: {e.stderr.strip()}. Halting analysis." + ) from e + + def _get_pkg_owner(self, file_path: str) -> str: + """Finds the package that owns a file using an instance cache.""" + if file_path in self._file_to_pkg_cache: + return self._file_to_pkg_cache[file_path] + try: + rpm_qf_proc = subprocess.run( + ["rpm", "-qf", file_path], capture_output=True, text=True, check=True + ) + pkg_full_name = rpm_qf_proc.stdout.strip() + rpm_q_qf_proc = subprocess.run( + ["rpm", "-q", "--qf", "%{NAME}", pkg_full_name], + capture_output=True, + text=True, + check=True, + ) + owner_pkg = rpm_q_qf_proc.stdout.strip() + self._file_to_pkg_cache[file_path] = owner_pkg + return owner_pkg + except FileNotFoundError as e: + raise RuntimeError( + "FATAL: `rpm` command not found. Please ensure the `rpm` package is installed." + ) from e + except subprocess.CalledProcessError: + self._file_to_pkg_cache[file_path] = "" + return "" + + def _get_needed_libraries(self, binary_path: str) -> list[str]: + """ + Runs `readelf -d` on a binary. Assumes the file is a valid ELF + as it has been pre-filtered by the classifier. + + Returns: + A list of needed libraries. + """ + needed = [] + try: + readelf_result = subprocess.run( + ["readelf", "-d", binary_path], + capture_output=True, + text=True, + check=True, + encoding="utf-8", + errors="ignore", + ) + for line in readelf_result.stdout.strip().split("\n"): + if "(NEEDED)" in line: + match = re.search(r"\[(.*)\]", line) + if match: + needed.append(match.group(1)) + return needed + except FileNotFoundError as e: + raise RuntimeError( + "FATAL: `readelf` command not found. Please ensure the `binutils` " + "package is installed." + ) from e + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"readelf failed on '{binary_path}' " + f"which was identified as an ELF file. The file might be corrupted. " + f"Halting analysis. Error: {e.stderr.strip()}" + ) from e + + def _resolve_library_to_package(self, lib_name: str) -> str: + """ + Resolves a library name (e.g., 'libc.so.6') to its owning package name (e.g., 'glibc'). + Uses the ldconfig cache to find the library path. + """ + if not self._ldconfig_cache: + return "" + + lib_path_match = re.search( + rf"\s+{re.escape(lib_name)}\s*.*=>\s*(/.*)", self._ldconfig_cache + ) + if not lib_path_match: + logger.warning(f"Could not find path for library '{lib_name}' in ldconfig cache.") + return "" + + lib_path = lib_path_match.group(1) + return self._get_pkg_owner(lib_path) diff --git a/tools/generator/sdfgenerator.py b/tools/generator/sdfgenerator.py new file mode 100644 index 0000000000000000000000000000000000000000..8bbafb87c58b677f17c0e482da722d83476a821a --- /dev/null +++ b/tools/generator/sdfgenerator.py @@ -0,0 +1,69 @@ +import os +import tempfile +from tools.download import rpm +from tools.parse import parse +from tools.splitter.splitter import _architecture_check, _clone_slices +from tools.logger import logger +from tools import SLICE_PATH + +from tools.generator import classifier +from tools.generator.dependency_analyzer import DependencyAnalyzer +from tools.generator.writer import SDFWriter + + +class SDFGenerator: + """Class to generate SDF files for a given package.""" + + # Class attributes for the generator + package: str + release: str + arch: str + output: str + + def __init__(self, release: str, arch: str, output: str, package: str): + self.release = f"openEuler-{release.upper()}" + self.arch = _architecture_check(arch) + self.output = os.path.abspath(output) + self.package = package + + def gen(self): + """ + Main entry point for generating SDF files. + """ + logger.info(f"===== Starting SDF Generation for: {self.package} =====") + + _clone_slices(self.release, SLICE_PATH) + + # Initialize DNF client for downloading + logger.info(f"Downloading package: {self.package}...") + dnf_client = rpm.init_dnf_client(self.arch, self.release, self.output) + local_pkg_path = rpm.download(dnf_client, self.package) + if not local_pkg_path: + logger.error(f"Failed to download package {self.package}.") + rpm.clear(dnf_client) + return + logger.info(f"Package downloaded to: {local_pkg_path}") + + # extracting RPM files + pkg_root_dir = tempfile.TemporaryDirectory() + logger.info(f"Extracting {local_pkg_path} to {pkg_root_dir} for analysis...") + parse.extract_files(local_pkg_path, pkg_root_dir.name, ["/*"]) + + # Classify files into slices + classified_slices = classifier.classify_files(self.package, pkg_root_dir.name) + for slice_name, files in classified_slices.items(): + logger.info(f"Slice '{slice_name}' contains {files} ") + + # Analyze dependencies + analyzer = DependencyAnalyzer(self.package, pkg_root_dir.name, classified_slices) + slice_deps = analyzer.analyze() + for slice_name, deps in slice_deps.items(): + logger.info(f"Slice '{slice_name}' depends on: {deps}") + + # Write the SDF file + writer = SDFWriter(self.output, self.package, classified_slices, slice_deps) + writer.write() + + rpm.clear(dnf_client) + + logger.info(f"===== Finished SDF Generation for: {self.package} =====") diff --git a/tools/generator/writer.py b/tools/generator/writer.py new file mode 100644 index 0000000000000000000000000000000000000000..41f6c7a7f4ac11fa09da5bdc7d35dd11272cd0f7 --- /dev/null +++ b/tools/generator/writer.py @@ -0,0 +1,108 @@ +import yaml +from pathlib import Path +from tools.logger import logger + + +class SDFWriter: + """ + Builds the SDF data structure and writes it to a YAML file. + """ + + def __init__( + self, + output: str, + package_name: str, + classified_slices: dict[str, set[str]], + slice_deps: dict[str, set[str]], + ): + self.output_path = Path(output) / f"{package_name}.yaml" + self.package_name = package_name + self.classified_slices = classified_slices + self.slice_deps = slice_deps + + def write(self): + """ + Main entry point to build the data and write the file. + """ + sdf_data = self._build_sdf_structure() + + self.output_path.parent.mkdir(parents=True, exist_ok=True) + with open(self.output_path, "w", encoding="utf-8") as f: + yaml.dump(sdf_data, f, indent=2, sort_keys=False, default_flow_style=False) + + logger.info(f"SDF file written to: {self.output_path}") + + def _build_sdf_structure(self) -> dict: + """ + Assembles the final SDF data dictionary. + """ + copyright_slice_name = f"{self.package_name}_copyright" + sdf_data = { + "package": self.package_name, + "deps": ( + [copyright_slice_name] + if copyright_slice_name in self.classified_slices + else [] + ), + "slices": {}, + } + + for slice_name, files in sorted(self.classified_slices.items()): + + short_slice_name = slice_name.replace(f"{self.package_name}_", "", 1) + slice_content = {} + + if self.slice_deps.get(slice_name): + slice_content["deps"] = sorted(list(self.slice_deps[slice_name])) + + # Apply path compression before adding to the structure + compressed_files = self._compress_paths(files) + slice_content["contents"] = {"common": sorted(list(compressed_files))} + + sdf_data["slices"][short_slice_name] = slice_content + + return sdf_data + + def _compress_paths(self, file_set: set[str]) -> set[str]: + """ + Performs a robust path compression for versioned shared libraries. + This version uses a direct prefix matching approach. + """ + # We only attempt to compress files that look like libraries + libs = sorted([f for f in file_set if ".so" in f], key=len) + other_files = {f for f in file_set if ".so" not in f} + + if not libs: + return other_files + + # The core idea: iterate through the sorted libs. If a lib is a prefix + # of subsequent libs, it becomes a candidate for a wildcard. + compressed_libs = set() + + # Use a boolean array to mark which libraries have been consumed + # by a wildcard prefix. + consumed = [False] * len(libs) + + for i in range(len(libs)): + if consumed[i]: + continue + + # The current library is a potential prefix + prefix = libs[i] + is_prefix_for_others = False + + for j in range(i + 1, len(libs)): + if libs[j].startswith(prefix): + # If we find at least one longer file that starts with our prefix, + # it confirms this is a valid compression case. + is_prefix_for_others = True + consumed[j] = True # Mark the longer path as consumed + + if is_prefix_for_others: + # Add the prefix with a wildcard + compressed_libs.add(f"{prefix}*") + else: + # If it wasn't a prefix for any other lib, add it as is + compressed_libs.add(prefix) + + return compressed_libs.union(other_files) diff --git a/tools/main.py b/tools/main.py index fa535a426d437d5f382ef25236c35998f4e9c59a..678e0e43de350adde0750e03bb93235e1537dff6 100644 --- a/tools/main.py +++ b/tools/main.py @@ -1,5 +1,6 @@ import click from tools.cmd.cut import cut +from tools.cmd.gen import gen @click.group(help=""" @@ -12,6 +13,7 @@ def entrance(): def _add_commands(): # Unified interface for extension. entrance.add_command(cut) + entrance.add_command(gen) def main(): _add_commands()