线程邮箱是一个更高阶的线程通信方法。最近项目中涉及到采集端需要多线程任务,发现网上关于线程邮箱的帖子视频很少,而且都是一知半解。经过多天的学习和整理,现将线程邮箱的内容整理如下:
前言:
一、线程邮箱用来干什么的?(线程邮箱的作用)
线程邮箱的作用:
1、使用线程邮箱可以避免资源间竞争,减少损耗的时间,提高系统的执行效率。
2、线程邮箱区别于“定义全局变量”的通信方式。
全局变量实现线程通信的原因:“线程间的共享堆区、静态变量区、常量区的特点”。
3、减少资源损耗,避免重复加锁解锁。
二、线程邮箱为什么可以实现以上功能?(线程邮箱的结构特点)
1、首先线程邮箱是一条链表。链表中保存了关于线程的相关信息(线程名、线程tid、线程的函数指针、线程中队列)
2、在线程当中其数据存储在队列中。因此在链表的基础上还要加上队列结构来存储线程数据。
队列中保存的数据有:发送方的线程名,接收方的线程名,和发送的数据。
具体结构:
以上一个形象的线程邮箱结构。
主体部分:
线程邮箱的头文件mailbox.h主要包含线程中所需要的结构体。分别作用已在代码注释中标注清楚。
/*************************************************************************
> File Name: mailbox.h
> Author: qiao
> Mail: qiaoyanrong@foxmail.com
> faction: 线程邮箱头文件
************************************************************************/
#ifndef _MAILBOX_H
#define _MAILBOX_H
#include <pthread.h>
#define ENTER_CRITICAL_AREA(mutex) do{pthread_mutex_lock(mutex);}while(0)
#define QUIT_CRITICAL_AREA(mutex) do{pthread_mutex_unlock(mutex);}while(0)
typedef char DATATYPE[256];
typedef void *(*th_fun)(void *);
//保存线程ID以及发送接收方的名字(邮件的信息)
typedef struct mail_infermation
{
pthread_t id_of_sender;
char name_of_sender[256];
pthread_t id_of_recver;
char name_of_recver[256];
DATATYPE data;
}MAIL_DATA;
//队列的结构 (存储发送接收双方的信息)
typedef struct queue
{
MAIL_DATA data;
struct queue *pnext;
}queue_t; //Que
//线程链表的主体
typedef struct thread_node
{
pthread_t tid;
char name[256];
queue_t *mail_head;
queue_t *mail_tail;
th_fun th;
}LIST_DATA;
//线程邮箱链表节点
typedef struct LinkNode
{
LIST_DATA elem;
struct LinkNode *pnext;
}MAILBOX;
//线程邮箱链表节点标签
typedef struct mail_box_system
{
pthread_mutex_t mutex;
MAILBOX *thread_list;
}MBS;
extern MAILBOX *end_list;
extern MBS *create_mail_box_system(void);
extern char *get_th_name(MBS *mbs);
extern int register_to_mail_system(MBS *mbs, char *name, th_fun th);
extern int send_msg(MBS *mbs, char *recvname, DATATYPE data);
extern int recv_msg(MBS *mbs, char *sendname, DATATYPE data);
extern int wait_all_end(MBS *mbs);
extern int destroy_mail_box_system(MBS *mbs);
#endif
mainbox.c主函数文件
/*************************************************************************
> File Name: mailbox.c
> Author: yas
> Mail: rage_yas@hotmail.com
>
************************************************************************/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "mailbox.h"
#include "mailbox_queue.h"
#include "mailbox_list.h"
MBS *create_mail_box_system(void)
{
int ret = 0;
MBS *ptmp = malloc(sizeof(MBS));
if (NULL == ptmp)
{
perror("fail to create_mail_box_system");
return NULL;
}
//初始化互斥锁
ret = pthread_mutex_init(&ptmp->mutex, NULL);
if (0 != ret)
{
perror("fail to pthread_mutex_init");
return NULL;
}
//创建线程链表
ptmp->thread_list = malloc(sizeof(MAILBOX));
ptmp->thread_list->pnext = NULL;
printf("mail box create successfully!\n");
return ptmp;
}
//注册邮箱:将线程相关信息加入到链表中
//参数:链表标签,线程名,线程函数指针
int register_to_mail_system(MBS *mbs, char *name, th_fun th)
{
MAILBOX *ptmp = malloc(sizeof(MAILBOX));
if (NULL == ptmp)
{
perror("fail to register");
return -1;
}
strcpy(ptmp->elem.name, name);
ptmp->elem.th = th;
init_que(ptmp); //创建邮箱需要的队列
list_add(mbs->thread_list, ptmp); //将注册的邮箱插入链表
int ret = pthread_create(&ptmp->elem.tid, NULL, th, NULL);
if (0 != ret)
{
perror("fail to pthread_create");
return -1;
}
printf("register mail system |%s| ok !\n", ptmp->elem.name);
return 0;
}
//发送函数:将相关数据发送至线程名为recvname中的线程
int send_msg(MBS *mbs, char *recvname, DATATYPE data)
{
MAIL_DATA *ptmp = malloc(sizeof(MAIL_DATA));
strcpy(ptmp->data, data);
//获取线程的tid
ptmp->id_of_sender = pthread_self();
//链表中遍历查找接收方名字
MAILBOX *find = list_for_each(mbs->thread_list, recvname);
if (NULL == find)
{
printf("can't find recv mailbox\n");
}
//查找发送方的名字并保存
char *name = get_th_name(mbs);
strcpy(ptmp->name_of_sender, name);
strcpy(ptmp->name_of_recver, recvname);
//入队上锁防止此时被资源竞争
ENTER_CRITICAL_AREA(&mbs->mutex);
in_queue(find, ptmp);
QUIT_CRITICAL_AREA(&mbs->mutex);
return 0;
}
//接收消息
int recv_msg(MBS *mbs, char *sendname, DATATYPE data)
{
MAIL_DATA *ptmp = malloc(sizeof(MAIL_DATA));
pthread_t tid = pthread_self();
//链表中遍历查找自身tid(找到自己所在的地方)
MAILBOX *find = mbs->thread_list->pnext;
while (find != NULL)
{
if (find->elem.tid == tid)
{
break;
}
find = find->pnext;
}
if (find != NULL && find->elem.tid == tid) //如果找到了
{
while (1)
{
//出队拿数据
if (find->elem.mail_head != find->elem.mail_tail)
{
ENTER_CRITICAL_AREA(&mbs->mutex);
out_queue(find, ptmp);
QUIT_CRITICAL_AREA(&mbs->mutex);
break;
}
}
}
strcpy(sendname, ptmp->name_of_sender);
strcpy(data, ptmp->data);
free(ptmp);
return 0;
}
//链表的尾节点
MAILBOX *end_list = NULL;
//通过线程tid找到相应的名字
char *get_th_name(MBS *mbs)
{
pthread_t tid = pthread_self();
MAILBOX *find = mbs->thread_list;
MAILBOX *end = end_list; //全局变量初始为空
while (find != end)
{
if (find->elem.tid == tid)
{
break;
}
find = find->pnext;
}
if (find->elem.tid == tid)
{
return find->elem.name;
}
else
{
return NULL;
}
}
//遍历pthread_join销毁线程
int wait_all_end(MBS *mbs)
{
MAILBOX *find = mbs->thread_list->pnext;
//MAILBOX *end = end_list;
//while (find != end)
while (find->pnext != NULL)
{
pthread_join(find->elem.tid, NULL);
find = find->pnext;
}
pthread_join(find->elem.tid, NULL);
return 0;
}
int destroy_mail_box_system(MBS *mbs)
{
pthread_mutex_destroy(&mbs->mutex);
MAILBOX *ptmp = NULL;
MAILBOX *find = mbs->thread_list;
while (find != NULL)
{
ptmp = find;
find = find->pnext;
free(ptmp);
}
free(find);
return 0;
}
链表函数mailbox_list.c
#include<stdio.h>
#include <string.h>
#include "mailbox.h"
//线程邮箱入链操作
void list_add(MAILBOX *head, MAILBOX *info)
{
info->pnext = head->pnext;
head->pnext = info;
}
//链表的遍历寻找指定的线程名
MAILBOX *list_for_each(MAILBOX *head, char *name)
{
MAILBOX *ptmp = NULL;
ptmp = head;
while(ptmp != NULL)
{
if (strncmp(ptmp->elem.name, name, strlen(name)) == 0)
{
return ptmp;
}
ptmp = ptmp->pnext;
}
return NULL;
}
mailbox_queue.c
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mailbox.h"
int init_que(MAILBOX *list_head)
{
queue_t *ptmp = malloc(sizeof(queue_t));
if (NULL == ptmp)
{
perror("fail to init_que malloc");
}
ptmp->pnext = NULL;
list_head->elem.mail_head = ptmp;
list_head->elem.mail_tail = ptmp;
return 0;
}
int in_queue(MAILBOX *list_head, MAIL_DATA *data)
{
queue_t *ptmp = malloc(sizeof(queue_t));
memcpy(&ptmp->data, data, sizeof(MAIL_DATA));
//ptmp->data = data;
ptmp->pnext = NULL;
list_head->elem.mail_tail->pnext = ptmp;
list_head->elem.mail_tail = list_head->elem.mail_tail->pnext;
return 0;
}
int out_queue(MAILBOX *list_head, MAIL_DATA *data)
{
if (list_head->elem.mail_head == list_head->elem.mail_tail)
{
printf("queue is empty.\n");
return -1;
}
if (list_head->elem.mail_head->pnext == list_head->elem.mail_tail)
{
list_head->elem.mail_tail = list_head->elem.mail_head;
}
queue_t *del = list_head->elem.mail_head->pnext;
list_head->elem.mail_head->pnext = del->pnext;
//memcpy(data, &del->data, sizeof(MAIL_DATA));
*data = del->data;
free(del);
return 0;
}
示例main.c
void *data_collect_th(void *arg)
{
while (1)
{
printf("this is the show th\n");
sleep(3);
send_msg(mbs, "show", "aabb");
send_msg(mbs, "show", "1111");
send_msg(mbs, "show", "2222");
send_msg(mbs, "show", "3333");
send_msg(mbs, "show", "4444");
send_msg(mbs, "show", "5555");
}
return NULL;
}
void *show_th(void *arg)
{
while(1)
{
printf("this is the show th\n");
char sendname[256];
DATATYPE data;
recv_msg(mbs, sendname, data);
printf("show recv msg from %s msg is %s\n", sendname, data);
sleep(1);
}
return NULL;
}
void *sock_th(void *arg)
{
while(1)
{
printf("this is the sock th\n");
DATATYPE data;
char sendname[256];
recv_msg(mbs, sendname, data);
send_msg(mbs, "show", "my is sock");
printf("sock recv msg from %s msg is %s\n", sendname, data);
sleep(1);
}
return NULL;
}
int main(void)
{
//MBS *mbs = NULL;
mbs = create_mail_box_system();
//printf("mbs = %p", mbs);
register_to_mail_system(mbs, "show", show_th);
register_to_mail_system(mbs, "sock", sock_th);
register_to_mail_system(mbs, "data", data_collect_th);
wait_all_end(mbs);
destroy_mail_box_system(mbs);
printf("Hello World!");
return 0;
}
以上就是完成代码,需要的相关mailbox_list.h和mailbox_queue.h都是非常简单的头文件添加,不在此多做赘述。