Skip to content
开发文档
能力中心
应用市场
WebOffice
开发者后台

前置操作

调用 365开放平台MCP server前,需要完成一些前置操作

1、创建开放平台应用,获取应用ak/sk,后续能力调用需要使用。应用创建方式详见: 创建企业自建应用

2、在应用详情的 权限管理 菜单,完成对相关应用能力权限的申请:

MCP 能力对权限的依赖,详见能力列表列表。

3、申请用户访问凭证,详见 用户授权流程

4、访问前请确认关闭接口签名(暂不支持接口签名)

关闭接口签名

对接流程

前置操作完成,可以进行MCP server调用。

python示例

环境搭建

安装python环境管理工具uv

MacOS/Linux

bash
curl -LsSf https://astral.sh/uv/install.sh | sh

Windows

powershell
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

初始化项目

MacOS/Linux

bash
# 创建项目
uv init mcp-client
cd mcp-client

# 创建并激活虚拟环境
uv venv
source .venv/bin/activate

# 安装依赖
uv add "mcp[cli]" httpx

# 创建客户端实现文件
touch client.py

Windows

powershell
# 创建项目
uv init mcp-client
cd mcp-client

# 创建并激活虚拟环境
uv venv
.venv\Scripts\activate

# 安装依赖
uv add mcp[cli] httpx

# 创建客户端实现文件
new-item client.py

构建StreamableHTTP客户端(推荐)

由于MCP Server是根据wps开放接口封装生成,访问开放接口所需的凭证在访问MCP Server时一样需要。目前开放平台采取的方案是通过http header透传Authorization信息。构建MCP HTTP Client时可参考下列代码传递凭证,HTTP方式的url为https://openapi.wps.cn/mcp/message

python
async with streamablehttp_client(url: "https://openapi.wps.cn/mcp/message", headers={
    "Authorization": f"Bearer " + access_token
}) as (
    read_stream,
    write_stream,
    _,
):
    async with ClientSession(read_stream, write_stream) as session:
        result = await session.initialize()
        assert isinstance(result, InitializeResult)
        assert result.serverInfo.name == "StatelessServer"
        tool_result = await session.call_tool("echo", {"message": "hello"})
        assert len(tool_result.content) == 1
        assert isinstance(tool_result.content[0], TextContent)
        assert tool_result.content[0].text == "Echo: hello"

        for i in range(3):
            tool_result = await session.call_tool("echo", {"message": f"test_{i}"})
            assert len(tool_result.content) == 1
            assert isinstance(tool_result.content[0], TextContent)
            assert tool_result.content[0].text == f"Echo: test_{i}"

以下是一个使用MCP Client 访问MCP HTTP Server的代码示例。

python
# client.py 


import asyncio
import json
import os
from typing import Optional
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import InitializeResult, TextContent

from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv()  # load environment variables from .env

async def streamable_http_tools(method, **kwargs):
    async with streamablehttp_client(url=sys.argv[1], headers={
        "Authorization": f"Bearer " + sys.argv[2]
    }) as (
        read_stream,
        write_stream,
        _,
    ):
      async with ClientSession(read_stream, write_stream) as session:        
        if method == "initialize":
          return await session.initialize()
        elif method == "list":
          return await session.list_tools()
        elif method == "call":
          result = await session.call_tool(**kwargs)
          return result

class MCPClient:
    def __init__(self):
        self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"))

    async def connect_to_http_server(self, server_url: str, access_token: str):
        """Connect to an MCP server running with HTTP transport"""
        await streamable_http_tools("initialize")

    async def process_query(self, query: str) -> str:
        """Process a query using OpenAI API and available tools"""
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]
        response = await streamable_http_tools("list")
        available_tools = [{ 
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema
            }
        } for tool in response.tools]
        print("available_tools: ", available_tools)

        # Initial OpenAI API call
        completion = await self.openai.chat.completions.create(
            model=os.getenv("OPENAI_MODEL"),
            max_tokens=1000,
            messages=messages,
            tools=available_tools
        )

        # Process response and handle tool calls
        tool_results = []
        final_text = []
        
        assistant_message = completion.choices[0].message
        
        if assistant_message.tool_calls:
            for tool_call in assistant_message.tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)

                result = await streamable_http_tools("call", name=tool_name, arguments=tool_args)
                tool_results.append({"call": tool_name, "result": result})
                final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

                messages.extend([
                    {
                        "role": "assistant",
                        "content": None,
                        "tool_calls": [tool_call]
                    },
                    {
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "content": result.content[0].text
                    }
                ])

                print(f"Tool {tool_name} returned: {result.content[0].text}")
                print("messages", messages)
                # Get next response from OpenAI
                completion = await self.openai.chat.completions.create(
                    model=os.getenv("OPENAI_MODEL"),
                    max_tokens=1000,
                    messages=messages,
                )  
                if isinstance(completion.choices[0].message.content, (dict, list)):
                    final_text.append(str(completion.choices[0].message.content))
                else:
                    final_text.append(completion.choices[0].message.content)
        else: 
            if isinstance(assistant_message.content, (dict, list)):
                final_text.append(str(assistant_message.content))
            else:
                final_text.append(assistant_message.content)

        return "\n".join(final_text)

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        
        while True:
            try:
                query = input("\nQuery: ").strip()
                
                if query.lower() == 'quit':
                    break
                    
                response = await self.process_query(query)
                print("\n" + response)
                    
            except Exception as e:
                print(f"\nError: {str(e)}")

async def main():
    if len(sys.argv) < 3:
        print("Usage: python client.py <path_to_server_script> <your access token>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_http_server(server_url=sys.argv[1], access_token=sys.argv[2])
        await client.chat_loop()
    except KeyboardInterrupt:
        print("\nClient is closing...")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        print("Client is closed.")


if __name__ == "__main__":
    import sys
    asyncio.run(main())

配置运行

在client.py同级目录下创建配置文件.env,填写选用的llm模型配置,文件示例

ini
# OpenAI 兼容API配置
OPENAI_API_KEY=your-llm-key
OPENAI_BASE_URL=your-llm-base-url
OPENAI_MODEL=your-llm-model

在创建的 mcp-client 目录下执行命令 (your_access_token 为申请的用户凭证)

bash
uv run client.py https://openapi.wps.cn/mcp/message {your_access_token}

运行成功如图

运行成功示例

运行成功后可以开始进行对话了

构建SSE客户端(待下线)

SSE即将下线,建议采用HTTP方式链接。SSE的url为https://openapi.wps.cn/mcp/sse

python
self._streams_context = sse_client(url="https://openapi.wps.cn/mcp/sse", headers={
    "Authorization": f"Bearer " + access_token
})
streams = await self._streams_context.__aenter__()

self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()