SeaTunnel系列之:部署SeaTunnel的Spark和Flink引擎

发布于:2025-04-06 ⋅ 阅读:(23) ⋅ 点赞:(0)

SeaTunnel系列之:部署SeaTunnel的Spark和Flink引擎

运行 SeaTunnel

现在您已经下载了 SeaTunnel 二进制包和连接器插件。接下来,您可以选择不同的引擎选项来运行同步任务。

如果您使用 Flink 运行同步任务,则无需部署 SeaTunnel 引擎服务集群。您可以参考使用 Flink 快速开始来运行您的同步任务。

如果您使用 Spark 运行同步任务,则同样无需部署 SeaTunnel 引擎服务集群。您可以参考使用 Spark 快速开始来运行您的同步任务。

如果您使用内置的 SeaTunnel 引擎(Zeta)来运行任务,则需要先部署 SeaTunnel 引擎服务。请参考使用 SeaTunnel 引擎快速开始。

快速开始使用 Flink

步骤 1: 部署 SeaTunnel 和连接器
在开始之前,请确保已按照部署中的描述下载并部署了 SeaTunnel。

步骤 2: 部署和配置 Flink
请首先下载 Flink(所需版本 ‍>= 1.12.0)。

配置 SeaTunnel: 修改 ${SEATUNNEL_HOME}/config/seatunnel-env.sh 中的设置,并将 FLINK_HOME 设置为 Flink 的部署目录。

步骤 3: 添加作业配置文件以定义作业

编辑 config/v2.streaming.conf.template,该文件确定 Seatunnel 启动后数据输入、处理和输出的方式和逻辑。以下是配置文件的一个示例,与上述提到的示例应用相同。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    plugin_input = "fake"
    plugin_output = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Console {
    plugin_input = "fake1"
  }
}

第 4 步:运行 SeaTunnel 应用程序

您可以使用以下命令启动应用程序:

对于 Flink 版本在 1.12.x 到 1.14.x 之间的:

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template

对于 Flink 版本在 1.15.x 到 1.18.x 之间的:

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template

查看输出:当您运行命令时,可以在控制台中看到其输出。这是判断命令是否成功执行的标志。

SeaTunnel 控制台将打印如下日志:

fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144

快速开始使用 Spark

步骤 1: 部署 SeaTunnel 和连接器
在开始之前,请确保您已按照部署中的描述下载并部署了 SeaTunnel。

步骤 2: 部署和配置 Spark
请先下载 Spark(所需版本 ‍>= 2.4.0)。更多信息请参阅快速开始: 独立模式。
配置 SeaTunnel:更改 ${SEATUNNEL_HOME}/config/seatunnel-env.sh 中的设置,并将 SPARK_HOME 设置为 Spark 的部署目录。

步骤 3: 添加作业配置文件以定义作业
编辑 config/seatunnel.streaming.conf.template,该文件确定 SeaTunnel 启动后数据输入、处理和输出的方式和逻辑。以下是配置文件的一个示例,与上述示例应用相同。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    plugin_output = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    plugin_input = "fake"
    plugin_output = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Console {
    plugin_input = "fake1"
  }
}

步骤4:运行SeaTunnel应用程序

您可以通过以下命令启动应用程序:

Spark 2.4.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

Spark3.x.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-3-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

查看输出:当你运行命令时,可以在控制台中看到其输出。这是判断命令是否成功执行的标志。

SeaTunnel 控制台将打印如下日志:

fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144