计算机网络基础知识详解
目录
1. 计算机网络概述
1.1 什么是计算机网络?
计算机网络是指将地理位置不同的具有独立功能的多台计算机及其外部设备,通过通信线路连接起来,在网络操作系统、网络管理软件及网络通信协议的管理和协调下,实现资源共享和信息传递的计算机系统。
1.2 网络的分类
按覆盖范围分类:
- 局域网(LAN):覆盖范围小,通常在几公里以内
- 城域网(MAN):覆盖一个城市,范围在几十公里
- 广域网(WAN):覆盖范围大,可以是一个国家或全球
按拓扑结构分类:
- 总线型:所有设备连接在一条总线上
- 星型:所有设备连接到中心节点
- 环型:设备连接成环状
- 网状型:设备之间相互连接
1.3 网络的基本组成
- 网络节点:计算机、服务器、路由器等
- 传输介质:双绞线、光纤、无线电波等
- 网络协议:TCP/IP、HTTP、FTP等
- 网络服务:文件共享、打印服务、邮件服务等
2. OSI七层模型详解
OSI(Open System Interconnection)开放系统互连参考模型是国际标准化组织(ISO)制定的一个用于计算机或通信系统间互联的标准体系。
2.1 七层模型结构
从上到下依次为:
第7层:应用层(Application Layer)
功能:为应用程序提供网络服务接口
主要协议:
- HTTP/HTTPS(网页浏览)
- FTP(文件传输)
- SMTP(邮件发送)
- POP3/IMAP(邮件接收)
- DNS(域名解析)
- SSH(安全远程登录)
工作原理:直接为用户的应用程序提供服务,是用户与网络的接口。
第6层:表示层(Presentation Layer)
功能:数据格式转换、加密解密、压缩解压
主要任务:
- 数据格式转换(如EBCDIC到ASCII)
- 数据加密和解密
- 数据压缩和解压缩
实例:JPEG、GIF图片格式,SSL/TLS加密
第5层:会话层(Session Layer)
功能:建立、管理和终止会话连接
主要任务:
- 建立会话连接
- 保持会话连接
- 同步会话
- 终止会话连接
实例:SQL、RPC(远程过程调用)
第4层:传输层(Transport Layer)
功能:提供端到端的可靠数据传输
主要协议:
- TCP(传输控制协议):可靠的、面向连接的协议
- UDP(用户数据报协议):不可靠的、无连接的协议
关键概念:
- 端口号:用于标识应用程序
- 分段和重组:将数据分成适合传输的段
- 流量控制:防止发送方发送过快
- 错误控制:检测和纠正传输错误
第3层:网络层(Network Layer)
功能:路径选择、路由及逻辑寻址
主要协议:
- IP(Internet Protocol)
- ICMP(Internet控制消息协议)
- ARP(地址解析协议)
- RARP(反向地址解析协议)
关键设备:路由器
主要任务:
- IP地址分配
- 路由选择
- 拥塞控制
第2层:数据链路层(Data Link Layer)
功能:提供错误检测和纠正,确保数据的可靠传输
子层划分:
- LLC(逻辑链路控制)子层
- MAC(媒体访问控制)子层
关键概念:
- MAC地址:物理地址,全球唯一
- 帧:数据链路层的数据单位
- 错误检测:CRC循环冗余校验
关键设备:交换机、网桥
第1层:物理层(Physical Layer)
功能:在物理媒体上传输原始比特流
主要任务:
- 定义电气特性(电压、电流)
- 定义机械特性(接口形状、引脚)
- 定义功能特性(各引脚的功能)
- 定义过程特性(建立连接的步骤)
传输介质:
- 双绞线(UTP/STP)
- 同轴电缆
- 光纤
- 无线电波
2.2 数据封装过程
数据在OSI模型中传输时,每一层都会添加自己的控制信息:
应用层数据
↓ 加上表示层头部
表示层数据
↓ 加上会话层头部
会话层数据
↓ 加上传输层头部(TCP/UDP头)
传输层段(Segment)
↓ 加上网络层头部(IP头)
网络层包(Packet)
↓ 加上数据链路层头部和尾部
数据链路层帧(Frame)
↓ 转换为比特流
物理层比特(Bits)
3. TCP/IP协议族
3.1 TCP/IP模型概述
TCP/IP模型是实际使用的网络协议模型,相比OSI七层模型更加简洁实用,分为四层:
- 应用层(对应OSI的应用层、表示层、会话层)
- 传输层(对应OSI的传输层)
- 网际层(对应OSI的网络层)
- 网络接口层(对应OSI的数据链路层、物理层)
3.2 TCP协议详解
TCP的特点
- 面向连接:通信前必须建立连接
- 可靠传输:确保数据准确到达
- 流量控制:防止接收方被淹没
- 拥塞控制:防止网络拥塞
- 有序传输:数据按顺序到达
TCP三次握手
建立连接的过程:
客户端 服务器
| |
|----SYN(seq=x)---------->| 第一次握手
| |
|<---SYN+ACK(seq=y,ack=x+1)| 第二次握手
| |
|----ACK(ack=y+1)-------->| 第三次握手
| |
|<====连接建立完成========>|
详细解释:
- 客户端发送SYN包,请求建立连接
- 服务器回复SYN+ACK包,同意连接
- 客户端发送ACK包,确认连接
TCP四次挥手
断开连接的过程:
客户端 服务器
| |
|----FIN(seq=x)---------->| 第一次挥手
| |
|<---ACK(ack=x+1)---------| 第二次挥手
| |
|<---FIN(seq=y)-----------| 第三次挥手
| |
|----ACK(ack=y+1)-------->| 第四次挥手
| |
|<====连接断开完成========>|
TCP报文格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 源端口号 | 目标端口号 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 序列号(Sequence Number) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 确认号(Acknowledgment Number) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 数据 | |U|A|P|R|S|F| |
| 偏移 | 保留 |R|C|S|S|Y|I| 窗口大小 |
| | |G|K|H|T|N|N| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 校验和 | 紧急指针 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 选项(长度可变) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 数据 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
3.3 UDP协议详解
UDP的特点
- 无连接:发送数据前不需要建立连接
- 不可靠:不保证数据一定到达
- 面向报文:应用层给多少数据就发送多少
- 没有拥塞控制:网络拥塞不影响发送速率
- 支持一对一、一对多、多对多通信
UDP报文格式
0 7 8 15 16 23 24 31
+--------+--------+--------+--------+
| 源端口 | 目标端口 |
+--------+--------+--------+--------+
| 长度 | 校验和 |
+--------+--------+--------+--------+
| |
| 数据 |
| |
+-----------------------------------+
TCP vs UDP 使用场景
TCP适用场景:
- 文件传输(FTP)
- 电子邮件(SMTP)
- 网页浏览(HTTP/HTTPS)
- 远程登录(SSH)
UDP适用场景:
- 实时视频/音频流
- 在线游戏
- DNS查询
- DHCP
4. IP地址与子网划分
4.1 IP地址基础
IPv4地址
IPv4地址是32位二进制数,通常用点分十进制表示:
二进制:11000000.10101000.00000001.00000001
十进制:192.168.1.1
IP地址分类
A类地址:
- 范围:1.0.0.0 到 126.255.255.255
- 默认子网掩码:255.0.0.0
- 私有地址:10.0.0.0 到 10.255.255.255
B类地址:
- 范围:128.0.0.0 到 191.255.255.255
- 默认子网掩码:255.255.0.0
- 私有地址:172.16.0.0 到 172.31.255.255
C类地址:
- 范围:192.0.0.0 到 223.255.255.255
- 默认子网掩码:255.255.255.0
- 私有地址:192.168.0.0 到 192.168.255.255
D类地址(组播):
- 范围:224.0.0.0 到 239.255.255.255
E类地址(保留):
- 范围:240.0.0.0 到 255.255.255.255
4.2 子网掩码
子网掩码用于划分网络部分和主机部分:
IP地址: 192.168.1.100
子网掩码: 255.255.255.0
----------------
网络地址: 192.168.1.0
主机地址: 0.0.0.100
广播地址: 192.168.1.255
4.3 CIDR(无类域间路由)
CIDR表示法:IP地址/前缀长度
例如:192.168.1.0/24 表示前24位是网络部分
常见CIDR对应关系:
/8 = 255.0.0.0 可用主机数:16,777,214
/16 = 255.255.0.0 可用主机数:65,534
/24 = 255.255.255.0 可用主机数:254
/25 = 255.255.255.128 可用主机数:126
/26 = 255.255.255.192 可用主机数:62
/27 = 255.255.255.224 可用主机数:30
/28 = 255.255.255.240 可用主机数:14
/29 = 255.255.255.248 可用主机数:6
/30 = 255.255.255.252 可用主机数:2
4.4 子网划分实例
需求:将192.168.1.0/24划分为4个子网
解决方案:
- 需要4个子网,需要借用2位主机位(2² = 4)
- 新的子网掩码:255.255.255.192(/26)
- 每个子网有62个可用主机地址
划分结果:
- 子网1:192.168.1.0/26(192.168.1.1-192.168.1.62)
- 子网2:192.168.1.64/26(192.168.1.65-192.168.1.126)
- 子网3:192.168.1.128/26(192.168.1.129-192.168.1.190)
- 子网4:192.168.1.192/26(192.168.1.193-192.168.1.254)
4.5 IPv6地址
IPv6使用128位地址,用冒号分隔的十六进制表示:
完整格式:2001:0db8:0000:0000:0000:8a2e:0370:7334
简化格式:2001:db8::8a2e:370:7334
IPv6地址类型:
- 单播地址:标识单个接口
- 组播地址:标识一组接口
- 任播地址:标识一组接口中的任意一个
5. 路由基础知识
5.1 什么是路由?
路由是指分组从源到目的地时,决定端到端路径的网络层功能。路由器是执行路由功能的关键设备。
5.2 路由表
路由表是路由器中存储的一张表,包含以下信息:
目的网络 子网掩码 下一跳 接口 度量值
0.0.0.0 0.0.0.0 192.168.1.1 eth0 0
192.168.1.0 255.255.255.0 0.0.0.0 eth0 0
10.0.0.0 255.0.0.0 192.168.1.254 eth0 1
5.3 路由类型
静态路由
管理员手动配置的路由:
优点:
- 没有路由协议开销
- 安全性高
- 路径可预测
缺点:
- 不能自动适应网络变化
- 配置繁琐
- 不适合大型网络
配置示例(Linux):
# 添加静态路由
ip route add 10.0.0.0/8 via 192.168.1.254
# 查看路由表
ip route show
动态路由
通过路由协议自动学习的路由:
常见动态路由协议:
RIP(路由信息协议)
- 基于距离向量算法
- 最大跳数15
- 适用于小型网络
OSPF(开放最短路径优先)
- 基于链路状态算法
- 支持大型网络
- 收敛速度快
BGP(边界网关协议)
- 用于自治系统之间
- 互联网核心路由协议
- 基于路径向量算法
5.4 路由选择过程
- 查找最长匹配:选择掩码最长的匹配项
- 比较管理距离:不同路由来源的优先级
- 比较度量值:相同协议内的路径开销
管理距离示例:
直连路由:0
静态路由:1
OSPF:110
RIP:120
5.5 默认路由
默认路由(0.0.0.0/0)是当没有更具体的路由匹配时使用的路由:
# 设置默认网关
ip route add default via 192.168.1.1
6. 网关详解
6.1 什么是网关?
网关是一个网络连接到另一个网络的"关口"。在TCP/IP协议中,网关通常指默认网关。
6.2 网关的类型
默认网关
- 本地网络中的设备要与外部网络通信时使用
- 通常是路由器的IP地址
- 每个网段通常只有一个默认网关
应用网关
- 在应用层工作
- 可以理解应用协议
- 常见例子:HTTP代理、邮件网关
6.3 网关的工作原理
主机A (192.168.1.100) → 默认网关 (192.168.1.1) → 路由器 → 目标主机
工作流程:
- 主机判断目标IP是否在同一网段
- 如果不在,将数据包发送给默认网关
- 网关根据路由表转发数据包
- 经过多个路由器最终到达目标
6.4 NAT(网络地址转换)
NAT通常在网关设备上实现,用于:
- 节省公网IP地址
- 隐藏内部网络结构
- 提供基本的安全防护
NAT类型:
- 静态NAT:一对一映射
- 动态NAT:多对多映射
- PAT(端口地址转换):多对一映射
PAT工作示例:
内网:192.168.1.100:3000 → 公网:203.0.113.1:10000
内网:192.168.1.101:3000 → 公网:203.0.113.1:10001
7. 常用网络协议
7.1 应用层协议
HTTP/HTTPS
HTTP(超文本传输协议):
- 默认端口:80
- 无状态协议
- 请求方法:GET、POST、PUT、DELETE等
HTTPS(安全的HTTP):
- 默认端口:443
- 使用SSL/TLS加密
- 提供身份验证和数据完整性
HTTP请求格式:
GET /index.html HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0
Accept: text/html
HTTP响应格式:
HTTP/1.1 200 OK
Content-Type: text/html
Content-Length: 1234
<html>...</html>
FTP(文件传输协议)
- 控制连接端口:21
- 数据连接端口:20(主动模式)
- 支持断点续传
- 明文传输(FTPS/SFTP提供加密)
DNS(域名系统)
- 默认端口:53
- 使用UDP协议(大数据使用TCP)
- 分层结构:根域名服务器 → 顶级域名服务器 → 权威域名服务器
DNS查询过程:
- 查询本地DNS缓存
- 查询本地DNS服务器
- 递归或迭代查询
- 返回IP地址
DHCP(动态主机配置协议)
- 自动分配IP地址
- 客户端端口:68
- 服务器端口:67
DHCP工作过程:
- DHCP Discover:客户端广播发现DHCP服务器
- DHCP Offer:服务器提供IP地址
- DHCP Request:客户端请求分配
- DHCP ACK:服务器确认分配
7.2 传输层协议补充
端口号分类
- 熟知端口:0-1023(需要管理员权限)
- 注册端口:1024-49151
- 动态端口:49152-65535
常用端口号:
FTP数据:20 SSH:22
FTP控制:21 Telnet:23
SMTP:25 DNS:53
HTTP:80 POP3:110
HTTPS:443 IMAP:143
MySQL:3306 RDP:3389
7.3 网络层协议补充
ICMP(Internet控制消息协议)
用于传递错误消息和其他需要注意的信息:
常见ICMP消息类型:
- Echo Request/Reply(ping命令使用)
- Destination Unreachable
- Time Exceeded
- Redirect
ARP(地址解析协议)
将IP地址解析为MAC地址:
ARP工作过程:
- 发送ARP请求(广播)
- 目标主机回复ARP响应
- 缓存ARP表项
查看ARP表:
# Windows
arp -a
# Linux
ip neigh show
8. 网络设备介绍
8.1 物理层设备
中继器(Repeater)
- 放大和再生信号
- 延长传输距离
- 不能连接不同类型的网络
集线器(Hub)
- 多端口中继器
- 共享带宽
- 所有端口在同一冲突域
8.2 数据链路层设备
网桥(Bridge)
- 连接两个局域网
- 基于MAC地址转发
- 可以隔离冲突域
交换机(Switch)
- 多端口网桥
- 每个端口独立冲突域
- 支持全双工通信
- 具有MAC地址表
交换机工作原理:
- 学习:记录源MAC地址和端口
- 转发:查表转发到目标端口
- 泛洪:未知目标时广播
- 更新:定期更新MAC地址表
8.3 网络层设备
路由器(Router)
- 连接不同网络
- 基于IP地址转发
- 隔离广播域
- 支持多种路由协议
路由器vs交换机:
- 路由器工作在网络层,交换机工作在数据链路层
- 路由器基于IP转发,交换机基于MAC转发
- 路由器可以连接不同类型网络,交换机通常用于局域网
8.4 高层设备
防火墙(Firewall)
- 网络安全设备
- 基于规则过滤流量
- 可工作在多个层次
防火墙类型:
- 包过滤防火墙
- 状态检测防火墙
- 应用层防火墙
负载均衡器
- 分发流量到多个服务器
- 提高可用性和性能
- 支持多种负载均衡算法
9. Python网络编程实例
9.1 Socket编程基础
TCP服务器示例
import socket
import threading
def handle_client(client_socket, address):
"""处理客户端连接"""
print(f"接受来自 {address} 的连接")
try:
while True:
# 接收数据
data = client_socket.recv(1024)
if not data:
break
print(f"收到来自 {address} 的数据: {data.decode('utf-8')}")
# 发送响应
response = f"服务器已收到: {data.decode('utf-8')}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"处理客户端 {address} 时出错: {e}")
finally:
client_socket.close()
print(f"关闭与 {address} 的连接")
def start_tcp_server(host='127.0.0.1', port=9999):
"""启动TCP服务器"""
# 创建socket对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 允许端口重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址和端口
server_socket.bind((host, port))
# 开始监听,最大连接数为5
server_socket.listen(5)
print(f"TCP服务器在 {host}:{port} 上监听...")
try:
while True:
# 接受客户端连接
client_socket, address = server_socket.accept()
# 为每个客户端创建新线程
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server_socket.close()
# 启动服务器
if __name__ == "__main__":
start_tcp_server()
TCP客户端示例
import socket
def tcp_client(host='127.0.0.1', port=9999):
"""TCP客户端"""
# 创建socket对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接服务器
client_socket.connect((host, port))
print(f"已连接到服务器 {host}:{port}")
while True:
# 获取用户输入
message = input("请输入消息 (输入 'quit' 退出): ")
if message.lower() == 'quit':
break
# 发送数据
client_socket.send(message.encode('utf-8'))
# 接收响应
response = client_socket.recv(1024)
print(f"服务器响应: {response.decode('utf-8')}")
except Exception as e:
print(f"客户端错误: {e}")
finally:
client_socket.close()
print("客户端关闭")
# 运行客户端
if __name__ == "__main__":
tcp_client()
UDP服务器示例
import socket
def udp_server(host='127.0.0.1', port=9999):
"""UDP服务器"""
# 创建UDP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定地址和端口
server_socket.bind((host, port))
print(f"UDP服务器在 {host}:{port} 上监听...")
try:
while True:
# 接收数据和客户端地址
data, address = server_socket.recvfrom(1024)
print(f"收到来自 {address} 的数据: {data.decode('utf-8')}")
# 发送响应
response = f"UDP服务器已收到: {data.decode('utf-8')}"
server_socket.sendto(response.encode('utf-8'), address)
except KeyboardInterrupt:
print("\nUDP服务器关闭")
finally:
server_socket.close()
# 启动UDP服务器
if __name__ == "__main__":
udp_server()
UDP客户端示例
import socket
def udp_client(host='127.0.0.1', port=9999):
"""UDP客户端"""
# 创建UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
while True:
# 获取用户输入
message = input("请输入消息 (输入 'quit' 退出): ")
if message.lower() == 'quit':
break
# 发送数据
client_socket.sendto(message.encode('utf-8'), (host, port))
# 接收响应
response, server_address = client_socket.recvfrom(1024)
print(f"服务器响应: {response.decode('utf-8')}")
except Exception as e:
print(f"UDP客户端错误: {e}")
finally:
client_socket.close()
print("UDP客户端关闭")
# 运行UDP客户端
if __name__ == "__main__":
udp_client()
9.2 HTTP编程
简单的HTTP服务器
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class SimpleHTTPHandler(BaseHTTPRequestHandler):
"""简单的HTTP请求处理器"""
def do_GET(self):
"""处理GET请求"""
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
html = """
<html>
<head><title>简单HTTP服务器</title></head>
<body>
<h1>欢迎访问简单HTTP服务器</h1>
<p>这是一个用Python实现的简单HTTP服务器</p>
</body>
</html>
"""
self.wfile.write(html.encode('utf-8'))
elif self.path == '/api/info':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
data = {
'server': 'Python HTTP Server',
'version': '1.0',
'status': 'running'
}
self.wfile.write(json.dumps(data).encode('utf-8'))
else:
self.send_response(404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(b'404 Not Found')
def do_POST(self):
"""处理POST请求"""
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
response = {
'received': post_data.decode('utf-8'),
'length': content_length
}
self.wfile.write(json.dumps(response).encode('utf-8'))
def run_http_server(host='127.0.0.1', port=8000):
"""运行HTTP服务器"""
server = HTTPServer((host, port), SimpleHTTPHandler)
print(f"HTTP服务器运行在 http://{host}:{port}/")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nHTTP服务器关闭")
server.shutdown()
# 启动HTTP服务器
if __name__ == "__main__":
run_http_server()
HTTP客户端示例
import requests
import urllib.request
import json
# 使用requests库(需要安装:pip install requests)
def http_client_requests():
"""使用requests库的HTTP客户端"""
# GET请求
print("=== GET请求示例 ===")
response = requests.get('https://api.github.com/users/github')
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()['name']}")
# POST请求
print("\n=== POST请求示例 ===")
data = {'name': 'test', 'value': 123}
headers = {'Content-Type': 'application/json'}
response = requests.post(
'https://httpbin.org/post',
data=json.dumps(data),
headers=headers
)
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()['data']}")
# 使用urllib(Python内置库)
def http_client_urllib():
"""使用urllib的HTTP客户端"""
# GET请求
print("=== urllib GET请求示例 ===")
with urllib.request.urlopen('https://api.github.com/users/github') as response:
data = json.loads(response.read().decode('utf-8'))
print(f"状态码: {response.status}")
print(f"用户名: {data['name']}")
# POST请求
print("\n=== urllib POST请求示例 ===")
data = json.dumps({'name': 'test', 'value': 123}).encode('utf-8')
req = urllib.request.Request(
'https://httpbin.org/post',
data=data,
headers={'Content-Type': 'application/json'}
)
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
print(f"状态码: {response.status}")
print(f"响应内容: {result['data']}")
# 运行示例
if __name__ == "__main__":
print("使用requests库:")
http_client_requests()
print("\n\n使用urllib库:")
http_client_urllib()
9.3 网络工具实现
Ping工具实现
import socket
import struct
import time
import select
import sys
def checksum(data):
"""计算校验和"""
sum = 0
n = len(data)
m = n % 2
for i in range(0, n - m, 2):
sum += (data[i]) + ((data[i + 1]) << 8)
if m:
sum += (data[-1])
sum = (sum >> 16) + (sum & 0xffff)
sum += (sum >> 16)
return (~sum) & 0xffff
def create_icmp_packet(id, sequence):
"""创建ICMP数据包"""
# ICMP类型(8=echo request), 代码(0), 校验和(初始为0), ID, 序列号
header = struct.pack('!BBHHH', 8, 0, 0, id, sequence)
# 数据部分
data = struct.pack('!d', time.time())
# 计算校验和
chksum = checksum(header + data)
# 重新打包,包含正确的校验和
header = struct.pack('!BBHHH', 8, 0, chksum, id, sequence)
return header + data
def parse_icmp_packet(packet, id):
"""解析ICMP数据包"""
# IP头部通常是20字节
icmp_header = packet[20:28]
type, code, checksum, packet_id, sequence = struct.unpack(
'!BBHHH', icmp_header
)
if type == 0 and packet_id == id: # Echo Reply
# 提取时间戳
timestamp = struct.unpack('!d', packet[28:36])[0]
return time.time() - timestamp
return None
def ping(host, timeout=1, count=4):
"""Ping指定主机"""
try:
# 获取主机IP
dest_ip = socket.gethostbyname(host)
print(f"正在 Ping {host} [{dest_ip}]:")
# 创建原始套接字(需要管理员权限)
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
sock.settimeout(timeout)
# 进程ID作为ICMP ID
icmp_id = sys.getpid() & 0xFFFF
# 统计信息
sent = 0
received = 0
min_time = float('inf')
max_time = 0
total_time = 0
for sequence in range(count):
# 创建ICMP包
packet = create_icmp_packet(icmp_id, sequence)
# 发送数据包
sock.sendto(packet, (dest_ip, 0))
sent += 1
send_time = time.time()
# 等待响应
try:
while True:
ready = select.select([sock], [], [], timeout)
if ready[0]:
recv_packet, addr = sock.recvfrom(1024)
recv_time = time.time()
# 解析响应
delay = parse_icmp_packet(recv_packet, icmp_id)
if delay is not None:
delay_ms = delay * 1000
received += 1
total_time += delay_ms
min_time = min(min_time, delay_ms)
max_time = max(max_time, delay_ms)
print(f"来自 {addr[0]} 的回复: 时间={delay_ms:.2f}ms")
break
else:
print("请求超时")
break
except socket.timeout:
print("请求超时")
# 等待一秒再发送下一个
if sequence < count - 1:
time.sleep(1)
# 统计结果
print(f"\n{host} 的 Ping 统计信息:")
print(f" 数据包: 已发送 = {sent},已接收 = {received},"
f"丢失 = {sent - received} ({(sent - received) / sent * 100:.0f}% 丢失)")
if received > 0:
print(f"往返行程的估计时间(以毫秒为单位):")
print(f" 最短 = {min_time:.2f}ms,最长 = {max_time:.2f}ms,"
f"平均 = {total_time / received:.2f}ms")
sock.close()
except PermissionError:
print("错误: 需要管理员权限才能创建原始套接字")
except socket.gaierror:
print(f"错误: 无法解析主机 {host}")
except Exception as e:
print(f"错误: {e}")
# 使用示例
if __name__ == "__main__":
# 注意:需要管理员权限运行
ping("www.google.com", count=4)
端口扫描器
import socket
import threading
import queue
import sys
class PortScanner:
"""端口扫描器"""
def __init__(self, host, start_port=1, end_port=1024, threads=100):
self.host = host
self.start_port = start_port
self.end_port = end_port
self.threads = threads
self.open_ports = []
self.port_queue = queue.Queue()
# 获取主机IP
try:
self.ip = socket.gethostbyname(host)
print(f"扫描主机: {host} ({self.ip})")
except socket.gaierror:
print(f"错误: 无法解析主机 {host}")
sys.exit(1)
def scan_port(self):
"""扫描单个端口的线程函数"""
while True:
port = self.port_queue.get()
if port is None:
break
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
try:
result = sock.connect_ex((self.ip, port))
if result == 0:
self.open_ports.append(port)
try:
# 尝试获取服务名称
service = socket.getservbyport(port)
print(f"端口 {port} 开放 - 服务: {service}")
except:
print(f"端口 {port} 开放")
except:
pass
finally:
sock.close()
self.port_queue.task_done()
def scan(self):
"""执行端口扫描"""
print(f"扫描端口范围: {self.start_port}-{self.end_port}")
print("扫描中...\n")
# 创建工作线程
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.scan_port)
t.daemon = True
t.start()
threads.append(t)
# 添加端口到队列
for port in range(self.start_port, self.end_port + 1):
self.port_queue.put(port)
# 等待所有端口扫描完成
self.port_queue.join()
# 停止所有线程
for _ in range(self.threads):
self.port_queue.put(None)
for t in threads:
t.join()
# 显示结果
print(f"\n扫描完成!")
print(f"开放的端口数: {len(self.open_ports)}")
if self.open_ports:
print(f"开放的端口: {sorted(self.open_ports)}")
# 使用示例
if __name__ == "__main__":
scanner = PortScanner("scanme.nmap.org", 1, 1000, threads=50)
scanner.scan()
9.4 网络诊断工具
网络连通性测试
import socket
import subprocess
import platform
import re
class NetworkDiagnostics:
"""网络诊断工具"""
@staticmethod
def check_internet_connection():
"""检查互联网连接"""
try:
# 尝试连接到Google的DNS服务器
socket.create_connection(("8.8.8.8", 53), timeout=3)
return True
except OSError:
return False
@staticmethod
def get_local_ip():
"""获取本地IP地址"""
try:
# 创建一个UDP套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 连接到外部地址(不会真正发送数据)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
return local_ip
except:
return "127.0.0.1"
@staticmethod
def traceroute(host, max_hops=30):
"""追踪路由(简化版)"""
print(f"追踪到 {host} 的路由:")
system = platform.system().lower()
if system == "windows":
cmd = f"tracert -h {max_hops} {host}"
else:
cmd = f"traceroute -m {max_hops} {host}"
try:
# 执行命令
process = subprocess.Popen(
cmd.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
# 读取输出
for line in process.stdout:
print(line.strip())
process.wait()
except Exception as e:
print(f"追踪路由失败: {e}")
@staticmethod
def dns_lookup(hostname):
"""DNS查询"""
print(f"查询 {hostname} 的DNS信息:")
try:
# 获取IP地址
ip = socket.gethostbyname(hostname)
print(f"IP地址: {ip}")
# 获取完整的主机信息
host_info = socket.gethostbyname_ex(hostname)
print(f"主机名: {host_info[0]}")
print(f"别名: {host_info[1]}")
print(f"IP地址列表: {host_info[2]}")
except socket.gaierror as e:
print(f"DNS查询失败: {e}")
@staticmethod
def get_network_interfaces():
"""获取网络接口信息"""
system = platform.system().lower()
if system == "windows":
cmd = "ipconfig /all"
else:
cmd = "ifconfig -a"
try:
result = subprocess.run(
cmd.split(),
capture_output=True,
text=True
)
print("网络接口信息:")
print(result.stdout)
except Exception as e:
print(f"获取网络接口信息失败: {e}")
@staticmethod
def bandwidth_test(host="www.google.com", port=80, duration=10):
"""简单的带宽测试"""
import time
print(f"测试到 {host}:{port} 的连接速度...")
try:
# 创建连接
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# 构造HTTP请求
request = b"GET / HTTP/1.1\r\nHost: " + host.encode() + b"\r\n\r\n"
# 测试下载速度
start_time = time.time()
bytes_received = 0
sock.send(request)
sock.settimeout(1)
while time.time() - start_time < duration:
try:
data = sock.recv(4096)
if not data:
break
bytes_received += len(data)
except socket.timeout:
break
elapsed_time = time.time() - start_time
speed = (bytes_received * 8) / (elapsed_time * 1024 * 1024) # Mbps
print(f"接收字节: {bytes_received}")
print(f"耗时: {elapsed_time:.2f} 秒")
print(f"平均速度: {speed:.2f} Mbps")
sock.close()
except Exception as e:
print(f"带宽测试失败: {e}")
# 运行诊断
if __name__ == "__main__":
diag = NetworkDiagnostics()
print("=== 网络诊断工具 ===\n")
# 检查互联网连接
print("1. 检查互联网连接:")
if diag.check_internet_connection():
print(" ✓ 互联网连接正常\n")
else:
print(" ✗ 无法连接到互联网\n")
# 获取本地IP
print("2. 本地IP地址:")
print(f" {diag.get_local_ip()}\n")
# DNS查询
print("3. DNS查询测试:")
diag.dns_lookup("www.github.com")
print()
# 追踪路由(可选)
# print("4. 路由追踪:")
# diag.traceroute("www.google.com", max_hops=10)
9.5 实用网络工具类
综合网络工具库
import socket
import struct
import fcntl
import array
import platform
import subprocess
import ipaddress
class NetworkUtils:
"""网络实用工具类"""
@staticmethod
def is_valid_ipv4(ip):
"""验证IPv4地址"""
try:
ipaddress.IPv4Address(ip)
return True
except:
return False
@staticmethod
def is_valid_ipv6(ip):
"""验证IPv6地址"""
try:
ipaddress.IPv6Address(ip)
return True
except:
return False
@staticmethod
def cidr_to_netmask(cidr):
"""CIDR转子网掩码"""
try:
network = ipaddress.IPv4Network(f"0.0.0.0/{cidr}")
return str(network.netmask)
except:
return None
@staticmethod
def netmask_to_cidr(netmask):
"""子网掩码转CIDR"""
try:
return ipaddress.IPv4Network(f"0.0.0.0/{netmask}").prefixlen
except:
return None
@staticmethod
def calculate_subnet(ip, cidr):
"""计算子网信息"""
try:
network = ipaddress.IPv4Network(f"{ip}/{cidr}", strict=False)
return {
"网络地址": str(network.network_address),
"广播地址": str(network.broadcast_address),
"子网掩码": str(network.netmask),
"主机数量": network.num_addresses - 2,
"可用地址范围": f"{network.network_address + 1} - {network.broadcast_address - 1}",
"CIDR": network.prefixlen
}
except Exception as e:
return {"错误": str(e)}
@staticmethod
def get_mac_address(interface=None):
"""获取MAC地址"""
if platform.system() == "Windows":
# Windows下获取MAC地址
import uuid
mac = uuid.getnode()
return ":".join(["{:02x}".format((mac >> i) & 0xff)
for i in range(0, 48, 8)][::-1])
else:
# Linux/Unix下获取指定接口的MAC地址
if interface is None:
interface = "eth0"
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927,
struct.pack('256s', interface[:15].encode()))
return ':'.join(['%02x' % b for b in info[18:24]])
except:
return None
@staticmethod
def port_service_map():
"""常用端口服务映射"""
return {
20: "FTP-DATA",
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
67: "DHCP-Server",
68: "DHCP-Client",
80: "HTTP",
110: "POP3",
119: "NNTP",
123: "NTP",
143: "IMAP",
161: "SNMP",
194: "IRC",
443: "HTTPS",
445: "SMB",
465: "SMTPS",
514: "Syslog",
515: "Printer",
993: "IMAPS",
995: "POP3S",
1080: "SOCKS",
1433: "MSSQL",
1521: "Oracle",
3306: "MySQL",
3389: "RDP",
5432: "PostgreSQL",
5900: "VNC",
6379: "Redis",
8080: "HTTP-Proxy",
8443: "HTTPS-Alt",
27017: "MongoDB"
}
@staticmethod
def bytes_to_human_readable(bytes, decimal_places=2):
"""字节转人类可读格式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024.0:
return f"{bytes:.{decimal_places}f} {unit}"
bytes /= 1024.0
return f"{bytes:.{decimal_places}f} PB"
# 使用示例
if __name__ == "__main__":
utils = NetworkUtils()
print("=== 网络工具演示 ===\n")
# IP地址验证
test_ip = "192.168.1.1"
print(f"1. IP地址验证:")
print(f" {test_ip} 是有效的IPv4地址: {utils.is_valid_ipv4(test_ip)}")
print(f" {test_ip} 是有效的IPv6地址: {utils.is_valid_ipv6(test_ip)}\n")
# CIDR和子网掩码转换
print("2. CIDR和子网掩码转换:")
print(f" /24 对应的子网掩码: {utils.cidr_to_netmask(24)}")
print(f" 255.255.255.0 对应的CIDR: {utils.netmask_to_cidr('255.255.255.0')}\n")
# 子网计算
print("3. 子网计算:")
subnet_info = utils.calculate_subnet("192.168.1.100", 24)
for key, value in subnet_info.items():
print(f" {key}: {value}")
print()
# 端口服务映射
print("4. 常用端口服务:")
services = utils.port_service_map()
for port in [80, 443, 22, 3306]:
print(f" 端口 {port}: {services.get(port, '未知')}")
print()
# 字节转换
print("5. 字节转换:")
sizes = [1024, 1048576, 1073741824]
for size in sizes:
print(f" {size} 字节 = {utils.bytes_to_human_readable(size)}")
总结
本教程详细介绍了计算机网络的基础知识,从理论到实践,涵盖了:
- 基础概念:网络的定义、分类和组成
- OSI七层模型:每一层的功能和协议
- TCP/IP协议族:TCP和UDP的详细对比
- IP地址和子网:IPv4/IPv6地址和子网划分
- 路由和网关:路由原理和网关功能
- 常用协议:HTTP、FTP、DNS等应用层协议
- 网络设备:交换机、路由器等设备的工作原理
- Python网络编程:Socket编程和实用工具实现