# EasyLLM **Repository Path**: janelu9/EasyLLM ## Basic Information - **Project Name**: EasyLLM - **Description**: No description available - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-06-03 - **Last Updated**: 2025-09-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # *EasyLLM* Training Large Language Model faster, easily and low-cost. ✦ Both GPU and NPU are supported. ✦ Directly training on whole big data of token ids converted by PySpark when pretrain. ✦ Flash speed when fine-tuning because of no redundant computation . ✦ Make PCIE as fast as NVLinks under 20 billion level model. ✦ Minimalist implementation of Sequence Parallelism (4D Parallelism for extra long context). ✦ High performance of Visual Language Model‘s full parameter fine-tuning. ✦ Low communication and dynamic experts balance when Mixture of Experts training. ✦ Flash speed of Reinforcement Learning benefitting from optimizations as asynchronous inference and training, etc. ## Installation ```shell git clone --depth 1 https://github.com/janelu9/EasyLLM.git cd EasyLLM pip wheel -e . --no-deps && pip install jllm-*-py3-none-any.whl ``` ## Quick Start ### Data Conversion Convert the raw data to token ids stored in parquet files. ```shell python -m jllm.raw2ids \ --tokenizer DeepSeek-R1 \ -i dataset0.jsonl \ -o dataset0_DeepSeek-R1 \ --max_len 8193 -C ``` - **Pre-train** dataset's samples should be separated by *`'\n\n'`* in text files or be the value of key *`'text'`* in jsonl files. - **Fine-tune**'s format should be *`[{'system':content},{'user':content},{'assistant':content},...] `* in each row of jsonl files, key *`'system'`* is not necessary. - **RLHF**'s format is like *`[index,{'user':content}] `.* *`index`* is an ID of integer. **For Vision Language Model:** ```shell python -m jllm.raw2ids \ --tokenizer Qwen2.5-VL-7B-Instruct \ -i dataset_vl.jsonl \ --image_path images \ --max_len 32769 ``` Folder *`images`* stores all the images data. Format of *`dataset_vl.jsonl`* is like: *`[{'user':['Give a description of these pictures please.\n ....','image0.jpg',...]},{'assistant':'This is ....'}]`* ### Model Training #### Large Language Model : ```shell DISTRIBUTED_ARGS=( --nproc_per_node $GPUS_PER_NODE --nnodes $NUM_NODES --master_addr $MASTER_ADDR --master_port $MASTER_PORT ) torchrun ${DISTRIBUTED_ARGS[@]} \ -m jllm.train_pipe \ --model DeepSeek-R1 \ --num_train_epochs 3 \ --train_data dataset0_DeepSeek-R1 \ --num_partitions 4 \ --pipe_parallel_size 16 \ --tensor_parallel_size 8 \ --expert_parallel_size 2 \ --micro_batch_size 1 \ --global_batch_size 256 \ --partition_method 9,5 \ --only_ckpt_model \ --max_num_checkpoints 2 \ --learning_rate 1e-5 \ --checkpoint checkpoint ``` #### **Vision Language Model**: ```shell torchrun ${DISTRIBUTED_ARGS[@]} \ -m jllm.train_pipe \ --model Qwen2.5-VL-7B-Instruct \ --num_train_epochs 3 \ --train_data dataset_vl_Qwen2.5-VL-7B-Instruct \ --pipe_parallel_size 4 \ --tensor_parallel_size 4 \ --encoder_pipe_parallel_size 2 \ --micro_batch_size 1 \ --global_batch_size 64 \ --only_ckpt_model \ --max_num_checkpoints 2 \ --partition_method fast \ --no_pin_memory \ --checkpoint_grad_interval 1 \ --checkpoint checkpoint ``` You can also submit training task by deepspeed mpi: ```shell HOSTFILE= """ 10.0.0.0 slots=$GPUS_PER_NODE 10.0.0.1 slots=$GPUS_PER_NODE """ deepspeed -H ${HOSTFILE} \ --module jllm.train_pipe \ ... ``` If you are using a shared storage, model weights from HuggingFace will be converted automatically. You can also do this manually when your storage of each node is independent : ```shell python -m jllm.hf2ds -p 16 -t 8 -e 4 --partition_method 8,6 -m DeepSeek-R1 -o trained_model ``` `--partition_method 8,6` denotes there's 8 sub-layers in first stage and 6 sub-layers in last pipeline stage. One decoder layer contains two sub-layers (one Aattention layer, one MLP or MoE layer) in my codes. ***Note**: Arguments `train_data` and `eval_data` also support `jsonl` file. Run `python -m jllm.train_pipe -h ` for more arguments.* Generally, every GPU process reads one piece of data, that means one node with 8 GPUs will need to allocate a total of 8x CPU memory for data. But now they need just 1x if these GPUs belong to one pipeline under my special optimizations in this project . **I strongly recommend you to train your model with faster and low-cost Pipeline Parallelism** rather than ZERO. Pipeline engine could directly load and save model's weights in HuggingFace's format. It could also load weights from checkpoint. If you want to resume interruption, any configs related to training shouldn't be modified. The engine was designed to save checkpoint through background process by default to save more time for training. **Don't save checkpoint too frequently** unless you disable checkpoint in background via the argument '`--background_executor none`' to avoid out of CPU memory. Setting `--partition_method` to be `fast` will always get a faster training when GPU memory are enough. #### **Reinforcement Learning** (GRPO): 1. Define a reward function in a python file which should include a `reward_func`: ```python # reward.py import numpy as np with open('truth.txt','r') as f: truth = f.read().splitlines() def reward_func(index, text=None, token_ids=None): ''' Args: index: int Unique index of the training prompt. text: List[ str ] One group of responses generated by trained actor. token_ids: List[ List[ int ] ] One group of token ids corresponding to the responses. return: scores: ndarray (group_size,) The reward sorces of this group. ''' ## For example ##: print('responses':text) print('truth':truth[index]) scores = np.random.rand(len(text)) return scores ``` 2. Start inference engines and the GRPO training task according to node ranks. ```shell GPUS_PER_NODE=8 MASTER_ADDR='ip of first node' MASTER_PORT=6000 RAY_ADDR='ip of last node' INFER_NODES=1 INFER_START_RANK=$((NUM_NODES - INFER_NODES)) if [[ $NODE_RANK -eq $INFER_START_RANK ]]; then echo "Starting inference node (Rank $NODE_RANK)" ray start --head --port 6380 python -m jllm.sync_ray $INFER_NODES python -m jllm.vllm --model Qwen3-32B \ --max_model_len 4096 \ --max_num_seqs 8 \ --vllm_tp 8 \ --ray_gpus $((INFER_NODES*8)) \ --vllm_mem 0.8 elif [[ $NODE_RANK -gt $INFER_START_RANK ]]; then python -m jllm.wait_port $RAY_ADDR 6380 ray start --address="$RAY_ADDR:6380" else export HCCL_IF_BASE_PORT=$((NODE_RANK * 16 + 20000)) # avoid ray's port range. echo "Starting training node (Rank $NODE_RANK)" echo "Waiting for inference node to start..." python -m jllm.wait_port $RAY_ADDR 8000 ray start --address="$RAY_ADDR:6380" \ --num-gpus=0 \ --num-cpus=1 \ --memory=$((1 * 1024**3)) \ --object-store-memory=$((4 * 1024**3)) \ --resources='{"NPU":0}' TRAIN_NODES=$((NUM_NODES - INFER_NODES)) WORLD_SIZE=$((GPUS_PER_NODE * TRAIN_NODES)) DISTRIBUTED_ARGS=( --nproc_per_node $GPUS_PER_NODE --nnodes $TRAIN_NODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT ) echo "Starting training with $TRAIN_NODES nodes" torchrun "${DISTRIBUTED_ARGS[@]}" \ -m jllm.train_pipe \ --model Qwen3-32B \ --num_train_epochs 2 \ --train_data rlhf_Qwen3-32B \ --pipe_parallel_size 4 \ --tensor_parallel_size 8 \ --micro_batch_size 2 \ --global_batch_size 2048 \ --partition_method mem \ --only_ckpt_model \ --max_num_checkpoints 2 \ --learning_rate 1e-5 \ --checkpoint checkpoint \ --checkpoint_grad_interval 4 \ --rlhf \ --num_generations 32 \ --max_model_len 4096 \ --vllm_sync_stage 1 \ --ray_ip $RAY_ADDR \ --reward_func reward.py \ --num_vllm_engines $INFER_NODES python -c "import requests;requests.post('http://"$RAY_ADDR":8000/shutdown')" fi ``` image
Figure 1 Up: Synchronous inference and training. Down: Periodic asynchronous inference and training.
### Checkpoint Conversion If argument `--only_ckpt_model` is enabled , engine will directly only checkpoint model's weights with HF's format. You can also convert model's weights from deepspeed's checkpoint to HF's format by `jllm.train_pipe`, such as: ```shell DISTRIBUTED_ARGS=( --nproc_per_node 8 --nnodes 32 --master_addr $MASTER_ADDR --master_port $MASTER_PORT ) torchrun ${DISTRIBUTED_ARGS[@]} \ --module jllm.train_pipe \ --model DeepSeek-R1 \ --train_data dataset0_DeepSeek-R1 \ --pipe_parallel_size 16 \ --tensor_parallel_size 8 \ --expert_parallel_size 2 \ --partition_method 9,5 \ --num_train_epochs 0 \ --from_ckpt checkpoint --tag 1000 \ --output_dir output_path ``` Giving number of devices that could cover one data parallel is enough. ### Weight Merging To concatenate the weights when ` tensor_parallel_size>1`: ```shell python -m jllm.cat2hf \ -C checkpoint_model \ -H huggingface_model ``` ## Supported Models | Model | Training Speed (tokens/s) | | :------------------------------------------------: | :-----------------------: | | qwen3/qwen3-moe | - | | deepseek-v3-685b (includes multi-token prediction) | - | | qwen2.5-vl | - | | qwen2-vl | - | | internvl2 | - | | internlm2 | - | | qwen2/qwen2-moe | - | | ~~qwen-14b~~ | ~~80749.57(old)~~ | | ~~baichuan-13b~~ | ~~79765.50(old)~~ | | llama-13b | 92749.82(old) | ***Note**: The training speed of each model was measured on 64 NVIDIA A100-PCIE-40GB GPUs linked by 100Gb/s bandwidth of InfiniBand with data type of bfloat16 and batch token size of 2048\*2048 (batch_size\*sequence_length, batch_size = micro_batch_size \* gradient_accumulation_steps).* | Model | Training Speed (tokens/s) | | :------: | :-----------------------: | | llama-7b | 26335.232 | *8 NVIDIA A100-PCIE-40GB GPUs, bfloat16, 2304\*2048 tokens/batch.* | Model | Training Speed (tokens/s) | | :---------: | :-----------------------: | | Qwen2.5-72b | 125327.23 | *512 **Ascend-910B-64GB NPUs** of Air-cooled, bfloat16, 4096\*4096 tokens/batch.* ## Advanced Tutorial For Data Processing This step is recommended especially when your data are too big to be loaded to CPU memory at once, such as during pretraining. Here are two methods. ### Python #### Conversion ```shell python -m jllm.raw2ids \ --tokenizer DeepSeek-R1 \ -i dataset0.jsonl \ -o dataset0_DeepSeek-R1 \ --max_len 4097 \ --type pretain \ -n 32768 \ --stack ``` #### Shuffle If you have multiple datasets, you shouldn't skip this step. It could shuffle all the datasets globally by rows like Spark doing. Firstly, move all the datasets stored in parquet folders into one directory. such as `datasets`: ```shell datasets ├── dataset0_DeepSeek-R1 │   ├── dataset0-00000-00000.gzip.parquet │   ├── dataset0-00000-00001.gzip.parquet │  ├── dataset0-00001-00000.gzip.parquet │  ├── dataset0-00001-00001.gzip.parquet │ └── dataset0_info.json └── dataset1_DeepSeek-R1 ├── dataset1-00000-00000.gzip.parquet    ├── dataset1-00000-00001.gzip.parquet ├── dataset1-00001-00000.gzip.parquet ├── dataset1-00001-00001.gzip.parquet └── dataset1_info.json ``` Then run the following command to shuffle the rows inner each dataset and distribute them to new blocks. ```shell python -m jllm.shuffle_datasets -d datasets -o shuffled_datasets -n 4 ``` Every dataset would be shuffled and placed in `shuffled_datasets` with several times of `num_block` parquet files: ```shell shuffled_datasets/ ├── dataset0_DeepSeek-R1-00000-00000.gzip.parquet ├── dataset0_DeepSeek-R1-00000-00001.gzip.parquet ├── dataset0_DeepSeek-R1-00000-00002.gzip.parquet ├── dataset0_DeepSeek-R1-00000-00003.gzip.parquet ├── dataset1_DeepSeek-R1-00000-00000.gzip.parquet ├── dataset1_DeepSeek-R1-00000-00001.gzip.parquet ├── dataset1_DeepSeek-R1-00000-00002.gzip.parquet ├── dataset1_DeepSeek-R1-00000-00003.gzip.parquet ├── dataset0..._info.json └── dataset1..._info.json ``` ### PySpark You can also use **PySpark** to do these steps. jllm could directly read token ids from the parquets those write out by **[Spark]((https://spark.apache.org))** . Shuffle and convert raw data of `jsonl` to token ids of `parquet` by pyspark: ```shell tokenizer="DeepSeek-R1" spark-submit \ --master yarn \ --deploy-mode cluster \ --queue default \ --archives hdfs://tokenizer.tgz#python_env \ --num-executors 32 \ --executor-memory 32G \ --executor-cores 32 \ --driver-memory 8G \ --name 'raw2ids' \ --conf spark.yarn.executor.memoryOverhead=128 \ --conf spark.driver.maxResultSize=4G \ --conf spark.memory.storageFraction=0.8 \ --conf spark.sql.metadataCacheTTLSeconds=86400 \ --conf spark.yarn.priority=100 \ --conf spark.speculation=true \ --conf spark.hadoop.hive.exec.dynamic.partition=true \ --conf spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/tokenizer/bin/python \ --files hdfs://${tokenizer}.tgz \ --py-files hdfs://pyspark.zip \ jllm.raw2ids_spark \ --num_partitions 500 \ --tokenizer ${tokenizer} \ --max_seq_length 4097 \ --input_path hdfs://localhost:9000/jsonl \ --output_path hdfs://localhost:9000/parquet ``` Then transport the parquet files to your training cluster's storage. The train data should be: ```shell train_data/ ├── part-00000-xxx.snappy.parquet ├── part-00100-xxx.snappy.parquet │ ... └── data_info.json ``` `data_info.json` is a necessary file under the folder you should create manually: ```shell { "num_samples": ${num_samples}, "max_len": ${max_seq_length}, "max_num_blocks": ${max_num_blocks}, "fields": [ "input_ids", "cu_seqlens" ] } ``` Values of `num_samples` and `max_num_blocks` will be printed at the last of yarn's logs once the spark tasks are completed successfully . ## Citation If you find EasyLLM useful or use EasyLLM's code in your research, please cite it in your publications. ```bibtex @misc{EasyLLM, author = {Jian Lu}, title = {EasyLLM: Training Large Language Model faster, easily and low-cost.}, year = {2023}, publisher = {GitHub}, journal = {GitHub repository}, howpublished = {\url{https://github.com/janelu9/EasyLLM.git}}, } ``` ## Acknowledgment This repository benefits from [DeepSpeed](https://github.com/microsoft/DeepSpeed), [Flash-Attention](https://github.com/Dao-AILab/flash-attention.git), [vLLM](https://github.com/vllm-project/vllm), [megatron_core](https://github.com/NVIDIA/Megatron-LM/tree/main/megatron/core/tensor_parallel).