Skip to content

Module nonebot_plugin_marshoai.extensions.mcp_extension.server

class Tool


func __init__(self, name: str, description: str, input_schema: dict[str, Any]) -> None

Source code or View on GitHub
python
def __init__(self, name: str, description: str, input_schema: dict[str, Any]) -> None:
    self.name: str = name
    self.description: str = description
    self.input_schema: dict[str, Any] = input_schema

func format_for_llm(self) -> str

Description: 为 llm 生成工具描述

:return: 工具描述

Source code or View on GitHub
python
def format_for_llm(self) -> str:
    args_desc = []
    if 'properties' in self.input_schema:
        for param_name, param_info in self.input_schema['properties'].items():
            arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}"
            if param_name in self.input_schema.get('required', []):
                arg_desc += ' (required)'
            args_desc.append(arg_desc)
    return f'Tool: {self.name}\nDescription: {self.description}\nArguments:{chr(10).join(args_desc)}'

class Server


func __init__(self, name: str, config: mcpConfig) -> None

Source code or View on GitHub
python
def __init__(self, name: str, config: mcpConfig) -> None:
    self.name: str = name
    self.config: mcpConfig = config
    self.session: ClientSession | None = None
    self._cleanup_lock: asyncio.Lock = asyncio.Lock()
    self.exit_stack: AsyncExitStack = AsyncExitStack()
    self._transport_initializers = {'stdio': self._initialize_stdio, 'sse': self._initialize_sse, 'streamable_http': self._initialize_streamable_http}

async func initialize(self) -> None

Description: 初始化实例

Source code or View on GitHub
python
async def initialize(self) -> None:
    transport = self.config.type
    initializer = self._transport_initializers[transport]
    read, write = await initializer()
    session = await self.exit_stack.enter_async_context(ClientSession(read, write))
    await session.initialize()
    self.session = session

async func list_tools(self) -> list[Tool]

Description: 从 MCP 服务器获得可用工具列表

:return: 工具列表

:raises RuntimeError: 如果服务器未启动

Source code or View on GitHub
python
async def list_tools(self) -> list[Tool]:
    if not self.session:
        raise RuntimeError(f'Server {self.name} not initialized')
    tools_response = await self.session.list_tools()
    tools: list[Tool] = []
    for item in tools_response:
        if isinstance(item, tuple) and item[0] == 'tools':
            tools.extend((Tool(tool.name, tool.description, tool.inputSchema) for tool in item[1]))
    return tools

async func execute_tool(self, tool_name: str, arguments: Optional[dict[str, Any]] = None, retries: int = 2, delay: float = 1.0) -> Any

Description: 执行一个 MCP 工具

:param tool_name: 工具名称 :param arguments: 工具参数 :param retries: 重试次数 :param delay: 重试间隔

:return: 工具执行结果

:raises RuntimeError: 如果服务器未初始化 :raises Exception: 工具在所有重试中均失败

Source code or View on GitHub
python
async def execute_tool(self, tool_name: str, arguments: Optional[dict[str, Any]]=None, retries: int=2, delay: float=1.0) -> Any:
    if not self.session:
        raise RuntimeError(f'Server {self.name} not initialized')
    attempt = 0
    while attempt < retries:
        try:
            logging.info(f'Executing {tool_name}...')
            result = await self.session.call_tool(tool_name, arguments)
            return result
        except Exception as e:
            attempt += 1
            logging.warning(f'Error executing tool: {e}. Attempt {attempt} of {retries}.')
            if attempt < retries:
                logging.info(f'Retrying in {delay} seconds...')
                await asyncio.sleep(delay)
            else:
                logging.error('Max retries reached. Failing.')
                raise

async func cleanup(self) -> None

Description: Clean up server resources.

Source code or View on GitHub
python
async def cleanup(self) -> None:
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
            self.session = None
        except Exception as e:
            logging.error(f'Error during cleanup of server {self.name}: {e}')