设计模式之观察者模式

发布于:2025-02-11 ⋅ 阅读:(97) ⋅ 点赞:(0)

1、概念

        观察者模式:当某个状态更改时,会及时的通知订阅这个状态对象,从而进行业务上的处理。比如我们常见的消息中心(MQ),Redis的发布订阅模式,都可以看作是观察者模式。

2、优缺点

        优点:1、对象解耦。2、具有自动促发机制。

        缺点:1、通知对象过多,会造成性能问题,一般采用异步方式。2、避免循环依赖(防止出现死循环),3、无法获知变化细节。

3、使用场景

  • 具有监听功能的场景。
  • 发布订阅的场景
  • 具有批量通知的场景

4、实现方式

4.1、需求

        实现一个可以支持订阅通知的功能。即:用户可以订阅某个主题,当主题有内容时,各个用户可以实时查看到具体的内容。

        分析:从上面需求来看,发现有两个点:1、主题,主要有内容,还可以添加订阅用户,以及通知用户。2、用户类,主要接收定义的主题内容。

4.2、基础功能

        基于上诉的分析,我们需要创建一个主题类与订阅者类。

4.2.1、主题类

主题:

import com.test.design.observer.basic.receive.Subscriber;
import java.util.ArrayList;
import java.util.List;

public class Topic {

    /**
     * 订阅者集合
     */
    List<Subscriber> subscriberList = new ArrayList<>();

    /**
     * 主题名
     */
    private String topicName;

    /**
     * 消息
     */
    private String message;

    /**
     * 绑定主题名
     *
     * @param topicName 主题名
     */
    public Topic(String topicName) {
        this.topicName = topicName;
    }

    /**
     * 设置发送的消息
     *
     * @param message 消息内容
     */
    public void setMessage(String message) {
        this.message = message;
        // 通知订阅者
        notifySubscribe();
    }

    /**
     * 添加订阅者
     *
     * @param subscriber 订阅者
     */
    public void addSubscribe(Subscriber subscriber) {
        if (subscriber == null) {
            return;
        }
        // 获取订阅者集合
        if (subscriberList == null) {
            subscriberList = new ArrayList<>();
        }
        // 添加订阅者
        subscriberList.add(subscriber);
    }

    /**
     * 通知订阅者
     * 注意:通知采用内部,以设置成私有的
     */
    private void notifySubscribe() {
        // 获取主题的订阅者们
        if (subscriberList == null) {
            return;
        }
        // 遍历订阅者,一个一个的通知
        for (Subscriber subscriber : subscriberList) {
            subscriber.receiveMessage(message);
        }
        // 清空消息
        message = null;
    }


}

 4.2.2、订阅者类

/**
 * 订阅者接口
 */
public interface Subscriber {

    /**
     * 接收消息
     *
     * @param message 消息内容
     */
    void receiveMessage(String message);

}

4.3、观察者模式(非官方)

4.3.1、实现订阅者

import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.topic.Topic;

/**
 * 订阅者实现类
 */
public class UserSubscriber implements Subscriber {

    /**
     * 主题
     */
    public Topic topic;

    /**
     * 订阅者名称
     */
    public String subscribeName;

    /**
     * 构造器,指定用户名
     * @param subscribeName 订阅者名称
     */
    public UserSubscriber(String subscribeName) {
        this.subscribeName = subscribeName;
    }

    /**
     * 绑定主题
     *
     * @param topic 主题
     */
    public void subscribeTopic(Topic topic) {
        this.topic = topic;
    }

    @Override
    public void receiveMessage(String message) {
        System.out.println(subscribeName + "接收到:" + message + " 内容了");
    }
}

4.3.2、测试+效果

import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.receive.concrete.UserSubscriber;
import com.test.design.observer.basic.topic.Topic;

public class ObserverTest {

    public static void main(String[] args) throws InterruptedException {
        // 教育类主题
        Topic eduTopic = new Topic("edu_topic");
        // 新闻类主题
        Topic newsTopic = new Topic("news_topic");
        // 创建5个用户
        Subscriber userSubscribe1 = new UserSubscriber("用户1");
        Subscriber userSubscribe2 = new UserSubscriber("用户2");
        Subscriber userSubscribe3 = new UserSubscriber("用户3");
        Subscriber userSubscribe4 = new UserSubscriber("用户4");
        Subscriber userSubscribe5 = new UserSubscriber("用户5");
        // 分别将用户1-3订阅教育类,3-5订阅新闻类,用户3同时订阅了教育与新闻类
        eduTopic.addSubscribe(userSubscribe1);
        eduTopic.addSubscribe(userSubscribe2);
        eduTopic.addSubscribe(userSubscribe3);

        newsTopic.addSubscribe(userSubscribe3);
        newsTopic.addSubscribe(userSubscribe4);
        newsTopic.addSubscribe(userSubscribe5);


        System.out.println("-------- 教育类 ----------");
        // 发送教育类消息
        eduTopic.setMessage("教育部即将下发教育总结");
        // 休息三秒
        Thread.sleep(3000);
        System.out.println("-------- 新闻类 ----------");
        // 发送新闻类消息
        newsTopic.setMessage("明天天气🌤");
    }

}

 

