1. Kafka:构建TB级异步消息系统
1.0 同步/异步消息的区别
同步消息系统:
1)定义:双向通信,需要保证发送方向接受者发送消息,接受者接收消息,这个时间段内两边不能做其他的事情,需要同时在线。发送程序和接收程序必须同步运行,否则会发送失败。
2)**优缺点:**时效性较强,可以立即得到结果;耦合度高,性能和吞吐能力下降。
3)应用场景:时序上有严格执行关系; 需要进行原子操作;没有耗时的操作
4)实现方式:消息处理=BIO(同步阻塞IO)与NIO(同步非阻塞IO),消息传输=TCP/IP与UDP/IP。Socket进行实现。异步消息系统:
1)定义:单向通信,发送端发送消息不需要接收方在线,可以后续推送。
2)优缺点:执行效率高,节约时间,耦合度低;缺点是不利于对流程进行控制,且实现更加复杂。
3)应用场景:没有严格时序关系,没有原子操作,有耗时的操作。
4)实现方式:AOI(异步阻塞IO);消息队列,Kafka
1.1 项目的目的
**异步消息系统:**以异步方式实现消息的传递。
- 需要实现功能:当其他用户给你点赞,发帖子和评论的时候,你能够在系统消息栏接收到消息。
- 实现方法有很多种,比较传统的是阻塞队列实现,大型项目中常常使用Kafka进行实现。
1. 2 阻塞队列实现异步消息系统

目的:解决线程通信的问题,比较原始的方法。
问题:生产者和消费者两者消耗不匹配,会占用系统资源,故需要阻塞队列(生产者生产一些消息后,如果消费者还没接收,就会存入阻塞队列中存起来,之后生产者就去做别的事情了,消费者慢慢消费)
使用对象:BlockingQueue(使用其接口)
(1):解决线程通信问题
(2):阻塞方法:put和take生产者和消费者模式
(1):生产者:产生数据的线程
(2):消费者:使用数据的线程消息队列实现类
(1):ArrayBlockingQueue:测试中用到的
(2):LinkedBlockingQueue
(3):PriorityBlockingQueue,SynchronousQueue,DelayQueue等阻塞队列实现消息队列步骤
1)生产者实现Runnable接口,重写run方法,通过put把消息放进去。
2)消费者实现Runnable接口,重写run方法,通过take接收消息。
3)调用函数,新建一个消息队列,BlockingQueue。新建一个线程生产者,对消息进行生产放入消息队列中。新建多干个线程消费者,把消息队列中的消息进行输出。
1.4 Kafka入门


- Kafka简介:
(1):是一个分布式的流媒体平台
(2):应用:消息系统,日志收集,用户行为追踪和流式处理 - Kafka特点
(1):高吞吐量;消息持久化(存在数据库中);高可靠性;高扩展性 - Kafka术语
(1)Broker:服务器
(2)Zookeeper:独立软件,管理集群(集群中,不同节点的通信)
(3)Topic:发布订阅模式的空间
(4)Partition:对Topic的分区,提高并发能力
(5)Offset:消息在分区内存存放的索引序列
(6)Leader Replica:主副本,调用数据
(7)FollowerReplica:从副本,主要是对主副本的文件备份,提高容错率。
5. 使用kafka
1)安装程序
2)配置zookeeper.properties与service.properties
3)启动zookeeper:使用之前的配置文件
F:\kafka\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
4)启动kafka:使用之前的配置文件
F:\kafka\kafka>bin\windows\kafka-server-start.bat config\server.properties
1.5 Spring整合Kafka
- 引入依赖:spring-kafka
- 配置Kafka:配置server和consumer(生产者和消费者)
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
- 访问Kafka:
(1):生产者(主动发消息):
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
(2):消费者(被动调用,接收消息):(使用注解表明主题)
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
1.6 发送系统通知
1.6.1 介绍

- : 为什么使用消息队列
1)如果采用消息队列,那么评论、点赞、关注三类不同的事,可以定义三类不同的主题(评论、点赞、关注)
2)发生相应的事件可以将其包装成一条消息放入对应的队列里。那么当前的线程可以继续处理下一条消息,
3)不用处理后续的业务(后续由消费者线程处理)
4)面向事件驱动编程(管道中传递的是事件) - 触发事件:
(1):评论后,发布通知
(2):点赞后,发布通知
(3):关注后,发布通知 - 处理事件:
(1):封装事件对象,而不是字符串,更加灵活
(2):开发事件的生产者
(3):开发事件的消费者
1.6.2 实现发送消息和接收消息功能
- 创建Event实体类:上面三个事件其实很相似,故可以抽象出一个事件实体类(传递的信息对象就是一个个Event实体对象)

