微服务和kafka

发布于:2024-06-30 ⋅ 阅读:(8) ⋅ 点赞:(0)

一、微服务简介

1.单体架构

分布式--微服务--云原生

传统架构(单机系统),一个项目一个工程:比如商品、订单、支付、库存、登录、注册等等,统一部署,一个进程

all in one的架构方式,把所有的功能单元放在一个应用里。然后把整个应用部署到一台服务器上。如果负载能力不行,将整个应用进行水平复制,进行扩展(扩展是指模块扩展,不方便做开发测试),然后通过负载均衡实现访问。

Java实现:JSP、Servlet,打包成一个jar、war部署易于开发和测试:也十分方便部署;当需要扩展时,只需要将war复制多份,然后放到多个服务器上,再做个负载均衡就可以了。如果某个功能模块出问题,有可能全站不可访问,修改Bug后、某模块功能修改或升级后,需要停掉整个服务,重新整体重新打包、部署这个应用war包,功能模块相互之间耦合度高,相互影响,不适合当今互联网业务功能的快速迭代。特别是对于一个大型应用,我们不可能吧所有内容都放在一个应用里面,我们如何维护、如何分工合作都是问题。如果项目庞大,管理难度大

web应用服务器:开源的tomcat、jetty、glassfish。商用的有weblogic、websphere、Jboss

2.微服务

Microservices Guide

每个模块跑一个单独应用,微服务关注的业务逻辑(restful、API、http、grpc、tcp、http),

属于SOA(Service Oriented Architecture)的子集

微服务化的核心就是将传统的一站式应用,根据业务拆分成一个一个的服务,彻底去掉耦合,每一个微服务提供单个业务功能,一个服务只做一件事。每个服务都围绕着具体业务进行构建,并且能够被独立地部署到生产环境、类生产环境等

从技术角度讲就是一种小而独立的处理过程,类似与进程的概念,能够自行单独启动或销毁

微服务架构(分布式系统),各个模块/服务,各自独立出来,"让专业的人干专业的事",独立部署。分布式系统中,不同的服务可以使用各自独立的数据库。

服务之间采用轻量级的通信机制(通常是基于HTTP的RESTful API)。

微服务设计的思想改变了原有的企业研发团队组织架构。传统的研发组织架构是水平架构,前端、后端、DBA、测试分别有自己对应的团队,属于水平团队组织架构。而微服务的设计思想对团队的划分有着一定的影响,使得团队组织架构的划分更倾向于垂直架构,比如用户业务是一个团队来负责,支付业务是一个团队来负责。但实际上在企业中并不会把团队组织架构拆分得这么绝对,垂直架构只是一种理想的架构

微服务的实现框架有多种,不同的应用架构,部署方式也有不同 

3.单体架构和微服务比较

单体架构的主要特征:

1.紧密耦合的组件:在单体架构中,组件之间紧密耦合,这使得修改和扩展应用程序的各个部分而不影响整个系统变得更加困难。

2.单一代码库:应用程序的所有部分都位于单一的代码库中,这对于开发和部署非常方便。

3.共享资源:组件共享相同的资源,如内存和CPU,这可能导致性能瓶颈和争用问题。

4.有限的可扩展性:单体应用程序在水平方向上进行扩展可能具有挑战性,因为扩展一个组件可能需要扩展整个应用程序。

5.复杂性:随着应用程序的增长,由于复杂性增加,维护和理解可能变得困难。

微服务架构的主要特征:

1.松散耦合:微服务之间松散耦合,允许每个服务独立开发、部署和扩展,而不影响其他服务。

2.分布式系统:微服务通过网络通信,通常使用API,这需要仔细考虑网络和通信模式。

3.独立部署:服务可以独立部署,实现持续交付和更快的发布周期。

4.专业化服务:每个微服务专注于特定的业务功能,使代码库更易于管理和维护。

5.可扩展性:微服务可以单独扩展,根据需求有效地分配资源。

