Flume的安装与使用 -- flume自定义拦截器 -- tailDir + Memory + HDFS案例

发布于:2024-09-19 ⋅ 阅读:(16) ⋅ 点赞:(0)


前言

Flume – 一般用于采集日志数据
Sqoop – 一般用于采集数据库中的数据

一、安装

1、上传、解压、重命名

flume下载地址
提取码: 1234

cd /opt/modules
# 上传
# 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs
# 重命名
mv /opt/modules/apache-flume-1.9.0-bin /opt/modules/flume

2、修改环境变量

vi /etc/profire

export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin

3、修改配置文件

cd /opt/installs/flume/conf
mv flume-env.sh.template flume-env.sh

#添加如下配置:
export JAVA_HOME=/opt/installs/jdk

二、案例(tailDir + Memory + HDFS)

Source(数据来源) + channels(通道) + sinks(数据下沉的地方) 组成了flume的agent

tailDir 是用来监控多个文件夹下的多个文件的,只要文件内容发生变化,就会再次的进行数据的抽取

在/opt/installs/flume/conf下创建一个myconf文件夹
并在myconf下创建一个taildir-memory-hdfs.conf 文件

在 taildir-memory-hdfs.conf 文件添加如下内容
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
#  . 代表的意思是一个任意字符   * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*


a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs

执行

cd /opt/installs/flume/conf/myconf/

flume-ng agent -n a1 -c ./ -f taildir-memory-hdfs.conf -Dflume.root.logger=INFO,console

三、flume中的自定义拦截器

1、导包

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

2、导入打包插件

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <!-- 禁止生成 dependency-reduced-pom.xml-->
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <!-- 解决包冲突 进行转换-->
                                    <pattern>com.google.protobuf</pattern>
                                    <shadedPattern>shaded.com.google.protobuf</shadedPattern>
                                </relocation>
                            </relocations>
                            <artifactSet>
                                <excludes>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们-->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>mainclass</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

3、代码

//实现Interceptor接口,并实现里面的四个方法
public class DemoInterceptor implements Interceptor{
	// 具体的逻辑代码,根据需求自行编写
}
// 在内部定义一个类,实现Builder接口
    public static class BuilderEvent implements Builder{

        @Override
        public Interceptor build() {
            return new DemoInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

网站公告

今日签到

点亮在社区的每一天
去签到