流量分配的艺术: 如何设计一款负载均衡组件

发布于:2025-03-23 ⋅ 阅读:(22) ⋅ 点赞:(0)

1、概述

       前面我们设计了一款分布式服务框架,完成了服务注册、服务发现、远程调用的基本功能,但是作为一款分布式服务框架他还需要一些高级特性,比如负载均衡。所以今天我们来分析常用的负载均衡策略,并设计一款通用的负载均衡组件,可以集成到 Cheese 中,让我们的 Cheese 具备负载均衡的能力。 

2、细说负载均衡

        首先我们都知道负载均衡是一种集群环境中策略,它可以按照某种算法将网络流量(计算任务)动态的分配到集群里面的各个节点,从而提高系统的可用性稳定性。

        比如一个集群环境中查询用户信息的服务有3个节点,当客户端并行发起10次远程调用,假如请求都打到了一个客户端上,你觉得合理吗。就像某些领导分配任务的时候  明明有四个干活的小伙伴,但是把任务全部分配给你了,就是逮着你使劲薅,就问你心态炸不炸。

        作为牛马,超负荷运行,你可以抗,可以内卷,甚至可以用自己的健康交换。但是服务器可不惯着领导,超负荷运行的话他直接不干了(宕机)。所以这里面需要一种流量合理分配的机制,也就是负载均衡。

        同时也希望大家理论结合实际,去好好理解一下 负载均衡,拒绝内卷,拒绝超负荷运行,为自己身心健康争取一种合理的负载均衡策略

3、常用的负载均衡算法

        聊完了概念,我们接下来就聊一下实现方案,常用的方案有 随机策略、轮询策略、随机权重策略、权重轮询策略、一致性哈希策略以及最小连接数策略。接下来我们分别介绍他们的核心逻辑

3.1、随机负载均衡

        随机负载均衡是一种比较简单的负载均衡方式,也就是从服务列表中随机选择一个节点,典型的实现方案就是生成一个范围内的随机数。相关代码如下

  public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty())
            throw new IllegalArgumentException("No instances available");
        int index = ThreadLocalRandom.current().nextInt(instances.size());
        return instances.get(index);
    }

3.2、轮询负载均衡

        轮询负载均衡的核心逻辑就是服务列表里面的服务轮流来,通俗的解释就是一人一次轮着来,相关代码也比较简单,需要注意的是要保证线程安全

public class RoundRobinLoadBalancer {
    private final AtomicInteger counter = new AtomicInteger(0);
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty())
            throw new IllegalArgumentException("No instances available");
        int index = counter.getAndIncrement() % instances.size();
        return instances.get(index);
    }
}

3.3、随机权重负载均衡

        随机权重的意思其实也是随机选择,但是增加了权重的逻辑,使得权重高的服务被选中的概率要大一些,要实现这种算法,也很简单,我们可以根据权重在服务列表中添加多个重复的节点,举个例子:比如有四个服务节点,Server1、Server2、Server3 和 Server4 他们的权重分别是1、2、3、4。这个时候总的权重就是 10  我们可以新建一个数组,往里面添加1个Server1、2个Server2、3个Server3 和4个Server4。然后随机从这个数组中选择一个服务节点。

相关代码是实现也比较简单,如下所示 

public class WeightedRandomLoadBalancer {

    private final Random random = new Random();
    private volatile ConcurrentHashMap<String, Integer> weightMap;

    public WeightedRandomLoadBalancer() {
        //初始化 操作
        initWeightConfigs();
    }
    
    private synchronized void initWeightConfigs() {
        if (weightMap == null) {
            List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
            weightMap = new ConcurrentHashMap<>();
            for (WeightConfig weightConfig : weightConfigs) {
                String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
                weightMap.put(key, weightConfig.getWeight());
            }
        }
    }
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (weightMap == null) {
            initWeightConfigs();
        }
        List<ServiceInstance> weightedServers = new ArrayList<>();
        for (ServiceInstance instance : instances) {
            String key = instance.getIp() + ":" + instance.getPort();
            Integer weight = weightMap.get(key);
            if (weight != null) {
                for (int i = 0; i < weight; i++) {
                    weightedServers.add(instance); // 按权重重复加入服务器
                }
            }
        }
        int index = random.nextInt(weightedServers.size());
        return weightedServers.get(index);
    }
}

3.4、权重轮询负载均衡

        权重轮询的设计思想和权重随机其实是一样的,我们依然可以通过循环,按权重重复添加对应的服务,但是我们可以换个思路,比如设置一个区间。

        还是以四个服务节点,权重分别是1、2、3、4为例,我们将请求数抽象成一个横向的坐标,当前请求数(第几次)如果 落在了 [0,1) 区间,就交给 Server1 处理,如果落在了 [1,3) 就交给 Server2 处理,同理 [3,6) 对应 Server3 、[6,10) 对应 Server4。思路设计好了 ,代码也不难