6.多语言架构:不同的微服务可以使用最适合其需求的不同编程语言和技术进行开发。 

4.微服务的优缺点

微服务优点:

每个服务足够内聚,足够小,代码容易理解。这样能聚焦一个简单唯一的业务功能或业务需求。开发简单、开发效率提高,一个服务可能就是专业的只干一件事,微服务能够被小团队单独开发,这个小团队可以是2到5人的开发人员组成

微服务是松耦合的,是有功能意义的服务,无论是在开发阶段或部署阶段都是独立的。

微服务能使用不同的语言开发,易于和第三方集成,微服务运行容易且灵活的方式集成自动部署,通过持续集成工具,如:Jenkins、Hudson、Bamboo

微服务易于被一个开发人员理解、修改和维护,这样小团队能够更关注自己的工作成果,无需通过合作才能体现价值

微服务允许你利用融合最新技术。微服务只是业务逻辑的代码,不会和HTML/CSS或其他界面组件混合,即前后端分离

每个微服务都有自己的存储能力,可以有自己的数据库,也可以有统一数据库

微服务缺点:(耦合性低,通信比较复杂,通过url)

微服务把原有的一个项目拆分成多个独立工程,增加了开发(编写API)、测试、运维(上线,需要会docker)、监控等的复杂度

微服务架构需要保证不同服务之间的数据一致性,引入了分布式事务和异步补偿机制,为设计和开发带来一定挑战

开发人员和运维需要处理分布式系统的复杂性,需要更强的技术能力

微服务适用于复杂的大系统,对于小型应用使用微服务,进行盲目的拆分只会增加其维护和开发成本

微服务技术栈

微服务治理:

注册中心:将所有API、网关、Url,接口、端口所有调用的关系,放到注册中心去,所有客户端先访问一个中间件发现中心,然后发现中心可以找到对应的调用关系,然后客户再次访问就可以访问到对应的api节点

一旦微服务发生变更,会同步到注册中心/目录下

配置中心是一种统一管理各种应用配置的基础服务组件。12

配置中心的主要作用是将配置从应用中抽取出来,进行统一管理,从而优雅地解决配置的动态变更、权限管理、持久化、运维成本等问题。通过集中管理配置,配置中心将应用的配置作为一个单独的服务抽离出来,同时也需要解决新的问题,如版本管理(为了支持回滚)、权限管理等。在系统架构中,配置中心是整个微服务基础架构体系中的一个组件,虽然其功能看似简单,即配置的管理和存取,但它是整个微服务架构中不可或缺的一环。

微服务网关:做业务的扩展,做OA系统(办公、审计)

API网关:将多个不同业务的API整合在一起,业务的扩展、动态自制的管理反向代理、网关、负载均衡;API是入口,它还可以做限流(蓝绿),流量控制、熔断降级

微服务、分布式本质是区域中心化

异步通信技术:削峰、限流

把5个技术栈整合在一起,就会用到微服务框架

5.常见的微服务框架

Dubbo

阿里开源贡献给了ASF,目前已经是Apache的顶级项目,一款高性能的Java RPC服务框架,微服务生态体系中的一个重要组件

将单体程序分解成多个功能服务模块,模块间使用Dubbo框架提供的高性能RPC通信

内部协调使用 Zookeeper,实现服务注册、服务发现和服务治理

Spring cloud

一个完整的微服务解决方案,相当于Dubbo的超集

微服务框架,将单体应用拆分为粒度更小的单一功能服务

基于HTTP协议的REST(Representational State Transfer 表述性状态转移)风格实现模块间通信

二、ZooKeeper

1.zookeeper介绍(注册中心、配置中心)

