TCP socket API 详解
下面介绍程序中用到的socket API,这些函数都在sys/socket.h中。
socket():
socket()打开一个网络通讯端口,如果成功的话,就像open()一样返回一个文件描述符;
应用程序可以像读写文件一样用read/write在网络上收发数据;
如果socket()调用出错则返回-1;
对于IPv4, family参数指定为AF_INET;
对于TCP协议,type参数指定为SOCK_STREAM, 表示面向流的传输协议
protocol参数的介绍从略,指定为0即可。
bind():
服务器程序所监听的网络地址和端口号通常是固定不变的,客户端程序得知服务器程序的地址和端口号后
就可以向服务器发起连接; 服务器需要调用bind绑定一个固定的网络地址和端口号;
bind()成功返回0,失败返回-1。
bind()的作用是将参数sockfd和myaddr绑定在一起, 使sockfd这个用于网络通讯的文件描述符监听
myaddr所描述的地址和端口号;
前面讲过,struct sockaddr *是一个通用指针类型,myaddr参数实际上可以接受多种协议的sockaddr结构体,而它们的长度各不相同,所以需要第三个参数addrlen指定结构体的长度;
我们的程序中对myaddr参数是这样初始化的:
1. 将整个结构体清零;
2. 设置地址类型为AF_INET;
3. 网络地址为INADDR_ANY, 这个宏表示本地的任意IP地址,因为服务器可能有多个网卡,每个网卡也可能绑
定多个IP 地址, 这样设置可以在所有的IP地址上监听,直到与某个客户端建立了连接时才确定下来到底用
哪个IP 地址;
4. 端口号为SERV_PORT, 我们定义为9999;
listen():
listen()声明sockfd处于监听状态, 并且最多允许有backlog个客户端处于连接等待状态, 如果接收到更多
的连接请求就忽略, 这里设置不会太大(一般是5), 具体细节同学们课后深入研究;
listen()成功返回0,失败返回-1;
accept():
三次握手完成后, 服务器调用accept()接受连接;
如果服务器调用accept()时还没有客户端的连接请求,就阻塞等待直到有客户端连接上来;
addr是一个传出参数,accept()返回时传出客户端的地址和端口号;
如果给addr 参数传NULL,表示不关心客户端的地址;
addrlen参数是一个传入传出参数(value-result argument), 传入的是调用者提供的, 缓冲区addr的长度
以避免缓冲区溢出问题, 传出的是客户端地址结构体的实际长度(有可能没有占满调用者提供的缓冲区);
我们的服务器程序结构是这样的:
理解accept的返回值:
饭店拉客的例子
步骤 | 饭店场景 | accept() 对应行为 |
---|---|---|
1 | 饭店开门营业(bind() + listen() ) |
服务器启动,监听端口 |
2 | 顾客在门口排队(客户端 connect() ) |
客户端发起连接请求 |
3 | 服务员(accept() )出来接待 |
accept() 从连接队列取出一个请求 |
4 | 服务员带顾客进单间(返回 new_socket ) |
accept() 返回一个新的 socket 用于通信 |
5 | 单间专属服务(recv() /send() ) |
用 new_socket 和客户端通信 |
6 | 其他顾客继续排队 | 服务器继续 accept() 处理新连接 |
accept
的返回值
成功时:
返回一个新的 socket(
new_socket
),代表一个独立的连接通道(就像单间)。原
server_socket
仍然保持监听(饭店大门继续开放,不影响新顾客进来)。后续的
recv()
/send()
都通过new_socket
进行(专属服务)。失败时:
返回-1
(并设置errno
),比如:
ENOMEM
:饭店没位置了(内存不足)。
EINTR
:服务员被叫走了(信号中断)。
EBADF
:饭店大门坏了(server_socket
无效)。
connect
客户端需要调用connect()连接服务器;
connect和bind的参数形式一致, 区别在于bind的参数是自己的地址, 而connect的参数是对方的地址;
connect()成功返回0,出错返回-1;
对一个套接字进行多线程的并发的读和写,但是不能同时读
发送和接收是分开的
传输层协议 有连接 可靠传输 面向字节流
没人访问服务器就一直等待(被动的),不能退出,必须随时随地应对客户端的请求
listen:将套接字设置为监听状态
netstat -nltp
n能显示成数字的就显示成数字
l:listen状态
t:tcp
p:显示对应的进程
tcp面向链接,在进行通信之前要把链接建立起来
accept:成功返回整数的文件描述符,失败-1
TCP程序
本地回环(Loopback)是什么?
定义:回环地址(通常是
127.0.0.1
)是操作系统提供的虚拟接口,用于本机内部通信。特点:
绑定到
127.0.0.1
的服务只能通过本机访问(如curl 127.0.0.1
)。绑定到
0.0.0.0
的服务可监听所有网卡(包括本地IP和公网IP)。
多个套接字
被监听的套接字从底层把新的连接获取上来
真正提供io、通讯服务的是accept返回的套接字
telnet:可以完成指定服务的远程登录telnet ip 端口号
在套接字中,正常的通信内容我们所用的接口会默认给主机序列转网络序列
客户端要绑定ip端口号,但是不是显示的,由操作系统随机选择
tcp面向连接,所以客户端要连接,不用listen、accept,要connect,客户端在发起connect时进行自动随机bind,
要传入服务端的ip和端口号
子进程再创建子进程成功直接退出,由孙子进程和父进程并发访问
偶发情况
当向文件描述符里写时,如果双方链接被释放掉了,程序会出现问题(类似管道写端没了,读端直接被操作系统关闭)
防止向已经被关闭的文件描述符写入:signal(SIGPIPE,SIG_IGN),防止程序因向已关闭的连接写入数据而意外终止
守护进程
int daemon()
linux一个用户创建一个session(会话),每个session一个bash
./process &后台启动
jobs查任务
fg+几号任务:将这个任务提到前台
bg+几号任务:将放到后台的进程重新拿到前台
如果一个前台任务暂停了,系统要把shell提到前台来,把暂停进程自动放到后台
在命令行中,前台进程必须要存在,否则所有进程全都是后台,就没有进程从键盘获取输入,就没有反应了
PGID:进程组id,共用进程组组长pid
任务要指派给进程组
多个任务(进程组),在同一个session内启动的sid一样
守护进程(本质是孤儿进程),防止在bash退出后进程组也退出
创建守护进程不能是组长 :if(fork()>0) exit(0); setsid();
/dev/null:进行垃圾处理
日志与守护要在同一个目录(2.50)
main
#include "TcpServer.hpp"
#include <iostream>
#include <memory>
void Usage(std::string proc)
{
std::cout << "\n\rUsage: " << proc << " port[1024+]\n" << std::endl;
}
// 下节课:守护进程化
//./tcpserver 8080
int main(int argc, char *argv[])
{
if(argc != 2)
{
Usage(argv[0]);
exit(UsageError);
}
uint16_t port = std::stoi(argv[1]);
lg.Enable(Classfile);
// std::unique_ptr<TcpServer> tcp_svr(new TcpServer(port, "127.0.0.1"));
std::unique_ptr<TcpServer> tcp_svr(new TcpServer(port));
tcp_svr->InitServer();
tcp_svr->Start();
return 0;
}
tcpserve
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <signal.h>
#include "Log.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Daemon.hpp"
const int defaultfd = -1;
const std::string defaultip = "0.0.0.0";
const int backlog = 10; // 但是一般不要设置的太大
extern Log lg;
enum
{
UsageError = 1,
SocketError,
BindError,
ListenError,
};
class TcpServer;
class ThreadData
{
public:
ThreadData(int fd, const std::string &ip, const uint16_t &p, TcpServer *t): sockfd(fd), clientip(ip), clientport(p), tsvr(t)
{}
public:
int sockfd;
std::string clientip;
uint16_t clientport;
TcpServer *tsvr;
};
class TcpServer
{
public:
TcpServer(const uint16_t &port, const std::string &ip = defaultip) : listensock_(defaultfd), port_(port), ip_(ip)
{
}
void InitServer()
{
listensock_ = socket(AF_INET, SOCK_STREAM, 0);
if (listensock_ < 0)
{
lg(Fatal, "create socket, errno: %d, errstring: %s", errno, strerror(errno));
exit(SocketError);
}
lg(Info, "create socket success, listensock_: %d", listensock_);
int opt = 1;
setsockopt(listensock_, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt)); // 防止偶发性的服务器无法进行立即重启(tcp协议的时候再说)
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port_);
inet_aton(ip_.c_str(), &(local.sin_addr));
// local.sin_addr.s_addr = INADDR_ANY;
if (bind(listensock_, (struct sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, errno: %d, errstring: %s", errno, strerror(errno));
exit(BindError);
}
lg(Info, "bind socket success, listensock_: %d", listensock_);
// Tcp是面向连接的,服务器一般是比较“被动的”,服务器一直处于一种,一直在等待连接到来的状态
if (listen(listensock_, backlog) < 0)
{
lg(Fatal, "listen error, errno: %d, errstring: %s", errno, strerror(errno));
exit(ListenError);
}
lg(Info, "listen socket success, listensock_: %d", listensock_);
}
// static void *Routine(void *args)
// {
// pthread_detach(pthread_self());
// ThreadData *td = static_cast<ThreadData *>(args);
// td->tsvr->Service(td->sockfd, td->clientip, td->clientport);//???
// delete td;
// return nullptr;
// }
void Start()
{
Daemon();
ThreadPool<Task>::GetInstance()->Start();
// for fork();
// signal(SIGCHLD, SIG_IGN);
lg(Info, "tcpServer is running....");
for (;;)
{
// 1. 获取新连接
struct sockaddr_in client;
socklen_t len = sizeof(client);
int sockfd = accept(listensock_, (struct sockaddr *)&client, &len);
if (sockfd < 0)
{
lg(Warning, "accept error, errno: %d, errstring: %s", errno, strerror(errno)); //?
continue;
}
uint16_t clientport = ntohs(client.sin_port);
char clientip[32];
inet_ntop(AF_INET, &(client.sin_addr), clientip, sizeof(clientip));
// 2. 根据新连接来进行通信
lg(Info, "get a new link..., sockfd: %d, client ip: %s, client port: %d", sockfd, clientip, clientport);
// std::cout << "hello world" << std::endl;
// version 1 -- 单进程版
// Service(sockfd, clientip, clientport);
// close(sockfd);
// version 2 -- 多进程版
// pid_t id = fork();
// if(id == 0)
// {
// // child
// close(listensock_);
// if(fork() > 0) exit(0);
// Service(sockfd, clientip, clientport); //孙子进程, system 领养
// close(sockfd);
// exit(0);
// }
// close(sockfd);
// // father
// pid_t rid = waitpid(id, nullptr, 0);
// (void)rid;
// version 3 -- 多线程版本
// ThreadData *td = new ThreadData(sockfd, clientip, clientport, this);
// pthread_t tid;
// pthread_create(&tid, nullptr, Routine, td);
// version 4 --- 线程池版本
Task t(sockfd, clientip, clientport);
ThreadPool<Task>::GetInstance()->Push(t);
}
}
// void Service(int sockfd, const std::string &clientip, const uint16_t &clientport)
// {
// // 测试代码
// char buffer[4096];
// while (true)
// {
// ssize_t n = read(sockfd, buffer, sizeof(buffer));
// if (n > 0)
// {
// buffer[n] = 0;
// std::cout << "client say# " << buffer << std::endl;
// std::string echo_string = "tcpserver echo# ";
// echo_string += buffer;
// write(sockfd, echo_string.c_str(), echo_string.size());
// }
// else if (n == 0)
// {
// lg(Info, "%s:%d quit, server close sockfd: %d", clientip.c_str(), clientport, sockfd);
// break;
// }
// else
// {
// lg(Warning, "read error, sockfd: %d, client ip: %s, client port: %d", sockfd, clientip.c_str(), clientport);
// break;
// }
// }
// }
~TcpServer() {}
private:
int listensock_;
uint16_t port_;
std::string ip_;
};
client
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
void Usage(const std::string &proc)
{
std::cout << "\n\rUsage: " << proc << " serverip serverport\n"
<< std::endl;
}
// ./tcpclient serverip serverport
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(1);
}
std::string serverip = argv[1];
uint16_t serverport = std::stoi(argv[2]);
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverport);
inet_pton(AF_INET, serverip.c_str(), &(server.sin_addr));
while (true)
{
int cnt = 5;
int isreconnect = false;
int sockfd = 0;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
return 1;
}
do
{
// tcp客户端要不要bind?1 要不要显示的bind?0 系统进行bind,随机端口
// 客户端发起connect的时候,进行自动随机bind
int n = connect(sockfd, (struct sockaddr *)&server, sizeof(server));
if (n < 0)
{
isreconnect = true;
cnt--;
std::cerr << "connect error..., reconnect: " << cnt << std::endl;
sleep(2);
}
else
{
break;
}
} while (cnt && isreconnect);
if (cnt == 0)
{
std::cerr << "user offline..." << std::endl;
break;
}
// while (true)
// {
std::string message;
std::cout << "Please Enter# ";
std::getline(std::cin, message);
int n = write(sockfd, message.c_str(), message.size());
if (n < 0)
{
std::cerr << "write error..." << std::endl;
// break;
}
char inbuffer[4096];
n = read(sockfd, inbuffer, sizeof(inbuffer));
if (n > 0)
{
inbuffer[n] = 0;
std::cout << inbuffer << std::endl;
}
else{
// break;
}
// }
close(sockfd);
}
return 0;
}
threadpool
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
static const int defalutnum = 10;
template <class T>
class ThreadPool
{
public:
void Lock()
{
pthread_mutex_lock(&mutex_);
}
void Unlock()
{
pthread_mutex_unlock(&mutex_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_);
}
bool IsQueueEmpty()
{
return tasks_.empty();
}
std::string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}
return "None";
}
public:
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
std::string name = tp->GetThreadName(pthread_self());
while (true)
{
tp->Lock();
while (tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->Pop();
tp->Unlock();
t();
}
}
void Start()
{
int num = threads_.size();
for (int i = 0; i < num; i++)
{
threads_[i].name = "thread-" + std::to_string(i + 1);
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
}
}
T Pop()
{
T t = tasks_.front();
tasks_.pop();
return t;
}
void Push(const T &t)
{
Lock();
tasks_.push(t);
Wakeup();
Unlock();
}
static ThreadPool<T> *GetInstance()
{
if (nullptr == tp_) // ???
{
pthread_mutex_lock(&lock_);
if (nullptr == tp_)
{
std::cout << "log: singleton create done first!" << std::endl;
tp_ = new ThreadPool<T>();
}
pthread_mutex_unlock(&lock_);
}
return tp_;
}
private:
ThreadPool(int num = defalutnum) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c
private:
std::vector<ThreadInfo> threads_;
std::queue<T> tasks_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *tp_;
static pthread_mutex_t lock_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
init
#include <iostream>
#include <string>
#include <fstream>
#include <unordered_map>
#include "Log.hpp"
const std::string dictname = "./dict.txt";
const std::string sep = ":";
static bool Split(std::string &s, std::string *part1, std::string *part2)
{
auto pos = s.find(sep);
if (pos == std::string::npos)
return false;
*part1 = s.substr(0, pos);
*part2 = s.substr(pos + 1);
return true;
}
class Init
{
public:
Init()
{
std::ifstream in(dictname);
if(!in.is_open())
{
lg(Fatal, "ifstream open %s error", dictname.c_str());
exit(1);
}
std::string line;
while(std::getline(in,line))
{
std::string part1,part2;
Split(line, &part1, &part2);
dict.insert({part1, part2});
}
in.close();
}
std::string translation(const std::string &key)
{
auto iter = dict.find(key);
if(iter == dict.end()) return "Unknow";
else return iter->second;
}
private:
std::unordered_map<std::string, std::string> dict;
};
task
#pragma once
#include <iostream>
#include <string>
#include "Log.hpp"
#include "Init.hpp"
extern Log lg;
Init init;
class Task
{
public:
Task(int sockfd, const std::string &clientip, const uint16_t &clientport)
: sockfd_(sockfd), clientip_(clientip), clientport_(clientport)
{
}
Task()
{
}
void run()
{
// 测试代码
char buffer[4096];
// Tcp是面向字节流的,你怎么保证,你读取上来的数据,是"一个" "完整" 的报文呢?
ssize_t n = read(sockfd_, buffer, sizeof(buffer)); // BUG?
if (n > 0)
{
buffer[n] = 0;
std::cout << "client key# " << buffer << std::endl;
std::string echo_string = init.translation(buffer);
// sleep(5);
// // close(sockfd_);
// lg(Warning, "close sockfd %d done", sockfd_);
// sleep(2);
n = write(sockfd_, echo_string.c_str(), echo_string.size()); // 100 fd 不存在
if(n < 0)
{
lg(Warning, "write error, errno : %d, errstring: %s", errno, strerror(errno));
}
}
else if (n == 0)
{
lg(Info, "%s:%d quit, server close sockfd: %d", clientip_.c_str(), clientport_, sockfd_);
}
else
{
lg(Warning, "read error, sockfd: %d, client ip: %s, client port: %d", sockfd_, clientip_.c_str(), clientport_);
}
close(sockfd_);
}
void operator()()
{
run();
}
~Task()
{
}
private:
int sockfd_;
std::string clientip_;
uint16_t clientport_;
};