public class WeightedRoundRobinLoadBalancer {

    private AtomicInteger index = new AtomicInteger(0);

    private volatile Integer totleCount = 0;
    private volatile ConcurrentHashMap<String, Integer> weightMap;

    public WeightedRoundRobinLoadBalancer() {
        //初始化 操作
        initWeightConfigs();
    }


    private synchronized void initWeightConfigs() {
        if (weightMap == null) {
            List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
            weightMap = new ConcurrentHashMap<>();
            for (WeightConfig weightConfig : weightConfigs) {
                String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
                weightMap.put(key, weightConfig.getWeight());
            }
        }
    }

    public ServiceInstance select(List<ServiceInstance> instances) {

        if (weightMap == null) {
            initWeightConfigs();
        } 
        for (ServiceInstance instance : instances) {
            String key = instance.getIp() + ":" + instance.getPort();
            Integer weight = weightMap.get(key);
            totleCount += weight;
        }

        int andIncrement = index.getAndIncrement();
        int select = andIncrement % totleCount;
        for (ServiceInstance instance : instances) {
            String key = instance.getIp() + ":" + instance.getPort();
            Integer weight = weightMap.get(key);
            if (select < weight)
                return instance;
            else {
                select -= weight;
            }
        }
        return null;
    }
}

3.5、一致性哈希负载均衡

        一致性哈希负载均衡算法就稍微复杂一下,本质上根据请求中的某个元素(可以是ip 也可以是请求的数据) j计算出一个数值,然后按顺时针方向,选择距离这个数值最近的一个节点。 这里你需要知道三个核心概念,哈希算法、哈希环以及虚拟节点

3.5.1、哈希算法

        首先来说哈希算法,目前比较常见的哈希算法有 MD5、SHA-256、BLAKE2等等,哈希算法的作用主要是应用于数据存储、数据验证、密码学等领域,本质上是和负载均衡无关,但是我们这里使用到了。

3.5.2、哈希环

        哈希环,故名思意。就是将所有的哈希值串成一个环,怎么理解呢。还是假设有4个服务节点,Server1、Server2、Server3、Server4。如果我们使用 MD5 算法生成一个字节序列,取前8位字节转换成Long类型的值作为哈希值,假设他们的值分别是 150、400、600、850。 这个时候的哈希环就是这样的

        我们假设整体的区间是 [0,1000) 。 上图中描述了 四个节点大概所在的位置,这个时候 很容易看的出来 Server1 负责的区间是 [850,1000) U [0,150)  server2 负责的区间是 [150,400) 同样的 Server 3和 Server4 分别对应 [400,600)  和 [600,850)。这个时候很显然不是均匀的分布,所以需要更进一步的细分粒度。这个时候就要引入虚拟节点。

3.5.3、虚拟节点

        前面我们的四个节点在Hash环上分布的不均匀,也就是存在倾斜的现象。这个时候我们就需要引入虚拟节点来解决这个问题了,假设每个服务节点对应两个虚拟节点,如下所示

我们再按照这8个虚拟节点的哈希值将他们放到环上,效果是这样的

看起来分布的还是不均匀,但是比上一次的是不是要更加分散一些。假设我们每个服务节点引入100个虚拟节点,那么这些节点肯定就会分布的更加均匀,所以虚拟节点主要就是起到了一个稀释的作用。

3.5.4、算法设计 

        相信你对哈希算法和哈希环已经有了一定的认知了,接下来我们回到算法设计上,我们要实现的需求就是 当一个请求过来的时候我们通过 该请求的ip和端口 计算出一个数值,然后在哈希环上寻找这个值对应的位置,最后按顺时针方法选择一个虚拟节点,然后将请求交给这个虚拟节点对应的物理节点处理。整个过程分为以下四步

1、这里我们使用主机的ip和端口来计算哈希值,哈希算法选择MD5 。

2、每个节点引入100个虚拟节点。

3、当请求来了的时候 根据ip和端口计算 哈希值,找到环上最近的虚拟节点

4、通过虚拟节点找到对应的物理节点

3.5.5、编码落地

        根据上面分析的思路,以下是相关代码的实现

/**
 * @Description
 * @Author wcan
 * @Date 2025/2/27 下午 12:40
 * @Version 1.0
 */
public class ConsistentHashingLoadBalancer extends AbstractLoadBalance {

    // 哈希环(使用 ConcurrentSkipListMap 实现有序的环结构)
    private final ConcurrentSkipListMap<Long, ServiceInstance> virtualNodes = new ConcurrentSkipListMap<>();

    private final AtomicBoolean loadFlag = new AtomicBoolean(false);