ZooKeeper 是一个开源的分布式协调服务,ZooKeeper框架最初是在“Yahoo!"上构建的,用于以简单而稳健的方式访问他们的应用程序。 后来,Apache ZooKeeper成为Hadoop,HBase和其他分布式框架使用的有组织服务的标准。 例如,Apache HBase使用ZooKeeper跟踪分布式数据的状态。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。

ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁(为了保持一致性,API、URL、配置文件内容保持一致)和分布式队列等功能。

Zookeeper 一个最常用的使用场景就是用于担任服务生产者和服务消费者的注册中心(提供发布订阅服务)。 服务生产者将自己提供的服务注册到Zookeeper中心,服务的消费者在进行服务调用的时候先到Zookeeper中查找服务,获取到服务生产者的详细信息之后,再去调用服务生产者的内容与数据。如下图所示,在 Dubbo架构中 Zookeeper 就担任了注册中心这一角色。

2.zookeeper工作原理

ZooKeeper 是一个分布式服务框架,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:命名服务、状态同步、配置中心、集群管理等。

①:ZooKeeper 功能

命名服务

命名服务是分布式系统中比较常见的一类场景。命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等——这些我们都可以统称它们为名字(Name),其中较为常见的就是一些分布式服务框架(如RPC、RMI)中的服务地址列表,通过使用命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的信息等。

Zookeeper 数据模型(树状结构)

在 Zookeeper 中,节点分为两类,第一类是指构成Zookeeper集群的主机,称之为主机节点;第二类则是指内存中zookeeper数据模型中的数据单元,用来存储各种数据内容,称之为数据节点 ZNode。Zookeeper内部维护了一个层次关系(树状结构)的数据模型,它的表现形式类似于Linux的文件系统,甚至操作的种类都一致。

Zookeeper数据模型中有自己的根目录(/),根目录下有多个子目录,每个子目录后面有若干个文件,由斜杠(/)进行分割的路径,就是一个ZNode,每个 ZNode上都会保存自己的数据内容和一系列属性信息.(把状态,相关模块都放在注册中心)

状态同步

每个节点除了存储数据内容和 node 节点状态信息之外,还存储了已经注册的APP 的状态信息,当有些节点或APP 不可用,就将当前状态同步给其他服务。

配置中心

现在我们大多数应用都是采用的是分布式开发的应用,搭建到不同的服务器上,我们的配置文件,同一个应用程序的配置文件一样,还有就是多个程序存在相同的配置,当我们配置文件中有个配置属性需要改变,我们需要改变每个程序的配置属性,这样会很麻烦的去修改配置,那么可用使用ZooKeeper 来实现配置中心, ZooKeeper 采用的是推拉相结合的方式:客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知后,需要主动到服务端获取最新的数据。

Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,此应用比较流行

集群管理

所谓集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制,在日常开发和运维过程中,我们经常会有类似于如下的需求:

希望知道当前集群中究竟有多少机器在工作。

对集群中每台机器的运行时状态进行数据收集。对集群中机器进行上下线操作。

ZooKeeper 具有以下两大特性:

客户端如果对ZooKeeper 的一个数据节点注册 Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,ZooKeeper服务器就会向已注册订阅的客户端发送变更通知。对在ZooKeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么该临时节点也就被自动清除。

Watcher(事件监听器)是 Zookeeper 中的一个很重要的特性。Zookeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的时候, ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 Zookeeper 实现分布式协调服务的重要特性。

ZooKeeper 服务流程

1. 生产者启动

2. 生产者注册至zookeeper

3. 消费者启动并订阅频道

4. zookeeper 通知消费者事件

5. 消费者调用生产者

6. 监控中心负责统计和监控服务状态

3.zookeeper单机部署

官方文档:https://zookeeper.apache.org/doc/r3.9.0/zookeeperStarted.html#sc_InstallingSingleMode

配置 Java 环境

官方依赖介绍  ZooKeeper: Because Coordinating Distributed Systems is a Zoo

案例: 安装单机 zookeeper

wget -P /usr/local/src https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper3.9.0-bin.tar.gz

