1 Star 0 Fork 0

皮豪 / lsp-bridge

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
lsp_bridge.py 37.41 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Andy Stewart
#
# Author: Andy Stewart <lazycat.manatee@gmail.com>
# Maintainer: Andy Stewart <lazycat.manatee@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import queue
import shutil
import threading
import traceback
import json
import time
from functools import wraps
from pathlib import Path
from epc.server import ThreadingEPCServer
from core.fileaction import (create_file_action_with_single_server,
create_file_action_with_multi_servers,
FILE_ACTION_DICT, LSP_SERVER_DICT)
from core.lspserver import LspServer
from core.search_file_words import SearchFileWords
from core.search_sdcv_words import SearchSdcvWords
from core.search_list import SearchList
from core.search_tailwindcss_keywords import SearchTailwindKeywords
from core.search_paths import SearchPaths
from core.tabnine import TabNine
from core.codeium import Codeium
from core.copilot import Copilot
from core.utils import *
from core.handler import *
from core.remote_file import (
RemoteFileClient,
FileSyncServer,
FileElispServer,
FileCommandServer,
SendMessageException,
save_ip
)
from core.ctags import Ctags
def threaded(func):
@wraps(func)
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.start()
if hasattr(args[0], 'thread_queue'):
args[0].thread_queue.append(thread)
return wrapper
REMOTE_FILE_SYNC_CHANNEL = 9999
REMOTE_FILE_COMMAND_CHANNEL = 9998
REMOTE_FILE_ELISP_CHANNEL = 9997
remote_server_ports = [
REMOTE_FILE_SYNC_CHANNEL,
REMOTE_FILE_COMMAND_CHANNEL,
REMOTE_FILE_ELISP_CHANNEL
]
class LspBridge:
def __init__(self, args):
# Check running environment.
self.running_in_server = len(args) == 0
if self.running_in_server:
set_running_in_server()
# Build EPC server.
self.server = ThreadingEPCServer(('127.0.0.1', 0), log_traceback=True)
self.server.allow_reuse_address = True
self.server.register_instance(self) # register instance functions let elisp side call
# Start EPC server.
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
# Init variables.
self.thread_queue = []
self.client_dict = {}
self.lsp_client_dict = {}
self.host_names = {}
# Init event loop.
self.event_queue = queue.Queue()
self.event_loop = threading.Thread(target=self.event_dispatcher)
self.event_loop.start()
self.sync_tramp_remote_complete_event = threading.Event()
if self.running_in_server:
self.init_search_backends_complete_event = threading.Event()
# Start remote file server if lsp-bridge running in server.
self.init_remote_file_server()
else:
# Init EPC client and search backends if lsp-bridge running in local.
init_epc_client(int(args[0]))
self.init_search_backends()
eval_in_emacs('lsp-bridge--first-start', self.server.server_address[1])
# event_loop never exit, simulation event loop.
self.event_loop.join()
def message_hostnames(self):
message_emacs(f"host_names:{self.host_names}")
# Functions for initialization
def init_search_backends(self):
# Init tabnine.
self.tabnine = TabNine()
# Init codeium
self.codeium = Codeium()
# Init copilot
self.copilot = Copilot()
# Init search backends.
self.search_file_words = SearchFileWords()
self.search_sdcv_words = SearchSdcvWords()
self.search_list = SearchList()
self.search_tailwind_keywords = SearchTailwindKeywords()
self.search_paths = SearchPaths()
self.ctags = Ctags()
# Build EPC interfaces.
handler_subclasses = list(map(lambda cls: cls.name, Handler.__subclasses__()))
for name in ["change_file", "update_file", "save_file",
"try_completion", "try_formatting",
"change_cursor",
"list_diagnostics",
"try_code_action",
"workspace_symbol"] + handler_subclasses:
self.build_file_action_function(name)
search_backend_export_functions = {
"search_file_words": ["index_files", "change_buffer", "load_file", "close_file", "search"],
"search_sdcv_words": ["search"],
"search_list": ["search", "update"],
"search_tailwind_keywords": ["search"],
"search_paths": ["search"]
}
for search_backend, export_functions in search_backend_export_functions.items():
for name in export_functions:
self.build_prefix_function(search_backend, search_backend, name)
# Set log level.
[enable_lsp_server_log] = get_emacs_vars(["lsp-bridge-enable-log"])
if enable_lsp_server_log:
logger.setLevel(logging.DEBUG)
# All LSP server response running in message_thread.
self.message_queue = queue.Queue()
self.message_thread = threading.Thread(target=self.message_dispatcher)
self.message_thread.start()
if not self.running_in_server:
remote_threads = {
"send_message_dispatcher": {
# Build loop to open remote file.
"remote_file_sender_queue": REMOTE_FILE_SYNC_CHANNEL,
# Build loop to call remote Python command.
"remote_file_command_sender_queue": REMOTE_FILE_COMMAND_CHANNEL,
# Build loop to reply remote rpc command.
"remote_file_elisp_sender_queue": REMOTE_FILE_ELISP_CHANNEL
},
"receive_message_dispatcher": {
# Build loop to receive remote file to local Emacs.
"remote_file_receiver_queue": "handle_remote_file_message",
# Build loop to receive remote Python command response from remote server to local Emacs.
"remote_file_command_receiver_queue": "handle_lsp_message",
# Build loop to receive remote rpc from remote server to local Emacs.
"remote_file_elisp_receiver_queue": "handle_file_elisp_message"
}
}
for dispatcher, args in remote_threads.items():
for q, v in args.items():
newq = queue.Queue()
setattr(self, q, newq)
if not isinstance(v, int):
v = getattr(self, v)
threading.Thread(
target=getattr(self, dispatcher),
args=(newq, v)
).start()
@threaded
def init_remote_file_server(self):
print("* Running lsp-bridge in remote server, use command 'lsp-bridge-open-remote-file' to open remote file.")
# Build loop for remote files management.
self.file_server = FileSyncServer("0.0.0.0", REMOTE_FILE_SYNC_CHANNEL)
# Build loop for call local Emacs function from server.
self.file_elisp_server = FileElispServer("0.0.0.0", REMOTE_FILE_ELISP_CHANNEL, self)
# Build loop for call remote command from local Emacs.
self.file_command_server = FileCommandServer("0.0.0.0", REMOTE_FILE_COMMAND_CHANNEL, self)
set_lsp_bridge_server(self)
# Functions for communication between local and remote server
def get_socket_client(self, server_host, server_port, is_retry=False):
if server_host not in self.host_names:
message_emacs(f"{server_host} is not connected, try reconnect...")
self.sync_tramp_remote_complete_event.clear()
eval_in_emacs('lsp-bridge-remote-reconnect', server_host)
self.sync_tramp_remote_complete_event.wait()
message_emacs(f"{server_host} connected.")
client_id = f"{server_host}:{server_port}"
if client_id in self.client_dict:
return self.client_dict[client_id]
import paramiko
ssh_conf = self.host_names[server_host]
try:
client = RemoteFileClient(
ssh_conf,
server_port,
lambda message: self.receive_remote_message(message, server_port),
)
except paramiko.AuthenticationException:
# cloud not login server
message_emacs(f"login {ssh_conf} failed, please check *lsp-bridge*")
return None
try:
client.create_channel()
except paramiko.ChannelException:
# Channel Exception indicates that we clould not established channel
# the remote process may not exist, try to start the process
if is_retry:
return None
[remote_start_automatically] = get_emacs_vars(["lsp-bridge-remote-start-automatically"])
if not remote_start_automatically:
message_emacs(f"please make sure `lsp_bridge.py` has start at server {server_host} (lsp-bridge-remote-start-automatically is disabled)")
return None
# try to start remote process if it does not exist
message_emacs(f"Start lsp-bridge process on {server_host} automatically...")
client.start_lsp_bridge_process()
# wait a while for the remote process to be ready
time.sleep(2)
# if client is not None, it has been started and put into client_dict
return self.get_socket_client(server_host, server_port, is_retry=True)
else:
client.start()
self.client_dict[client_id] = client
return client
def send_remote_message(self, host, queue, message, wait=False):
queue.put({"host": host, "message": message})
if wait:
queue.join()
def send_message_dispatcher(self, queue, port):
try:
while True:
data = queue.get(True)
server_host = data["host"]
client = self.get_socket_client(server_host, port)
try:
client.send_message(data["message"])
except SendMessageException as e:
# lsp-bridge process might has been restarted, making the orignal socket no longer valid.
logger.exception("Channel %s is broken, message %s, error %s", f"{server_host}:{port}", data["message"], e)
# remove all the clients for server_host from client_dict
# client will be created again when get_socket_client is called
for p in remote_server_ports:
self.client_dict.pop(f"{server_host}:{p}", None)
message_emacs(f"try to recreate channel to {server_host}:{port}")
try:
client = self.get_socket_client(server_host, port)
except Exception as e:
# unable to restore the channel
logger.exception(e)
else:
# try to send the message
client.send_message(data["message"])
eval_in_emacs('lsp-bridge-remote-reconnect', server_host)
except Exception as e:
logger.exception(e)
finally:
queue.task_done()
except:
logger.error(traceback.format_exc())
def receive_remote_message(self, message, server_port):
if server_port == REMOTE_FILE_SYNC_CHANNEL:
self.remote_file_receiver_queue.put(message)
elif server_port == REMOTE_FILE_COMMAND_CHANNEL:
self.remote_file_command_receiver_queue.put(message)
elif server_port == REMOTE_FILE_ELISP_CHANNEL:
self.remote_file_elisp_receiver_queue.put(message)
def receive_message_dispatcher(self, queue, handle_remote_message):
try:
while True:
message = queue.get(True)
handle_remote_message(message)
queue.task_done()
except:
logger.error(traceback.format_exc())
def remote_sync(self, host, remote_info):
client_id = f"{host}:{REMOTE_FILE_ELISP_CHANNEL}"
if client_id not in self.client_dict:
# send "say hello" upon establishing the first connection
self.send_remote_message(
host, self.remote_file_elisp_sender_queue, "Connect", True)
self.send_remote_message(
host, self.remote_file_sender_queue, {
"command": "remote_sync",
"server": host,
"remote_connection_info": remote_info,
})
@threaded
def sync_tramp_remote(self, tramp_file_name, server_username, server_host, ssh_port, path):
# arguments are passed from emacs using standard TRAMP functions tramp-file-name-<field>
if server_host in self.host_names:
server_host = self.host_names[server_host]['hostname']
ssh_conf = self.host_names[server_host]
elif is_valid_ip(server_host):
ssh_conf = {'hostname' : server_host}
else:
import paramiko
alias = server_host
ssh_config = paramiko.SSHConfig()
with open(os.path.expanduser('~/.ssh/config')) as f:
ssh_config.parse(f)
ssh_conf = ssh_config.lookup(alias)
server_host = ssh_conf.get('hostname', server_host)
self.host_names[alias] = ssh_conf
if not is_valid_ip(server_host):
message_emacs("HostName Must be IP format.")
if server_username:
ssh_conf['user'] = server_username
if ssh_port:
ssh_conf['port'] = ssh_port
self.host_names[server_host] = ssh_conf
# this value is like /ssh:user@ip#port:
# it should not be called tramp-method to avoid confusion
# we call it TRAMP connection information
tramp_connection_info = tramp_file_name.rsplit(":", 1)[0] + ":"
self.remote_sync(server_host, tramp_connection_info)
eval_in_emacs("lsp-bridge-update-tramp-file-info", tramp_file_name, tramp_connection_info, server_host, path)
self.sync_tramp_remote_complete_event.set()
# Functions for local to manage remote files
@threaded
def open_remote_file(self, path, jump_define_pos):
if is_valid_ip_path(path):
# open_remote_file path notation
# ip:path
# ip:port:path
path_info = split_ssh_path(path)
if path_info:
(remote_info, server_host, ssh_conf, server_path) = path_info
self.host_names[server_host] = ssh_conf
self.remote_sync(server_host, f"/{remote_info}")
message_emacs(f"Open {remote_info}{server_path}...")
# Add TRAMP-related fields
# The following fields: tramp_method, user, server, port, and path
# are set as buffer-local variables in the buffer created by Emacs.
# These variables facilitate the construction of a TRAMP file name,
# and then allow the buffer to reconnect to a restarted remote lsp-bridge process
# using the same logic with reconnecting a TRAMP remote file buffer.
self.send_remote_message(
server_host, self.remote_file_sender_queue, {
"command": "open_file",
"tramp_method": "ssh",
"user": ssh_conf.get('user'),
"server": server_host,
"port": ssh_conf.get('port'),
"path": server_path,
"jump_define_pos": epc_arg_transformer(jump_define_pos)
})
save_ip(f"{remote_info}")
else:
message_emacs("Please input valid path match rule: 'ip:/path/file'.")
@threaded
def save_remote_file(self, remote_file_host, remote_file_path):
self.send_remote_message(
remote_file_host, self.remote_file_sender_queue, {
"command": "save_file",
"server": remote_file_host,
"path": remote_file_path
})
@threaded
def close_remote_file(self, remote_file_host, remote_file_path):
self.send_remote_message(
remote_file_host, self.remote_file_sender_queue, {
"command": "close_file",
"server": remote_file_host,
"path": remote_file_path
})
# Functions for local to send request to remote server
@threaded
def lsp_request(self, remote_file_host, remote_file_path, method, args):
if method == "change_file":
self.send_remote_message(
remote_file_host, self.remote_file_sender_queue, {
"command": "change_file",
"server": remote_file_host,
"path": remote_file_path,
"args": list(map(epc_arg_transformer, args))
})
self.send_remote_message(
remote_file_host, self.remote_file_command_sender_queue, {
"command": "lsp_request",
"server": remote_file_host,
"path": remote_file_path,
"method": method,
"args": list(map(epc_arg_transformer, args))
})
@threaded
def func_request(self, remote_file_host, remote_file_path, method, args):
self.send_remote_message(
remote_file_host, self.remote_file_command_sender_queue, {
"command": "func_request",
"server": remote_file_host,
"path": remote_file_path,
"method": method,
"args": list(map(epc_arg_transformer, args))
})
# Functions for local to handle messages from remote server
@threaded
def handle_remote_file_message(self, message):
command = message["command"]
if command == "open_file":
if "error" in message:
message_emacs(message["error"])
else:
eval_in_emacs(
"lsp-bridge-open-remote-file--response",
message["tramp_method"],
message["user"],
message["server"],
message["port"],
message["path"],
string_to_base64(message["content"]),
message["jump_define_pos"],
)
message_emacs(f"Open file {message['path']} on {message['server']}")
@threaded
def handle_lsp_message(self, message):
if message["command"] == "eval-in-emacs":
# Execute emacs command from remote server.
eval_sexp_in_emacs(message["sexp"])
@threaded
def handle_file_elisp_message(self, message):
# Receive elisp RPC call from remote server.
log_time(f"Receive server elisp RPC: {message}")
host = message["host"]
# Read elisp code from local Emacs, and sendback to remote server.
if message["command"] == "get_emacs_func_result":
result = get_emacs_func_result(message["method"], *message["args"])
elif message["command"] == "get_emacs_vars":
result = get_emacs_vars(message["args"])
else:
logger.error("Unsupported command %s", message["command"])
result = None
self.send_remote_message(host, self.remote_file_elisp_sender_queue, result)
# Functions for local handling
def event_dispatcher(self):
try:
while True:
message = self.event_queue.get(True)
if message["name"] == "close_file":
self._close_file(message["content"])
elif message["name"] == "action_func":
(func_name, func_args) = message["content"]
getattr(self, func_name)(*func_args)
self.event_queue.task_done()
except:
logger.error(traceback.format_exc())
def message_dispatcher(self):
try:
while True:
message = self.message_queue.get(True)
if message["name"] == "server_process_exit":
self.handle_server_process_exit(message["content"])
else:
logger.error("Unhandled lsp-bridge message: %s" % message)
self.message_queue.task_done()
except:
logger.error(traceback.format_exc())
def rename_file(self, old_filepath, new_filepath):
if is_in_path_dict(FILE_ACTION_DICT, old_filepath):
get_from_path_dict(FILE_ACTION_DICT, old_filepath).rename_file(old_filepath, new_filepath)
def fetch_completion_item_info(self, filepath, item_key, server_name):
if is_in_path_dict(FILE_ACTION_DICT, filepath):
get_from_path_dict(FILE_ACTION_DICT, filepath).completion_item_resolve(item_key, server_name)
def open_file(self, filepath):
project_path = get_project_path(filepath)
multi_lang_server = get_emacs_func_result("get-multi-lang-server", project_path, filepath)
if os.path.splitext(filepath)[-1] == '.org':
single_lang_server = get_emacs_func_result("get-single-lang-server", project_path, filepath)
lang_server_info = load_single_server_info(single_lang_server)
#TODO support diagnostic
lsp_server = self.create_lsp_server(filepath, project_path, lang_server_info, enable_diagnostics=False)
create_file_action_with_single_server(filepath, lang_server_info, lsp_server)
elif multi_lang_server:
# Try to load multi language server when get-multi-lang-server return match one.
multi_lang_server_dir = Path(__file__).resolve().parent / "multiserver"
multi_lang_server_path = multi_lang_server_dir / "{}.json".format(multi_lang_server)
user_multi_lang_server_dir = Path(str(get_emacs_vars(["lsp-bridge-user-multiserver-dir"])[0])).expanduser()
user_multi_lang_server_path = user_multi_lang_server_dir / "{}.json".format(multi_lang_server)
if user_multi_lang_server_path.exists():
multi_lang_server_path = user_multi_lang_server_path
with open(multi_lang_server_path, encoding="utf-8", errors="ignore") as f:
multi_lang_server_info = json.load(f)
servers = self.pick_multi_server_names(multi_lang_server_info)
# Load multi language server only when all language server commands exist.
if self.check_multi_server_command(servers, filepath):
multi_servers = {}
for server_name in servers:
server_path = get_lang_server_path(server_name)
with open(server_path, encoding="utf-8", errors="ignore") as server_path_file:
lang_server_info = read_lang_server_info(server_path_file)
lsp_server = self.create_lsp_server(
filepath,
project_path,
lang_server_info,
server_name in multi_lang_server_info.get("diagnostics", []))
if lsp_server:
multi_servers[lang_server_info["name"]] = lsp_server
else:
return False
self.enjoy_hacking(servers, project_path)
create_file_action_with_multi_servers(filepath, multi_lang_server_info, multi_servers)
else:
# Try load single language server if multi language server load failed.
single_lang_server = get_emacs_func_result("get-single-lang-server", project_path, filepath)
if single_lang_server:
return self.load_single_lang_server(project_path, filepath)
else:
self.turn_off(
filepath,
"ERROR: can't find all command of multi-server for {}, haven't found match single-server".format(filepath))
return False
else:
# Try to load single language server.
return self.load_single_lang_server(project_path, filepath)
return True
def close_file(self, filepath):
# Add queue, make sure close file after other LSP request.
self.event_queue.put({
"name": "close_file",
"content": filepath
})
def _close_file(self, filepath):
if is_in_path_dict(FILE_ACTION_DICT, filepath):
get_from_path_dict(FILE_ACTION_DICT, filepath).exit()
def close_all_files(self):
FILE_ACTION_DICT.clear()
for lsp_server in LSP_SERVER_DICT.values():
lsp_server.exit()
LSP_SERVER_DICT.clear()
def enjoy_hacking(self, servers, project_path):
# Notify user server is ready.
print("Start lsp server ({}) for {}".format(", ".join(servers), project_path))
message_emacs("Active {} '{}', enjoy hacking!".format(
"project" if os.path.isdir(project_path) else "file",
os.path.basename(project_path.rstrip(os.path.sep))))
def load_single_lang_server(self, project_path, filepath):
single_lang_server = get_emacs_func_result("get-single-lang-server", project_path, filepath)
if not single_lang_server:
self.turn_off(filepath, "ERROR: can't find the corresponding server for {}".format(filepath))
return False
lang_server_info = load_single_server_info(single_lang_server)
if ((not os.path.isdir(project_path)) and
"support-single-file" in lang_server_info and
lang_server_info["support-single-file"] is False):
self.turn_off(
filepath,
"ERROR: {} not support single-file, you need put this file in a git repository".format(single_lang_server))
return False
lsp_server = self.create_lsp_server(filepath, project_path, lang_server_info)
if lsp_server:
self.enjoy_hacking([lang_server_info["name"]], project_path)
create_file_action_with_single_server(filepath, lang_server_info, lsp_server)
else:
return False
return True
def turn_off(self, filepath, message):
if os.path.splitext(filepath)[1] != ".txt":
message_emacs(message + ", disable LSP feature.")
eval_in_emacs("lsp-bridge--turn-off-lsp-feature", filepath, get_lsp_file_host())
def check_lang_server_command(self, lang_server_info, filepath, turn_off_on_error=True):
# We merge PATH from `exec-path` variable, to make sure lsp-bridge find LSP server command if it can find by Emacs.
merge_emacs_exec_path()
if len(lang_server_info["command"]) > 0:
server_command = lang_server_info["command"][0]
server_command_path = shutil.which(server_command)
if server_command_path:
# We always replace LSP server command with absolute path of 'which' command.
lang_server_info["command"][0] = server_command_path
# message back the LSP server command path to emacs for verification
# as some languages have runtime environment isolation
# for example python virtualenv, NodeJS nvm, Ruby RVM
message_emacs(f"found language server: {server_command_path}")
return True
else:
error_message = "Error: can't find command '{}' to start LSP server {} ({})".format(
server_command, lang_server_info["name"], filepath)
if turn_off_on_error:
self.turn_off(filepath, error_message)
else:
message_emacs(error_message)
return False
else:
error_message = "Error: {}'s command argument is empty".format(filepath)
if turn_off_on_error:
self.turn_off(filepath, error_message)
else:
message_emacs(error_message)
return False
def check_multi_server_command(self, server_names, filepath):
for server_name in server_names:
server_path = get_lang_server_path(server_name)
with open(server_path, encoding="utf-8", errors="ignore") as server_path_file:
lang_server_info = read_lang_server_info(server_path_file)
if not self.check_lang_server_command(lang_server_info, filepath, False):
return False
return True
def create_lsp_server(self, filepath, project_path, lang_server_info, enable_diagnostics=True):
if not self.check_lang_server_command(lang_server_info, filepath):
return False
lsp_server_name = "{}#{}".format(path_as_key(project_path), lang_server_info["name"])
if lsp_server_name not in LSP_SERVER_DICT:
LSP_SERVER_DICT[lsp_server_name] = LspServer(
message_queue=self.message_queue,
project_path=project_path,
server_info=lang_server_info,
server_name=lsp_server_name,
enable_diagnostics=enable_diagnostics)
return LSP_SERVER_DICT[lsp_server_name]
def pick_multi_server_names(self, multi_lang_server_info):
servers = []
for info in multi_lang_server_info:
info_value = multi_lang_server_info[info]
if isinstance(info_value, str):
servers.append(info_value)
else:
servers += info_value
return list(dict.fromkeys(servers))
def maybe_create_org_babel_server(self, filepath):
action = get_from_path_dict(FILE_ACTION_DICT, filepath)
current_lang_server = get_emacs_func_result("get-single-lang-server",
action.single_server.project_path, filepath)
lsp_server_name = "{}#{}".format(action.single_server.project_path, current_lang_server)
if lsp_server_name != action.single_server.server_name and isinstance(current_lang_server, str):
if lsp_server_name not in action.org_lang_servers:
lang_server_info = load_single_server_info(current_lang_server)
server = self.create_lsp_server(filepath, action.single_server.project_path,
lang_server_info, enable_diagnostics=False)
action.org_lang_servers[lsp_server_name] = server
action.org_server_infos[lsp_server_name] = lang_server_info
server.attach(action)
else:
lang_server_info = action.org_server_infos[lsp_server_name]
server = action.org_lang_servers[lsp_server_name]
action.single_server = server
action.single_server_info = lang_server_info
action.set_lsp_server()
def build_file_action_function(self, name):
def _do(filepath, *args):
if is_remote_path(filepath):
return
open_file_success = True
if not is_in_path_dict(FILE_ACTION_DICT, filepath):
open_file_success = self.open_file(filepath) # _do is called inside event_loop, so we can block here.
elif os.path.splitext(filepath)[-1] == '.org' and get_emacs_vars(['lsp-bridge-enable-org-babel'])[0]:
# check weather need create new lsp server
self.maybe_create_org_babel_server(filepath)
if open_file_success:
action = get_from_path_dict(FILE_ACTION_DICT, filepath)
action.call(name, *args)
setattr(self, "_{}".format(name), _do)
def _do_wrap(*args):
# To prevent long-time calculations from blocking Emacs, we need to put the function into event loop.
self.event_queue.put({
"name": "action_func",
"content": ("_{}".format(name), list(map(epc_arg_transformer, args)))
})
setattr(self, name, _do_wrap)
def build_prefix_function(self, obj_name, prefix, name):
def _do(*args, **kwargs):
getattr(getattr(self, obj_name), name)(*args, **kwargs)
setattr(self, "{}_{}".format(prefix, name), _do)
def tabnine_complete(self, before, after, filename, region_includes_beginning, region_includes_end, max_num_results):
self.tabnine.complete(before, after, filename, region_includes_beginning, region_includes_end, max_num_results)
@threaded
def ctags_complete(self, symbol, filename, cursor_offset):
self.ctags.make_complete(symbol, filename, cursor_offset)
def copilot_complete(self, position, editor_mode, file_path, relative_path, tab_size, text, insert_spaces):
self.copilot.complete(position, editor_mode, file_path, relative_path, tab_size, text, insert_spaces)
def codeium_complete(self, cursor_offset, editor_language, tab_size, text, insert_spaces, prefix, language):
self.codeium.complete(cursor_offset, editor_language, tab_size, text, insert_spaces, prefix, language)
def codeium_completion_accept(self, id):
self.codeium.accept(id)
def codeium_auth(self):
self.codeium.auth()
def copilot_login(self):
self.copilot.login()
def copilot_logout(self):
self.copilot.logout()
def copilot_status(self):
self.copilot.check_status()
def copilot_completion_accept(self, id):
self.copilot.accept(id)
def codeium_get_api_key(self, auth_token):
self.codeium.get_api_key(auth_token)
def handle_server_process_exit(self, server_name):
if server_name in LSP_SERVER_DICT:
log_time("Exit server {}".format(server_name))
del LSP_SERVER_DICT[server_name]
def cleanup(self):
"""Do some cleanup before exit python process."""
close_epc_client()
def start_test(self):
# Called from lsp-bridge-test.el to start test.
from test.test import start_test
start_test(self)
def profile_dump(self):
try:
global profiler
profiler.dump_stats(os.path.expanduser("~/lsp-bridge.prof"))
message_emacs("Output profile data to ~/lsp-bridge.prof, please use snakeviz open it.")
except:
message_emacs("Set option 'lsp-bridge-enable-profile' to 't' and call lsp-bridge-restart-process, then call lsp-bridge-profile-dump again.")
def read_lang_server_info(lang_server_path):
lang_server_info = json.load(lang_server_path)
# Replace template in command options.
command_args = lang_server_info["command"]
for i, arg in enumerate(command_args):
command_args[i] = replace_template(arg)
lang_server_info["command"] = command_args
# Replace template in initializationOptions.
if "initializationOptions" in lang_server_info:
initialization_options_args = lang_server_info["initializationOptions"]
for i, arg in enumerate(initialization_options_args):
if isinstance(initialization_options_args[arg], str):
initialization_options_args[arg] = replace_template(initialization_options_args[arg])
lang_server_info["initializationOptions"] = initialization_options_args
return lang_server_info
def load_single_server_info(lang_server):
lang_server_info_path = ""
if os.path.exists(lang_server) and os.path.dirname(lang_server) != "":
# If lang_server is real file path, we load the LSP server configuration from the user specified file.
lang_server_info_path = lang_server
else:
# Otherwise, we load LSP server configuration from file lsp-bridge/langserver/lang_server.json.
lang_server_info_path = get_lang_server_path(lang_server)
with open(lang_server_info_path, encoding="utf-8", errors="ignore") as f:
return read_lang_server_info(f)
def get_lang_server_path(server_name):
server_dir = Path(__file__).resolve().parent / "langserver"
server_path_current = server_dir / "{}_{}.json".format(server_name, get_os_name())
server_path_default = server_dir / "{}.json".format(server_name)
user_server_dir = Path(str(get_emacs_vars(["lsp-bridge-user-langserver-dir"])[0])).expanduser()
user_server_path_current = user_server_dir / "{}_{}.json".format(server_name, get_os_name())
user_server_path_default = user_server_dir / "{}.json".format(server_name)
if user_server_path_current.exists():
server_path_current = user_server_path_current
elif user_server_path_default.exists():
server_path_current = user_server_path_default
return server_path_current if server_path_current.exists() else server_path_default
if __name__ == "__main__":
if len(sys.argv) >= 3:
import cProfile
profiler = cProfile.Profile()
profiler.run("LspBridge(sys.argv[1:])")
else:
LspBridge(sys.argv[1:])
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pphboy/lsp-bridge.git
git@gitee.com:pphboy/lsp-bridge.git
pphboy
lsp-bridge
lsp-bridge
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891