- 新建Event文件夹,常见方法:EventProducer和EventConsumer
- EventProducer
1)kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
2)只有一个方法,调用kafka,传递主题和信息,然后转换为json数据
3)传递的信息为event对象(即上面定义的)

- EventConsumer
//核心方法:调用kafka的接收消息功能
@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record) {
1)核心方法:调用的是kafka的接收消息功能,对象为recoder
2)具体:把recoder转化为Event对象,然后通过这个对象获取事件到的基本信息,封装成一个message,存入到message数据库表格里面。
3)其中重要信息为:把event的data数据保存好(即在显示栏会显示的信息),然后封装到message中的content里面,便于展示。
1.6.3 发送消息和接收消息功能(三类)
修改CommentController:发送评论
(1):需要把评论的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到评论,故可以写一个方法,根据评论的id获取评论信息。修改LikeController:点赞*
(1):需要把点赞的数据封装为一个envet对象,然后调用producer方法。这里需要显示跳转到帖子,这里在点赞开头多传入一个参数,即postId。即从前端页面多返回个帖子的id。修改FollowController:关注*
(1):需要把关注的数据封装为一个envet对象,然后调用producer方法。关注的是人,不需要显示额外信息。运行注意事项
1)如果kafka运行不正常,把数据中的kafka日志删除
1.7 显示系统通知()

1.7.1 通知列表:显示评论,点赞和关注三类通知
- Dao层,MessageMapper,并书写sql进行实现
// 查询某个主题下最新的通知
Message selectLatestNotice(int userId, String topic);
// 查询某个主题所包含的通知数量
int selectNoticeCount(int userId, String topic);
// 查询未读的通知的数量
int selectNoticeUnreadCount(int userId, String topic);
- Service层:MessageService进行书写。直接调用Dao层,进行方法的实现。
- Controller层:MessageController层,书写请求:getNoticeList
1)查询评论类通知
2)查询点赞类通知
3)查询关注类通知
4)把上述数据返回给前端,就能够在消息通知进行展示数据了。
1.7.2. 通知详情:分页显示某一类主题所包含的通知
- Dao层:MessageMapper,并书写sql进行实现
// 查询某个主题所包含的通知列表
List<Message> selectNotices(int userId, String topic, int offset, int limit);
- Service层:MessageService进行书写。直接调用Dao层,进行方法的实现。
- Controller层:MessageController层,书写请求:getNoticeDetail
1)对不同类事件的详情页进行展示。对展示出来的信息标记为已读。
1.7.3. 未读消息:在页面头部显示所有的未读消息数量

- 通过拦截器进行计算,因为在每一个登录后的页面,都会显示未读消息,故在每一次访问页面的时候,都会进行统计未读消息的个数,显示在页面上面。
1)新建:新建一个拦截器MessageIntercepter。
2)实现方法:拦截器实现HandlerInceptor方法
3)重写posthandle方法:即在调用Controller之前进行操作
4)重写内容:先判断是否已经登陆了,再计算私信和系统通知的未读消息,然后传入前端,进行数据展示。
5)配置:在拦截器的配置文件(WebMvcConfig)中书写拦截的范围。
2. Elasticsearch,分布式搜索引擎
2.0 为什么要搜索引擎
- 数据库进行搜索有什么缺陷?
1)数据库搜索可以针对索引进行搜索速度非常快。
2)如果进行模糊搜索,不走索引,就会很慢。
3)对于一个句子,数据库无法进行搜索,并且提取中间的分词进行搜索 - Elasticsearch的优势
1)能够对一个句子进行分词,然后根据所分得词进行联合搜索。
2)由于Elasticsearch是分布式的,所以需要从各个节点都拉取对应的数据,然后最终统一合成给客户端 - Elasticsearch如何工作
1)可以实现快速的“模糊匹配”/“相关性查询”,实际上是你写入数据到Elasticsearch的时候会进行分词。
2.1 Elasticsearch(分布式搜索)
- Elasticserach简介
(1):一个分布式的,Restful风格的搜索引擎
(2):支持对各种类型的数据的检索
(3):搜索速度快,可以提供实时的搜索服务
(4):便于水平扩展,每秒可以处理PB级的海量数据 - Elasticserach术语
(1):索引Index:=MySQL中的数据库Database
(2):索引Index类型:=MySQL中的表Table
(3):文档:=MySQL中的数据行Row
(4):字段:=MySQL中的数据列Column
(5):集群:若干个节点构成集群,对外提供索引和搜索功能。有共同的cluster.name。
(6):节点:一个集群若干个节点,节点即运行Elasticserach的机器。分为主节点和副节点。如果主节点挂了,会选举出一个新的主节点。
(7):分片:一个index类型即表能够存储在多个节点中,这种操作叫做分片。四个接待您
(8):副本:分片分为主分片和副分片,实现高可用性。