tar xf /usr/local/src/apache-zookeeper-3.9.0-bin.tar.gz -C /usr/local/  ---解压包

ln -s /usr/local/apache-zookeeper-3.9.0-bin /usr/local/zookeeper  ---做软链接

echo 'PATH=/usr/local/zookeeper/bin:$PATH' > /etc/profile.d/zookeeper.sh  ---做环境变量

. /etc/profile.d/zookeeper.sh   ---运行环境变量

cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg ---做数据目录

 

tickTime=2000  #"滴答时间",用于配置Zookeeper中最小的时间单元长度,单位毫秒,是其它时间配置的基础

initLimit=10   #初始化时间,包含启动和数据同步,其值是tickTime的倍数

syncLimit=5    #正常工作,心跳监测的时间间隔,其值是tickTime的倍数

dataDir=/tmp/zookeeper #配置Zookeeper服务存储数据的目录,基于安全,可以修改为dataDir=/usr/local/zookeeper/data

#dataLogdir=/usr/local/zookeeper/logs #可以指定日志路径

clientPort=2181 #配置当前Zookeeper服务对外暴露的端口,用户客户端和服务端建立连接会话

autopurge.snapRetainCount=3 #3.4.0中的新增功能:启用后,ZooKeeper 自动清除功能,会将只保留此最新3个快照和相应的事务日志,并分别保留在dataDir 和dataLogDir中,删除其余部分,默认值为3,最小值为3

autopurge.purgeInterval=24  #3.4.0及之后版本,ZK提供了自动清理日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是 0,表示不开启自动清理功能

启动 ZooKeeper

zkServer.sh start-foreground

zkServer.sh start

4.zookeeper集群部署

ZooKeeper集群用于解决单点和单机性能及数据高可用等问题

 

zookeeper集群基于Master/Slave的模型,处于主要地位(处理写操作)的主机称为Master节点,处于次要地位(处理读操作)的主机称为 slave节点,生产中读取的方式一般是以异步复制方式来实现的。

对于n台server,每个server都知道彼此的存在。只要有>n/2台server节点可用,整个zookeeper系统保持可用。保证zookeep是基数节点,不可以为偶数

因此zookeeper集群通常由奇数台Server节点组成

当进行写操作时,由leader完成,并且同步到其它follower节点,当在保证写操作在所有节点的总数过半后,才会认为写操作成功

下图表示读的比例越高,性能越好

机器越多,写性能就越差

集群角色 

 

领导者(Leader)    负责处理写入请求的,事务请求的唯一调度和处理者,负责进行投票发起和决议,更新系统状态

跟随者(Follower)   接收客户请求并向客户端返回结果,在选Leader过程中参与投票

观察者(Observer)   转交客户端写请求给leader节点,和同步leader状态

                           和Follower唯一区别就是不参与Leader投票,也不参与写操作的"过半写成功"策略

学习者(Learner)    和leader进行状态同步的节点统称Learner,包括:Follower和Observer

客户端(client)      请求发起方

选举过程

节点角色状态:

LOOKING:寻找 Leader 状态,处于该状态需要进入选举流程

LEADING:领导者状态,处于该状态的节点说明是角色已经是Leader

FOLLOWING:跟随者状态,表示 Leader已经选举出来,当前节点角色是follower

OBSERVER:观察者状态,表明当前节点角色是 observer

选举 ID

ZXID(zookeeper transaction id):每个改变 Zookeeper状态的操作都会形成一个对应的zxid。

ZXID最大的节点优先选为Leader

myid:服务器的唯一标识(SID),通过配置 myid 文件指定,集群中唯一,当ZXID一样时,myid大的节点优先选为Leader

ZooKeeper 集群选举过程:

当集群中的 zookeeper 节点启动以后,会根据配置文件中指定的 zookeeper节点地址进行leader 选择操作,过程如下:

