模块 nonebot_plugin_marshoai.extensions.mcp_extension.server
class Tool
func __init__(self, name: str, description: str, input_schema: dict[str, Any]) -> None
源代码 或 在GitHub上查看
python
def __init__(self, name: str, description: str, input_schema: dict[str, Any]) -> None:
self.name: str = name
self.description: str = description
self.input_schema: dict[str, Any] = input_schema
func format_for_llm(self) -> str
说明: 为 llm 生成工具描述
:return: 工具描述
源代码 或 在GitHub上查看
python
def format_for_llm(self) -> str:
args_desc = []
if 'properties' in self.input_schema:
for param_name, param_info in self.input_schema['properties'].items():
arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}"
if param_name in self.input_schema.get('required', []):
arg_desc += ' (required)'
args_desc.append(arg_desc)
return f'Tool: {self.name}\nDescription: {self.description}\nArguments:{chr(10).join(args_desc)}'
class Server
func __init__(self, name: str, config: mcpConfig) -> None
源代码 或 在GitHub上查看
python
def __init__(self, name: str, config: mcpConfig) -> None:
self.name: str = name
self.config: mcpConfig = config
self.session: ClientSession | None = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()
self._transport_initializers = {'stdio': self._initialize_stdio, 'sse': self._initialize_sse, 'streamable_http': self._initialize_streamable_http}
async func initialize(self) -> None
说明: 初始化实例
源代码 或 在GitHub上查看
python
async def initialize(self) -> None:
transport = self.config.type
initializer = self._transport_initializers[transport]
read, write = await initializer()
session = await self.exit_stack.enter_async_context(ClientSession(read, write))
await session.initialize()
self.session = session
async func list_tools(self) -> list[Tool]
说明: 从 MCP 服务器获得可用工具列表
:return: 工具列表
:raises RuntimeError: 如果服务器未启动
源代码 或 在GitHub上查看
python
async def list_tools(self) -> list[Tool]:
if not self.session:
raise RuntimeError(f'Server {self.name} not initialized')
tools_response = await self.session.list_tools()
tools: list[Tool] = []
for item in tools_response:
if isinstance(item, tuple) and item[0] == 'tools':
tools.extend((Tool(tool.name, tool.description, tool.inputSchema) for tool in item[1]))
return tools
async func execute_tool(self, tool_name: str, arguments: Optional[dict[str, Any]] = None, retries: int = 2, delay: float = 1.0) -> Any
说明: 执行一个 MCP 工具
:param tool_name: 工具名称 :param arguments: 工具参数 :param retries: 重试次数 :param delay: 重试间隔
:return: 工具执行结果
:raises RuntimeError: 如果服务器未初始化 :raises Exception: 工具在所有重试中均失败
源代码 或 在GitHub上查看
python
async def execute_tool(self, tool_name: str, arguments: Optional[dict[str, Any]]=None, retries: int=2, delay: float=1.0) -> Any:
if not self.session:
raise RuntimeError(f'Server {self.name} not initialized')
attempt = 0
while attempt < retries:
try:
logging.info(f'Executing {tool_name}...')
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
attempt += 1
logging.warning(f'Error executing tool: {e}. Attempt {attempt} of {retries}.')
if attempt < retries:
logging.info(f'Retrying in {delay} seconds...')
await asyncio.sleep(delay)
else:
logging.error('Max retries reached. Failing.')
raise
async func cleanup(self) -> None
说明: Clean up server resources.
源代码 或 在GitHub上查看
python
async def cleanup(self) -> None:
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
except Exception as e:
logging.error(f'Error during cleanup of server {self.name}: {e}')