一、识别猫狗图片
数据集:
train_catvnoncat.h5
test_catvnoncat.h5
模型 1:基于 Numpy 实现的单隐藏层神经网络
import numpy as np
import matplotlib.pyplot as plt
import h5py
import scipy
from PIL import Image
from scipy import ndimage
# 设置随机种子,保证结果可复现
np.random.seed(1)
def load_dataset():
"""加载猫与非猫的数据集"""
train_dataset = h5py.File('train_catvnoncat.h5', "r")
train_set_x_orig = np.array(train_dataset["train_set_x"][:]) # 训练集特征
train_set_y_orig = np.array(train_dataset["train_set_y"][:]) # 训练集标签
test_dataset = h5py.File('test_catvnoncat.h5', "r")
test_set_x_orig = np.array(test_dataset["test_set_x"][:]) # 测试集特征
test_set_y_orig = np.array(test_dataset["test_set_y"][:]) # 测试集标签
classes = np.array(test_dataset["list_classes"][:]) # 类别列表
train_set_y_orig = train_set_y_orig.reshape((1, train_set_y_orig.shape[0]))
test_set_y_orig = test_set_y_orig.reshape((1, test_set_y_orig.shape[0]))
return train_set_x_orig, train_set_y_orig, test_set_x_orig, test_set_y_orig, classes
def sigmoid(z):
"""
实现sigmoid函数
"""
s = 1 / (1 + np.exp(-z))
return s
def initialize_with_zeros(dim):
"""
初始化权重向量w和偏置b
"""
w = np.zeros((dim, 1))
b = 0
assert(w.shape == (dim, 1))
assert(isinstance(b, float) or isinstance(b, int))
return w, b
def propagate(w, b, X, Y):
"""
实现前向传播和反向传播
"""
m = X.shape[1]
# 前向传播
A = sigmoid(np.dot(w.T, X) + b)
cost = -1/m * np.sum(Y * np.log(A) + (1 - Y) * np.log(1 - A))
# 反向传播
dw = 1/m * np.dot(X, (A - Y).T)
db = 1/m * np.sum(A - Y)
assert(dw.shape == w.shape)
assert(db.dtype == float)
cost = np.squeeze(cost)
assert(cost.shape == ())
grads = {"dw": dw, "db": db}
return grads, cost
def optimize(w, b, X, Y, num_iterations, learning_rate, print_cost=False):
"""
使用梯度下降算法优化参数
"""
costs = []
for i in range(num_iterations):
# 计算梯度和损失
grads, cost = propagate(w, b, X, Y)
# 获取导数
dw = grads["dw"]
db = grads["db"]
# 更新参数
w = w - learning_rate * dw
b = b - learning_rate * db
# 记录损失
if i % 100 == 0:
costs.append(cost)
# 打印损失
if print_cost and i % 100 == 0:
print(f"迭代次数 {i}: 损失 = {cost}")
params = {"w": w, "b": b}
grads = {"dw": dw, "db": db}
return params, grads, costs
def predict(w, b, X):
"""
使用训练好的参数预测标签
"""
m = X.shape[1]
Y_prediction = np.zeros((1, m))
w = w.reshape(X.shape[0], 1)
# 计算预测概率
A = sigmoid(np.dot(w.T, X) + b)
# 将概率转换为0/1预测
for i in range(A.shape[1]):
Y_prediction[0, i] = 1 if A[0, i] > 0.5 else 0
assert(Y_prediction.shape == (1, m))
return Y_prediction
def model(X_train, Y_train, X_test, Y_test, num_iterations=2000, learning_rate=0.5, print_cost=False):
"""
整合所有函数,构建完整的模型
"""
# 初始化参数
w, b = initialize_with_zeros(X_train.shape[0])
# 梯度下降
parameters, grads, costs = optimize(w, b, X_train, Y_train, num_iterations, learning_rate, print_cost)
# 获取训练后的参数
w = parameters["w"]
b = parameters["b"]
# 预测训练集和测试集
Y_prediction_test = predict(w, b, X_test)
Y_prediction_train = predict(w, b, X_train)
# 打印准确率
print(f"训练集准确率: {100 - np.mean(np.abs(Y_prediction_train - Y_train)) * 100}%")
print(f"测试集准确率: {100 - np.mean(np.abs(Y_prediction_test - Y_test)) * 100}%")
d = {"costs": costs,
"Y_prediction_test": Y_prediction_test,
"Y_prediction_train": Y_prediction_train,
"w": w,
"b": b,
"learning_rate": learning_rate,
"num_iterations": num_iterations}
return d
# 加载数据集
train_set_x_orig, train_set_y, test_set_x_orig, test_set_y, classes = load_dataset()
# 显示一个训练样本
index = 25
plt.imshow(train_set_x_orig[index])
print(f"y = {train_set_y[:, index]}, 这是一张 '{classes[np.squeeze(train_set_y[:, index])].decode('utf-8')}' 的图片。")
# 数据预处理
m_train = train_set_x_orig.shape[0]
m_test = test_set_x_orig.shape[0]
num_px = train_set_x_orig.shape[1]
print(f"训练样本数: m_train = {m_train}")
print(f"测试样本数: m_test = {m_test}")
print(f"每张图片的高度/宽度: num_px = {num_px}")
print(f"每张图片的大小: ({num_px}, {num_px}, 3)")
print(f"训练集形状: {train_set_x_orig.shape}")
print(f"训练标签形状: {train_set_y.shape}")
print(f"测试集形状: {test_set_x_orig.shape}")
print(f"测试标签形状: {test_set_y.shape}")
# 重塑训练集和测试集
train_set_x_flatten = train_set_x_orig.reshape(train_set_x_orig.shape[0], -1).T
test_set_x_flatten = test_set_x_orig.reshape(test_set_x_orig.shape[0], -1).T
print(f"训练集展平后的形状: {train_set_x_flatten.shape}")
print(f"测试集展平后的形状: {test_set_x_flatten.shape}")
# 标准化数据集
train_set_x = train_set_x_flatten / 255.
test_set_x = test_set_x_flatten / 255.
# 训练模型
d = model(train_set_x, train_set_y, test_set_x, test_set_y, num_iterations=2000, learning_rate=0.005, print_cost=True)
# 绘制损失曲线
costs = np.squeeze(d['costs'])
plt.plot(costs)
plt.ylabel('cost')
plt.xlabel('iterations (per hundreds)')
plt.title(f"Learning rate = {d['learning_rate']}")
plt.show()
模型 2:基于 PyTorch 实现的同结构神经网络
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import h5py
import numpy as np
import matplotlib.pyplot as plt
# 设置随机种子,保证结果可复现
torch.manual_seed(1)
np.random.seed(1)
# 加载数据集
def load_dataset():
train_dataset = h5py.File('train_catvnoncat.h5', "r")
train_set_x_orig = np.array(train_dataset["train_set_x"][:]) # 训练集特征
train_set_y_orig = np.array(train_dataset["train_set_y"][:]) # 训练集标签
test_dataset = h5py.File('test_catvnoncat.h5', "r")
test_set_x_orig = np.array(test_dataset["test_set_x"][:]) # 测试集特征
test_set_y_orig = np.array(test_dataset["test_set_y"][:]) # 测试集标签
classes = np.array(test_dataset["list_classes"][:]) # 类别列表
train_set_y_orig = train_set_y_orig.reshape((1, train_set_y_orig.shape[0]))
test_set_y_orig = test_set_y_orig.reshape((1, test_set_y_orig.shape[0]))
return train_set_x_orig, train_set_y_orig, test_set_x_orig, test_set_y_orig, classes
print("load_dataset():success")
# 定义PyTorch模型
class CatClassifier(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(CatClassifier, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size) # 输入层到隐藏层
self.relu = nn.ReLU() # 隐藏层激活函数
self.fc2 = nn.Linear(hidden_size, output_size) # 隐藏层到输出层
self.sigmoid = nn.Sigmoid() # 输出层激活函数,用于二分类
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return x
print("CatClassifier():success")
# 数据预处理
def preprocess_data(train_x_orig, train_y, test_x_orig, test_y):
# 重塑数据集
train_x_flatten = train_x_orig.reshape(train_x_orig.shape[0], -1)
test_x_flatten = test_x_orig.reshape(test_x_orig.shape[0], -1)
# 标准化
train_x = train_x_flatten / 255.
test_x = test_x_flatten / 255.
# 转换为PyTorch张量
train_x_tensor = torch.FloatTensor(train_x)
train_y_tensor = torch.FloatTensor(train_y.T) # 调整形状为 [样本数, 1]
test_x_tensor = torch.FloatTensor(test_x)
test_y_tensor = torch.FloatTensor(test_y.T)
return train_x_tensor, train_y_tensor, test_x_tensor, test_y_tensor
print("preprocess_data():success")
# 训练模型
def train_model(model, train_loader, criterion, optimizer, epochs):
model.train()
losses = []
for epoch in range(epochs):
running_loss = 0.0
for inputs, labels in train_loader:
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
# 记录每个epoch的平均损失
epoch_loss = running_loss / len(train_loader)
losses.append(epoch_loss)
if (epoch + 1) % 100 == 0:
print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')
# 绘制损失曲线
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()
print("train_model():success")
return model
# 评估模型
def evaluate_model(model, data_loader, dataset_name):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in data_loader:
outputs = model(inputs)
predicted = (outputs > 0.5).float()
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(f'{dataset_name} Accuracy: {accuracy:.2f}%')
return accuracy
# 主函数
def main():
# 加载数据
train_x_orig, train_y, test_x_orig, test_y, classes = load_dataset()
# 数据预处理
train_x, train_y, test_x, test_y = preprocess_data(train_x_orig, train_y, test_x_orig, test_y)
# 创建数据加载器
train_dataset = TensorDataset(train_x, train_y)
train_loader = DataLoader(train_dataset, batch_size=train_x.shape[0], shuffle=True)
test_dataset = TensorDataset(test_x, test_y)
test_loader = DataLoader(test_dataset, batch_size=test_x.shape[0])
# 初始化模型
input_size = train_x.shape[1] # 图片展平后的维度
hidden_size = 4 # 隐藏层神经元数量
output_size = 1 # 输出层维度(二分类问题)
model = CatClassifier(input_size, hidden_size, output_size)
# 定义损失函数和优化器
criterion = nn.BCELoss() # 二分类交叉熵损失
optimizer = optim.SGD(model.parameters(), lr=0.005) # 随机梯度下降
# 训练模型
print("开始训练模型...")
model = train_model(model, train_loader, criterion, optimizer, epochs=200)
# 评估模型
print("\n模型评估结果:")
evaluate_model(model, train_loader, "训练集")
evaluate_model(model, test_loader, "测试集")
return model
if __name__ == "__main__":
model = main()
print("main():success")
二、识别web恶意流量
数据集:
normalTrafficTraining.txt
normalTrafficTest.txt
anomalousTrafficTest.txt
模型1:基于 Numpy 实现的单隐藏层神经网络
import numpy as np
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
def load_data():
"""Load and preprocess the web traffic data from text files"""
try:
# Load normal training data
with open('normalTrafficTraining.txt', 'r', encoding='utf-8') as f:
normal_train = f.read().split('\n\n')
# Load normal test data
with open('normalTrafficTest.txt', 'r', encoding='utf-8') as f:
normal_test = f.read().split('\n\n')
# Load anomalous data
with open('anomalousTrafficTest.txt', 'r', encoding='utf-8') as f:
anomalous = f.read().split('\n\n')
# Create labels (0 = normal, 1 = anomalous)
X = normal_train + normal_test + anomalous
y = np.array([0]*len(normal_train) + [0]*len(normal_test) + [1]*len(anomalous))
return X, y
except FileNotFoundError as e:
print(f"Error loading data files: {e}")
print("Please ensure the following files are in the same directory:")
print("- normalTrafficTraining.txt")
print("- normalTrafficTest.txt")
print("- anomalousTrafficTest.txt")
raise
def preprocess_data(X, y):
"""Convert text data to numerical features"""
# Convert text to TF-IDF features
vectorizer = TfidfVectorizer(max_features=1000)
X_vec = vectorizer.fit_transform(X).toarray()
# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
X_vec, y, test_size=0.2, random_state=42)
# Reshape for our model
X_train = X_train.T
X_test = X_test.T
y_train = y_train.reshape(1, -1)
y_test = y_test.reshape(1, -1)
return X_train, y_train, X_test, y_test
def sigmoid(z):
"""Sigmoid function"""
return 1 / (1 + np.exp(-z))
def initialize_with_zeros(dim):
"""Initialize parameters"""
w = np.zeros((dim, 1))
b = 0
return w, b
def propagate(w, b, X, Y):
"""Forward and backward propagation"""
m = X.shape[1]
# Forward propagation
A = sigmoid(np.dot(w.T, X) + b)
cost = -1/m * np.sum(Y * np.log(A) + (1 - Y) * np.log(1 - A))
# Backward propagation
dw = 1/m * np.dot(X, (A - Y).T)
db = 1/m * np.sum(A - Y)
cost = np.squeeze(cost)
grads = {"dw": dw, "db": db}
return grads, cost
def optimize(w, b, X, Y, num_iterations, learning_rate, print_cost=False):
"""Optimization function"""
costs = []
for i in range(num_iterations):
grads, cost = propagate(w, b, X, Y)
dw = grads["dw"]
db = grads["db"]
w = w - learning_rate * dw
b = b - learning_rate * db
if i % 100 == 0:
costs.append(cost)
if print_cost and i % 100 == 0:
print(f"Cost after iteration {i}: {cost}")
params = {"w": w, "b": b}
grads = {"dw": dw, "db": db}
return params, grads, costs
def predict(w, b, X):
"""Make predictions"""
m = X.shape[1]
Y_prediction = np.zeros((1, m))
w = w.reshape(X.shape[0], 1)
A = sigmoid(np.dot(w.T, X) + b)
for i in range(A.shape[1]):
Y_prediction[0, i] = 1 if A[0, i] > 0.5 else 0
return Y_prediction
def model(X_train, Y_train, X_test, Y_test, num_iterations=2000, learning_rate=0.5, print_cost=False):
"""Complete model"""
w, b = initialize_with_zeros(X_train.shape[0])
parameters, grads, costs = optimize(w, b, X_train, Y_train, num_iterations, learning_rate, print_cost)
w = parameters["w"]
b = parameters["b"]
Y_prediction_test = predict(w, b, X_test)
Y_prediction_train = predict(w, b, X_train)
print(f"Train accuracy: {100 - np.mean(np.abs(Y_prediction_train - Y_train)) * 100}%")
print(f"Test accuracy: {100 - np.mean(np.abs(Y_prediction_test - Y_test)) * 100}%")
d = {"costs": costs,
"Y_prediction_test": Y_prediction_test,
"Y_prediction_train": Y_prediction_train,
"w": w,
"b": b,
"learning_rate": learning_rate,
"num_iterations": num_iterations}
return d
# Main execution
if __name__ == "__main__":
try:
# Load and preprocess data
print("Loading data...")
X, y = load_data()
print("Preprocessing data...")
X_train, y_train, X_test, y_test = preprocess_data(X, y)
# Train model
print("Training model...")
d = model(X_train, y_train, X_test, y_test, num_iterations=2000, learning_rate=0.005, print_cost=True)
# Plot learning curve
costs = np.squeeze(d['costs'])
plt.plot(costs)
plt.ylabel('cost')
plt.xlabel('iterations (per hundreds)')
plt.title(f"Learning rate = {d['learning_rate']}")
plt.show()
except Exception as e:
print(f"An error occurred: {e}")
模型2:基于 PyTorch 实现的同结构神经网络
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import matplotlib.pyplot as plt
# Set random seeds for reproducibility
torch.manual_seed(1)
np.random.seed(1)
def load_data():
"""Load and preprocess the web traffic data"""
# Load normal training data
with open('normalTrafficTraining.txt', 'r') as f:
normal_train = f.read().split('\n\n')
# Load normal test data
with open('normalTrafficTest.txt', 'r') as f:
normal_test = f.read().split('\n\n')
# Load anomalous data
with open('anomalousTrafficTest.txt', 'r') as f:
anomalous = f.read().split('\n\n')
# Create labels (0 = normal, 1 = anomalous)
X = normal_train + normal_test + anomalous
y = np.array([0]*len(normal_train) + [0]*len(normal_test) + [1]*len(anomalous))
return X, y
def preprocess_data(X, y):
"""Convert text data to numerical features and create PyTorch datasets"""
# Convert text to TF-IDF features
vectorizer = TfidfVectorizer(max_features=1000)
X_vec = vectorizer.fit_transform(X).toarray()
# Convert to PyTorch tensors
X_tensor = torch.FloatTensor(X_vec)
y_tensor = torch.FloatTensor(y).unsqueeze(1)
return X_tensor, y_tensor
class TrafficClassifier(nn.Module):
"""Neural network for traffic classification"""
def __init__(self, input_size, hidden_size, output_size):
super(TrafficClassifier, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(hidden_size, output_size)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
x = self.sigmoid(x)
return x
def train_model(model, train_loader, criterion, optimizer, epochs):
"""Train the neural network"""
model.train()
losses = []
for epoch in range(epochs):
running_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
epoch_loss = running_loss / len(train_loader)
losses.append(epoch_loss)
if (epoch + 1) % 100 == 0:
print(f'Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}')
# Plot training loss
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.show()
return model
def evaluate_model(model, data_loader, dataset_name):
"""Evaluate model performance"""
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in data_loader:
outputs = model(inputs)
predicted = (outputs > 0.5).float()
total += labels.size(0)
correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(f'{dataset_name} Accuracy: {accuracy:.2f}%')
return accuracy
def main():
# Load and preprocess data
X, y = load_data()
X_tensor, y_tensor = preprocess_data(X, y)
# Create dataset and split into train and test
dataset = TensorDataset(X_tensor, y_tensor)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)
# Initialize model
input_size = X_tensor.shape[1]
hidden_size = 64
output_size = 1
model = TrafficClassifier(input_size, hidden_size, output_size)
# Define loss and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Train model
print("Starting training...")
model = train_model(model, train_loader, criterion, optimizer, epochs=1000)
# Evaluate model
print("\nModel evaluation:")
evaluate_model(model, train_loader, "Training set")
evaluate_model(model, test_loader, "Test set")
if __name__ == "__main__":
main()