每个zookeeper 都会发出投票,由于是第一次选举leader,因此每个节点都会把自己当做leader 角色进行选举,每个zookeeper 的投票中都会包含自己的myid和zxid,此时zookeeper 1 的投票为myid 为 1,初始zxid有一个初始值0x0,后期会随着数据更新而自动变化,zookeeper2 的投票为myid 为2,初始zxid 为初始生成的值。每个节点接受并检查对方的投票信息,比如投票时间、是否状态为LOOKING状态的投票。对比投票,优先检查zxid,如果zxid 不一样则 zxid 大的为leader,如果zxid相同则继续对比myid,myid 大的一方为 leader成为 Leader 的必要条件: Leader 要具有最高的zxid;当集群的规模是 n 时,集群中大多数的机器(至少n/2+1)得到响应并follower 选出的 Leader。

心跳机制:Leader 与 Follower 利用 PING 来感知对方的是否存活,当 Leader无法响应PING 时,将重新发起 Leader 选举。当 Leader 服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB(Zookeeper Atomic Broadcast) 协议就会进入恢复模式并选举产生新的Leader服务器。这个过程大致如下:

Leader Election(选举阶段):节点在一开始都处于选举阶段,只要有一个节点得到超半数节点的票数,它就可以当选准 leader。

Discovery(发现阶段):在这个阶段,followers 跟准 leader 进行通信,同步 followers 最近接收的事务提议。

Synchronization(同步阶段):同步阶段主要是利用 leader 前一阶段获得的最新提议历史,同步集群中所有的副本。同步完成之后 准 leader 才会成为真正的 leader。

Broadcast(广播阶段) :到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,并且leader 可以进行消息广播。同时如果有新的节点加入,还需要对新节点进行同步

ZAB 协议介绍

ZAB(ZooKeeper Atomic Broadcast 原子广播) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。 在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。

ZooKeeper 集群特性

整个集群中只要有超过集群数量一半的 zookeeper工作是正常的,那么整个集群对外就是可用的假如有 2 台服务器做了一个 Zookeeper 集群,只要有任何一台故障或宕机,那么这个 ZooKeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台zookeeper 组成一个集群, 那么损坏一台就还剩两台,大于 3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是 4 台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么2台不大于集群数量的一半,所以 3 台的 zookeeper 集群和 4 台的 zookeeper集群损坏两台的结果都是集群不可用,以此类推 5 台和 6 台以及 7 台和 8台都是同理

另外偶数节点可以会造成"脑裂"现象,所以这也就是为什么集群一般都是奇数的原因。

ZooKeeper 集群部署

运行脚本进行安装

配置文件和单机模式略微不同之处

 nc 访问 ZooKeeper

ZooKeeper支持某些特定的四字命令字母与其的交互。它们大多是查询命令,用来获取 ZooKeeper服务的当前状态及相关信息。用户在客户端可以通过 netcat telnetzookeeper发送下面命令

conf #输出相关服务配置的详细信息

cons #列出所有连接到服务器的客户端的完全的连接/会话的详细信息

envi #输出关于服务环境的详细信息

dump #列出未经处理的会话和临时节点

stat #查看哪个节点被选择作为Follower或者Leader

ruok #测试是否启动了该Server,若回复imok表示已经启动

mntr #输出一些运行时信息

reqs #列出未经处理的请求

wchs #列出服务器watch的简要信息

wchc #通过session列出服务器watch的详细信息

wchp #通过路径列出服务器watch的详细信息

srvr #输出服务的所有信息

srst #重置服务器统计信息

kill #关掉Server

isro #查看该服务的节点权限信息

将所有四字命令加入白名单

vim conf/zoo.cfg

4lw.commands.whitelist=*

echo stat | nc 127.0.0.1 2181

 

 

改完之后重启服务:systemctl restart zookeeper.service 

图形化客户端 ZooInspector

git clone GitHub - zzhang5/zooinspector: An improved zookeeper inspector   然后结合阿里云加速进行mvn编译即可使用

