目录
进程间通信
进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间通信发展
管道
System V进程间通信
POSIX进程间通信
进程间通信分类
管道
匿名管道pipe
命名管道
System V IPC
System V 消息队列
System V 共享内存
System V 信号量
POSIX IPC
消息队列
共享内存
信号量
互斥量
条件变量
读写锁
进程间通信的本质:
让不同的进程看到同一份资源
资源:特定形式的内存空间
资源由操作系统提供 (如果一个进程提供,那么资源属于谁就有问题,如果进程独有就会破坏进程独立性)
进程访问这个空间,进行通信,本质就是访问操作系统,进程代表的时用户,资源从创建、使用、释放——要用系统调用接口
从底层设计,接口设计都要有操作系统独立设计
一般操作系统会有一个独立的通信模块——隶属于文件系统——IPC通信模块——进程间通信室友标准的——systemV(本机内部)&&posix(网络)
管道
什么是管道
管道是Unix中最古老的进程间通信的形式。 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
管道的特征
具有血缘关系的进程进行进程间通信
管道只能单单向通信
父子进程是会进程协同的,同步与互斥的——保护管道文件的数据安全
管道是面向字节流的(不管,一股脑往里放)
管道是基于文件的,而文件的生命周期是随进程的(文件如果没有被关闭,进程结束,文件被操作系统回收)
匿名管道
建立
每个文件会提供三个核心东西:
inode(文件的属性)
file_operators访问底层方法集struct_file里包含一组函数指针指向不同的文件方法
文件页缓冲区
磁盘中文件通过属性和数据块预加载到内存,修改在内存修改,发现数据为脏,进行写入即文件无论读写都要先加载到内存中(文件缓冲页)
文件和进程等级相同(创建子进程时不会拷贝一份文件,文件是操作系统打开的),但是文件描述符表指向的内容是一样的
管道是内存级文件
文件有引用计数不用担心一个关了文件直接关闭
管道把同一个文件以读写方式打开,要读关写,要写关读
父子兄弟关系可以通信
接口
open打开的是磁盘级文件(需要提供方文件路径),不能用于管道通信
#include<unistd.h>
功能:创建⼀⽆名管道
原型
int pipe(int fd[2]);
参数:
fd:⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端
返回值:成功返回0,失败返回错误代码
pipe以读写方式把文件打开并把文件struct_file文件缓冲区创建好
pipefd【0】读 pipefd【1】写
分别以读写方式打开的文件描述符数字带出来让用户使用
创建子进程会拷贝file_struct,不会拷贝struct file(进程与文件具有相同地位)
父进程创建管道
#include <iostream>
#include <unistd.h>
#define N 2
using namespace std;
int main()
{
int pipefd[N] = {0};
int n = pipe(pipefd);
if (n < 0)
return 1;
cout << "pipefd[0]: " << pipefd[0] << " , pipefd[1]: " << pipefd[1] << endl;
return 0;
}
测试管道读写
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib> //stdlib.h
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#define N 2
#define NUM 1024
using namespace std;
void Writer(int wfd)
{
string s = "hello I am child";
pid_t self = getpid();
int number = 0;
char buffer[NUM];
while(true)
{
buffer[0] = 0; // 字符串清空, 只是为了提醒阅读代码的人,我把这个数组当做字符串了
snprintf(buffer,sizeof(buffer),"%s-%d-%d",s.c_str(),self,number++);
cout<<buffer<<endl;
sleep(1);
}
}
void Reader(int rfd)
{
}
int main()
{
int pipefd[N] = {0};
int n = pipe(pipefd);
if (n < 0)
return 1;
// cout << "pipefd[0]: " << pipefd[0] << " , pipefd[1]: " << pipefd[1] << endl;
//child->w father->r
pid_t id = fork();
if(id < 0 )
{
return 2;
}
if(id == 0)
{
close(pipefd[0]);
// IPC code
Writer(pipefd[1]);
}
close(pipefd[1]);
//IPC code
Reader(pipefd[0]);
pid_t rid = waitpid(id,nullptr,0);
if(rid<0)
{
return 3;
}
return 0;
}
父子进程读写
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib> //stdlib.h
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#define N 2
#define NUM 1024
using namespace std;
void Writer(int wfd)
{
//构建发送字符串
string s = "hello I am child";
pid_t self = getpid();
int number = 0;
char buffer[NUM];
while(true)
{
buffer[0] = 0; // 字符串清空, 只是为了提醒阅读代码的人,我把这个数组当做字符串了
snprintf(buffer,sizeof(buffer),"%s-%d-%d",s.c_str(),self,number++);
//cout<<buffer<<endl;
//发送/写入给父进程
write(wfd,buffer,strlen(buffer));
sleep(1);
}
}
void Reader(int rfd)
{
char buffer[NUM];
while(true)
{
buffer[0] = 0;
ssize_t n = read(rfd,buffer,sizeof(buffer));
if(n>0)
{
buffer[n] = 0;
cout<<"father get amessage["<<getpid()<<"]"<<buffer<<endl;
}
}
}
int main()
{
int pipefd[N] = {0};
int n = pipe(pipefd);
if (n < 0)
return 1;
// cout << "pipefd[0]: " << pipefd[0] << " , pipefd[1]: " << pipefd[1] << endl;
//child->w father->r
pid_t id = fork();
if(id < 0 )
{
return 2;
}
if(id == 0)
{
close(pipefd[0]);
// IPC code
Writer(pipefd[1]);
}
close(pipefd[1]);
//IPC code
Reader(pipefd[0]);
pid_t rid = waitpid(id,nullptr,0);
if(rid<0)
{
return 3;
}
return 0;
}
操作系统不相信用户,必须调用系统调用(资源由系统提供 )
两次拷贝:
buffer(用户级缓冲区)拷贝到文件缓冲区调用read拷贝到父进程应用层缓冲区
管道固定大小
void Writer(int wfd)
{
//构建发送字符串
string s = "hello I am child";
pid_t self = getpid();
int number = 0;
char buffer[NUM];
while(true)
{
//发送/写入给父进程
char c = 'a';
write(wfd,&c,1);
number++;
cout<<number<<endl;
}
}
void Reader(int rfd)
{
sleep(50);
64kb
当进程间通信写入数据大小小于PIPE_BUFF时整体读入
只要小于PIPE_BUFF就是原子的
管道通信四种情况(根据上边代码修改即可看到现象)
读写端正常,管道为空,读端要阻塞
读写端正常,管道如果被写满,写端就要阻塞
读端正常度,写端关闭,读端就会读到0,表明读到了文件(pipe)的结尾,不会被阻塞
写端正常,读端关闭,操作系统要通过信号杀掉正在写入的进程(因为操作系统是不会做抵消、浪费等类似的工作,如果做了就是操作系统的bug)
使用管道实现一个简易的进程池
输入参数:const &
输出参数: *
输入输出参数: &
文件描述符可以被子进程继承
这个代码会使得读端永远是3,而写端会是456789……(第一次34,写关3留4,下一次申请就是35)
下边代码则是使得原本0从键盘接受现在变为从管道接受(淡化了管道概念,可以不用传参)
边关边退时会使得子进程继承父进程写端,文件不会关闭,进行阻塞
for(const auto &c : channels)
{
close(c._cmdfd);
waitpid(c._slaverid, nullptr, 0);
}
负载均衡:
随机数
轮询
初始化时管道建立好还在往后创建进程,文件描述符有问题,子进程会继承父进程的写端
代码
ProcessPool.cc
#include "Task.hpp"
#include <string>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
const int processnum = 10;
std::vector<task_t> tasks;
// 先描述
class channel
{
public:
channel(int cmdfd, int slaverid, const std::string &processname)
:_cmdfd(cmdfd), _slaverid(slaverid), _processname(processname)
{}
public:
int _cmdfd; // 发送任务的文件描述符
pid_t _slaverid; // 子进程的PID
std::string _processname; // 子进程的名字 -- 方便我们打印日志
// int _cmdcnt;
};
void slaver()
{
// read(0)
while(true)
{
int cmdcode = 0;
int n = read(0, &cmdcode, sizeof(int)); // 如果父进程不给子进程发送数据呢??阻塞等待!
if(n == sizeof(int))
{
//执行cmdcode对应的任务列表
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(n == 0) break;
}
}
// 输入:const &
// 输出:*
// 输入输出:&
void InitProcessPool(std::vector<channel> *channels)
{
// version 2: 确保每一个子进程都只有一个写端
std::vector<int> oldfds;
for(int i = 0; i < processnum; i++)
{
int pipefd[2]; // 临时空间
int n = pipe(pipefd);
assert(!n); // 演示就可以
(void)n;
pid_t id = fork();
if(id == 0) // child
{
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " ";
close(fd);
}
std::cout << "\n";
close(pipefd[1]);
dup2(pipefd[0], 0);
close(pipefd[0]);
slaver();
std::cout << "process : " << getpid() << " quit" << std::endl;
// slaver(pipefd[0]);
exit(0);
}
// father
close(pipefd[0]);
// 添加channel字段了
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
}
}
void Debug(const std::vector<channel> &channels)
{
// test
for(const auto &c :channels)
{
std::cout << c._cmdfd << " " << c._slaverid << " " << c._processname << std::endl;
}
}
void Menu()
{
std::cout << "################################################" << std::endl;
std::cout << "# 1. 刷新日志 2. 刷新出来野怪 #" << std::endl;
std::cout << "# 3. 检测软件是否更新 4. 更新用的血量和蓝量 #" << std::endl;
std::cout << "# 0. 退出 #" << std::endl;
std::cout << "#################################################" << std::endl;
}
void ctrlSlaver(const std::vector<channel> &channels)
{
int which = 0;
// int cnt = 5;
while(true)
{
int select = 0;
Menu();
std::cout << "Please Enter@ ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
// select > 0&& select < 5
// 1. 选择任务
// int cmdcode = rand()%tasks.size();
int cmdcode = select - 1;
// 2. 选择进程
// int processpos = rand()%channels.size();
std::cout << "father say: " << " cmdcode: " <<
cmdcode << " already sendto " << channels[which]._slaverid << " process name: "
<< channels[which]._processname << std::endl;
// 3. 发送任务
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
which++;
which %= channels.size();
// cnt--;
// sleep(1);
}
}
void QuitProcess(const std::vector<channel> &channels)
{
for(const auto &c : channels){
close(c._cmdfd);
waitpid(c._slaverid, nullptr, 0);
}
// version1
// int last = channels.size()-1;
// for(int i = last; i >= 0; i--)
// {
// close(channels[i]._cmdfd);
// waitpid(channels[i]._slaverid, nullptr, 0);
// }
// for(const auto &c : channels) close(c._cmdfd);
// // sleep(5);
// for(const auto &c : channels) waitpid(c._slaverid, nullptr, 0);
// // sleep(5);
}
int main()
{
LoadTask(&tasks);
srand(time(nullptr)^getpid()^1023); // 种一个随机数种子
// 在组织
std::vector<channel> channels;
// 1. 初始化 --- bug?? -- 找一下这个问题在哪里?然后提出一些解决方案!
InitProcessPool(&channels);
// Debug(channels);
// 2. 开始控制子进程
ctrlSlaver(channels);
// 3. 清理收尾
QuitProcess(channels);
return 0;
}
Task.hpp
#pragma once
#include <iostream>
#include <vector>
typedef void (*task_t)();
void task1()
{
std::cout << "lol 刷新日志" << std::endl;
}
void task2()
{
std::cout << "lol 更新野区,刷新出来野怪" << std::endl;
}
void task3()
{
std::cout << "lol 检测软件是否更新,如果需要,就提示用户" << std::endl;
}
void task4()
{
std::cout << "lol 用户释放技能,更新用的血量和蓝量" << std::endl;
}
void LoadTask(std::vector<task_t> *tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
命名管道
毫不相干的进程进行进程间通信
文件只会打开一个,缓冲区等都是共享的(都两个进程打开同一个文件了,不在意错乱了)
操作系统识别到是管道就不进行刷盘,进行内存间通信(内存级文件)
如何知道打开的是同一个文件:同路径下同一个文件名即路径+文件名(具有唯一性)
命名管道通过使用路径+文件名的方式让不同进程看到同一份文件资源进而实现进程间通信
mkfifo创建管道
两个进程通信创建一个管道即可(某一个进程)
名字加权限
两个终端分别执行代码
while :; do echo "hello" ; sleep 1;done >> myfifo
cat < myfifo
发现文件大小始终为0
删除管道文件
unlink
注意:
服务端打开管道后,需要等待写入方打开后才会打开文件,往后执行(读端也一样)
int fd = open(FIFO_FILE, O_WRONLY);
if(fd < 0)
{
perror("open");
exit(FIFO_OPEN_ERR);
}
cout << "client open file done" << endl;
string line;
while(true)
{
cout << "Please Enter@ ";
getline(cin, line);
write(fd, line.c_str(), line.size());
}
close(fd);
return 0;
int fd = open(FIFO_NAME,O_RDONLY);
if(fd<0)
{
perror("open");
exit(2);
}
cout<<"server open file done"<<endl;
可变参数
va_list(char*)
使用...参数一定要压栈,产生临时变量
(int n ,...)
int sum(int n, ...)
{
va_list s; // char*
va_start(s, n);
int sum = 0;
while(n)
{
sum += va_arg(s, int); // printf("hello %d, hello %s, hello %c, hello %d,", 1, "hello", 'c', 123);
n--;
}
va_end(s); //s = NULL
return sum;
}
形参要进行实例化,调用的时候就要形成他的栈帧结构,从右向左依次入栈,形成栈结构
示例代码(日志)
log.hpp
#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
// void logmessage(int level, const char *format, ...)
// {
// time_t t = time(nullptr);
// struct tm *ctime = localtime(&t);
// char leftbuffer[SIZE];
// snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
// ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
// ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
// // va_list s;
// // va_start(s, format);
// char rightbuffer[SIZE];
// vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
// // va_end(s);
// // 格式:默认部分+自定义部分
// char logtxt[SIZE * 2];
// snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// // printf("%s", logtxt); // 暂时打印
// printLog(level, logtxt);
// }
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
serve.cc
#include "comm.hpp"
#include "log.hpp"
using namespace std;
// 管理管道文件
int main()
{
Init init;
Log log;
// log.Enable(Onefile);
log.Enable(Onefile);
// 打开管道
int fd = open(FIFO_FILE, O_RDONLY); // 等待写入方打开之后,自己才会打开文件,向后执行, open 阻塞了!
if (fd < 0)
{
log(Fatal, "error string: %s, error code: %d", strerror(errno), errno);
exit(FIFO_OPEN_ERR);
}
log(Info, "server open file done, error string: %s, error code: %d", strerror(errno), errno);
log(Warning, "server open file done, error string: %s, error code: %d", strerror(errno), errno);
log(Fatal, "server open file done, error string: %s, error code: %d", strerror(errno), errno);
log(Debug, "server open file done, error string: %s, error code: %d", strerror(errno), errno);
// 开始通信
while (true)
{
char buffer[1024] = {0};
int x = read(fd, buffer, sizeof(buffer));
if (x > 0)
{
buffer[x] = 0;
cout << "client say# " << buffer << endl;
}
else if (x == 0)
{
log(Debug, "client quit, me too!, error string: %s, error code: %d", strerror(errno), errno);
break;
}
else
break;
}
close(fd);
return 0;
}
comm.hpp
#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#define FIFO_FILE "./myfifo"
#define MODE 0664
enum
{
FIFO_CREATE_ERR = 1,
FIFO_DELETE_ERR,
FIFO_OPEN_ERR
};
class Init
{
public:
Init()
{
// 创建管道
int n = mkfifo(FIFO_FILE, MODE);
if (n == -1)
{
perror("mkfifo");
exit(FIFO_CREATE_ERR);
}
}
~Init()
{
int m = unlink(FIFO_FILE);
if (m == -1)
{
perror("unlink");
exit(FIFO_DELETE_ERR);
}
}
};
client.cc
#include <iostream>
#include "comm.hpp"
using namespace std;
int main()
{
int fd = open(FIFO_FILE, O_WRONLY);
if(fd < 0)
{
perror("open");
exit(FIFO_OPEN_ERR);
}
cout << "client open file done" << endl;
string line;
while(true)
{
cout << "Please Enter@ ";
getline(cin, line);
write(fd, line.c_str(), line.size());
}
close(fd);
return 0;
}
Makefile
.PHONY:all
all:server client
server:server.cc
g++ -o $@ $^ -g -std=c++11
client:client.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f server client
systemV共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到 内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
示意图
查看系统内IPC资源的指令
共享内存数据结构
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
共享内存的创建
依旧进程地址空间,物理内存中开辟一段空间,由页表映射到共享区,给应用层返回连续空间(虚拟)起始地址,多个进程分别映射即可看到同一份空间(共享内存)
key的介绍
key(在内核中具有唯一性,能够让不同进程进行唯一性标识)来保证不同进程看到同一个共享内存,知道这个共享内存存不存在,链表结构管理,遍历发现没有相同的key就创建
如何生成key
将pathname和proj_id用一套算法生成一个重复率低的key
注意
操作系统不清除哪两个进程之间要通信,所以这个key要由用户创建 ,第一个key自动生成,用户约定好的,两个生成的key一样(看到的pathname和proj_id一样)
key:操作系统内标定唯一性,创建好共享内存后就不关心key了
shmid:只在你的进程内,标定资源的唯一性
IPC_CREAT 创建一个共享内存(不存在创建,存在直接获取)
IPC_CREAT|IPC_EXCL 不存在就创建,存在就返回错误返回,确保我们申请成功了,这个共享内存一定是新的
IPC_EXCL不单独使用
共享内存大小一般设置为4096,如果4097,会开4096*2的大小,但是用户只能用4097大小
shmget函数
功能:用来创建共享内存 原型 int shmget(key_t key, size_t size, int shmflg); 参数 key:这个共享内存段名字 size:共享内存大小 shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的 返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
示例代码
挂接 /去关联
shmat函数
功能:将共享内存段连接到进程地址空间 原型 void *shmat(int shmid, const void *shmaddr, int shmflg);
参数 shmid: 共享内存标识 shmaddr:指定连接的地址 shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY 返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1
shmdt函数
功能:将共享内存段与当前进程脱离 原型 int shmdt(const void *shmaddr); 参数 shmaddr: 由shmat所返回的指针 返回值:成功返回0;失败返回-1 注意:将共享内存段与当前进程脱离不等于删除共享内存段
说明:
shmaddr为NULL,核心自动选择一个地址 shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。 shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。公式:shmaddr - (shmaddr % SHMLBA) shmflg=SHM_RDONLY,表示连接操作用来只读共享内存
共享内存的释放
当前进程释放
首先要去关联,然后释放共享内存
ipcrm -m shmid(key是给操作系统用的,用户层用shmid)
shmctl函数
功能:用于控制共享内存 原型 int shmctl(int shmid, int cmd, struct shmid_ds *buf); 参数 shmid:由shmget返回的共享内存标识码 cmd:将要采取的动作(有三个可取值) buf:指向一个保存着共享内存的模式状态和访问权限的数据结构 返回值:成功返回0;失败返回-1
cmd:
共享内存生命周期
共享内存生命周期是随内核的,用户不主动关闭会一直存在,除非内核重启(不主动释放会有内存泄漏问题)
注意
共享内存没有同步互斥之类的保护机制
共享内存是所有进程间通信中速度最快的(拷贝少)
共享内存中所有数据由用户自己维护
示例代码:
共享内存可以通过管道来同步
comm.hpp
#include <sys/stat.h>
#include "log.hpp"
using namespace std;
Log log;
// 共享内存的大小一般建议是4096的整数倍
// 4097,实际上操作系统给你的是4096*2的大小
const int size = 4096;
const string pathname="/home/whb";
const int proj_id = 0x6666;
key_t GetKey()
{
key_t k = ftok(pathname.c_str(), proj_id);
if(k < 0)
{
log(Fatal, "ftok error: %s", strerror(errno));
exit(1);
}
log(Info, "ftok success, key is : 0x%x", k);
return k;
}
int GetShareMemHelper(int flag)
{
key_t k = GetKey();
int shmid = shmget(k, size, flag);
if(shmid < 0)
{
log(Fatal, "create share memory error: %s", strerror(errno));
exit(2);
}
log(Info, "create share memory success, shmid: %d", shmid);
return shmid;
}
int CreateShm()
{
return GetShareMemHelper(IPC_CREAT | IPC_EXCL | 0666);
}
int GetShm()
{
return GetShareMemHelper(IPC_CREAT);
}
#define FIFO_FILE "./myfifo"
#define MODE 0664
enum
{
FIFO_CREATE_ERR = 1,
FIFO_DELETE_ERR,
FIFO_OPEN_ERR
};
class Init
{
public:
Init()
{
// 创建管道
int n = mkfifo(FIFO_FILE, MODE);
if (n == -1)
{
perror("mkfifo");
exit(FIFO_CREATE_ERR);
}
}
~Init()
{
int m = unlink(FIFO_FILE);
if (m == -1)
{
perror("unlink");
exit(FIFO_DELETE_ERR);
}
}
};
#endif
log.hpp
#pragma once
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
// void logmessage(int level, const char *format, ...)
// {
// time_t t = time(nullptr);
// struct tm *ctime = localtime(&t);
// char leftbuffer[SIZE];
// snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
// ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
// ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
// // va_list s;
// // va_start(s, format);
// char rightbuffer[SIZE];
// vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
// // va_end(s);
// // 格式:默认部分+自定义部分
// char logtxt[SIZE * 2];
// snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// // printf("%s", logtxt); // 暂时打印
// printLog(level, logtxt);
// }
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
processa.cc
#include "comm.hpp"
extern Log log;
int main()
{
Init init;
int shmid = CreateShm();
char *shmaddr = (char*)shmat(shmid, nullptr, 0);
// ipc code 在这里!!
// 一旦有人把数据写入到共享内存,其实我们立马能看到了!!
// 不需要经过系统调用,直接就能看到数据了!
int fd = open(FIFO_FILE, O_RDONLY); // 等待写入方打开之后,自己才会打开文件,向后执行, open 阻塞了!
if (fd < 0)
{
log(Fatal, "error string: %s, error code: %d", strerror(errno), errno);
exit(FIFO_OPEN_ERR);
}
struct shmid_ds shmds;
while(true)
{
char c;
ssize_t s = read(fd, &c, 1);
if(s == 0) break;
else if(s < 0) break;
cout << "client say@ " << shmaddr << endl; //直接访问共享内存
sleep(1);
shmctl(shmid, IPC_STAT, &shmds);
cout << "shm size: " << shmds.shm_segsz << endl;
cout << "shm nattch: " << shmds.shm_nattch << endl;
printf("shm key: 0x%x\n", shmds.shm_perm.__key);
cout << "shm mode: " << shmds.shm_perm.mode << endl;
}
shmdt(shmaddr);
shmctl(shmid, IPC_RMID, nullptr);
close(fd);
return 0;
}
processb.cc
#include "comm.hpp"
int main()
{
int shmid = GetShm();
char *shmaddr = (char*)shmat(shmid, nullptr, 0);
int fd = open(FIFO_FILE, O_WRONLY); // 等待写入方打开之后,自己才会打开文件,向后执行, open 阻塞了!
if (fd < 0)
{
log(Fatal, "error string: %s, error code: %d", strerror(errno), errno);
exit(FIFO_OPEN_ERR);
}
// 一旦有了共享内存,挂接到自己的地址空间中,你直接把他当成你的内存空间来用即可!
// 不需要调用系统调用
// ipc code
while(true)
{
cout << "Please Enter@ ";
fgets(shmaddr, 4096, stdin);
write(fd, "c", 1); // 通知对方
}
shmdt(shmaddr);
close(fd);
return 0;
}
makefile
.PHONY:all
all:processa processb
processa:processa.cc
g++ -o $@ $^ -g -std=c++11
processb:processb.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f processa processb
消息队列
消息队列(Message Queue)是一种进程间通信或同一进程内不同线程间的通信方式,它提供了异步通信协议,允许发送者和接收者不需要同时与队列交互。消息存储在队列中,直到接收者处理它们。
接口
消息队列的接口基本与共享内存相似
IPC资源的管理
管理操作系统内所有IPC资源,先描述,将对资源增删查改转换成对数组的增删查改,每一种ipc资源只是把结构体中第一个字段指针传到数组里,数组下标是shmid
perm里添加标志位,表明是哪一种ipc资源,方便地址强转去访问
信号量
信号量也是进程间通信一部分
原因·:
通信不仅仅是通信数据,互相协同也是
要协同,本质也是通信,信号量首先要被所有的通信进程看到
数据不一致问题
AB两个进程,看到同一份资源,共享资源,如果不加保护,会导致数据不一样问题
A在写的时候,B也在读取,写了一部分便被读取,就会导致数据不一致问题(hello Linux只读取到hello)
互斥
加锁——互斥访问,使得任何时刻只允许一个执行流访问共享资源(互斥)
临界资源
共享的,任何时刻只允许一个执行流访问(就是执行访问代码)的资源(一般是内存空间)
临界区
访问临界资源的代码叫做临界区(不是所有代码都访问临界资源)
理解信号量
本质是一把计数器,,描述临界资源中资源数量的多少
临界资源被分为多个小块,每个进程访问的是一小块而不是整块临界资源
防止多个执行流访问同一块资源
引入一个计数器,每申请一块计数器就减减,当计数器小于等于0时,资源就被申请完了,再有执行流申请就不给了。
每个执行流想访问共享内存中的一部分时,不是直接访问,而是先申请计数器资源,申请成功后就有了访问资源的权限,但是还没有访问,申请计数器资源是对资源的预定机制,计数器可以有效保证进入共享资源的执行流数量
我们把值只能为1,0的两态计数器叫做二元信号量,本质是一个锁
设置信号量:semop
释放信号量:semctl
mmap函数(共享内存)