flink operator v1.10对接华为云对象存储OBS

发布于:2025-03-01 ⋅ 阅读:(17) ⋅ 点赞:(0)

1 概述

flink operator及其flink集群,默认不直接支持华为云OBS,需要在这些java程序的插件目录放一个jar包,以及修改flink配置后,才能支持集成华为云OBS。
相关链接参考:

https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html

2 环境准备

2.1 华为云kubernetes集群

准备一个kubernetes集群,如下图所示:
在这里插入图片描述

2.2 flink operator helm包下载地址

https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz

2.3 cert-manager yaml文件下载地址

https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml

2.4 准备flink应用示例

https://github.com/apache/flink/tree/master/flink-examples

将flink官方示例的代码编译成jar包,再上传到对象存储OBS,如下图所示:
在这里插入图片描述
这些jar包存放在华为云OBS对象存储上,flink operator和可以通过OBS协议拉取jar包,最终提交给flink集群,并且flink集群的jobmanager、flink taskmanager也能读写OBS对象存储。

3 部署

3.1 安装cert-manager

此组件是flink operator webhook的一个依赖,因此先安装它。

cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml

在这里插入图片描述

3.2 安装helm二进制工具

cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env

3.3 部署flink operator

下载fink operator的helm包,解压文件,最后通过helm命令将它部署在flink namespace中。

cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz

修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下内容:

defaultConfiguration:
  flink-conf.yaml: |+
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: *********你的ak*********
    fs.obs.secret.key: *********你的sk*********
    fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com     # 这是对象存储端点,依据实际情况填写

部署k8s资源,命令如下:

helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/

我将flink-obs的jar包放入到镜像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此镜像是公共镜像,大家可随意拉取使用。

接着,更新operator deployment(需要使用initContainer和obs-plugin的volume的挂载),直接kubectl apply如下内容即可:

apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    meta.helm.sh/release-name: flink-operator
    meta.helm.sh/release-namespace: flink
  generation: 4
  labels:
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: flink-kubernetes-operator
    app.kubernetes.io/version: 1.10.0
    helm.sh/chart: flink-kubernetes-operator-1.10.0
  name: flink-kubernetes-operator
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app.kubernetes.io/name: flink-kubernetes-operator
  strategy:
    type: Recreate
  template:
    metadata:
      annotations:
        kubectl.kubernetes.io/default-container: flink-kubernetes-operator
      creationTimestamp: null
      labels:
        app.kubernetes.io/name: flink-kubernetes-operator
    spec:
      initContainers:
      - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
        name: sidecar
        command: ["sh"]
        args: [
                "-c",
                "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
        ]
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      containers:
      - command:
        - /docker-entrypoint.sh
        - operator
        env:
        - name: OPERATOR_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        - name: HOST_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.hostIP
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: OPERATOR_NAME
          value: flink-kubernetes-operator
        - name: FLINK_CONF_DIR
          value: /opt/flink/conf
        - name: FLINK_PLUGINS_DIR
          value: /opt/flink/plugins
        - name: LOG_CONFIG
          value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
        - name: JVM_ARGS
        image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 3
          httpGet:
            path: /
            port: health-port
            scheme: HTTP
          initialDelaySeconds: 30
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        name: flink-kubernetes-operator
        ports:
        - containerPort: 8085
          name: health-port
          protocol: TCP
        resources: {}
        securityContext: {}
        startupProbe:
          failureThreshold: 30
          httpGet:
            path: /
            port: health-port
            scheme: HTTP
          periodSeconds: 10
          successThreshold: 1
          timeoutSeconds: 1
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /opt/flink/conf
          name: flink-operator-config-volume
        - mountPath: /opt/flink/artifacts
          name: flink-artifacts-volume
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      - command:
        - /docker-entrypoint.sh
        - webhook
        env:
        - name: WEBHOOK_KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              key: password
              name: flink-operator-webhook-secret
        - name: WEBHOOK_KEYSTORE_FILE
          value: /certs/keystore.p12
        - name: WEBHOOK_KEYSTORE_TYPE
          value: pkcs12
        - name: WEBHOOK_SERVER_PORT
          value: "9443"
        - name: LOG_CONFIG
          value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
        - name: JVM_ARGS
        - name: FLINK_CONF_DIR
          value: /opt/flink/conf
        - name: FLINK_PLUGINS_DIR
          value: /opt/flink/plugins
        - name: OPERATOR_NAMESPACE
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
        image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0
        imagePullPolicy: IfNotPresent
        name: flink-webhook
        resources: {}
        securityContext: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /certs
          name: keystore
          readOnly: true
        - mountPath: /opt/flink/conf
          name: flink-operator-config-volume
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext:
        runAsGroup: 9999
        runAsUser: 9999
      serviceAccount: flink-operator
      serviceAccountName: flink-operator
      terminationGracePeriodSeconds: 30
      volumes:
      - configMap:
          defaultMode: 420
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-operator.properties
            path: log4j-operator.properties
          - key: log4j-console.properties
            path: log4j-console.properties
          name: flink-operator-config
        name: flink-operator-config-volume
      - emptyDir: {}
        name: flink-artifacts-volume
      - name: keystore
        secret:
          defaultMode: 420
          items:
          - key: keystore.p12
            path: keystore.p12
          secretName: webhook-server-cert
      - name: obs-plugin
        emptyDir: {}

3.4 部署flink session cluster

kubectl apply以下资源即可部署一个flink session集群,文件内容如下:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-session-cluster
  namespace: flink
spec:
  image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19
  flinkVersion: v1_19
  flinkConfiguration:
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: *********你的ak*********
    fs.obs.secret.key: *********你的sk*********
    fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com   # 这是对象存储端点,依据实际情况填写
  jobManager:
    resource:
      memory: "2048m"
      cpu: 2
  taskManager:
    resource:
      memory: "2048m"
      cpu: 2
  serviceAccount: flink
  podTemplate:
    spec:
      volumes:
      - name: obs-plugin
        emptyDir: {}
      containers:
      # Do not change the main container name
      - name: flink-main-container
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop
      initContainers:
      - image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45
        name: sidecar
        command: ["sh"]
        args: [
                "-c",
                "mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"
        ]
        volumeMounts:
        - name: obs-plugin
          mountPath: /opt/flink/plugins/obs-fs-hadoop

在这里插入图片描述

4 提交flink作业

kubectl apply以下资源即可:

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: basic-session-job-example
  namespace: flink
spec:
  deploymentName: flink-session-cluster
  job:
    jarURI: obs://你的桶/StateMachineExample.jar    # jar包的位置,按实际情况填写
    parallelism: 1

在这里插入图片描述
可见flink作业是running状态,说明jar包被flink operator从华为云对象存储OBS拉取下来并提交到flink集群中。
继续查看flink operator日志,可以看见obs相关的信息:
在这里插入图片描述

小结

本文介绍flink operator及其管理的flink集群是如何对接到华为云对象存储OBS,对接完成后,不仅可以将作业的jar包存储在对象存储,也可以将flink作业的状态、输入输出等存储在对象存储。