文章目录
前言
如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!
java-Milvus 连接池(多key)与自定义端点监听设计
1. MilvusClientV2Pool 是什么
MilvusClientV2Pool 是 Milvus 官方 Java SDK 提供的一个 连接池管理类,用于管理和复用 Milvus 连接客户端(MilvusClientV2 实例)
2. MilvusClientV2Pool中的key
MilvusClientV2Pool 是一个多键(multi-key)连接池管理器,这里的 key 是用来区分和管理不同“连接组”的标识符。
你可以把它理解为“连接池里的子池”的名字或分类标签,每个 key 对应一组单独的连接资源(即一批可用的 MilvusClientV2 实例),你调用 pool.getClient(key) 时,会从对应 key 的连接子池里获取连接。
3. 连接池设计
admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
划分的好处:
模块 | 连接池 | 说明 |
---|---|---|
admin-module + insert-module | 共用一个池 | 都偏向写操作、管理操作,不太频繁。写入时延和并发要求可控。 |
search-module | 独立池 | 查询操作频繁,对并发吞吐和时延更敏感,需要独立池保证稳定性。 |
4. 连接池暴露给 Actuator
添加依赖:
<!-- 添加 Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
启用端点配置:
management:
endpoints:
web:
exposure:
include: "*"
Spring Security 配置修改:
/**
* @description: TODO
* @author 杨镇宇
* @date 2024/7/12 16:01
* @version 1.0
*/
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/api/**").permitAll()
.antMatchers("/swagger-ui.html","/doc.html", "/webjars/**", "/swagger-ui/**",
"/swagger-resources/**", "/v2/api-docs").permitAll()
.antMatchers("/actuator/**").permitAll() // ✅ 添加这行:放行 actuator
.anyRequest().authenticated()
.and()
.httpBasic()
.and()
.exceptionHandling()
.authenticationEntryPoint((request, response, authException) -> {
System.out.println("Authentication failed: " + authException.getMessage());
response.setContentType("application/json;charset=UTF-8");
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
response.getWriter().write("{\"error\":\"Authentication Failed\"}");
})
.and()
.csrf().disable();
}
}
5. Milvus连接池与自定义 Milvus 连接池端点demo
添加依赖:
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.5.9</version>
</dependency>
注意的是Milvus SDK 本身并不主动声明所有运行必须的依赖,尤其是:
- protobuf-java
- grpc-*
- commons-pool2
- 其他 Milvus SDK 内部依赖的底层库
所以需要处理一下:
参考https://central.sonatype.com/artifact/io.milvus/milvus-sdk-java/2.5.9
的Maven POM File
修改所有运行必须的依赖。
下面是我修改后的:
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.5.9</version>
<exclusions>
<exclusion>
<artifactId>commons-pool2</artifactId>
<groupId>org.apache.commons</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.5</version>
</dependency>
代码:
Milvus链接池bean配置:
package org.example.milvus.config;
import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.pool.PoolConfig;
import io.milvus.v2.client.ConnectConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
/**
* admin-module 与 insert-module 公用一个连接池,search-module 单独一个连接池
*/
@Configuration
@Slf4j
public class MilvusConfig {
private final MilvusProperties milvusProperties;
public MilvusConfig(MilvusProperties milvusProperties){
this.milvusProperties = milvusProperties;
}
/**
* 创建并返回一个 ConnectConfig 实例,用于配置连接到 Milvus 的参数。
* 该配置包括连接 URI 和认证 Token,是建立与 Milvus 服务连接的基础。
*
* @return 返回一个初始化好的 ConnectConfig 对象
*/
@Bean
public ConnectConfig connectConfig() {
return ConnectConfig.builder()
.uri(milvusProperties.getUri()) // 设置 Milvus 服务的连接地址
.token(milvusProperties.getToken()) // 设置访问 Milvus 的认证 Token
.build(); // 构建并返回 ConnectConfig 实例
}
/**
* 创建并返回一个 PoolConfig 实例,用于配置连接池参数。
* 此配置定义了连接池中每个 key 的最大空闲连接数、最大总连接数,
* 整体连接池的最大连接数,以及连接等待和回收的相关策略。
*
* @return 返回一个初始化好的 PoolConfig 对象
*/
private static PoolConfig createBasePoolConfig (){
return PoolConfig.builder()
.maxIdlePerKey(10) // 每个 key 的最大空闲连接数
.maxTotalPerKey(20) // 每个 key 的最大总连接数
.maxTotal(50) // 连接池整体的最大连接数
.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒)
.minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
.build(); // 构建并返回 PoolConfig 实例
}
/**
* 创建并返回一个 MilvusClientV2Pool 实例,用于管理与 Milvus 的连接池。
* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池。
*
* @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息
* @return 返回一个初始化好的 MilvusClientV2Pool 对象
* @throws ClassNotFoundException 如果类未找到,抛出该异常
* @throws NoSuchMethodException 如果方法不存在,抛出该异常
*/
@Bean(name = "searchMilvusPool")
public MilvusClientV2Pool searchPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {
PoolConfig poolConfig = PoolConfig.builder()
.maxIdlePerKey(10) // 每个 key 的最大空闲连接数
.maxTotalPerKey(20) // 每个 key 的最大总连接数
.maxTotal(50) // 连接池整体的最大连接数
.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒)
.minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
.build(); // 构建并返回 PoolConfig 实例
return new MilvusClientV2Pool(
poolConfig,
connectConfig
);
}
/**
* 创建并返回一个用于插入操作的 MilvusClientV2Pool 实例。
* 该方法使用指定的 PoolConfig 和 ConnectConfig 配置来初始化连接池,
* 主要用于管理与 Milvus 服务进行插入操作时的连接资源。
*
* @param connectConfig 连接配置对象,包含连接 Milvus 所需的 URI 和 Token 等信息
* @return 返回一个初始化好的 MilvusClientV2Pool 对象
* @throws ClassNotFoundException 如果类未找到,抛出该异常
* @throws NoSuchMethodException 如果方法不存在,抛出该异常
*/
@Bean(name = "commonMilvusPool")
public MilvusClientV2Pool insertPool( ConnectConfig connectConfig) throws ClassNotFoundException, NoSuchMethodException {
PoolConfig poolConfig = PoolConfig.builder()
.maxIdlePerKey(10) // 每个 key 的最大空闲连接数
.maxTotalPerKey(20) // 每个 key 的最大总连接数
.maxTotal(50) // 连接池整体的最大连接数
.maxBlockWaitDuration(Duration.ofSeconds(3)) // 最大阻塞等待时间(3秒)
.minEvictableIdleDuration(Duration.ofSeconds(10)) // 最小可回收空闲时间(10秒)
.build(); // 构建并返回 PoolConfig 实例
return new MilvusClientV2Pool(
poolConfig,
connectConfig
);
}
}
package org.example.milvus.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author 杨镇宇
* @date 2025/7/4 10:23
* @version 1.0
*/
@Component
@Data
@ConfigurationProperties(prefix = "milvus")
public class MilvusProperties {
/**
* 设置 Milvus 服务的连接地址
*/
private String uri = "http://localhost:19530";
/**
* 设置访问 Milvus 的认证 Token
*/
private String token = "root:Milvus";
}
yml配置:
milvus:
uri: http://localhost:19530
token: root:Milvus
Milvus 连接池端点:
package org.example.milvus.config;
import com.google.common.collect.Maps;
import io.milvus.pool.MilvusClientV2Pool;
import org.example.milvus.model.MilvusClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* @author 杨镇宇
* @date 2025/7/4 17:13
* @version 1.0
*/
@Component
@Endpoint(id = "milvusPool")
public class MilvusPoolEndpoint {
private final MilvusClientV2Pool searchMilvusPool;
private final MilvusClientV2Pool connectConfig;
// 构造函数注入
public MilvusPoolEndpoint(
@Qualifier("searchMilvusPool") MilvusClientV2Pool searchMilvusPool,
@Qualifier("commonMilvusPool") MilvusClientV2Pool commonMilvusPool) {
this.searchMilvusPool = searchMilvusPool;
this.connectConfig = commonMilvusPool;
}
@ReadOperation
public Map<String, Object> milvusPoolStats() {
Map<String, Object> result = Maps.newHashMap();
List<String> commonKeys = Arrays.asList(MilvusClient.ADMIN_MODULE, MilvusClient.INSERT_MODULE);
List<String> searchKeys = Collections.singletonList(MilvusClient.SEARCH_MODULE);
for (String key : searchKeys) {
result.put("searchPool_activeCount_" + key, searchMilvusPool.getActiveClientNumber(key));
result.put("searchPool_idleCount_" + key, searchMilvusPool.getIdleClientNumber(key));
}
for (String key : commonKeys) {
result.put("commonPool_activeCount_" + key, connectConfig.getActiveClientNumber(key));
result.put("commonPool_idleCount_" + key, connectConfig.getIdleClientNumber(key));
}
return result;
}
}
search-module 这个key的客户端:
package org.example.milvus.model;
/**
* @author 杨镇宇
* @date 2025/7/4 17:15
* @version 1.0
*/
public interface MilvusClient {
String ADMIN_MODULE = "admin-module";
String INSERT_MODULE = "insert-module";
String SEARCH_MODULE = "search-module";
}
package org.example.milvus.model;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.pool.MilvusClientV2Pool;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:44
* @version 1.0
*/
@Component
public class MilvusSearchClient implements MilvusClient{
private final MilvusClientV2 client;
private final MilvusClientV2Pool pool;
public MilvusSearchClient(@Qualifier("searchMilvusPool") MilvusClientV2Pool pool) {
this.pool = pool;
this.client = pool.getClient(SEARCH_MODULE);
}
public MilvusClientV2 getClient() {
return client;
}
@PreDestroy
public void close() {
pool.returnClient(SEARCH_MODULE, client);
}
}
admin-module + insert-module 这两个key的客户端:
package org.example.milvus.model;
import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/
@Component
public class MilvusInsertClient implements MilvusClient{
private final MilvusClientV2 client;
private final MilvusClientV2Pool pool;
public MilvusInsertClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {
this.pool = pool;
this.client = pool.getClient(INSERT_MODULE);
}
public MilvusClientV2 getClient() {
return client;
}
@PreDestroy
public void close() {
pool.returnClient(INSERT_MODULE, client);
}
}
package org.example.milvus.model;
import io.milvus.pool.MilvusClientV2Pool;
import io.milvus.v2.client.MilvusClientV2;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* @author 杨镇宇
* @date 2025/7/4 16:45
* @version 1.0
*/
@Component
public class MilvusAdminClient implements MilvusClient{
private final MilvusClientV2 client;
private final MilvusClientV2Pool pool;
public MilvusAdminClient(@Qualifier("commonMilvusPool") MilvusClientV2Pool pool) {
this.pool = pool;
this.client = pool.getClient(ADMIN_MODULE);
}
public MilvusClientV2 getClient() {
return client;
}
@PreDestroy
public void close() {
pool.returnClient(ADMIN_MODULE, client);
}
}
效果:
https://127.0.0.1:13145/actuator/milvusPool
字段名 | 意义 |
---|---|
searchPool_activeCount_search-module = 1 | 当前 searchMilvusPool 池中 search-module 正在使用的连接数为 1 |
searchPool_idleCount_search-module = 0 | 当前 searchMilvusPool 池中 search-module 空闲连接数为 0 |
commonPool_activeCount_admin-module = 1 | 当前 commonMilvusPool 池中 admin-module 正在使用的连接数为 1 |
commonPool_idleCount_admin-module = 0 | 当前 commonMilvusPool 池中 admin-module 空闲连接数为 0 |
commonPool_activeCount_insert-module = 1 | 当前 commonMilvusPool 池中 insert-module 正在使用的连接数为 1 |
commonPool_idleCount_insert-module = 0 | 当前 commonMilvusPool 池中 insert-module 空闲连接数为 0 |