【SpringAI】9.创建本地mcp服务(演示通过mcp实现联网搜索)

发布于:2025-08-09 ⋅ 阅读:(34) ⋅ 点赞:(0)

前言

上一篇演示了接入公网的高德地图sse服务,有人说我贴的代码不全,确实有些自定义工具类我不可能全部复制过来,复杂的功能大家一般也都会拆分开避免单文件过大,要查看完整代码还是去看完整项目的好,
这篇文章接入本地/内网的mcp服务实现联网搜索

1,先看最终效果

在这里插入图片描述
在这里插入图片描述

2,新建一个mcp服务项目或模块

引入pom依赖

<dependency>
			<groupId>org.springframework.ai</groupId>
			<artifactId>spring-ai-starter-mcp-server-webmvc</artifactId>
            <version>${spring-ai.version}</version>
		</dependency>
        <!-- Spring AI MCP 核心包 (手动实现SSE) -->
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-mcp</artifactId>
            <version>${spring-ai.version}</version>
        </dependency>
        <!-- Spring AI Model (基础接口) -->
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-model</artifactId>
            <version>${spring-ai.version}</version>
        </dependency>

application.yml添加必要的mcp服务信息,
服务端口假设9099

server:
  port: 9099
spring:
  ai:
    mcp:
      server:
        enabled: true
        type: SYNC
        name: "LocalMcpServer"
        version: "1.0.0"
        stdio: false
        sse-message-endpoint: "/mcp/message"
        sse-endpoint: "/sse"

        # MCP 服务能力配置
        capabilities:
          tool: true
          resource: true
          prompt: true
          completion: false
          roots: false
          sampling: false

3,创建sse接收端点

private final ConcurrentMap<String, SseEmitter> clients = new ConcurrentHashMap<>();

    /**
     * SSE端点 - 匹配MCP客户端期望的路径
     */
    @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter sse(@RequestParam(value = "clientId", defaultValue = "default") String clientId) {
        logger.info("🔗 新SSE连接: clientId={}", clientId);
        
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE); // 无超时
        clients.put(clientId, emitter);
        
        // 发送初始事件
        try {
            emitter.send(SseEmitter.event()
                .name("connect")
                .data("SSE连接已建立 - clientId: " + clientId));
                
            emitter.send(SseEmitter.event()
                .name("mcp-ready")
                .data("{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\",\"params\":{}}"));
        } catch (IOException e) {
            logger.error("发送初始SSE事件失败: {}", e.getMessage());
            clients.remove(clientId);
            emitter.complete();
        }
        
        // 设置完成和错误处理
        emitter.onCompletion(() -> {
            logger.info("🔌 SSE连接关闭: clientId={}", clientId);
            clients.remove(clientId);
        });
        
        emitter.onError((ex) -> {
            logger.error("❌ SSE连接错误: clientId={}, error={}", clientId, ex.getMessage());
            clients.remove(clientId);
        });
        
        emitter.onTimeout(() -> {
            logger.warn("⏰ SSE连接超时: clientId={}", clientId);
            clients.remove(clientId);
        });
        
        return emitter;
    }

4,获取联网搜索的key

https://www.searchapi.io/
在这里插入图片描述
登录注册即可拿到一个key,实际使用时直接

curl --get https://www.searchapi.io/api/v1/search \
  -d engine="baidu" \
  -d q="ERNIE Bot"

api_key可以放请求头,实测也可以直接当普通参数与q一起拼接到url后面,
更多的可选参数可以参考:https://www.searchapi.io/docs/baidu#api-parameters-search-query
将拿到的key配置到yml中随时读取

5,添加联网搜索工具类

/**
 * 联网搜索工具配置类,提供基于SearchAPI的搜索功能。
 */
@Configuration
public class SearchToolsConfiguration {

    private static final Logger logger = LoggerFactory.getLogger(SearchToolsConfiguration.class);
    private static final int DEFAULT_NUM_RESULTS = 50;
    private static final int MAX_NUM_RESULTS = 100;

    @Value("${search.url}")
    private String searchApiUrl;

    private final RestTemplate restTemplate = new RestTemplate();
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * 搜索工具集合:包含所有联网搜索相关的工具。
     */
    @Bean
    public List<McpServerFeatures.SyncToolSpecification> searchTools() {
        return List.of(createSearchWebTool());
    }

