1、概述
前面我们设计了一款分布式服务框架,完成了服务注册、服务发现、远程调用的基本功能,但是作为一款分布式服务框架他还需要一些高级特性,比如负载均衡。所以今天我们来分析常用的负载均衡策略,并设计一款通用的负载均衡组件,可以集成到 Cheese 中,让我们的 Cheese 具备负载均衡的能力。
2、细说负载均衡
首先我们都知道负载均衡是一种集群环境中策略,它可以按照某种算法将网络流量(计算任务)动态的分配到集群里面的各个节点,从而提高系统的可用性、稳定性。
比如一个集群环境中查询用户信息的服务有3个节点,当客户端并行发起10次远程调用,假如请求都打到了一个客户端上,你觉得合理吗。就像某些领导分配任务的时候 明明有四个干活的小伙伴,但是把任务全部分配给你了,就是逮着你使劲薅,就问你心态炸不炸。
作为牛马,超负荷运行,你可以抗,可以内卷,甚至可以用自己的健康交换。但是服务器可不惯着领导,超负荷运行的话他直接不干了(宕机)。所以这里面需要一种流量合理分配的机制,也就是负载均衡。
同时也希望大家理论结合实际,去好好理解一下 负载均衡,拒绝内卷,拒绝超负荷运行,为自己身心健康争取一种合理的负载均衡策略
3、常用的负载均衡算法
聊完了概念,我们接下来就聊一下实现方案,常用的方案有 随机策略、轮询策略、随机权重策略、权重轮询策略、一致性哈希策略以及最小连接数策略。接下来我们分别介绍他们的核心逻辑
3.1、随机负载均衡
随机负载均衡是一种比较简单的负载均衡方式,也就是从服务列表中随机选择一个节点,典型的实现方案就是生成一个范围内的随机数。相关代码如下
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty())
throw new IllegalArgumentException("No instances available");
int index = ThreadLocalRandom.current().nextInt(instances.size());
return instances.get(index);
}
3.2、轮询负载均衡
轮询负载均衡的核心逻辑就是服务列表里面的服务轮流来,通俗的解释就是一人一次轮着来,相关代码也比较简单,需要注意的是要保证线程安全
public class RoundRobinLoadBalancer {
private final AtomicInteger counter = new AtomicInteger(0);
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty())
throw new IllegalArgumentException("No instances available");
int index = counter.getAndIncrement() % instances.size();
return instances.get(index);
}
}
3.3、随机权重负载均衡
随机权重的意思其实也是随机选择,但是增加了权重的逻辑,使得权重高的服务被选中的概率要大一些,要实现这种算法,也很简单,我们可以根据权重在服务列表中添加多个重复的节点,举个例子:比如有四个服务节点,Server1、Server2、Server3 和 Server4 他们的权重分别是1、2、3、4。这个时候总的权重就是 10 我们可以新建一个数组,往里面添加1个Server1、2个Server2、3个Server3 和4个Server4。然后随机从这个数组中选择一个服务节点。
相关代码是实现也比较简单,如下所示
public class WeightedRandomLoadBalancer {
private final Random random = new Random();
private volatile ConcurrentHashMap<String, Integer> weightMap;
public WeightedRandomLoadBalancer() {
//初始化 操作
initWeightConfigs();
}
private synchronized void initWeightConfigs() {
if (weightMap == null) {
List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
weightMap = new ConcurrentHashMap<>();
for (WeightConfig weightConfig : weightConfigs) {
String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
weightMap.put(key, weightConfig.getWeight());
}
}
}
public ServiceInstance select(List<ServiceInstance> instances) {
if (weightMap == null) {
initWeightConfigs();
}
List<ServiceInstance> weightedServers = new ArrayList<>();
for (ServiceInstance instance : instances) {
String key = instance.getIp() + ":" + instance.getPort();
Integer weight = weightMap.get(key);
if (weight != null) {
for (int i = 0; i < weight; i++) {
weightedServers.add(instance); // 按权重重复加入服务器
}
}
}
int index = random.nextInt(weightedServers.size());
return weightedServers.get(index);
}
}
3.4、权重轮询负载均衡
权重轮询的设计思想和权重随机其实是一样的,我们依然可以通过循环,按权重重复添加对应的服务,但是我们可以换个思路,比如设置一个区间。
还是以四个服务节点,权重分别是1、2、3、4为例,我们将请求数抽象成一个横向的坐标,当前请求数(第几次)如果 落在了 [0,1) 区间,就交给 Server1 处理,如果落在了 [1,3) 就交给 Server2 处理,同理 [3,6) 对应 Server3 、[6,10) 对应 Server4。思路设计好了 ,代码也不难
public class WeightedRoundRobinLoadBalancer {
private AtomicInteger index = new AtomicInteger(0);
private volatile Integer totleCount = 0;
private volatile ConcurrentHashMap<String, Integer> weightMap;
public WeightedRoundRobinLoadBalancer() {
//初始化 操作
initWeightConfigs();
}
private synchronized void initWeightConfigs() {
if (weightMap == null) {
List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
weightMap = new ConcurrentHashMap<>();
for (WeightConfig weightConfig : weightConfigs) {
String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
weightMap.put(key, weightConfig.getWeight());
}
}
}
public ServiceInstance select(List<ServiceInstance> instances) {
if (weightMap == null) {
initWeightConfigs();
}
for (ServiceInstance instance : instances) {
String key = instance.getIp() + ":" + instance.getPort();
Integer weight = weightMap.get(key);
totleCount += weight;
}
int andIncrement = index.getAndIncrement();
int select = andIncrement % totleCount;
for (ServiceInstance instance : instances) {
String key = instance.getIp() + ":" + instance.getPort();
Integer weight = weightMap.get(key);
if (select < weight)
return instance;
else {
select -= weight;
}
}
return null;
}
}
3.5、一致性哈希负载均衡
一致性哈希负载均衡算法就稍微复杂一下,本质上根据请求中的某个元素(可以是ip 也可以是请求的数据) j计算出一个数值,然后按顺时针方向,选择距离这个数值最近的一个节点。 这里你需要知道三个核心概念,哈希算法、哈希环以及虚拟节点。
3.5.1、哈希算法
首先来说哈希算法,目前比较常见的哈希算法有 MD5、SHA-256、BLAKE2等等,哈希算法的作用主要是应用于数据存储、数据验证、密码学等领域,本质上是和负载均衡无关,但是我们这里使用到了。
3.5.2、哈希环
哈希环,故名思意。就是将所有的哈希值串成一个环,怎么理解呢。还是假设有4个服务节点,Server1、Server2、Server3、Server4。如果我们使用 MD5 算法生成一个字节序列,取前8位字节转换成Long类型的值作为哈希值,假设他们的值分别是 150、400、600、850。 这个时候的哈希环就是这样的
我们假设整体的区间是 [0,1000) 。 上图中描述了 四个节点大概所在的位置,这个时候 很容易看的出来 Server1 负责的区间是 [850,1000) U [0,150) server2 负责的区间是 [150,400) 同样的 Server 3和 Server4 分别对应 [400,600) 和 [600,850)。这个时候很显然不是均匀的分布,所以需要更进一步的细分粒度。这个时候就要引入虚拟节点。
3.5.3、虚拟节点
前面我们的四个节点在Hash环上分布的不均匀,也就是存在倾斜的现象。这个时候我们就需要引入虚拟节点来解决这个问题了,假设每个服务节点对应两个虚拟节点,如下所示
我们再按照这8个虚拟节点的哈希值将他们放到环上,效果是这样的
看起来分布的还是不均匀,但是比上一次的是不是要更加分散一些。假设我们每个服务节点引入100个虚拟节点,那么这些节点肯定就会分布的更加均匀,所以虚拟节点主要就是起到了一个稀释的作用。
3.5.4、算法设计
相信你对哈希算法和哈希环已经有了一定的认知了,接下来我们回到算法设计上,我们要实现的需求就是 当一个请求过来的时候我们通过 该请求的ip和端口 计算出一个数值,然后在哈希环上寻找这个值对应的位置,最后按顺时针方法选择一个虚拟节点,然后将请求交给这个虚拟节点对应的物理节点处理。整个过程分为以下四步
1、这里我们使用主机的ip和端口来计算哈希值,哈希算法选择MD5 。
2、每个节点引入100个虚拟节点。
3、当请求来了的时候 根据ip和端口计算 哈希值,找到环上最近的虚拟节点
4、通过虚拟节点找到对应的物理节点
3.5.5、编码落地
根据上面分析的思路,以下是相关代码的实现
/**
* @Description
* @Author wcan
* @Date 2025/2/27 下午 12:40
* @Version 1.0
*/
public class ConsistentHashingLoadBalancer extends AbstractLoadBalance {
// 哈希环(使用 ConcurrentSkipListMap 实现有序的环结构)
private final ConcurrentSkipListMap<Long, ServiceInstance> virtualNodes = new ConcurrentSkipListMap<>();
private final AtomicBoolean loadFlag = new AtomicBoolean(false);
// 每个物理节点对应的虚拟节点数量
private static final int VIRTUAL_NODE_COUNT = 100;
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
this.init(instances);
return null;
}
public ServiceInstance select(String key) {
return getNode(key);
}
@Override
public void init(List<ServiceInstance> instances) {
if (instances == null || instances.isEmpty()) {
throw new IllegalArgumentException("Instances list cannot be null or empty");
}
if (loadFlag.compareAndSet(false, true)) {
// 循环添加到哈希环
instances.forEach(this::addNode);
}
}
/**
* 添加物理节点到哈希环
* @param node 物理节点
*/
public void addNode(ServiceInstance node) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
// 生成虚拟节点名称(如 "Server-A#VN1")
String virtualNodeName = node.getIp() + "#VN" + i;
long hash = getHash(virtualNodeName);
virtualNodes.put(hash, node);
}
}
/**
* 移除物理节点
* @param node 物理节点名称
*/
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
String virtualNodeName = node + "#VN" + i;
long hash = getHash(virtualNodeName);
virtualNodes.remove(hash);
}
}
/**
* 根据请求Key获取目标节点
* @param requestKey 请求的Key
* @return 目标节点
*/
public ServiceInstance getNode(String requestKey) {
if (virtualNodes.isEmpty()) {
return null;
}
long hash = getHash(requestKey);
// 查找第一个大于等于该哈希值的节点
Map.Entry<Long, ServiceInstance> entry = virtualNodes.ceilingEntry(hash);
if (entry == null) {
// 如果没找到,则回到环的起点
entry = virtualNodes.firstEntry();
}
return entry.getValue();
}
/**
* 使用MD5哈希算法(更均匀的分布)
* @param key 输入字符串
* @return 哈希值
*/
private long getHash(String key) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(key.getBytes());
// 取前8字节转换为long型
return bytesToLong(digest);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}
/**
* 将字节数组转换为long
*/
private static long bytesToLong(byte[] bytes) {
long result = 0;
for (int i = 0; i < 8; i++) {
result <<= 8;
result |= (bytes[i] & 0xFF);
}
return result;
}
}
3.6、最小连接数负载均衡
接着我们来看看最小连接数负载均衡,从字面意思上就可以理解这种算法的核心 就是让负载最轻的节点处理请求,这样可以充分的利用集群环境整体的软硬件资源。
3.6.1、理论分析
初步一想,你可能觉得要实现这个算法不难,无非就是选择集群环境中连接数最少的一个节点就行了。但是我们需要考虑怎么去统计连接数呢,负载均衡组件很容易知道一次请求的开始,但是要怎么知道该请求结束了呢,而且还需要注意不能和应用耦合。这些问题 仔细想想 还是很复杂的。下面我整理了一张逻辑结果图。
还需要注意一种极端情况,假设某时刻 Server1 和 Server2 连接数都是 1,Server3和 Server4 的连接数是2和3。根据最小连接数的定义 我们应该选择连接数最小的,那这个时候我们是选择 Server1 呢还是 Server2呢,这里我们可以随机选一个,但是更好的做法是引入权重,假设Server1 机器的硬件配置更好点,那就可以设置权重 2:1。当出现相同数值的时候优先 Server1。
3.6.2、落地实践
下面我们想想怎么设计一个 最小连接数的负载均衡组件,这里我的思路是首先 负载均衡器需要维护一个数据结构,存放的是所有主机的连接数。当一次请求来了的时候 就需要在对应的主机增加一个连接数,当请求结束后,再减掉一个连接数。
/**
* @ClassName LeastConnectionsLoadBalancer
* @Description 最小连接数负载均衡算法
* @Author wcan
* @Date 2025/3/10 下午 12:37
* @Version 1.0
*/
public class LeastConnectionsLoadBalancer extends AbstractLoadBalance{
// 服务器列表(线程安全)
private final List<ServiceInstance> servers = new CopyOnWriteArrayList<>();
private volatile ConcurrentHashMap<String, Integer> weightMap;
private final AtomicBoolean loadFlag = new AtomicBoolean(false);
private synchronized void initWeightConfigs() {
if (weightMap == null) {
List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
weightMap = new ConcurrentHashMap<>();
for (WeightConfig weightConfig : weightConfigs) {
String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
weightMap.put(key, weightConfig.getWeight());
}
}
}
/**
* 添加服务器
* @param name 服务器名称
* @param weight 权重(默认为1)
*/
public void addServer(String name, int weight) {
servers.add(new ServiceInstance(weight,name));
}
/**
* 选择目标服务器(线程安全)
* @return 服务器名称
*/
public ServiceInstance selectServer() {
if (servers.isEmpty()) {
throw new IllegalStateException("No servers available");
}
// 遍历找出当前连接数/权重最小的服务器
ServiceInstance selected = null;
double minRatio = Double.MAX_VALUE;
for (ServiceInstance server : servers) {
double ratio = server.getActiveConnections() / (double) server.getWeight();
if (ratio < minRatio) {
minRatio = ratio;
selected = server;
}
}
// 增加该服务器的连接数
selected.incrementConnections();
return selected;
}
/**
* 释放服务器连接(请求完成时调用)
* @param serverName 服务器名称
*/
public void releaseConnection(String serverName) {
for (ServiceInstance server : servers) {
if (server.getName().equals(serverName)) {
server.decrementConnections();
break;
}
}
}
@Override
public void init(List<ServiceInstance> instances) {
instances.forEach(instance -> {
String name = instance.getName();
String key = instance.getIp() + ":" + instance.getPort();
if (weightMap != null) {
Integer weight = weightMap.get(key);
if (weight != null) {
addServer(name, weight);
}
}
});
}
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (loadFlag.compareAndSet(false, true)) {
init(instances);
}
return selectServer();
}
}
4、最佳技术选项
我们已经学习了6种常见的负载均衡策略,并且基于这六种策略分别实现了相关的功能,接下来我们就来综合比对一下这6种负载均衡算法的优缺点
4.1、负载均衡策略对比分析
策略 | 核心原理 | 优点 | 缺点 | 典型使用场景 |
---|---|---|---|---|
随机策略 | 完全随机分配请求 | ✅ 实现简单 ✅ 无状态,性能高 |
❌ 无法感知服务器状态 ❌ 负载可能不均衡 |
测试环境、服务器性能完全相同的简单场景 |
轮询策略 | 按顺序依次分配请求 | ✅ 绝对公平分配 ✅ 实现简单 |
❌ 无视服务器性能差异 ❌ 无法应对突发流量 |
短连接服务(HTTP API)、服务器配置完全一致的环境 |
随机权重策略 | 按权重概率随机分配请求 | ✅ 适配异构服务器 ✅ 比纯随机更均衡 |
❌ 瞬时负载可能倾斜 ❌ 无法动态调整权重 |
混合云环境(部分物理机+虚拟机) |
权重轮询策略 | 按权重比例轮询分配请求 | ✅ 精确控制流量比例 ✅ 长期负载均衡 |
❌ 无法应对短时波动 ❌ 需要预设静态权重 |
CDN节点分配、数据库读写分离 |
一致性哈希策略 | 基于哈希环定位目标节点 | ✅ 动态扩缩容影响小 ✅ 支持会话保持 |
❌ 实现复杂 ❌ 需虚拟节点防倾斜 |
分布式缓存(Redis集群)、文件存储分片 |
最小连接数策略 | 优先选择当前连接数最少的服务器 | ✅ 动态感知负载 ✅ 适配长连接场景 |
❌ 需维护连接状态 ❌ 短连接场景效果有限 |
WebSocket服务、数据库连接池、视频流媒体服务器 |
4.2、最佳技术选型
从上图中我们可以得出各种算法的优缺点,并且我还列出来典型的应用场景。根据上面的信息我们会发现没有完美的方案,他们都有缺点,那我们该如何选型呢。
还是那句话,没有根据自己的业务系统,权衡利弊,只要保证利大于弊,并且要尽可能的多“大”一些,这就是最佳的技术选型。这里我根据 是否需要会话保持、服务器的性能、流量波动以及请求类型 总结出了一张图,你可以针对性的参考
最后举个例子,在我们的分布式系统中 对外暴露的微服务网关我们可以采用 一致性哈希 + 最小连接数的负载均衡策略,内部业务系统的微服务 我们可以使用 权重轮询的策略 在配合健康检查机制。
5、负载均衡组件设计
我们将上述的实现设计成一款通用的负载均衡组件,后面集成到 Cheese 中。
5.1、服务实例对象设计
为了方便后面的算法处理,我们需要将每个服务节点定义成一个对象,这个对象的属性有 IP、端口、当前连接数、权重配比等等,相关代码如下
package org.wcan.lb;
import java.util.concurrent.atomic.AtomicInteger;
public class ServiceInstance {
private String ip;
private int port;
private int weight;
private String name ;
private final AtomicInteger activeConnections = new AtomicInteger(0);
public ServiceInstance(int weight, String name) {
this.weight = weight;
this.name = name;
}
public ServiceInstance(String ip, int port) {
this.ip = ip;
this.port = port;
}
public ServiceInstance() {
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getUrl() {
return "http://" + ip + ":" + port;
}
public void incrementConnections() {
activeConnections.incrementAndGet();
}
public void decrementConnections() {
activeConnections.decrementAndGet();
}
// Getters
public int getActiveConnections() {
return activeConnections.get();
}
@Override
public String toString() {
return "ServiceInstance{" +
"ip='" + ip + '\'' +
", port=" + port +
", currentConnections=" + activeConnections.get() +
'}';
}
}
5.2、权重加载设计
我们在服务实例对象中设计了一个权重的属性,主要的作用是来封装服务的权重值,通常情况下我们将权重通过配置化的形式集成到项目中,因此除了一个实例对象之外我们需要设计一个加载配置的功能,这里先以读取配置文件为例。相关代码如下
package org.wcan.lb.weight;
import org.wcan.lb.ServiceInstance;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @Description 加载权重配置
* 权重配置文件格式
*
* @Author wcan
* @Date 2025/2/21 下午 17:39
* @Version 1.0
*/
public class LoadWeightConfig {
private static List<ServiceInstance> serviceInstances;
static {
Properties properties = new Properties();
try (InputStream input = LoadWeightConfig.class.getClassLoader().getResourceAsStream("weight.properties")) {
if (input == null) {
throw new RuntimeException("Sorry, unable to find weight.properties");
}
properties.load(input);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
int size = properties.size();
serviceInstances = new ArrayList<ServiceInstance>(size);
for (int i = 0; i < size; i++) {
String key = "serverInfo" + (i + 1);
String value = properties.getProperty(key);
String[] weightAndUrl = value.split("&");
String url = weightAndUrl[0];
String weightValue = weightAndUrl[1];
ServiceInstance serviceInstance = new ServiceInstance();
String[] ipAndPort = url.split(":");
serviceInstance.setIp(ipAndPort[0]);
serviceInstance.setPort(Integer.parseInt(ipAndPort[1]));
serviceInstance.setWeight(Integer.valueOf(weightValue));
serviceInstances.add(serviceInstance);
}
}
public static List<ServiceInstance> getWeightConfigs() {
return serviceInstances;
}
}
5.3、负载均衡策略设计
这里我们需要实现六种负载均衡算法,我们抽取这六种算法的共性,设计一个顶层的接口,然后根据不同的负载均衡算法的特点来实现,也就是我们常见的策略模式。
同时我们应该可以想到,不管是哪一种算法,都需要先初始化,再调用获取服务的方法,所以这里我们可以同时引入一个抽象类,作为模板。
最后我们可以设计一个工厂,对外屏蔽掉创建实例的过程,外部仅需要传入特定的参数 就能返回对应的负载均衡的实现对象。整个设计结合了 策略模式、模板方法设计模式以及工厂模式,类图如下
5.4、编码落地
5.4.1、顶层接口设计
顶层接口我们主要定义两个查询方法,其中一个是空参,一个是带参数,代码如下
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.List;
/**
* @Description 负载均衡策略接口
* @Author wcan
* @Date 2025/2/21 下午 16:38
* @Version 1.0
*/
public interface LoadBalanceStrategy {
public ServiceInstance select(List<ServiceInstance> instances);
public ServiceInstance select();
}
5.4.2、抽象模板设计
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import org.wcan.lb.weight.LoadWeightConfig;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @ClassName AbstractLoadBalance
* @Description TODO
* @Author wcan
* @Date 2025/3/19 下午 22:47
* @Version 1.0
*/
public abstract class AbstractLoadBalance implements LoadBalanceStrategy {
List<ServiceInstance> serviceInstances = null;
private final AtomicBoolean loadFlag = new AtomicBoolean(false);
public void init() {
if (null == serviceInstances) {
serviceInstances = LoadWeightConfig.getWeightConfigs();
}
}
public abstract ServiceInstance select(List<ServiceInstance> instances);
/**
* @Description 默认情况下 所有的负载均衡策略都是需要先初始化服务列表,
* @Param []
* @return org.wcan.lb.ServiceInstance
* @Date 2025/3/21 下午 22:45
* @Author wcan
* @Version 1.0
*/
public ServiceInstance select() {
if (loadFlag.compareAndSet(false, true)) {
init();
}
return select(serviceInstances);
}
public ServiceInstance select(String key) {
return null;
}
}
5.4.3、负载均衡实现
以下分别是6种负载均衡策略的代码实现
5.4.3.1、随机负载均衡
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @ClassName RandomStrategy
* @Description 随机策略实现
* @Author wcan
* @Date 2025/2/21 下午 16:41
* @Version 1.0
*/
public class RandomLoadBalancer extends AbstractLoadBalance {
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty())
throw new IllegalArgumentException("No instances available");
int index = ThreadLocalRandom.current().nextInt(instances.size());
return instances.get(index);
}
}
5.4.3.2、轮询负载均衡
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName RoundRobinStrategy
* @Description 轮询策略实现
* @Author wcan
* @Date 2025/2/21 下午 16:40
* @Version 1.0
*/
public class RoundRobinLoadBalancer extends AbstractLoadBalance {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (instances.isEmpty())
throw new IllegalArgumentException("No instances available");
int index = counter.getAndIncrement() % instances.size();
return instances.get(index);
}
}
5.4.3.3、随机权重负载均衡
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* @ClassName WeightedRandomLoadBalancer
* @Description 随机权重 负载均衡策略
* @Author wcan
* @Date 2025/2/24 上午 0:16
* @Version 1.0
*/
public class WeightedRandomLoadBalancer extends AbstractLoadBalance {
private final Random random = new Random();
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
List<ServiceInstance> weightedServers = new ArrayList<>();
for (ServiceInstance instance : instances) {
int weight = instance.getWeight();
if (weight > 0) {
for (int i = 0; i < weight; i++) {
weightedServers.add(instance); // 按权重重复加入服务器
}
}
}
int index = random.nextInt(weightedServers.size());
return weightedServers.get(index);
}
}
5.4.3.4、权重轮询负载均衡
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class WeightedRoundRobinLoadBalancer extends AbstractLoadBalance {
private AtomicInteger index = new AtomicInteger(0);
private volatile Integer totleCount = 0;
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
totleCount = instances.stream().mapToInt(ServiceInstance::getWeight).sum();
int andIncrement = index.getAndIncrement();
int select = andIncrement % totleCount;
for (ServiceInstance instance : instances) {
int weight = instance.getWeight();
if (select < weight)
return instance;
else {
select -= weight;
}
}
return null;
}
}
5.4.3.5、最小连接数
本案例 仅实现了基本的获取服务的逻辑过程,断开连接计数 需要根据具体的情况去实现
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @ClassName LeastConnectionsLoadBalancer
* @Description 最小连接数负载均衡算法
* @Author wcan
* @Date 2025/3/10 下午 12:37
* @Version 1.0
*/
public class LeastConnectionsLoadBalancer extends AbstractLoadBalance{
// 服务器列表(线程安全)
private final List<ServiceInstance> servers = new CopyOnWriteArrayList<>();
private final AtomicBoolean loadFlag = new AtomicBoolean(false);
/**
* 选择目标服务器(线程安全)
* @return 服务器名称
*/
public ServiceInstance selectServer() {
if (servers.isEmpty()) {
throw new IllegalStateException("No servers available");
}
// 遍历找出当前连接数/权重最小的服务器
ServiceInstance selected = null;
double minRatio = Double.MAX_VALUE;
for (ServiceInstance server : servers) {
double ratio = server.getActiveConnections() / (double) server.getWeight();
if (ratio < minRatio) {
minRatio = ratio;
selected = server;
}
}
// 增加该服务器的连接数
selected.incrementConnections();
return selected;
}
/**
* 释放服务器连接(请求完成时调用)
* @param serverName 服务器名称
*/
public void releaseConnection(String serverName) {
for (ServiceInstance server : servers) {
if (server.getName().equals(serverName)) {
server.decrementConnections();
break;
}
}
}
@Override
public void init() {
super.init();
servers.addAll(serviceInstances);
}
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
if (loadFlag.compareAndSet(false, true)) {
init();
}
return selectServer();
}
}
5.4.3.6、一致性哈希
一致性哈希负载均衡和前面5种有点不同,他需要通过一个入参来计算哈希值,这个入参可以是客户端主机ip 或者是 某个请求参数。
package org.wcan.lb.strategy;
import org.wcan.lb.ServiceInstance;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @Description
* @Author wcan
* @Date 2025/2/27 下午 12:40
* @Version 1.0
*/
public class ConsistentHashingLoadBalancer extends AbstractLoadBalance {
// 哈希环(使用 ConcurrentSkipListMap 实现有序的环结构)
private final ConcurrentSkipListMap<Long, ServiceInstance> virtualNodes = new ConcurrentSkipListMap<>();
private final AtomicBoolean loadFlag = new AtomicBoolean(false);
// 每个物理节点对应的虚拟节点数量
private static final int VIRTUAL_NODE_COUNT = 100;
@Override
public ServiceInstance select(List<ServiceInstance> instances) {
return null;
}
@Override
public ServiceInstance select(String key) {
this.init();
return getNode(key);
}
@Override
public void init( ) {
super.init();
if (serviceInstances == null || serviceInstances.isEmpty()) {
throw new IllegalArgumentException("Instances list cannot be null or empty");
}
if (loadFlag.compareAndSet(false, true)) {
// 循环添加到哈希环
serviceInstances.forEach(this::addNode);
}
}
/**
* 添加物理节点到哈希环
* @param node 物理节点
*/
public void addNode(ServiceInstance node) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
// 生成虚拟节点名称(如 "Server-A#VN1")
String virtualNodeName = node.getIp() + "#VN" + i;
long hash = getHash(virtualNodeName);
virtualNodes.put(hash, node);
}
}
/**
* 移除物理节点
* @param node 物理节点名称
*/
public void removeNode(String node) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
String virtualNodeName = node + "#VN" + i;
long hash = getHash(virtualNodeName);
virtualNodes.remove(hash);
}
}
/**
* 根据请求Key获取目标节点
* @param requestKey 请求的Key
* @return 目标节点
*/
public ServiceInstance getNode(String requestKey) {
if (virtualNodes.isEmpty()) {
return null;
}
long hash = getHash(requestKey);
// 查找第一个大于等于该哈希值的节点
Map.Entry<Long, ServiceInstance> entry = virtualNodes.ceilingEntry(hash);
if (entry == null) {
// 如果没找到,则回到环的起点
entry = virtualNodes.firstEntry();
}
return entry.getValue();
}
/**
* 使用MD5哈希算法(更均匀的分布)
* @param key 输入字符串
* @return 哈希值
*/
private long getHash(String key) {
try {
MessageDigest md5 = MessageDigest.getInstance("MD5");
byte[] digest = md5.digest(key.getBytes());
// 取前8字节转换为long型
return bytesToLong(digest);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}
/**
* 将字节数组转换为long
*/
private static long bytesToLong(byte[] bytes) {
long result = 0;
for (int i = 0; i < 8; i++) {
result <<= 8;
result |= (bytes[i] & 0xFF);
}
return result;
}
}
5.5.4、抽象工厂实现
到这里核心的代码已经编写完成了,我们的负载均衡组件已经具备了6种负载均衡策略了,下面我们来设计一个工厂,用来对外提供获取服务实例的方法。
首先最简洁明了的写法就是直接在工厂里面 new 对象就好了,相关代码如下
package org.wcan.lb;
import org.wcan.lb.strategy.*;
public class StrategyFactory {
public static AbstractLoadBalance createStrategy(String strategyType) {
switch (strategyType.toLowerCase()) {
case "1":
return new RoundRobinLoadBalancer();
case "2":
return new RandomLoadBalancer();
case "3":
return new WeightedRandomLoadBalancer();
case "4":
return new WeightedRoundRobinLoadBalancer();
case "5":
return new LeastConnectionsLoadBalancer();
case "6":
return new ConsistentHashingLoadBalancer();
default:
throw new IllegalArgumentException("Unsupported strategy type");
}
}
}
但是怎么说呢,这个工厂简单的有些离谱了,以至于让一个新手也能看懂,作为一个优秀的高级开发,代码轻易的让别人看懂了 是一件耻辱的事,好吧,下面我们改造成更 “高级” 的写法
首先定义一个函数式接口
package org.wcan.lb;
import org.wcan.lb.strategy.AbstractLoadBalance;
/**
* @Description
* @Author wcan
* @Date 2025/3/21 下午 21:58
* @Version 1.0
*/
@FunctionalInterface
public interface AbstractLoadBalancerFactory {
AbstractLoadBalance getLoadBalancer();
}
接着就是工厂的实现
package org.wcan.lb;
import org.wcan.lb.strategy.*;
import java.util.HashMap;
import java.util.Map;
public class LoadBalancerFactory {
private static final Map<String, AbstractLoadBalancerFactory> strategyMap = new HashMap<>();
static {
strategyMap.put("1", RoundRobinLoadBalancer::new);
strategyMap.put("2", RandomLoadBalancer::new);
strategyMap.put("3", WeightedRandomLoadBalancer::new);
strategyMap.put("4", WeightedRoundRobinLoadBalancer::new);
strategyMap.put("5", LeastConnectionsLoadBalancer::new);
strategyMap.put("6", ConsistentHashingLoadBalancer::new);
}
public static AbstractLoadBalance createStrategy(String strategyType) {
AbstractLoadBalancerFactory abstractLoadBalancerFactory = strategyMap.get(strategyType.toLowerCase());
if (abstractLoadBalancerFactory != null) {
return abstractLoadBalancerFactory.getLoadBalancer();
} else {
throw new IllegalArgumentException("Unsupported strategy type");
}
}
}
好吧,为了方便后续的集成,我们再来一个对外暴露的静态方法的工具类,后续在使用的时候直接使用这个工具类就行了。
package org.wcan.lb;
import org.wcan.lb.strategy.AbstractLoadBalance;
public class LoadBalancer {
/**
* @Description 根据负载均衡策略 获取对应的负载均衡组件
* @Param [strategyType]
* @return org.wcan.lb.strategy.AbstractLoadBalance
* @Date 2025/3/22 下午 13:48
* @Author wcan
* @Version 1.0
*/
public static AbstractLoadBalance getAbstractLoadBalance(String strategyType){
return LoadBalancerFactory.createStrategy(strategyType);
}
}
5.5、功能测试
前面的功能已经设计完成了,下面我们就可以编写单元测试进行测试了。
import org.junit.Test;
import org.wcan.lb.LoadBalancer;
import org.wcan.lb.ServiceInstance;
import org.wcan.lb.LoadBalancerFactory;
import org.wcan.lb.strategy.AbstractLoadBalance;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Author wcan
* @Date 2025/3/19 下午 23:05
* @Version 1.0
*/
public class MainTest {
@Test
public void loadBalanceTest() throws InterruptedException {
// testForLoadBalancerFactory();
// testForLoadBalanceFactory();
testForLoadBalancerFactoryHashKey();
}
/**
* @Description LoadBalancer.getAbstractLoadBalance() 直接指定策略 获取服务实例
* @Param []
* @return void
* @Date 2025/3/22 下午 14:04
* @Author wcan
* @Version 1.0
*/
public void testForLoadBalancerFactoryHashKey() throws InterruptedException {
AbstractLoadBalance abstractLoadBalance = LoadBalancer.getAbstractLoadBalance("6");
int i = 0;
while (true){
TimeUnit.SECONDS.sleep(1);
ServiceInstance select = abstractLoadBalance.select(String.valueOf(i%6));
System.out.println("第 "+i++ +" 次请求: "+select);
}
}
/**
* @Description LoadBalancer.getAbstractLoadBalance() 直接指定策略 获取服务实例
* @Param []
* @return void
* @Date 2025/3/22 下午 14:04
* @Author wcan
* @Version 1.0
*/
public void testForLoadBalancerFactory() throws InterruptedException {
AbstractLoadBalance abstractLoadBalance = LoadBalancer.getAbstractLoadBalance("1");
int i = 0;
while (true){
TimeUnit.SECONDS.sleep(1);
ServiceInstance select = abstractLoadBalance.select();
System.out.println("第 "+i++ +" 次请求: "+select);
}
}
public void testForLoadBalanceFactory() throws InterruptedException {
AbstractLoadBalance strategy = LoadBalancerFactory.createStrategy("1");
int i = 0;
while (true){
TimeUnit.SECONDS.sleep(1);
ServiceInstance select = strategy.select();
System.out.println("第 "+i++ +" 次请求: "+select);
}
}
/**
* @Description 一致性哈希算法 需要传入一个入参 来计算哈希值
* @Param []
* @return void
* @Date 2025/3/22 下午 14:02
* @Author wcan
* @Version 1.0
*/
public void testForLoadBalanceFactoryWithKey() throws InterruptedException {
AbstractLoadBalance strategy = LoadBalancerFactory.createStrategy("6");
int i = 0;
while (true){
TimeUnit.SECONDS.sleep(1);
ServiceInstance select = strategy.select(String.valueOf(i%6));
System.out.println("第 "+i++ +" 次请求: "+select);
}
}
}
6、总结
本篇文章给大家介绍了负载均衡常见的六种策略,并且给出了实现思路和落地的代码,在分布式系统设计中,除了远程通信、序列化之外 负载均衡也是一个重要的核心的组件。到现在我们已经自研了远程通信组件、序列化组件和负载均衡组件,后面我们将他们集成在一起,将我们的 cheese彻底打造成一款可投产的 分布式服务框架。