4.3.3、结论

        符合在测试类中的预期,用户1-3接收到了教育类消息,用户3-5接收到了新闻类消息。通过上面的代码,可以很直观的看到主题与订阅者之间的关系,订阅者必须在主题中进行绑定操作,否则无法进行通知。但是上面也有特别大的问题,需要我们仔细的去思考,第一个是并发的发送消息时,会产生并发问题。第二个,通知方法可以通过反射调用,跨越了私有权限。

        那下面我们就解决上诉的两个问题。

4.3.4、功能升级

4.3.4.1、问题一:解决并发情况下,添加订阅者与通知订阅者。

既然是并发问题,那么我们只能通过加锁或采用并发包的工具类管理。那下面我们开始改造。

主题类 支持并发:

       注释很重要,很重要,很重要。

package com.test.design.observer.basic.topic;

import com.test.design.observer.basic.receive.Subscriber;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Topic {

    /**
     * 订阅者集合,这里采用并发集合,避免并发添加订阅者造成并发数据丢失问题
     * 注意:这里主要避免并发添加订阅者问题
     */
    CopyOnWriteArrayList<Subscriber> subscriberList = new CopyOnWriteArrayList<>();

    /**
     * 添加排队等待机制
     * 注意:这里主要解决并发发送消息问题
     */
    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();

    /**
     * 是否正在执行通知,默认没有通知
     * 注意:这里时并发重复通知调用订阅者与queue配合使用,保证并发安全性
     */
    AtomicBoolean notify = new AtomicBoolean(Boolean.FALSE);

    /**
     * 主题名
     */
    private String topicName;

    /**
     * 消息
     */
    private String message;

    /**
     * 绑定主题名
     *
     * @param topicName 主题名
     */
    public Topic(String topicName) {
        this.topicName = topicName;
    }

    /**
     * 设置发送的消息
     *
     * @param message 消息内容
     */
    public void setMessage(String message) {
        // 将要发送的消息放入队列中
        queue.add(message);
        // 既然没有人在促发通知了,那么我就去促发一次
        // 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
        if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
            // 通知订阅者
            asyncNotifySubscribe();
        }
    }

    /**
     * 添加订阅者
     *
     * @param subscriber 订阅者
     */
    public void addSubscribe(Subscriber subscriber) {
        if (subscriber == null) {
            return;
        }
        // 添加订阅者
        subscriberList.add(subscriber);
    }

    /**
     * 通知订阅者
     * 通知采用内部,以设置成私有的
     * 注意:本次优化会改成异步调用,避免造成主线程一直阻塞使用,
     */
    private void asyncNotifySubscribe() {
        // 这里可以使用线程池,我在这就不使用了
        new Thread(()->{
            // 通知一次
            notifyMessage();
            // 再次检查一次,避免刚好还未释放标记锁,就已经添加消息了
            notifyMessage();
            // 释放标记,可以主动通知了
            // 注意:这里无法再次采用CAS,本身以获取到了,直接设置即可
            notify.set(Boolean.FALSE);
        }).start();
    }

    /**
     * 通知消息
     */
    private void notifyMessage() {
        String messageInfo = null;
        // 不为空,说明还有内容在等待通知
        while ((messageInfo = queue.poll()) != null) {
            // 获取主题的订阅者们
            if (subscriberList == null) {
                return;
            }
            // 遍历订阅者,一个一个的通知
            for (Subscriber subscriber : subscriberList) {
                subscriber.receiveMessage(messageInfo);
            }
        }
    }

}
4.3.4.2、问题二:反射跨越权限调用,造成分发消息。

        反射这是Java本身自带的功能,目前无法很直观的屏蔽,但是我们可以设置流水线,必须经过什么才能访问,这样也能避免反射带来的影响。

主题类新增反射标记 同时 setMessage()方法设置false

 /**
     * 反射状态设置为True,只有通过setMessage方法才可以直接访问通知。
     */
    AtomicBoolean reflectStatus = new AtomicBoolean(Boolean.TRUE);

  /**
     * 设置发送的消息
     *
     * @param message 消息内容
     */
    public void setMessage(String message) {
        // 将要发送的消息放入队列中
        queue.add(message);
        // 将反射标记设置成False
        reflectStatus.set(Boolean.FALSE);
        // 既然没有人在促发通知了,那么我就去促发一次
        // 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
        if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
            // 通知订阅者
            asyncNotifySubscribe();
        }
    }

完整代码:

package com.test.design.observer.basic.topic;

