Docker torchserve workflow部署流程

发布于:2024-10-13 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. 先部署相关模型, 步骤见: Docker torchserve 部署模型流程

搭建涉及到的模型,如ocr_detection,ocr_judge,ocr_text, xxx_detection …

2. workflow文件

2.1 串行执行,示例1

(1)xxx_workflow.yaml

models:
    #global model params
    batch-size: 1
    max-batch-delay : 10
    retry-attempts : 3
    timeout-ms : 5000
    ocr_detection:
       url : ocr_detection.mar #local or public URI
    ocr_judge:
       url : ocr_judge.mar
    ocr_text:
       url : ocr_text.mar    
 
dag:
  ocr_detection : [first_processing]
  first_processing: [ocr_judge]
  ocr_judge : [second_processing]
  second_processing : [ocr_text]

(2)xxx_workflow_handler.py

import json
import numpy as np

def first_processing(data, context):
    if data:
        image = data[0].get("data") or data[0].get("body")
        image = image.decode('utf-8')
        image = json.loads(image)
        return [{"image": image}] # [{"image":["xxx","xxx"]}]
    return None

def second_processing(data, context):
    if data:
        image = data[0].get("data") or data[0].get("body")
        image = image.decode('utf-8')
        image = json.loads(image)
        return [{"image": image}] # [{"image":["xxx","xxx"]}]
    return None

2.2 串行执行,示例2

(1)xxx_workflow.yaml

models:
    #global model params
    batch-size: 1
    max-batch-delay : 10
    retry-attempts : 3
    timeout-ms : 5000
    xxx_detection:
       url : xxx_detection.mar #local or public URI
    xxx_recognition:
       url : xxx_recognition.mar
     
 
dag:
  xxx_detection : [xxx_recognition]

2.2 并行执行,示例

(1)xxx_workflow.yaml

models:
    #global model params
    batch-size: 1
    max-batch-delay : 10
    retry-attempts : 3
    timeout-ms : 5000
    xxx_branch_1:
       url : xxx_branch_1.mar #local or public URI
     
    xxx_branch_2:
       url : xxx_branch_2.mar

    xxx_branch_3:
       url : xxx_branch_3.mar
 
dag:
  pre_processing : [xxx_branch_1,xxx_branch_2,xxx_branch_3]
  xxx_branch_1 : [aggregate_func]
  xxx_branch_2 : [aggregate_func]
  xxx_branch_3 : [aggregate_func]

(2)xxx_workflow_handler.py

import json

def pre_processing(data, context):
    '''
    Empty node as a starting node since the DAG doesn't support multiple start nodes
    '''
    if data:
        text = data[0].get("data") or data[0].get("body")
        return [text]
    return None


def aggregate_func(data, context):
    '''
    Changes the output keys obtained from the individual model
    to be more appropriate for the workflow output
    '''
    if data:
        xxx_branch_1_result = json.loads(data[0].get("xxx_branch_1"))
        xxx_branch_2_result = json.loads(data[0].get("xxx_branch_2"))
        xxx_branch_3_result = json.loads(data[0].get("xxx_branch_3"))
        response = {
                "xxx_branch_1_result": xxx_branch_1_result,
                "xxx_branch_2_result": xxx_branch_2_result,
                "xxx_branch_3_result": xxx_branch_3_result
        }
        return [response]

3. 打包workflow文件

torch-workflow-archiver -f --workflow-name xxx_workflow --spec-file xxx_workflow.yaml --handler xxx_workflow_handler.py --export-path /home/model-server/model-store/

4. 配置接口

(1)查询已注册的workflow
curl "http://localhost:8381/workflows"
(2)注册workflow并为其分配资源

将.war文件注册,注意:.war文件必须放在model-store文件夹下,即/path/model-server/model-store

curl -X POST "$server/workflows?url=$model_url&workflow_name=$model_name"
# 示例
curl -X POST "localhost:8381/workflows?url=xxx_workflow.war&workflow_name=xxx_workflow"
(3)查看workflow状态
curl http://localhost:8381/workflows/xxx_workflow
(4)删除注册workflow
curl -X DELETE http://localhost:8381/workflows/xxx_workflow

5. workflow推理

# -*- coding: utf-8 -*-
import requests
import json
from PIL import Image
import base64
import io

def get_image_bytes(image_path):
    # 打开图像并转换为 RGB
    with Image.open(image_path).convert("RGB") as img:
        # 创建一个字节流
        byte_stream = io.BytesIO()
        # 将图像保存到字节流中,格式为 JPEG
        img.save(byte_stream, format="JPEG")
        # 获取字节流内容
        byte_stream.seek(0)
        return byte_stream.read()

image_path = '/path/img1.jpg'

image_bytes = get_image_bytes(image_path)
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
data = {'data':json.dumps({'image':image_base64})}
response = requests.post('http://localhost:8380/wfpredict/xxx_workflow',data = data)
print(response)
print(response.status_code)
print(eval(response.text))

参考:
容器内/serve readme或示例
https://pytorch.org/serve/workflows.html