    // 每个物理节点对应的虚拟节点数量
    private static final int VIRTUAL_NODE_COUNT = 100;

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        this.init(instances);
        return null;
    }

    public ServiceInstance select(String key) {
        return getNode(key);
    }

    @Override
    public void init(List<ServiceInstance> instances) {
        if (instances == null || instances.isEmpty()) {
            throw new IllegalArgumentException("Instances list cannot be null or empty");
        }
        if (loadFlag.compareAndSet(false, true)) {
            // 循环添加到哈希环
            instances.forEach(this::addNode);
        }
    }

    /**
     * 添加物理节点到哈希环
     * @param node 物理节点
     */
    public void addNode(ServiceInstance node) {
        for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
            // 生成虚拟节点名称(如 "Server-A#VN1")
            String virtualNodeName = node.getIp() + "#VN" + i;
            long hash = getHash(virtualNodeName);
            virtualNodes.put(hash, node);
        }
    }

    /**
     * 移除物理节点
     * @param node 物理节点名称
     */
    public void removeNode(String node) {
        for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
            String virtualNodeName = node + "#VN" + i;
            long hash = getHash(virtualNodeName);
            virtualNodes.remove(hash);
        }
    }

    /**
     * 根据请求Key获取目标节点
     * @param requestKey 请求的Key
     * @return 目标节点
     */
    public ServiceInstance getNode(String requestKey) {
        if (virtualNodes.isEmpty()) {
            return null;
        }
        long hash = getHash(requestKey);
        // 查找第一个大于等于该哈希值的节点
        Map.Entry<Long, ServiceInstance> entry = virtualNodes.ceilingEntry(hash);
        if (entry == null) {
            // 如果没找到,则回到环的起点
            entry = virtualNodes.firstEntry();
        }
        return entry.getValue();
    }

    /**
     * 使用MD5哈希算法(更均匀的分布)
     * @param key 输入字符串
     * @return 哈希值
     */
    private long getHash(String key) {
        try {
            MessageDigest md5 = MessageDigest.getInstance("MD5");
            byte[] digest = md5.digest(key.getBytes());
            // 取前8字节转换为long型
            return bytesToLong(digest);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 algorithm not found", e);
        }
    }

    /**
     * 将字节数组转换为long
     */
    private static long bytesToLong(byte[] bytes) {
        long result = 0;
        for (int i = 0; i < 8; i++) {
            result <<= 8;
            result |= (bytes[i] & 0xFF);
        }
        return result;
    }
}

3.6、最小连接数负载均衡

        接着我们来看看最小连接数负载均衡,从字面意思上就可以理解这种算法的核心 就是让负载最轻的节点处理请求,这样可以充分的利用集群环境整体的软硬件资源。

3.6.1、理论分析

        初步一想,你可能觉得要实现这个算法不难,无非就是选择集群环境中连接数最少的一个节点就行了。但是我们需要考虑怎么去统计连接数呢,负载均衡组件很容易知道一次请求的开始,但是要怎么知道该请求结束了呢,而且还需要注意不能和应用耦合。这些问题 仔细想想 还是很复杂的。下面我整理了一张逻辑结果图。

        还需要注意一种极端情况,假设某时刻 Server1 和 Server2 连接数都是 1,Server3和 Server4 的连接数是2和3。根据最小连接数的定义 我们应该选择连接数最小的,那这个时候我们是选择 Server1 呢还是 Server2呢,这里我们可以随机选一个,但是更好的做法是引入权重,假设Server1 机器的硬件配置更好点,那就可以设置权重 2:1。当出现相同数值的时候优先 Server1。

 3.6.2、落地实践

      下面我们想想怎么设计一个 最小连接数的负载均衡组件,这里我的思路是首先 负载均衡器需要维护一个数据结构,存放的是所有主机的连接数。当一次请求来了的时候 就需要在对应的主机增加一个连接数,当请求结束后,再减掉一个连接数。

/**
 * @ClassName LeastConnectionsLoadBalancer
 * @Description 最小连接数负载均衡算法
 * @Author wcan
 * @Date 2025/3/10 下午 12:37
 * @Version 1.0
 */
public class LeastConnectionsLoadBalancer extends AbstractLoadBalance{

    // 服务器列表(线程安全)
    private final List<ServiceInstance> servers = new CopyOnWriteArrayList<>();

    private volatile ConcurrentHashMap<String, Integer> weightMap;

    private final AtomicBoolean loadFlag = new AtomicBoolean(false);

    private synchronized void initWeightConfigs() {
        if (weightMap == null) {
            List<WeightConfig> weightConfigs = LoadWeightConfig.getWeightConfigs();
            weightMap = new ConcurrentHashMap<>();
            for (WeightConfig weightConfig : weightConfigs) {
                String key = weightConfig.getInstance().getIp() + ":" + weightConfig.getInstance().getPort();
                weightMap.put(key, weightConfig.getWeight());
            }
        }
    }