import com.test.design.observer.basic.receive.Subscriber;

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class Topic {

    /**
     * 订阅者集合,这里采用并发集合,避免并发添加订阅者造成并发数据丢失问题
     * 注意:这里主要避免并发添加订阅者问题
     */
    CopyOnWriteArrayList<Subscriber> subscriberList = new CopyOnWriteArrayList<>();

    /**
     * 添加排队等待机制
     * 注意:这里主要解决并发发送消息问题
     */
    LinkedBlockingQueue<String> queue = new LinkedBlockingQueue();

    /**
     * 是否正在执行通知,默认没有通知
     * 注意:这里时并发重复通知调用订阅者与queue配合使用,保证并发安全性
     */
    AtomicBoolean notify = new AtomicBoolean(Boolean.FALSE);

    /**
     * 反射状态设置为True,只有通过setMessage方法才可以直接访问通知。
     */
    AtomicBoolean reflectStatus = new AtomicBoolean(Boolean.TRUE);

    /**
     * 主题名
     */
    private String topicName;

    /**
     * 消息
     */
    private String message;

    /**
     * 绑定主题名
     *
     * @param topicName 主题名
     */
    public Topic(String topicName) {
        this.topicName = topicName;
    }

    /**
     * 设置发送的消息
     *
     * @param message 消息内容
     */
    public void setMessage(String message) {
        // 将要发送的消息放入队列中
        queue.add(message);
        // 将反射标记设置成False
        reflectStatus.set(Boolean.FALSE);
        // 既然没有人在促发通知了,那么我就去促发一次
        // 注意:这里采用CAS保证只有一个线程可以设置成运行通知状态
        if (notify.compareAndSet(Boolean.TRUE, Boolean.FALSE)) {
            // 通知订阅者
            asyncNotifySubscribe();
        }
    }

    /**
     * 添加订阅者
     *
     * @param subscriber 订阅者
     */
    public void addSubscribe(Subscriber subscriber) {
        if (subscriber == null) {
            return;
        }
        // 添加订阅者
        subscriberList.add(subscriber);
    }

    /**
     * 通知订阅者
     * 通知采用内部,以设置成私有的
     * 注意:本次优化会改成异步调用,避免造成主线程一直阻塞使用,
     */
    private void asyncNotifySubscribe() {
        // 这里可以使用线程池,我在这就不使用了
        new Thread(()->{
            // 通知一次
            notifyMessage();
            // 再次检查一次,避免刚好还未释放标记锁,就已经添加消息了
            notifyMessage();
            // 释放标记,可以主动通知了
            // 注意:这里无法再次采用CAS,本身以获取到了,直接设置即可
            notify.set(Boolean.FALSE);
        }).start();
    }

    /**
     * 通知消息
     */
    private void notifyMessage() {
        String messageInfo = null;
        // 不为空,说明还有内容在等待通知
        while ((messageInfo = queue.poll()) != null) {
            // 获取主题的订阅者们
            if (subscriberList == null) {
                return;
            }
            // 遍历订阅者,一个一个的通知
            for (Subscriber subscriber : subscriberList) {
                subscriber.receiveMessage(messageInfo);
            }
        }
    }

}
 4.3.4.3、测试+效果
import com.test.design.observer.basic.receive.Subscriber;
import com.test.design.observer.basic.receive.concrete.UserSubscriber;
import com.test.design.observer.basic.topic.Topic;

public class ObserverTest {

    public static void main(String[] args) throws InterruptedException {

        // 并发测试
        // 教育类主题
        Topic eduTopic = new Topic("edu_topic");
        // 创建3个用户
        Subscriber userSubscribe1 = new UserSubscriber("用户1");
        Subscriber userSubscribe2 = new UserSubscriber("用户2");
        Subscriber userSubscribe3 = new UserSubscriber("用户3");
        // 都订阅教育类主题
        eduTopic.addSubscribe(userSubscribe1);
        eduTopic.addSubscribe(userSubscribe2);
        eduTopic.addSubscribe(userSubscribe3);
        // 开辟100个线程分别发送10条消息
        for (int i = 0; i < 100; i++) {
            Thread thread = new Thread(()->{
                for (int j = 0; j < 10; j++) {
                    eduTopic.setMessage(Thread.currentThread().getName() + " 发送消息 " + j);
                }
            });
            thread.setName("线程" + i);
            thread.start();
        }
    }

}

可以看到总共发送了3000条消息,满足 100(线程)* 10(消息)* 3(订阅者)= 3000;说明消息都通知到各个订阅者了,没有发生消息丢失。 

5、总结

        通过上面分析,可以很直观的看到观察者模式中的核心理念,只要保证主题与订阅者之间的关系绑定,以及促发通知的约定,就能保证消息能够通知到各个订阅者。


网站公告

今日签到

点亮在社区的每一天
去签到