libco源码剖析(1) - 共享栈与协程的创建
前言
libco是腾讯开源的一个协程库,我会出一系列文章对libco的核心原理源码进行介绍,主要以源码注释为主,本文主要介绍共享栈相关源码,主要包括共享栈的创建,在协程创建过程中是怎样分配共享栈的,以及在协程切换时是怎样保存、恢复共享栈的数据的。
共享栈
协程栈模型有两种方式,独立栈和共享栈:
每个协程都拥有一个独立的栈帧, 协程切换时会保存当前协程栈中的所有数据, 并加载新的栈帧对象
. 这样做的优点是: 协程调度可以在内存中的任意位置、任意时刻进行; 但是缺点也很明显:随着并发量的增加, 协程的数目越来越多, 当前内存中的协程栈(无论是occupy还是suspend)越来越多, 内存瓶颈开始显现, 且内存切换本身也是不小的开销(寄存器恢复、数据拷贝).
所以, stackful coroutine一般有栈大小的限制(libco是128K).
那么能不能所有
正在执行的协程都共享一个栈呢?
这就是共享栈的思路. 共享栈将协程划分为协程组, 同一个协程组中的协程共享同一块内存,在协程切换的时候将当前内存中的数据保存到运行协程的buffer中, 并将新调度协程buffer中的数据拷贝到共享栈中
. 如此既可以减少内存开销, 同时运行协程又没有了栈大小的限制. 共享栈的缺点是:协程调度产生的局部变量都在共享栈上, 一旦新的协程运行后共享栈中的数据就会被覆盖, 先前协程的局部变量也就不再有效, 进而无法实现参数传递、嵌套调用等高级协程交互.
libco支持两种方式,默认使用独立栈,使用共享栈需要手动指定。
以下两个结构体分别是每个共享栈内存模型的结构体和组织所有共享栈的结构体:
/**
* 一个共享栈的结构体,每个共享栈的内存所在
* 一个进程或者线程栈的地址,是从高位到低位安排数据的,所以stack_bp是栈底,stack_buffer是栈顶
*/
struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 当前正在使用该共享栈的协程
int stack_size; // 栈的大小
char* stack_bp; // stack_buffer + stack_size 栈底
char* stack_buffer; // 栈的内容,也就是栈顶
};
/*
* 所有共享栈的结构体,这里的共享栈是个数组,每个元素分别是个共享栈
*/
struct stShareStack_t
{
unsigned int alloc_idx; // 目前正在使用的那个共享栈的index
int stack_size; // 共享栈的大小,这里的大小指的是一个stStackMem_t*的大小
int count; // 共享栈的个数,共享栈可以为多个,所以以下为共享栈的数组
stStackMem_t** stack_array; //栈的内容,这里是个数组,元素是stStackMem_t*
};
下面两个函数用来创建共享栈以及分别匹配内存:
/**
* 创建一个共享栈
* @param count 创建共享栈的个数
* @param stack_size 每个共享栈的大小
*/
stShareStack_t* co_alloc_sharestack(int count, int stack_size)
{
stShareStack_t* share_stack = (stShareStack_t*)malloc(sizeof(stShareStack_t));
share_stack->alloc_idx = 0;
share_stack->stack_size = stack_size;
//alloc stack array
share_stack->count = count;
stStackMem_t** stack_array = (stStackMem_t**)calloc(count, sizeof(stStackMem_t*)); //见下文介绍
for (int i = 0; i < count; i++)
{
stack_array[i] = co_alloc_stackmem(stack_size); //co_alloc_stackmem用于分配每个共享栈内存, 实现见下
}
share_stack->stack_array = stack_array;
return share_stack;
}
/**
* 分配一个栈内存
* @param stack_size的大小
*/
stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
{
stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
stack_mem->occupy_co= NULL; //当前没有协程使用该共享栈
stack_mem->stack_size = stack_size;
stack_mem->stack_buffer = (char*)malloc(stack_size); //栈顶,低内存空间
stack_mem->stack_bp = stack_mem->stack_buffer + stack_size; //栈底,高地址空间
return stack_mem;
}
calloc函数,其原型void *calloc(size_t n, size_t size);其比malloc函数多一个参数,并不需要人为的计算空间的大小,比如如果他要申请20个int类型空间,会int *p = (int *)calloc(20, sizeof(int)),这样就省去了人为空间计算的麻烦。但这并不是他们之间最重要的区别,malloc申请后空间的值是随机的,并没有进行初始化,而calloc却在申请后,对空间逐一进行初始化,并设置值为0。
协程的创建
在了解协程的创建之前需要了解协程相关的结构体:
//协程结构体
struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 协程所在的运行环境,可以理解为,该协程所属的协程管理器
pfn_co_routine_t pfn; // 协程所对应的函数
void *arg; // 函数参数
coctx_t ctx; // 协程上下文,包括寄存器和栈
// 以下用char表示了bool语义,节省空间
char cStart; // 是否已经开始运行了
char cEnd; // 是否已经结束
char cIsMain; // 是否是主协程
char cEnableSysHook; // 是否要打开钩子标识,默认是关闭的
char cIsShareStack; // 是否要采用共享栈
void *pvEnv;
stStackMem_t* stack_mem; // 栈内存
char* stack_sp; //协程在共享栈中已经使用的栈顶
unsigned int save_size; // save_buffer的长度
char* save_buffer; // 当协程挂起时,栈的内容会栈暂存到save_buffer中
stCoSpec_t aSpec[1024];
};
typedef void *(*pfn_co_routine_t)(void *); //协程函数类型
/*
* 协程属性
* 用于每个协程自定义一些配置
*/
struct stCoRoutineAttr_t
{
int stack_size; // 如果是共享栈模式,则该只不用指定。如果不是共享栈模式,则必需指定
stShareStack_t* share_stack; //不是共享栈模式,为NULL
stCoRoutineAttr_t()
{
// 默认是128k
stack_size = 128 * 1024;
// 默认不是共享栈
share_stack = NULL;
}
}__attribute__ ((packed));
每个线程都有一份 stCoRoutineEnv_t 对象(协程的运行环境),在线程第一次创建协程时被自动创建,同时也会创建主协程,并将指向主协程的指针放到 pCallStack[0]
里:
/*
* 线程所管理的协程的运行环境
* 一个线程只有一个这个属性
*/
struct stCoRoutineEnv_t
{
// 这里实际上维护的是个调用栈
// 最后一位是当前运行的协程,前一位是当前协程的父协程(即,resume该协程的协程)
// 可以看出来,libco只能支持128层协程的嵌套调用。这个绝对够了
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize; // 当前调用栈长度
stCoEpoll_t *pEpoll; //主要是epoll,作为协程的调度器
//当采用共享栈模式时,用于共享栈数据的保存与回复
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
接下来是协程创建的具体过程,需要重点关注共享栈的分配:
/**
* 创建一个协程对象
*
* @param ppco - (output) 协程的地址,未初始化,需要在此函数中将其申请内存空间以及初始化工作
* @param attr - (input) 协程属性,目前主要是共享栈
* @param pfn - (input) 协程所运行的函数
* @param arg - (input) 协程运行函数的参数
*/
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
// 查找当前线程的管理环境
if( !co_get_curr_thread_env() )
{
// 如果找不到,则初始化协程
co_init_curr_thread_env();
}
// 根据协程的运行环境,来创建一个协程,见下个函数
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
/**
* 根据协程管理器env, 新建一个协程
*
* @param env - (input) 协程所在线程的环境
* @param attr - (input) 协程属性,目前主要是共享栈
* @param pfn - (input) 协程所运行的函数
* @param arg - (input) 协程运行函数的参数
*/
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
// 初始化属性。并且给默认值
stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024; // 默认的为128k
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
// 如果采用了共享栈模式,则获取到其中一个共享栈的内存,见下个函数
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
// 如果没有采用共享栈,则分配内存
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
// 设置该协程的context
lp->ctx.ss_sp = stack_mem->stack_buffer; // 栈地址
lp->ctx.ss_size = at.stack_size; // 栈大小
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0; // 默认不开启hook
lp->cIsShareStack = at.share_stack != NULL;
// 仅在共享栈的时候有意义
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
/*
* 在共享栈中,获取协程的栈内存
*/
static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{
if (!share_stack)
{
return NULL;
}
// 轮询的使用shared_stack
int idx = share_stack->alloc_idx % share_stack->count;
share_stack->alloc_idx++;
return share_stack->stack_array[idx];
}
共享栈数据的保存与恢复
在协程切换时,会将共享栈的数据保存到上一个协程(occupy_co)的save_buffer中,将接下来要执行的协程(pending_co)save_buffer中的数据拷贝到共享栈中。
注意,在coctx_swap(&(curr->ctx),&(pending_co->ctx) );
函数前后,协程经历了切换,函数之前,coctx_swap会进入到pending_co的协程环境中运行,函数之后已经yield回此协程了,才会执行接下来的语句。
*
* 1. 将当前的运行上下文保存到curr中
* 2. 将当前的运行上下文替换为pending_co中的上下文
* @param curr
* @param pending_co
*/
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
//c变量的作用是为了找到目前的栈顶,因为c变量是最后一个放入栈中的内容。
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack)
{
// 如果没有采用共享栈,清空pending_co和occupy_co
env->pending_co = NULL;
env->occupy_co = NULL;
}
else
{
// 如果采用了共享栈
env->pending_co = pending_co;
//get last occupy co on the same stack mem
// occupy_co指的是,和pending_co共同使用一个共享栈的协程
// 把它取出来是为了先把occupy_co的内存保存起来
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
// 将该共享栈的占用者改为pending_co
pending_co->stack_mem->occupy_co = pending_co;
env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co)
{
// 如果上一个使用协程不为空, 则需要把它的栈内容保存起来,见下个函数。
save_stack_buffer(occupy_co);
}
}
// swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
// 上一步coctx_swap会进入到pending_co的协程环境中运行
// 到这一步,已经yield回此协程了,才会执行下面的语句
// 而yield回此协程之前,env->pending_co会被上一层协程设置为此协程
// 因此可以顺利执行: 将之前保存起来的栈内容,恢复到运行栈上
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
// 将栈的内容恢复,如果不是共享栈的话,每个协程都有自己独立的栈空间,则不用恢复。
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
// resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
// 将之前保存起来的栈内容,恢复到运行栈上
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
/**
* 将原本占用共享栈的协程的内存保存起来。
* @param occupy_co 原本占用共享栈的协程
*/
void save_stack_buffer(stCoRoutine_t* occupy_co)
{
///copy out
stStackMem_t* stack_mem = occupy_co->stack_mem;
// 计算出栈的大小
int len = stack_mem->stack_bp - occupy_co->stack_sp;
if (occupy_co->save_buffer)
{
free(occupy_co->save_buffer), occupy_co->save_buffer = NULL;
}
occupy_co->save_buffer = (char*)malloc(len); //malloc buf;
occupy_co->save_size = len;
// 将当前运行栈的内容,拷贝到save_buffer中
memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len);
}
参考
https://github.com/chenyahui/AnnotatedCode/tree/master/libco
https://blog.csdn.net/weibo1230123/article/details/81503135