    /**
     * 添加服务器
     * @param name 服务器名称
     * @param weight 权重(默认为1)
     */
    public void addServer(String name, int weight) {
        servers.add(new ServiceInstance(weight,name));
    }


    /**
     * 选择目标服务器(线程安全)
     * @return 服务器名称
     */
    public ServiceInstance selectServer() {
        if (servers.isEmpty()) {
            throw new IllegalStateException("No servers available");
        }

        // 遍历找出当前连接数/权重最小的服务器
        ServiceInstance selected = null;
        double minRatio = Double.MAX_VALUE;

        for (ServiceInstance server : servers) {
            double ratio = server.getActiveConnections() / (double) server.getWeight();
            if (ratio < minRatio) {
                minRatio = ratio;
                selected = server;
            }
        }

        // 增加该服务器的连接数
        selected.incrementConnections();
        return selected;
    }

    /**
     * 释放服务器连接(请求完成时调用)
     * @param serverName 服务器名称
     */
    public void releaseConnection(String serverName) {
        for (ServiceInstance server : servers) {
            if (server.getName().equals(serverName)) {
                server.decrementConnections();
                break;
            }
        }
    }
    @Override
    public void init(List<ServiceInstance> instances) {
        instances.forEach(instance -> {
            String name = instance.getName();
            String key = instance.getIp() + ":" + instance.getPort();
            if (weightMap != null) {
                Integer weight = weightMap.get(key);
                if (weight != null) {
                    addServer(name, weight);
                }
            }
        });
    }

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (loadFlag.compareAndSet(false, true)) {
            init(instances);
        }
        return selectServer();
    }
}

4、最佳技术选项

        我们已经学习了6种常见的负载均衡策略,并且基于这六种策略分别实现了相关的功能,接下来我们就来综合比对一下这6种负载均衡算法的优缺点

4.1、负载均衡策略对比分析

策略 核心原理 优点 缺点 典型使用场景
随机策略 完全随机分配请求 ✅ 实现简单
✅ 无状态,性能高
❌ 无法感知服务器状态
❌ 负载可能不均衡
测试环境、服务器性能完全相同的简单场景
轮询策略 按顺序依次分配请求 ✅ 绝对公平分配
✅ 实现简单
❌ 无视服务器性能差异
❌ 无法应对突发流量
短连接服务(HTTP API)、服务器配置完全一致的环境
随机权重策略 按权重概率随机分配请求 ✅ 适配异构服务器
✅ 比纯随机更均衡
❌ 瞬时负载可能倾斜
❌ 无法动态调整权重
混合云环境(部分物理机+虚拟机)
权重轮询策略 按权重比例轮询分配请求 ✅ 精确控制流量比例
✅ 长期负载均衡
❌ 无法应对短时波动
❌ 需要预设静态权重
CDN节点分配、数据库读写分离
一致性哈希策略 基于哈希环定位目标节点 ✅ 动态扩缩容影响小
✅ 支持会话保持
❌ 实现复杂
❌ 需虚拟节点防倾斜
分布式缓存(Redis集群)、文件存储分片
最小连接数策略 优先选择当前连接数最少的服务器 ✅ 动态感知负载
✅ 适配长连接场景
❌ 需维护连接状态
❌ 短连接场景效果有限
WebSocket服务、数据库连接池、视频流媒体服务器

4.2、最佳技术选型

        从上图中我们可以得出各种算法的优缺点,并且我还列出来典型的应用场景。根据上面的信息我们会发现没有完美的方案,他们都有缺点,那我们该如何选型呢。

       还是那句话,没有根据自己的业务系统,权衡利弊,只要保证利大于弊,并且要尽可能的多“大”一些,这就是最佳的技术选型。这里我根据 是否需要会话保持、服务器的性能、流量波动以及请求类型 总结出了一张图,你可以针对性的参考

        最后举个例子,在我们的分布式系统中 对外暴露的微服务网关我们可以采用 一致性哈希 + 最小连接数的负载均衡策略,内部业务系统的微服务 我们可以使用 权重轮询的策略  在配合健康检查机制。

5、负载均衡组件设计

        我们将上述的实现设计成一款通用的负载均衡组件,后面集成到 Cheese 中。

5.1、服务实例对象设计

        为了方便后面的算法处理,我们需要将每个服务节点定义成一个对象,这个对象的属性有 IP、端口、当前连接数、权重配比等等,相关代码如下

package org.wcan.lb;

import java.util.concurrent.atomic.AtomicInteger;

public class ServiceInstance {
    private String ip;
    private int port;
    private int weight;
    private String name ;
    private final AtomicInteger activeConnections = new AtomicInteger(0);

    public ServiceInstance(int weight, String name) {
        this.weight = weight;
        this.name = name;
    }