    /**
     * 创建联网搜索工具。
     */
    private McpServerFeatures.SyncToolSpecification createSearchWebTool() {
        logger.info("创建联网搜索工具,API URL: {}", searchApiUrl);

        String schemaJson = """
        {
          "type": "object",
          "properties": {
            "q": {
              "type": "string",
              "description": "搜索查询内容,必填参数"
            },
            "num": {
              "type": "integer",
              "description": "返回结果数量,可选参数,默认50,最大100",
              "minimum": 1,
              "maximum": 100,
              "default": 50
            }
          },
          "required": ["q"]
        }
        """;

        McpSchema.Tool tool = new McpSchema.Tool(
            "search_web",
            "执行联网搜索并返回分页的结构化搜索结果",
            schemaJson
        );

        return new McpServerFeatures.SyncToolSpecification(
            tool,
            (exchange, arguments) -> {
                try {
                    String query = (String) arguments.get("q");
                    if (query == null || query.trim().isEmpty()) {
                        return new McpSchema.CallToolResult("错误:搜索查询内容不能为空", true);
                    }

                    Integer num = DEFAULT_NUM_RESULTS;
                    if (arguments.containsKey("num")) {
                        Object numValue = arguments.get("num");
                        if (numValue instanceof Number) {
                            num = ((Number) numValue).intValue();
                        } else if (numValue instanceof String) {
                            try {
                                num = Integer.parseInt((String) numValue);
                            } catch (NumberFormatException e) {
                                logger.warn("无效的num参数值: {}, 使用默认值: {}", numValue, DEFAULT_NUM_RESULTS);
                            }
                        }
                        // 限制结果数量在合理范围内
                        num = Math.max(1, Math.min(num, MAX_NUM_RESULTS));
                    }

                    logger.info("执行联网搜索: query='{}', num={}", query, num);

                    // 构建完整的搜索URL,使用UriComponents处理特殊字符
                    UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(searchApiUrl);
                    String url = builder.toUriString();

                    // 手动构建查询参数,确保正确编码
                    StringBuilder queryString = new StringBuilder();
                    queryString.append("q=").append(UriUtils.encode(query, "UTF-8"));
                    queryString.append("&num=").append(num);

                    // 如果URL已有查询参数,追加到现有参数
                    String fullUrl = url;
                    if (url.contains("?")) {
                        fullUrl = url + "&" + queryString.toString();
                    } else {
                        fullUrl = url + "?" + queryString.toString();
                    }

                    URI searchUri = URI.create(fullUrl);

                    logger.debug("搜索API请求URL: {}", searchUri);

                    // 调用搜索API
                    String response = restTemplate.getForObject(searchUri, String.class);

                    if (response == null || response.trim().isEmpty()) {
                        return new McpSchema.CallToolResult("错误:搜索API返回空结果", true);
                    }

                    // 解析并格式化结果
                    try {
                        Map<String, Object> searchResult = objectMapper.readValue(response, Map.class);

                        // 检查搜索结果状态
                        if (searchResult.containsKey("search_metadata")) {
                            Map<String, Object> metadata = (Map<String, Object>) searchResult.get("search_metadata");
                            String status = (String) metadata.get("status");
                            if (!"Success".equals(status)) {
                                logger.warn("搜索API返回非成功状态: {}", status);
                                return new McpSchema.CallToolResult("搜索失败,API返回状态: " + status, true);
                            }
                        }

                        // 提取有用的搜索结果
                        List<Map<String, Object>> organicResults = (List<Map<String, Object>>) searchResult.get("organic_results");
                        if (organicResults == null || organicResults.isEmpty()) {
                            return new McpSchema.CallToolResult("无内容", false);
                        }

                        // 直接返回organic_results的原始内容
                        String organicResultsJson = objectMapper.writeValueAsString(organicResults);
                        return new McpSchema.CallToolResult(organicResultsJson, false);

                    } catch (Exception e) {
                        logger.error("解析搜索结果失败", e);
                        // 如果无法解析JSON,直接返回原始响应
                        return new McpSchema.CallToolResult(response, false);
                    }

                } catch (Exception e) {
                    logger.error("联网搜索失败", e);
                    return new McpSchema.CallToolResult("搜索错误: " + e.getMessage(), true);
                }
            }
        );
    }
}

