diff --git a/vllm_mindspore/__init__.py b/vllm_mindspore/__init__.py index a2d0c6e0380473208305e4defcf8aab4f57c0003..b20703aa7ab6928fa589a966107c5ce3dd1aff72 100644 --- a/vllm_mindspore/__init__.py +++ b/vllm_mindspore/__init__.py @@ -37,6 +37,8 @@ from vllm_mindspore.scripts import env_setup env_setup() +is_dispatch_req_all_depend_core_client = True + # 2. update the log configuration ahead of other modifications. import vllm_mindspore.logger # noqa F401 @@ -81,9 +83,10 @@ vllm.engine.arg_utils.EngineArgs._is_v1_supported_oracle = ( vllm.engine.arg_utils.EngineArgs._set_default_args_v1 = _set_default_args_v1 import vllm.v1.engine.core -from vllm_mindspore.v1.engine.core import shutdown +from vllm_mindspore.v1.engine.core import shutdown, init_engine_core vllm.v1.engine.core.DPEngineCoreProc.shutdown = shutdown +vllm.v1.engine.core.EngineCoreProc.__init__ = init_engine_core from vllm_mindspore.v1.core.kv_cache_utils import get_kv_cache_config @@ -563,10 +566,12 @@ vllm.entrypoints.cli.serve.CoreEngine = MsCoreEngine vllm.v1.engine.core_client.CoreEngine = MsCoreEngine vllm.v1.utils.CoreEngine = MsCoreEngine -from vllm.v1.engine.core_client import DPAsyncMPClient +if is_dispatch_req_all_depend_core_client: + from vllm.v1.engine.core_client import DPAsyncMPClient -DPAsyncMPClient.get_core_engine_for_request = get_core_engine_for_request -DPAsyncMPClient.add_request_async = add_request_async -DPAsyncMPClient.process_engine_outputs = staticmethod(process_engine_outputs) + DPAsyncMPClient.get_core_engine_for_request = get_core_engine_for_request + DPAsyncMPClient.add_request_async = add_request_async + DPAsyncMPClient.process_engine_outputs = staticmethod( + process_engine_outputs) check_ready() diff --git a/vllm_mindspore/v1/engine/core.py b/vllm_mindspore/v1/engine/core.py index 6f83e8241db161c498009fe16c54e4e1f1729262..ea63ade8bd0ad831d49a7014ffdbd2e2a64c97c2 100644 --- a/vllm_mindspore/v1/engine/core.py +++ b/vllm_mindspore/v1/engine/core.py @@ -18,9 +18,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing +import queue +import threading +from typing import Union + from vllm.config import VllmConfig +from vllm.v1.engine import EngineCoreOutputs, EngineCoreRequestType +from vllm.v1.engine.core import EngineCoreProc from vllm.v1.executor.abstract import Executor -from vllm.v1.utils import EngineZmqAddresses from vllm_mindspore.config import stateless_destroy_socket_process_group @@ -32,26 +38,57 @@ def shutdown(self): stateless_destroy_socket_process_group(dp_group) -def init_dp_engine_core_actor( +def init_engine_core( self, vllm_config: VllmConfig, on_head_node: bool, - addresses: EngineZmqAddresses, + handshake_address: str, executor_class: type[Executor], log_stats: bool, - dp_rank: int = 0, - local_dp_rank: int = 0, + engine_index: int = 0, ): - self.addresses = addresses - vllm_config.parallel_config.data_parallel_rank = dp_rank - vllm_config.parallel_config.data_parallel_rank_local = \ - local_dp_rank + # vLLM-MindSpore Plugin: When communicating between processes, it is + # necessary to use a inter-process safe queue. + self.input_queue = multiprocessing.Queue() + self.output_queue = queue.Queue[Union[tuple[int, EngineCoreOutputs], + bytes]]() + executor_fail_callback = lambda: self.input_queue.put_nowait( + (EngineCoreRequestType.EXECUTOR_FAILED, b'')) + + self.engine_index = engine_index + identity = self.engine_index.to_bytes(length=2, byteorder="little") + self.engines_running = False + + with self._perform_handshake(handshake_address, identity, on_head_node, + vllm_config) as addresses: + self.client_count = len(addresses.outputs) + + # Set up data parallel environment. + self.has_coordinator = addresses.coordinator_output is not None + self._init_data_parallel(vllm_config) + + super(EngineCoreProc, self).__init__(vllm_config, executor_class, + log_stats, executor_fail_callback) - # vllm-mindspore: Do not reset current_platform.device_control_env_var for - # mp executor when data parallel use ray, because setting it after - # `import mindspore` will error in set device. + self.step_fn = (self.step if self.batch_queue is None else + self.step_with_batch_queue) - from vllm.v1.engine.core import DPEngineCoreActor + # vLLM-MindSpore Plugin: Call process_input_sockets through thread may + # become too slow when main process is under high load. Change it to a new + # sub-process to speed up message processing. + multiprocessing.Process(target=self.process_input_sockets, + args=(addresses.inputs, + addresses.coordinator_input, identity), + daemon=True).start() - super(DPEngineCoreActor, self).__init__(vllm_config, on_head_node, "", - executor_class, log_stats) + # Background Threads and Queues for IO. These enable us to + # overlap ZMQ socket IO with GPU since they release the GIL, + # and to overlap some serialization/deserialization with the + # model forward pass. + # Threads handle Socket <-> Queues and core_busy_loop uses Queue. + self.output_thread = threading.Thread(target=self.process_output_sockets, + args=(addresses.outputs, + addresses.coordinator_output, + self.engine_index), + daemon=True) + self.output_thread.start()