【文档搜索引擎】使用多线程优化流程

发布于:2024-12-19 ⋅ 阅读:(13) ⋅ 点赞:(0)

枚举所有文件

// 1. 枚举出所有的文件  
ArrayList<File> files = new ArrayList<>();  
enumFile(INPUT_PATH, files);
  • 直接调用 enumFile 方法即可

循环遍历文件(多线程)

此处为了能够通过多线程来制作,所以直接使用线程池来完成

// 2. 循环遍历文件(多线程,引入线程池)  
ExecutorService executorService = Executors.newFixedThreadPool(4);  
for(File f : files){  
    executorService.submit(new Runnable() {  
        @Override  
        public void run() {  
            // 打印要解析的文件路径  
            System.out.println("解析 " + f.getAbsolutePath());  
            parseHTML(f);  
        }  
    });  
}
  • 通过 submit 添加任务,我们通过 Runnable 来去描述具体任务的内容
  • 此时线程池中就有四个线程来执行遍历操作

保存索引

问题描述

当我们执行 save 的时候,可能会出现线程池里面的线程还没有执行完的情况

  • 我们是通过 submit 来往线程池中提交任务的(这是一个极快的操作,只是把这个 Runnable 对象放到了阻塞队列中)

是否存在可能:这边的一万多次循环 submit 都已经完了,但是线程池这里还没有把这一万多个文档都解析完的情况?

  • 可能性是非常大的

领导给下属们布置任务,很快就将任务布置完了。最后任务布置完了,但是任务还没完成的可能性太大了

所以我们在遍历文件的时候,不能在任务布置完之后就立刻保存索引,要保证所有的任务都执行完(所有的线程把所有的文档任务处理完),才能执行 save

  • 不然 save 的就只是一小部分,而不是完整的

解决方法

CountDownLatch 类似于跑步比赛裁判,只要所有的选手都撞线了,就认为这场比赛结束了

CountDownLatch latch = new CountDownLatch(files.size()); // 选手(文件)的个数  
ExecutorService executorService = Executors.newFixedThreadPool(4);  
for(File f : files){  
    executorService.submit(new Runnable() {  
        @Override  
        public void run() {  
            // 打印要解析的文件路径  
            System.out.println("解析 " + f.getAbsolutePath());  
            parseHTML(f);  
            latch.countDown();  // 通知 CountDownLatch,选手已经到达终点,已经撞线  
        }  
    });  
}  
// 死等。会阻塞,直到所有的选手都调用 countDown 撞线之后,才会返回
latch.await(); 
  1. 在构造 CountDownLatch 的时候指定一下比赛选手的个数(所有文档的个数)
  2. 通过 await() 来等待所有的选手都撞线通过,这里我们采用死等的方式
    • 不带参数:一直死等,直到所有的选手都撞线,即使有一个选手中途退赛了,也继续等
    • 带参数:选择一个最大等待时间,到达后就不等了

线程安全问题

[!quote] 线程安全
多个线程尝试修改同一个对象

在循环遍历文件的过程中,我们调用了 parseHTML 方法,而在 parserHTML 方法中又调用了 addDoc 等方法。我们就得保证这些部分的代码是线程安全的,在多线程环境下执行不会出现问题

private void parseHTML(File f) {  
    String title = parseTitle(f);   
    String url = parseUrl(f);
    String content = parseContent(f);  
    index.addDoc(title,url,content);  
}
  • parseTitle()parserUrl():不会有线程安全问题,它就取了 file 里面的一个属性,不涉及到操作一些公共对象,也就不会涉及到线程安全问题
  • parseContent():每个文件之间各读各的,也没有干扰,所以也不会引发线程安全问题
  • addDoc():涉及到构建正排和构建倒排

引发原因

image.png|606

  • 四个线程都会调用 addDoc() 方法,而在 addDoc() 方法中又会调用 buildForward()buildInverted() 方法。
    • buildForward() 会修改到 forwardIndexbuildInverted() 会修改到 invertedIndex
  • 四个线程同时修改两个结构,就符合“多个线程同时修改变量”的条件,势必会引发线程安全问题

解决办法

通过加锁的方式来解决线程安全问题

方法一:给 addDoc() 加锁

可以直接给 addDoc() 方法前面加上 synchronized 吗?

public synchronized void addDoc(){}
  • 这样当然可以避免线程安全问题,但是如果给 addDoc() 加锁的话,那么四个线程就不能同时调用 addDoc() 方法了
  • 只有前面三个解析 HTML 属性的方法可以并发执行,而 addDoc() 不可以,多多少少是对程序的执行效率是有影响的

所以,如果直接把 synchronized 加到 parserHTML 或者 addDoc 这样的方法上,其实是非常不科学的。

  • 这样做的话,锁的粒度太粗了,并发程度不高,能提高的效率有限
  • 因此,需要把加锁的粒度尽量搞细一点,只在必要的地方加锁

方法二:给 buildForward()buildInverted() 里面加锁

  1. buildForward()
private DocInfo buildForward(String title, String url, String content) {  
    DocInfo docInfo = new DocInfo();  
    docInfo.setTitle(title);  
    docInfo.setUrl(url);  
    docInfo.setContent(content);  
    
    docInfo.setDocId(forwardIndex.size());  
    forwardIndex.add(docInfo);  
    return docInfo;  
}
  • 在这段代码中,主要就是最后两行逻辑会引发线程安全问题,所以我们只需要将其加上锁就可以了