cd zooinspector/

mvn clean package -Dmaven.test.skip=true

chmod +x target/zooinspector-pkg/bin/zooinspector.sh

target/zooinspector-pkg/bin/zooinspector.sh

三、kafka

消息队列历史

2007 年的时候,Rabbit 技术公司基于Erlang语言开发了符合AMQP 规范RabbitMQ 1.0。

从最开始用在金融行业里面,现在MQ 已经在世界各地的公司中遍地开花。国内的绝大部分大厂都在用MQ,包括头条,美团,滴滴,去哪儿,艺龙,淘宝也有用。

kafka的数据是存储到zookeep中的/目录下,kafka嫁接在zookeep上,kafka的消息是分片的,

MQ 定义

在分布式场景中,相对于大量的用户请求来说,内部的功能主机之间、功能模块之间等,数据传递的数据量是无法想象的,因为一个用户请求,会涉及到各种内部的业务逻辑跳转等操作。那么,在大量用户的业务场景中,如何保证所有的内部业务逻辑请求都处于稳定而且快捷的数据传递呢? 消息队列(Message Queue)技术可以满足此需求

消息队列(Message Queue,简称 MQ)是构建分布式互联网应用的基础设施,通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性,是适用于现代应用的最佳设计方案。

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

MQ 使用场合

消息队列作为高并发系统的核心组件之一,能够帮助业务系统结构提升开发效率和系统稳定性

消息队列主要有以下应用场景

削峰填谷

诸如电商业务中的秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝等电商的最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列提供的顺序消息即保证消息FIFO。

分布式事务一致性(zookeep)

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用消息队列与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。

分布式缓存同步

电商的大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列构建分布式缓存,实时通知商品数据的变化

蓄流压测

线上有些链路不方便做压力测试,可以通过堆积一定量消息再放开来压测

消息队列在测试环境做压测,先不在生产环境上线

主流 MQ

目前主流的消息队列软件有 kafka、RabbitMQ、ActiveMQ、RocketMQ等,还有相对小众的消息队列软件如ZeroMQ、Apache Qpid 等。

kafka不能做集群,但是可以基于zookeep做集群

Kafka 介绍

Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。

Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。

Kafka 特点和优势

特点

分布式: 多机实现,不允许单机

分区: 一个消息.可以拆分出多个,分别存储在多个位置

多副本: 防止信息丢失,可以多来几个备份

多订阅者: 可以有很多应用连接kafka,在kafka中没有严格的说法,都叫订阅者

Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!

优势

Kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存储也能够保持长时间的稳定性能。

高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。

分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移

顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)

支持 Hadoop 并行数据加载

通常用于大数据场合,传递单条消息比较大,而Rabbitmq (支持单条小消息)消息主要是传输业务的指令数据,单条数据较小,不适合集群化大规模的场景

Kafka 角色

Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。

Consumer:消费者,用于消费消息,即处理消息

Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),使用 consumer high level API 时,同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,但多个consumer group 可同时消费这一消息。

Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……

Topic :消息的主题,可以理解为消息的分类,相当于Redis的Key和ES中的索引,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息。

Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES不同的,ES中的副本数不包括主分片数

Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建topic时可指定 parition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数据,注意同一个partition数据是有顺序的,但不同的partition则是无序的。为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片,

AR: Assigned Replicas,分区中的所有副本的统称,AR= lSR+ OSR

lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,是AR的子集

OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集

分区的优势:

实现存储空间的横向扩容,即将多个kafka服务器的空间结合利用提升性能,多服务器读写

实现高可用,分区leader 分布在不同的kafka 服务器,假设分区因子为 3, 分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。

假如设置一个wang的topic,它会生成多个目录,在第一台机器上有两个目录,一个存主本,一个存副本;第二个机器有两个目录,一个存主本,一个存副本;第三个机器有两个目录,都存副本。交叉是为了防止崩,这样做可以保证数据的高可用

