C语言:流量控制

发布于:2024-07-01 ⋅ 阅读:(10) ⋅ 点赞:(0)

前言

流量控制可以让发送端根据接收端的实际接受能力控制发送的数据量。它的具体操作是,接收端主机向发送端主机通知自己可以接收数据的大小,于是发送端会发送不会超过该大小的数据,该限制大小即为窗口大小,即窗口大小由接收端主机决定。如播放视频,音频文件时,需要对发送的数据进行流控。

mycat实现

#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>

#define BUF_SIZE    1024*1
#define BUF_SIZE    100
int main(int argc,char **argv)
{

    int fps,fpd=1;
    int ret;
    char buf[BUF_SIZE];
    int len,pos;
    if(argc <2)
    {
        fprintf(stderr,"Usage:%s <src_file> <dest_file>\n",argv[0]);
        exit(1);
    }
    do
    {
        fps = open(argv[1],O_RDONLY);
        if(fps <0)
        {
            if(errno != EINTR)
            {
                perror("open");
                exit(1);
            }
        }
    }while(fps<0);
    
    while(1)
    {
        len = read(fps,buf,BUF_SIZE);
        if(len < 0)
        {
            if(errno == EINTR)
                continue;
            perror("read()");
            break;
        }
        if(len ==0)
            break;

        pos = 0;
        while(len > 0)
        {
            ret = write(fpd,buf+pos,len);
            sleep(1);
            if(ret <0)
            {
                if(errno == EINTR)
                    continue;
                perror("write()");
                exit(1);
            }
            pos += ret;
            len-=ret;
        }
    
    }

    close(fps);

    return 0;
}

漏桶实现

#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>


#define CPS 100
#define BUF_SIZE    CPS

static volatile int loop = 0;

static void alrm_handler(int num)
{
    alarm(1);
    loop = 1;
}

int main(int argc,char **argv)
{

    int fps,fpd=1;
    int ret;
    char buf[BUF_SIZE];
    int len,pos;
    if(argc <2)
    {
        fprintf(stderr,"Usage:%s \n",argv[0]);
        exit(1);
    }
    signal(SIGALRM,alrm_handler);
    alarm(1);

    do
    {
        fps = open(argv[1],O_RDONLY);
        if(fps <0)
        {
            if(errno != EINTR)
            {
                perror("open");
                exit(1);
            }
        }
    }while(fps<0);
    
    while(1)
    {
        //while(!loop);
        while(loop == 0)
            pause();   //用于等待一个打断的到来。不加则是忙等。

        loop = 0;
        while(1)
        {
        len = read(fps,buf,BUF_SIZE);
        if(len < 0)
        {
            if(errno == EINTR)
                continue;
            perror("read()");
            break;
        }
        break;
        }
        if(len ==0)
            break;

        pos = 0;
        while(len > 0)
        {
            ret = write(fpd,buf+pos,len);
            if(ret <0)
            {
                if(errno == EINTR)
                    continue;
                perror("write()");
                exit(1);
            }
            pos += ret;
            len-=ret;
        }
    
    }

    close(fps);

    return 0;
}

如果读取的是打印机类的设备,并且当时打印机上面没有数据,那么程序就会一直循环于 read处

所以 漏桶的缺陷就是 如果没有数据的时候 就会一直循环等待,直到有数据,如果忽然来的数据量很大,也不能快速的去读数据,只能慢慢的一秒10个字节的去读n次

令牌桶实现

令牌桶的优势,就是 当没有数据可读的时候,会积攒自己的权限,意思是 如果之前30秒一直没有数据,读空了30秒,那么就存下30个权限,等到有数据的时候,快速使用前面30个权限,快速连续读30次。

#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>


#define CPS 10
#define BUF_SIZE    CPS
#define BURST 100

static volatile sig_atomic_t  token  = 0; //信号原子类型,保证取值和赋值一定是一条指令去运行的

static void alrm_handler(int num)
{
    alarm(1);
    token++;
    if(token >=  BURST)
        token =BURST;
}

int main(int argc,char **argv)
{

    int fps,fpd=1;
    int ret;
    char buf[BUF_SIZE];
    int len,pos;
    if(argc <2)
    {
        fprintf(stderr,"Usage:%s \n",argv[0]);
        exit(1);
    }
    signal(SIGALRM,alrm_handler);
    alarm(1);

    do
    {
        fps = open(argv[1],O_RDONLY);
        if(fps <0)
        {
            if(errno != EINTR)
            {
                perror("open");
                exit(1);
            }
        }
    }while(fps<0);
    
    while(1)
    {
        while(token <= 0)
            pause();   //用于等待一个打断的到来。不加则是忙等。

        token--; // 操作不原子,可以出现--的时候出现++的情况

        while(1)
        {
        len = read(fps,buf,BUF_SIZE);  //读的时候被信号打断时token会++
        if(len < 0)
        {
            if(errno == EINTR)
                continue;
            perror("read()");
            break;
        }
        break;
        }
        if(len ==0)
            break;

        pos = 0;
        while(len > 0)
        {
            ret = write(fpd,buf+pos,len);
            if(ret <0)
            {
                if(errno == EINTR)
                    continue;
                perror("write()");
                exit(1);
            }
            pos += ret;
            len-=ret;
        }
    
    }

    close(fps);

    return 0;
}

令牌桶实现封装

main.c

#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>
#include "mytbf.h"