6,实际调用

配置与前一章节的类似,在前端的mcp管理页面新增mcp,表单json填入:

{
  "mcpServers": {
    "LocalMcpServer": {
      "url": "http://127.0.0.1:9099",
      "type": "sse",
      "sseEndpoint": "/sse"
    }
  }
}

后端对所有管理的模型做了维护,实际调用是从工厂取出再决定要不要使用mcp工具

/**
     * 输出处理后端流式结果
     * 优化流式输出处理逻辑:
     * 1. 在开始阶段或空字符串输出时,状态保持为"think",表示模型正在思考
     * 2. 只有当检测到有意义的内容(中文、数字、字母等)时,才将状态改为"running"
     * 3. 在流结束时明确添加一个"stop"状态的消息,确保前端能正确处理结束状态
     *
     * @param messageList 模型消息,包括系统提示词、用户提示词、历史对话和媒体文件
     * @param myModel     指定模型对象
     * @param body        用户请求参数
     * @return 处理后的FluxVO流
     */
    private Flux<FluxVO> getFluxVOFlux(List<Message> messageList, AiModel myModel, QuestionVO body) {
        Prompt prompt = new Prompt(messageList);
        AtomicBoolean inThinking = new AtomicBoolean(false);
        StringBuffer outputText = body.getMemory() ? new StringBuffer() : null;
        ChatClient chatModel = myModel.getChatClient();
        // 1. 先构造 Publisher<ChatResponse>
        Flux<ChatResponse> publisher;
        if (body.getUseTools()) {
            List<ToolCallback> toolCallbacks = dynamicMcpClientManager.getAvailableToolCallbacks();
            publisher = chatModel.prompt(prompt).toolCallbacks(toolCallbacks).stream().chatResponse();
        } else {
            publisher = chatModel.prompt(prompt).stream().chatResponse();
        }
        // 主动推送一条“处理中”消息
        Flux<FluxVO> proactiveMsg = Flux.just(
                FluxVO.builder().text("").status("before").build()
        );

        Flux<FluxVO> resp = Flux.from(publisher)
                .doFirst(() -> {
                    System.out.println("-------------开始输出");
                    if (body.getMemory()) {
                        chatMemoryService.saveMessage(body);
                    }
                })
                .map(response -> {
                    String text = response.getResult().getOutput().getText();
                    if (text == null) {
                        text = "";
                    }
                    // 处理工具使用信息
                    if (!response.getResult().getOutput().getToolCalls().isEmpty()) {
                        for (AssistantMessage.ToolCall toolCall : response.getResult().getOutput().getToolCalls()) {
                            System.out.println("==================调用mcp工具====================");
                            System.out.println(toolCall.name());
                        }
                    }
                    if ("<think>".equals(text)) {
                        inThinking.set(true);
                    } else if ("</think>".equals(text)) {
                        inThinking.set(false);
                    }
                    boolean isStop = response.getResult().getMetadata().getFinishReason() != null && !response.getResult().getMetadata().getFinishReason().isEmpty();
                    String status = inThinking.get() ? "think" : (isStop ? "stop" : "running");
                    if (outputText != null) {
                        outputText.append(text);
                    }
                    return FluxVO.builder()
                            .text(text)
                            .status(status)
                            .build();
                })
                .doFinally(signalType -> {
                    System.out.println("-------------流式处理结束");
                    if (body.getMemory() && outputText != null) {
                        chatMemoryService.saveMessage(body.getSessionId(), "ASSISTANT", outputText.toString(), body.getModel());
                    }
                })
                .onErrorResume(error -> {
                    System.err.println("流式处理异常: " + error.getMessage());
                    return Flux.just(FluxVO.builder()
                            .text("AI服务异常,请稍后重试")
                            .status("stop")
                            .build());
                });

        // 先推 proactiveMsg,再推 publisher
        return Flux.concat(proactiveMsg, resp);
    }

7,其他

后端的动态mcp管理类做了重大优化,支持服务运行时动态添加mcp服务、定期健康检查、定时重连等,还是推荐去看我的完整目录
https://gitee.com/luckylanyu/springai-novel