(9)写入数据的流程:
新来一个数据,是写入到主分片的,副本分片会备份主分片的信息。
2.2 Spring整合Elasticsearch
- 引入依赖:spring-boot-starter-data-elasticserach
- 配置Elasticsearch:cluster-name集群名字; cluster-nodes节点(是分布式的,不过这里这有一个节点,说明IP地址和端口号9300(TCP端口);注:9200(HTTP端口)
- 注意:redis和Elasticsearch都是基于netty的,故需要配置一下,不然会报错。
- Spring Data Elasticsearch(查找帖子的两种方法)
(1):Elasticsearch Template
(2):Elasticsearch Repository(简单,把帖子转存到Elasticsearch服务器中) - ES链接MYSQL
(1)修改帖子的实体类:加上注解Document,然后就会直接把帖子的表格映射到es服务器中。
(2)字段:添加与es中类型相匹配。尤其注意两个字段title和content,后续主要也是靠这两个进行检索,需要添加存储和搜索的分词器。
@Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
indexName:索引名字,type:类型;shards:分片;replicas :副本
作用:实体DiscussPost和es中的索引discusspost有了对应的关系。
@Document(indexName = "discusspost", type = "_doc", shards = 6, replicas = 3)
public class DiscussPost {
@Id
private int id;
@Field(type = FieldType.Integer)
private int userId;
// 互联网校招(重要:因为就靠这个搜索)
//analyzer:存储使用的分词器;searchAnalyzer:搜索使用的分词器
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String content;
@Field(type = FieldType.Integer)
private int type;
@Field(type = FieldType.Integer)
private int status;
@Field(type = FieldType.Date)
private Date createTime;
@Field(type = FieldType.Integer)
private int commentCount;
@Field(type = FieldType.Double)
private double score;
- 针对es的Dao层
1)只需要新建一个接口,并且继承子接口,里面已经封装好了ES的增删改查操作。其中两个参数分别为:对应的实体类和主键的类型(Integer)
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost, Integer> {
}
2)访问ES的逻辑:在ES中搜索,数据来源是mysql数据库里面,然后转存到ES服务器的数据库里;后面的话会先做初始化操作,即把mysql中的数据全部转存到ES服务器中,以后如果mysql有新增,在es没有所搜到的时候,会先存到es中去。
3)增加数据:调用DiscussPostRepository的save方法。(PostMan中的URL:localhost:9230/discusspost/_doc/231,指向对应的索引名字和类型)
4)更新数据:修改一点内容,再次save存储,会进行覆盖。
5)删除:进行删除操作,deleteById和deleteAll。
6)查找(有两个类Dao层可选:Elasticsearch Repository和Elasticsearch Template)
(1)通过方法1:Elasticsearch Repository
(2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。
(3)discussRepository.search(searchQuery):通过discussRepository的search,获取搜索结果。
(4)缺点:找到了高亮内容,但是不直接范围,而是分为两部分数据返回。
@Test
public void testSearchByRepository() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0, 10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
// elasticTemplate.queryForPage(searchQuery, class, SearchResultMapper)
// 底层获取得到了高亮显示的值, 但是没有返回.
Page<DiscussPost> page = discussRepository.search(searchQuery);
System.out.println(page.getTotalElements());
System.out.println(page.getTotalPages());
System.out.println(page.getNumber());
System.out.println(page.getSize());
for (DiscussPost post : page) {
System.out.println(post);
}
}
-----------------------------------
(2)通过方法2:Elasticsearch Template
(2)指定搜索的对象searchQuery:内容withQuery,排序方法ithSort,分页查询withPageable,哪些属性高亮显示withHighlightFields。(和上一个方法一样)
(3)Page page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() :重写queryForPage方法。
@Test
public void testSearchByTemplate() {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.multiMatchQuery("互联网寒冬", "title", "content"))
.withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC))
.withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC))
.withPageable(PageRequest.of(0, 10))
.withHighlightFields(
new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),
new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")
).build();
Page<DiscussPost> page = elasticTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {
@Override
public <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {
SearchHits hits = response.getHits();
if (hits.getTotalHits() <= 0) {
return null;
}
List<DiscussPost> list = new ArrayList<>();
for (SearchHit hit : hits) {
DiscussPost post = new DiscussPost();
String id = hit.getSourceAsMap().get("id").toString();
post.setId(Integer.valueOf(id));
String userId = hit.getSourceAsMap().get("userId").toString();
post.setUserId(Integer.valueOf(userId));
String title = hit.getSourceAsMap().get("title").toString();
post.setTitle(title);
String content = hit.getSourceAsMap().get("content").toString();
post.setContent(content);
String status = hit.getSourceAsMap().get("status").toString();
post.setStatus(Integer.valueOf(status));
String createTime = hit.getSourceAsMap().get("createTime").toString();
post.setCreateTime(new Date(Long.valueOf(createTime)));
String commentCount = hit.getSourceAsMap().get("commentCount").toString();
post.setCommentCount(Integer.valueOf(commentCount));
// 处理高亮显示的结果
HighlightField titleField = hit.getHighlightFields().get("title");
if (titleField != null) {
post.setTitle(titleField.getFragments()[0].toString());
}
HighlightField contentField = hit.getHighlightFields().get("content");
if (contentField != null) {
post.setContent(contentField.getFragments()[0].toString());
}
list.add(post);
}
return new AggregatedPageImpl(list, pageable,
hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());
}
});
System.out.println(page.getTotalElements());
System.out.println(page.getTotalPages());
System.out.println(page.getNumber());
System.out.println(page.getSize());
for (DiscussPost post : page) {
System.out.println(post);
}
}
- 综合
(1)使用Elasticsearch Repository的save,delete等功能完成增删改操作。不推荐使用其search功能,因为高亮词汇不能正常显示。会分批进行输出。
(2)使用Elasticsearch Template,重写queryForPage功能,完成增删改操作。
3.3 开发社区搜索功能
3.3.1 理论步骤
- 搜索服务:
(1):新建帖子时,将帖子保存至Elasticsearch 服务器中
(2):从Elasticsearch 服务器删除帖子
(3):从Elasticsearch 服务器搜索帖子(核心是为了搜索功能) - 发布事件
(1):发布帖子时,将帖子异步的提交到Elasticsearch 服务器
(2):增加评论的时候,将帖子异步的提交到Elasticsearch 服务器
(3):在消费主键中添加一个方法,消费帖子发布事件
(注意):异步方式,不用等待,并行实现功能。 - 显示结果
(1):在控制器中处理搜索请求,在HTML上显示搜索结果。
3.3.2 实际步骤
- Service层(之前展示了Dao层)
1)service层ElasticsearchService:
(1)新增,删除和修改:向es服务器提交数据,即为新增;再提交一次则为修改。使用Elasticsearch Repository的save,delete。
(2)搜索:使用Elasticsearch Repository,重写其中queryForPage方法。
2)修改发帖和评论部分,增加消息发布事件(采用异步,并行处理,加强效率)
(1)在DiscussPostController中,触发事件发布事件
(2)具体内容:eventProducer.fireEvent(event);event里面存入帖子的内容和发布的主题。
(3)在CommentController中,触发帖子评论发布事件
(2)具体内容:eventProducer.fireEvent(event);event里面存入帖子的内容和发布的主题。
3)消费发帖:EventConsumer中新建handlePublishMessage
(1)目的:针对kafka中主题为TOPIC_PUBLISH的事件消息进行消费。
(2)逻辑:先参数校验,然后存入es的数据库中。 - Controller层(调用)
1)新建Controller:SearchController
(1)目的:展现所有结果的展示列表。
(2)逻辑:通过输入需要搜索的关键词,调用搜索的方法,得到搜索的内容,进行封装,展示给页面。
(3)注意:一开始展示的页面时第一页,但是后面如果想更改页面,那么就需要在前端修改page页的内容,传回给后端,后端收到了进行换页展示。 - 前端的数据展示调用(这里就省略一下了)
文章参考:
牛客网Java项目:https://www.nowcoder.com/study/live/246
代码记录:https://gitee.com/xkh-dasiy/newcoder
漫画形式说明Linux的五种IO模型
Kafka【入门】就这一篇!
计网 - 网络 I/O 模型:BIO、NIO 和 AIO 有什么区别?
Elasticsearch术语及概念熟悉
什么是 Elasticsearch?一篇搞懂