private DocInfo buildForward(String title, String url, String content) {  
    DocInfo docInfo = new DocInfo();  
    docInfo.setTitle(title);  
    docInfo.setUrl(url);  
    docInfo.setContent(content);  
    synchronized (this) {  
        docInfo.setDocId(forwardIndex.size());  
        forwardIndex.add(docInfo);  
    }  
    return docInfo;  
}
  • 在这个代码中 Index 这个实例就只有一份,我们针对这个 Index 实例加锁就可以了,也就是 `this
  1. buildInverted()

buildInverted() 里面,将 Map 转换成 Set 遍历这一部分,涉及到对 invertedIndex 的修改,所以这一部分需要加锁

for(Map.Entry<String, WordCnt> entry : wordCntHashMap.entrySet()) {  

    List<Weight> invertedList = invertedIndex.get(entry.getKey());  //这里的get方法
    if(invertedList == null) {  
        ArrayList<Weight> newInvertedList = new ArrayList<>();  
        Weight weight = new Weight();  
        weight.setDocId(docInfo.getDocId());  
        weight.setWeight(entry.getValue().titleCount * 10 + entry.getValue().contentCount);  
        newInvertedList.add(weight);  
        invertedIndex.put(entry.getKey(), newInvertedList);  // 这里的 put 方法  
    }else{  
        Weight weight = new Weight();  
        weight.setDocId(docInfo.getDocId());  
        weight.setWeight(entry.getValue().titleCount
         * 10 + entry.getValue().contentCount);  
        invertedList.add(weight);   //这里的 add 方法
    }  
}
  • 这里的三个方法涉及到线程安全问题,我们直接把这一整块都加上锁

加锁后

for (Map.Entry<String, WordCnt> entry : wordCntHashMap.entrySet()) {  
    
    synchronized (this) {  
        List<Weight> invertedList = invertedIndex.get(entry.getKey());  
        if (invertedList == null) {  
            ArrayList<Weight> newInvertedList = new ArrayList<>();  
            Weight weight = new Weight();  
            weight.setDocId(docInfo.getDocId());  
            weight.setWeight(entry.getValue().titleCount
             * 10 + entry.getValue().contentCount);  
            newInvertedList.add(weight);  
            invertedIndex.put(entry.getKey(), newInvertedList);  
        } else {  
            Weight weight = new Weight();  
            weight.setDocId(docInfo.getDocId());  
            weight.setWeight(entry.getValue().titleCount
             * 10 + entry.getValue().contentCount);  
            invertedList.add(weight);  
        }  
    } 
     
}

加锁对象优化

我们调用 buildForward() ,影响到的是 forwardIndex;调用 buildInverted(),影响到的是 invertedIndex。因为这是两个不同的对象,所以我们就可以对两个对象分别进行加锁,他们之间没有竞争关系,不会出现锁竞争的情况 image.png|500

现在有两个女生:A 和 B。他们都有一些追求者

  • 某个 A 的追求者把 A 约到了,那么此时,其他的追求者就不能再约 A 了
  • 但是另外的一些追求者是可以正常约 B 的
    我们的锁只是确保 A 和 B 同时只能被一个人约

A 和 B 是两个独立的东西,并不是一个整体,所以我们加锁的对象设为 thisIndex 对象) 就不合适了。

  • 正排索引:直接针对正排索引本身(forwardIndex)加锁
  • 倒排索引:直接针对倒排索引本身(invertedIndex)加锁
    这样就能让这个并发达到最完美的效果

不过我们也可以专门去创建两个锁对象,去给两边分别加锁

public Object locker1 = new Object();
public Object locker2 = new Object();

这样写的效果更好

线程池的线程数设置

当前确实是把代码改进成了多线程版本,但是线程池中的线程数目,设置成多少合适呢?

最好是通过实验的方式来确定,不同的代码,并发程度是不一样的

网上有些说法,线程池的线程数目设定成 CPU 核数/CPU 核数的 1.5倍/CPU 核数的 2倍…

  • 这些说法只是一些经验公式,不同的业务代码最终的线程池中线程的数量设置不能一概而论
  • 需要通过测试的方式去找出一个合适的值

不是越多越好,达到一定的值之后,速度就不会有太大的变化了。因为有锁竞争,都得排队等着,还会带来一些额外的线程调度开销。达到阈值后再加就是白白浪费计算机资源了

解决进程退出问题

我们在开始执行代码之后,再索引的制作完毕后,我们发现程序并没有执行完毕,进程还没有退出 image.png|534

main 方法执行完了,为什么进程没有退出呢?

这里就涉及到——守护线程

  • 如果要是一个线程是守护线程(后台线程),此时这个线程的运行状态就不会影响进行结束
  • 如果它要不是一个守护线程(前台线程),此时这个线程的运行状态就会影响到进程结束
  • 默认创建出来的线程都是前台线程,需要通过 setDaemon 方法手动设置,才能成为守护线程

我们代码中创建线程的方式:

ExecutorService executorService = Executors.newFixedThreadPool(6);
  • 这些通过线程池创建的线程,都是前台线程
  • main 执行完了,这些线程仍然在工作,仍然在等待新任务的到来

所以,要想在 main 程序执行完之后,结束进程:

  1. 手动把这里的线程干掉
  2. 将线程设置为守护线程(线程池创建的线程不能直接设置)
// 手动把线程池里面的线程都干掉  
executorService.shutdown();
  • 卸磨杀驴

此时程序运行结束后,就正常退出了image.png|551


网站公告

今日签到

点亮在社区的每一天
去签到