    public ServiceInstance(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public ServiceInstance() {

    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getWeight() {
        return weight;
    }

    public void setWeight(int weight) {
        this.weight = weight;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUrl() {
        return "http://" + ip + ":" + port;
    }


    public void incrementConnections() {
        activeConnections.incrementAndGet();
    }

    public void decrementConnections() {
        activeConnections.decrementAndGet();
    }

    // Getters
    public int getActiveConnections() {
        return activeConnections.get();
    }


    @Override
    public String toString() {
        return "ServiceInstance{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                ", currentConnections=" + activeConnections.get() +
                '}';
    }
}

5.2、权重加载设计

       我们在服务实例对象中设计了一个权重的属性,主要的作用是来封装服务的权重值,通常情况下我们将权重通过配置化的形式集成到项目中,因此除了一个实例对象之外我们需要设计一个加载配置的功能,这里先以读取配置文件为例。相关代码如下

package org.wcan.lb.weight;

import org.wcan.lb.ServiceInstance;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @Description 加载权重配置
 *   权重配置文件格式
 *
 * @Author wcan
 * @Date 2025/2/21 下午 17:39
 * @Version 1.0
 */
public class LoadWeightConfig {
    private static List<ServiceInstance> serviceInstances;

    static {
        Properties properties = new Properties();
        try (InputStream input = LoadWeightConfig.class.getClassLoader().getResourceAsStream("weight.properties")) {
            if (input == null) {
                throw  new RuntimeException("Sorry, unable to find weight.properties");
            }
            properties.load(input);
        } catch (Exception e) {
            throw  new RuntimeException(e.getMessage());
        }
        int size = properties.size();
        serviceInstances = new ArrayList<ServiceInstance>(size);
        for (int i = 0; i < size; i++) {
            String key = "serverInfo" + (i + 1);
            String value = properties.getProperty(key);
            String[] weightAndUrl = value.split("&");
            String url = weightAndUrl[0];
            String weightValue = weightAndUrl[1];
            ServiceInstance serviceInstance = new ServiceInstance();
            String[] ipAndPort = url.split(":");
            serviceInstance.setIp(ipAndPort[0]);
            serviceInstance.setPort(Integer.parseInt(ipAndPort[1]));
            serviceInstance.setWeight(Integer.valueOf(weightValue));
            serviceInstances.add(serviceInstance);
        }
    }

    public static List<ServiceInstance> getWeightConfigs() {
        return serviceInstances;
    }
}

5.3、负载均衡策略设计

        这里我们需要实现六种负载均衡算法,我们抽取这六种算法的共性,设计一个顶层的接口,然后根据不同的负载均衡算法的特点来实现,也就是我们常见的策略模式。

        同时我们应该可以想到,不管是哪一种算法,都需要先初始化,再调用获取服务的方法,所以这里我们可以同时引入一个抽象类,作为模板。

        最后我们可以设计一个工厂,对外屏蔽掉创建实例的过程,外部仅需要传入特定的参数 就能返回对应的负载均衡的实现对象。整个设计结合了  策略模式、模板方法设计模式以及工厂模式,类图如下

5.4、编码落地

5.4.1、顶层接口设计

        顶层接口我们主要定义两个查询方法,其中一个是空参,一个是带参数,代码如下

package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.util.List;

/**
 * @Description 负载均衡策略接口
 * @Author wcan
 * @Date 2025/2/21 下午 16:38
 * @Version 1.0
 */
public interface LoadBalanceStrategy {
    public ServiceInstance select(List<ServiceInstance> instances);
    public ServiceInstance select();
}

5.4.2、抽象模板设计

package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;
import org.wcan.lb.weight.LoadWeightConfig;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @ClassName AbstractLoadBalance
 * @Description TODO
 * @Author wcan
 * @Date 2025/3/19 下午 22:47
 * @Version 1.0
 */
public abstract class AbstractLoadBalance implements LoadBalanceStrategy {
    List<ServiceInstance> serviceInstances = null;
    private final AtomicBoolean loadFlag = new AtomicBoolean(false);
    public void init() {
        if (null == serviceInstances) {
            serviceInstances = LoadWeightConfig.getWeightConfigs();
        }
    }
    public abstract ServiceInstance select(List<ServiceInstance> instances);
    /**
    * @Description 默认情况下 所有的负载均衡策略都是需要先初始化服务列表,
    * @Param []
    * @return org.wcan.lb.ServiceInstance
    * @Date 2025/3/21 下午 22:45
    * @Author wcan
    * @Version 1.0
    */
    public ServiceInstance select() {
        if (loadFlag.compareAndSet(false, true)) {
            init();
        }
        return select(serviceInstances);
    }

    public ServiceInstance select(String key) {
        return null;
    }

}

5.4.3、负载均衡实现

        以下分别是6种负载均衡策略的代码实现

5.4.3.1、随机负载均衡
package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @ClassName RandomStrategy
 * @Description 随机策略实现
 * @Author wcan
 * @Date 2025/2/21 下午 16:41
 * @Version 1.0
 */
public class RandomLoadBalancer extends AbstractLoadBalance {
    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty())
            throw new IllegalArgumentException("No instances available");
        int index = ThreadLocalRandom.current().nextInt(instances.size());
        return instances.get(index);
    }
}
5.4.3.2、轮询负载均衡
package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName RoundRobinStrategy
 * @Description 轮询策略实现
 * @Author wcan
 * @Date 2025/2/21 下午 16:40
 * @Version 1.0
 */
public class RoundRobinLoadBalancer extends AbstractLoadBalance {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (instances.isEmpty())
            throw new IllegalArgumentException("No instances available");
        int index = counter.getAndIncrement() % instances.size();
        return instances.get(index);
    }
}
5.4.3.3、随机权重负载均衡
package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ClassName WeightedRandomLoadBalancer
 * @Description 随机权重 负载均衡策略
 * @Author wcan
 * @Date 2025/2/24 上午 0:16
 * @Version 1.0
 */
