workflow:高效的流式工作架构

发布于:2025-05-21 ⋅ 阅读:(19) ⋅ 点赞:(0)
引言

workflow是sougou的一款开源框架 主要是以请求回应的模式解决各自网络/IO任务而发明的

 一.workflow的任务流
1.workflow都封装了哪些任务流

以请求回应的模式来解释

① 网络层

服务端

在服务端的request 相当于发送了一个获取客户端请求的请求,response相当于接收客户端的请求从而处理请求

客户端 

在客户端的request相当于向服务端发送了一个请求,response相当于接收服务端回复的消息

 ② 定时器

发送延时请求---->处理延时任务

③  IO层

发起处理文件IO的请求---->处理文件

④  cpu

发起耗时计算的请求---->处理耗时任务

总结:

耗时等待 :负载均衡 类似于fd通过%4取模的方式负载均衡给四个网络线程 ,当条件满足的时候把任务抛出给工作线程处理

 

耗时计算:当遇到计算任务的时候工作线程会把计算任务抛出给到go线程,go线程池通过任务调度处理计算任务

2.任务是如何来进行组织的呢

在workflow中通常以串联,并联,DAG 三种方式对任务进行组织

①串联

想象成你中学学习物理的时候的串联电路

②并联

想象成你中学学习物理的时候的并联电路

③DAG

在工作流(Workflow)系统中,DAG(有向无环图,Directed Acyclic Graph) 是一个非常核心的概念,它用于表示任务之间的执行依赖关系,画一个简单的图演示一下。

 

任务A 最先执行

任务B 和 任务C 并行执行,但都依赖于 任务A

任务D 要等 任务B 和 任务C 都完成后才执行

学过408中的操作系统信号量的同学应该可以很轻松理解这个模型

 

二.线程模型

main函数入口主线程

4个网络线程

20个工作线程

8个计算线程

 

 三.workflow的三板斧
1.三板斧是哪三板斧

first-->抽象粒度合适的异步任务

second--->通过任务流组织任务:串联,并联,以及DAG

third--->协调任务:counter,conditional,resource pool,message queue

2.message queue 

我们简单介绍一下message queue 消息队列的部分

队列的接口层

 在workflow中消息队列提供了上述这些接口 分别是 队列的创建 ,获取队列 ,往队列里面put 消息  ,设置队列是阻塞还是非阻塞 ,队列的销毁 

队列都有哪些成员

 此队列中有两个队列 分别是put队列 和 get队列 我们了解这两个队列的同时 我们要先理解多消费者和多生成者模型 , 在生产者往队列里面put消息的时候 是通过在put队列里面put操作 ,当消费者消费队列中的消息的时候 通过在get队列里面使用get操作,当get队列里面没有数据的时候,put队列里面的数据会转移到get队列里面供消费者消费,我们这样设计的目的是为了减少生产者和消费者之间的碰撞从而提供效率

 我们从中拿一个消息队列的头插法举个例子

 我们用void** 接收一个偏移后的message 用C语音的void**的好处是可以接收任何类型的数据,更加自由的去操作数据

我们对消息队列进行put操作前 我们要保证原子性 所以我们要通过pthread_mutex_lock 上锁 和pthread_mutex_unloc解锁

 后面我们通过条件变量保证数据的同步性pthread_cond_wait

 3.我们举一个简单的用workflow写的一个http请求回应代码
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <workflow/WFTaskFactory.h>
#include <workflow/WFFacilities.h>

static WFFacilities::WaitGroup waitGroup(1);  // 用于主线程等待任务完成

void http_callback(WFHttpTask *task)
{
    int state = task->get_state();
    int error = task->get_error();

    if (state != WFT_STATE_SUCCESS)
    {
        printf("HTTP request failed. State = %d, Error = %d\n", state, error);
        waitGroup.done();  // 通知主线程任务已完成
        return;
    }

    const void *body;
    size_t body_len;
    task->get_resp()->get_parsed_body(&body, &body_len);
    printf("HTTP Response:\n%.*s\n", (int)body_len, (const char *)body);

    waitGroup.done();  // 通知主线程任务已完成
}

int main()
{
    signal(SIGPIPE, SIG_IGN);  // 忽略 SIGPIPE 信号

    const char *url = "http://www.github.com/";

    WFHttpTask *task = WFTaskFactory::create_http_task(url, 4, 2, http_callback);
    task->start();  // 异步启动任务
    waitGroup.wait();  // 主线程等待任务完成

    return 0;
}

我们把一个url 放入到一个任务流中 并设置回调函数 

任务流对github发送request github收到请求 并 回复response给我们 触发回调

打印http://www.example.com 的 HTTP 响应内容。