#define CPS 10
#define BUF_SIZE    1024
#define BURST 100


int main(int argc,char **argv)
{

    int fps,fpd=1;
    int ret;
    char buf[BUF_SIZE];
    int len,pos;
    if(argc <2)
    {
        fprintf(stderr,"Usage:%s \n",argv[0]);
        exit(1);
    }
    mytbf_t* tbf;
    tbf = mytbf_init(CPS,BURST);
    if(tbf ==NULL)
    {
        fprintf(stderr,"mytbf_init Error\n");
        exit(1);
    }

    do
    {
        fps = open(argv[1],O_RDONLY);
        if(fps <0)
        {
            if(errno != EINTR)
            {
                perror("open");
                exit(1);
            }
        }
    }while(fps<0);
    
    while(1)
    {
        int size;
        size = mytbf_fetchtoken(tbf,BUF_SIZE);
        if(size <0)
        {
            fprintf(stderr,"mytbf_fetchtoken:%s\n",strerror(-size));
            exit(1);
        }

        while(1)
        {
        len = read(fps,buf,size);  //读的时候被信号打断时token会++
        if(len < 0)
        {
            if(errno == EINTR)
                continue;
            perror("read()");
            break;
        }
        break;
        }

        if(len ==0)
            break;
        if(size -len > 0)   //如果读到的token数比取出来的token数小,就把没用到的token归还。
        {
            mytbf_returntoken(tbf,size -len);
        }
        pos = 0;
        while(len > 0)
        {
            ret = write(fpd,buf+pos,len);
            if(ret <0)
            {
                if(errno == EINTR)
                    continue;
                perror("write()");
                exit(1);
            }
            pos += ret;
            len-=ret;
        }
    
    }

    close(fps);
    mytbf_destroy(tbf);
    return 0;
}

mytbf.h

#ifndef MYTBF_H
#define MYTBF_H

#define MYTBF_MAX 1024

typedef  void mytbf_t;

mytbf_t* mytbf_init(int cps ,int burst);  //C语言中,void*可以赋值给任何类型的指针,任何类型的指针也都可以赋值给void*

int mytbf_fetchtoken(mytbf_t*,int);  //获取token

int mytbf_returntoken(mytbf_t*,int);  //返还token

int mytbf_destroy(mytbf_t*);




#endif

mytbf.c

#include <stdlib.h>
#include <stdio.h>
#include <error.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <signal.h>

#include "mytbf.h"

struct mytbf_st
{
    int cps;
    int burst;
    int token;
    int pos;

};
static struct mytbf_st* job[MYTBF_MAX];
static int inited = 0;

typedef void (*sighandler_t)(int);

static sighandler_t alrm_handler_save;

static int get_free_pos(void)
{
    for(int i=0;i< MYTBF_MAX;i++)
    {
        if(job[i]==NULL)
            return i;
    }
    return -1;
}

static void alrm_handler(int num)
{
    alarm(1);
    for(int i=0;i<MYTBF_MAX;i++)
    {
        if(job[i] != NULL)
        {
            job[i]->token += job[i]->cps;
            if(job[i]->token >job[i]->burst )
            {
                job[i]->token = job[i]->burst;
            }
        }
    }

}

static void module_unload()
{
    signal(SIGALRM,alrm_handler_save ); //关闭alarm注册的行为,还原之前的行为
    alarm(0); //关闭时钟信号
    for(int i=0;i<MYTBF_MAX;i++)
        free(job[i]);
}

static void module_load()
{
    alrm_handler_save = signal(SIGALRM,alrm_handler); //定义新的行为,保存旧的行为
    alarm(1);
    atexit(module_unload);
}

mytbf_t* mytbf_init(int cps ,int burst)  //C语言中,void*可以赋值给任何类型的指针,任何类型的指针也都可以赋值给void*
{
    struct mytbf_st*me;
    int pos;
    if(inited == 0)
    {
        module_load();
        inited = 1;
    }

    pos = get_free_pos();
    if(pos < 0)
        return NULL;

    me = malloc(sizeof(*me));
    if(me == NULL)
        return NULL;

    me->cps = cps;
    me->burst = burst;
    me->token = 0;
    me->pos = pos;
    job[pos] = me;

    return me;

}
static int min(int token,int size)
{
	if(token> size)
		return size;
	return token;
}
int mytbf_fetchtoken(mytbf_t*ptr,int size)  //获取token
{
    if(size <= 0)
        return -EINVAL;  //参数非法
    struct mytbf_st*me = ptr;
    while(me->token <= 0 )  //token为空就等待
        pause();
    
   //  int n = min(me->token,size);
    int n = (me->token>size?size:me->token);
    me->token -= n;
    return n;
}

int mytbf_returntoken(mytbf_t*ptr,int size)  //返还token
{
    if(size<=0)
        return -EINVAL;
    struct mytbf_st*me = ptr;

    me->token+= size;
    if(me->token > me->burst)
        me->token  = me->burst;
    return size;
}

int mytbf_destroy(mytbf_t*ptr)
{
    struct mytbf_st *me;
    me = ptr;
    job[me->pos] = NULL;
    free(ptr);
    return 0;
    
}

makefile

all:mytbf
CFLAGS=-g -Wall
mytbf:main.o mytbf.o
	gcc $^ $(CFLAGS) -o $@ 

clean:
	rm -rf *.o mytbf