public class WeightedRandomLoadBalancer extends AbstractLoadBalance {
    private final Random random = new Random();

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        List<ServiceInstance> weightedServers = new ArrayList<>();
        for (ServiceInstance instance : instances) {
            int weight = instance.getWeight();
            if (weight > 0) {
                for (int i = 0; i < weight; i++) {
                    weightedServers.add(instance); // 按权重重复加入服务器
                }
            }
        }
        int index = random.nextInt(weightedServers.size());
        return weightedServers.get(index);
    }
}
5.4.3.4、权重轮询负载均衡
package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class WeightedRoundRobinLoadBalancer extends AbstractLoadBalance {

    private AtomicInteger index = new AtomicInteger(0);

    private volatile Integer totleCount = 0;

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        totleCount = instances.stream().mapToInt(ServiceInstance::getWeight).sum();
        int andIncrement = index.getAndIncrement();
        int select = andIncrement % totleCount;
        for (ServiceInstance instance : instances) {
            int weight = instance.getWeight();
            if (select < weight)
                return instance;
            else {
                select -= weight;
            }
        }
        return null;
    }
}
5.4.3.5、最小连接数

        本案例 仅实现了基本的获取服务的逻辑过程,断开连接计数 需要根据具体的情况去实现

package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @ClassName LeastConnectionsLoadBalancer
 * @Description 最小连接数负载均衡算法
 * @Author wcan
 * @Date 2025/3/10 下午 12:37
 * @Version 1.0
 */
public class LeastConnectionsLoadBalancer extends AbstractLoadBalance{

    // 服务器列表(线程安全)
    private final List<ServiceInstance> servers = new CopyOnWriteArrayList<>();

    private final AtomicBoolean loadFlag = new AtomicBoolean(false);

    /**
     * 选择目标服务器(线程安全)
     * @return 服务器名称
     */
    public ServiceInstance selectServer() {
        if (servers.isEmpty()) {
            throw new IllegalStateException("No servers available");
        }

        // 遍历找出当前连接数/权重最小的服务器
        ServiceInstance selected = null;
        double minRatio = Double.MAX_VALUE;

        for (ServiceInstance server : servers) {
            double ratio = server.getActiveConnections() / (double) server.getWeight();
            if (ratio < minRatio) {
                minRatio = ratio;
                selected = server;
            }
        }

        // 增加该服务器的连接数
        selected.incrementConnections();
        return selected;
    }

    /**
     * 释放服务器连接(请求完成时调用)
     * @param serverName 服务器名称
     */
    public void releaseConnection(String serverName) {
        for (ServiceInstance server : servers) {
            if (server.getName().equals(serverName)) {
                server.decrementConnections();
                break;
            }
        }
    }
    @Override
    public void init() {
        super.init();
        servers.addAll(serviceInstances);
    }

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {
        if (loadFlag.compareAndSet(false, true)) {
            init();
        }
        return selectServer();
    }
}
5.4.3.6、一致性哈希

        一致性哈希负载均衡和前面5种有点不同,他需要通过一个入参来计算哈希值,这个入参可以是客户端主机ip 或者是 某个请求参数。

package org.wcan.lb.strategy;

import org.wcan.lb.ServiceInstance;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @Description
 * @Author wcan
 * @Date 2025/2/27 下午 12:40
 * @Version 1.0
 */
public class ConsistentHashingLoadBalancer extends AbstractLoadBalance {

    // 哈希环(使用 ConcurrentSkipListMap 实现有序的环结构)
    private final ConcurrentSkipListMap<Long, ServiceInstance> virtualNodes = new ConcurrentSkipListMap<>();