Kafka 写入消息的流程

Kafka 部署

当前版本 Kafka 依赖 Zookeeper 服务,但以后将不再依赖

环境说明

#在三个节点提前部署zookeeperkafka三个节点复用

node1:192.168.10.110

node2:192.168.10.110

node3:192.168.10.110

注意:生产中zookeeperkafka一般是分开独立部署的,kafka安装前需要安装java环境;确保三个节点的zookeeper启动

Kafka 节点配置

配置文件说明

#配置文件 ./conf/server.properties内容说明

############################# Server Basics###############################

# broker的id,值为整数,且必须唯一,在一个集群中不能重复

broker.id=1

############################# Socket ServerSettings ######################

# kafka监听端口,默认9092

listeners=PLAINTEXT://10.0.0.101:9092

# 处理网络请求的线程数量,默认为3个

num.network.threads=3

# 执行磁盘IO操作的线程数量,默认为8个

num.io.threads=8

# socket服务发送数据的缓冲区大小,默认100KB

socket.send.buffer.bytes=102400

# socket服务接受数据的缓冲区大小,默认100KB

socket.receive.buffer.bytes=102400

# socket服务所能接受的一个请求的最大大小,默认为100M

socket.request.max.bytes=104857600

############################# Log Basics###################################

# kafka存储消息数据的目录

log.dirs=../data

# 每个topic默认的partition

num.partitions=1

# 设置副本数量为3,当Leader的Replication故障,会进行故障自动转移。

default.replication.factor=3

# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量

num.recovery.threads.per.data.dir=1

############################# Log FlushPolicy #############################

# 消息刷新到磁盘中的消息条数阈值

log.flush.interval.messages=10000

# 消息刷新到磁盘中的最大时间间隔,1s

log.flush.interval.ms=1000

############################# Log RetentionPolicy #########################

# 日志保留小时数,超时会自动删除,默认为7天

log.retention.hours=168

# 日志保留大小,超出大小会自动删除,默认为1G

#log.retention.bytes=1073741824

# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件

log.segment.bytes=1073741824

# 每隔多长时间检测数据是否达到删除条件,300s

log.retention.check.interval.ms=300000

############################# Zookeeper ####################################

# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开

zookeeper.connect=192.168.10.110:2181,192.168.10.120:2181,192.168.10.130:2181

# 连接zookeeper的超时时间,6s

zookeeper.connection.timeout.ms=6000

Kafka 读写数据 

kafka-topics.sh # 消息的管理命令

kafka-console-producer.sh   #生产者的模拟命令

kafka-console-consumer.sh   #消费者的模拟命令

创建 Topic

创建名为 wang,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为 2 的topic(主题)

/usr/local/kafka/bin/kafka-topics.sh --create --topic wang --bootstrap-server 192.168.10.110:9092 --partitions 3 --replication-factor 2

在各节点上观察生成的相关数据

ls /usr/local/kafka/data/

获取所有 Topic

/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.10.110:9092

验证 Topic 详情

/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrapserver 192.168.10.110:9092 --topic wang

生产 Topic

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.110:9092,192.168.10.120:9092,192.168.10.130:9092 --topic wang

消费 Topic

/usr/local/kafka/bin/kafka-console-consumer.sh --topic wang --bootstrap-server 192.168.10.110:9092 --from-beginning

删除 Topic

/usr/local/kafka/bin/kafka-topics.sh --delete --bootstrap-server 192.168.10.110:2181,192.168.10.120:2181,192.168.10.130:2181 --topic wang

 

创建消息

 在toplic中生产一下对应的生产消息

 消费消息

删除wang这条消息

如何在应用程序中关联kafka

首先基于filebeat从应用/web端将数据先收集进来,然后将数据先放入kafka中,然后将其适当的分片,redis做缓存,kafka做消息,然后将基于logstash的INPUT模块去关联kafka,把kafka/redis中消息消费出来,然后输出值es中