    private final AtomicBoolean loadFlag = new AtomicBoolean(false);

    // 每个物理节点对应的虚拟节点数量
    private static final int VIRTUAL_NODE_COUNT = 100;

    @Override
    public ServiceInstance select(List<ServiceInstance> instances) {

        return null;
    }
    @Override
    public ServiceInstance select(String key) {
        this.init();
        return getNode(key);
    }

    @Override
    public void init( ) {
        super.init();
        if (serviceInstances == null || serviceInstances.isEmpty()) {
            throw new IllegalArgumentException("Instances list cannot be null or empty");
        }
        if (loadFlag.compareAndSet(false, true)) {
            // 循环添加到哈希环
            serviceInstances.forEach(this::addNode);
        }
    }

    /**
     * 添加物理节点到哈希环
     * @param node 物理节点
     */
    public void addNode(ServiceInstance node) {
        for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
            // 生成虚拟节点名称(如 "Server-A#VN1")
            String virtualNodeName = node.getIp() + "#VN" + i;
            long hash = getHash(virtualNodeName);
            virtualNodes.put(hash, node);
        }
    }

    /**
     * 移除物理节点
     * @param node 物理节点名称
     */
    public void removeNode(String node) {
        for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
            String virtualNodeName = node + "#VN" + i;
            long hash = getHash(virtualNodeName);
            virtualNodes.remove(hash);
        }
    }

    /**
     * 根据请求Key获取目标节点
     * @param requestKey 请求的Key
     * @return 目标节点
     */
    public ServiceInstance getNode(String requestKey) {
        if (virtualNodes.isEmpty()) {
            return null;
        }
        long hash = getHash(requestKey);
        // 查找第一个大于等于该哈希值的节点
        Map.Entry<Long, ServiceInstance> entry = virtualNodes.ceilingEntry(hash);
        if (entry == null) {
            // 如果没找到,则回到环的起点
            entry = virtualNodes.firstEntry();
        }
        return entry.getValue();
    }

    /**
     * 使用MD5哈希算法(更均匀的分布)
     * @param key 输入字符串
     * @return 哈希值
     */
    private long getHash(String key) {
        try {
            MessageDigest md5 = MessageDigest.getInstance("MD5");
            byte[] digest = md5.digest(key.getBytes());
            // 取前8字节转换为long型
            return bytesToLong(digest);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 algorithm not found", e);
        }
    }

    /**
     * 将字节数组转换为long
     */
    private static long bytesToLong(byte[] bytes) {
        long result = 0;
        for (int i = 0; i < 8; i++) {
            result <<= 8;
            result |= (bytes[i] & 0xFF);
        }
        return result;
    }
}

5.5.4、抽象工厂实现

        到这里核心的代码已经编写完成了,我们的负载均衡组件已经具备了6种负载均衡策略了,下面我们来设计一个工厂,用来对外提供获取服务实例的方法。

        首先最简洁明了的写法就是直接在工厂里面 new 对象就好了,相关代码如下

package org.wcan.lb;

import org.wcan.lb.strategy.*;

public class StrategyFactory {
    public static AbstractLoadBalance createStrategy(String strategyType) {
        switch (strategyType.toLowerCase()) {
            case "1":
                return new RoundRobinLoadBalancer();
            case "2":
                return new RandomLoadBalancer();
            case "3":
                return new WeightedRandomLoadBalancer();
            case "4":
                return new WeightedRoundRobinLoadBalancer();
            case "5":
                return new LeastConnectionsLoadBalancer();
            case "6":
                return new ConsistentHashingLoadBalancer();
            default:
                throw new IllegalArgumentException("Unsupported strategy type");
        }
    }
}

        但是怎么说呢,这个工厂简单的有些离谱了,以至于让一个新手也能看懂,作为一个优秀的高级开发,代码轻易的让别人看懂了 是一件耻辱的事,好吧,下面我们改造成更 “高级” 的写法

        首先定义一个函数式接口

package org.wcan.lb;

import org.wcan.lb.strategy.AbstractLoadBalance;

/**
 * @Description
 * @Author wcan
 * @Date 2025/3/21 下午 21:58
 * @Version 1.0
 */
@FunctionalInterface
public interface AbstractLoadBalancerFactory {
    AbstractLoadBalance getLoadBalancer();
}

接着就是工厂的实现

package org.wcan.lb;

import org.wcan.lb.strategy.*;

import java.util.HashMap;
import java.util.Map;

public class LoadBalancerFactory {
        private static final Map<String, AbstractLoadBalancerFactory> strategyMap = new HashMap<>();

        static {
            strategyMap.put("1", RoundRobinLoadBalancer::new);
            strategyMap.put("2", RandomLoadBalancer::new);
            strategyMap.put("3", WeightedRandomLoadBalancer::new);
            strategyMap.put("4", WeightedRoundRobinLoadBalancer::new);
            strategyMap.put("5", LeastConnectionsLoadBalancer::new);
            strategyMap.put("6", ConsistentHashingLoadBalancer::new);
        }

        public static AbstractLoadBalance createStrategy(String strategyType) {
            AbstractLoadBalancerFactory abstractLoadBalancerFactory = strategyMap.get(strategyType.toLowerCase());
            if (abstractLoadBalancerFactory != null) {
                return abstractLoadBalancerFactory.getLoadBalancer();
            } else {
                throw new IllegalArgumentException("Unsupported strategy type");
            }
        }
}

         好吧,为了方便后续的集成,我们再来一个对外暴露的静态方法的工具类,后续在使用的时候直接使用这个工具类就行了。

package org.wcan.lb;

import org.wcan.lb.strategy.AbstractLoadBalance;

public class LoadBalancer {

    /**
     * @Description  根据负载均衡策略 获取对应的负载均衡组件
     * @Param [strategyType]
     * @return org.wcan.lb.strategy.AbstractLoadBalance
     * @Date 2025/3/22 下午 13:48
     * @Author wcan
     * @Version 1.0
     */
    public static AbstractLoadBalance getAbstractLoadBalance(String strategyType){
        return LoadBalancerFactory.createStrategy(strategyType);
    }
}

5.5、功能测试

前面的功能已经设计完成了,下面我们就可以编写单元测试进行测试了。

import org.junit.Test;
import org.wcan.lb.LoadBalancer;
import org.wcan.lb.ServiceInstance;
import org.wcan.lb.LoadBalancerFactory;
import org.wcan.lb.strategy.AbstractLoadBalance;

import java.util.concurrent.TimeUnit;

/**
 * @Description
 * @Author wcan
 * @Date 2025/3/19 下午 23:05
 * @Version 1.0
 */
public class MainTest {

    @Test
    public void loadBalanceTest() throws InterruptedException {
//        testForLoadBalancerFactory();
//        testForLoadBalanceFactory();
        testForLoadBalancerFactoryHashKey();


    }

    /**
     * @Description LoadBalancer.getAbstractLoadBalance()  直接指定策略 获取服务实例
     * @Param []
     * @return void
     * @Date 2025/3/22 下午 14:04
     * @Author wcan
     * @Version 1.0
     */
    public void testForLoadBalancerFactoryHashKey() throws InterruptedException {
        AbstractLoadBalance abstractLoadBalance = LoadBalancer.getAbstractLoadBalance("6");
        int i = 0;
        while (true){
            TimeUnit.SECONDS.sleep(1);
            ServiceInstance select = abstractLoadBalance.select(String.valueOf(i%6));
            System.out.println("第 "+i++ +" 次请求: "+select);
        }
    }



    /**
     * @Description LoadBalancer.getAbstractLoadBalance()  直接指定策略 获取服务实例
     * @Param []
     * @return void
     * @Date 2025/3/22 下午 14:04
     * @Author wcan
     * @Version 1.0
     */
    public void testForLoadBalancerFactory() throws InterruptedException {
        AbstractLoadBalance abstractLoadBalance = LoadBalancer.getAbstractLoadBalance("1");
        int i = 0;
        while (true){
            TimeUnit.SECONDS.sleep(1);
            ServiceInstance select = abstractLoadBalance.select();
            System.out.println("第 "+i++ +" 次请求: "+select);
        }
    }

    public void testForLoadBalanceFactory() throws InterruptedException {
        AbstractLoadBalance strategy = LoadBalancerFactory.createStrategy("1");
        int i = 0;
        while (true){
            TimeUnit.SECONDS.sleep(1);
            ServiceInstance select = strategy.select();
            System.out.println("第 "+i++ +" 次请求: "+select);
        }
    }

    /**
    * @Description 一致性哈希算法 需要传入一个入参 来计算哈希值
    * @Param []
    * @return void
    * @Date 2025/3/22 下午 14:02
    * @Author wcan
    * @Version 1.0
    */
    public void testForLoadBalanceFactoryWithKey() throws InterruptedException {
        AbstractLoadBalance strategy = LoadBalancerFactory.createStrategy("6");
        int i = 0;
        while (true){
            TimeUnit.SECONDS.sleep(1);
            ServiceInstance select = strategy.select(String.valueOf(i%6));
            System.out.println("第 "+i++ +" 次请求: "+select);
        }
    }
}

6、总结

        本篇文章给大家介绍了负载均衡常见的六种策略,并且给出了实现思路和落地的代码,在分布式系统设计中,除了远程通信、序列化之外 负载均衡也是一个重要的核心的组件。到现在我们已经自研了远程通信组件、序列化组件和负载均衡组件,后面我们将他们集成在一起,将我们的 cheese彻底打造成一款可投产的 分布式服务框架。