1. 学习目的
最近在学习cuda,准备给我的taskBus SDR添加CUDA的模块支持,以便可以用PC机压榨山寨 B210那56M的带宽。
因为年纪比较大了,所以不打算看书了,直接DeepSeek安排上,结果发现学习效率杠杠的!
2. 阶段成果
我用自己的文物显卡 NVIDIA GeForce GTX 750 Ti (Compute Capability 5.0),只有2G的显存,1G的主频,做时域512点fir卷积,竟然只需0.65秒就跑完100M float数据(411MB文件大小)。
关键技术是使用了三个线程进行读写处理分离。
2.1 NVVP 性能探查
使用NVVP,可以看到GPU的占空比还是很紧凑。
主要时间花在了fir_kernel,这是我们希望的。同时,部分时间在进行GPU内部的memCpy,这也是值得进一步优化的方向。
由于磁盘IO完全和CPU计算独立,所以两波计算之间几乎没有空闲。
2.2 测试编译环境
Manjaro 20250321,CUDA12
Intel Core i7 6500K @ 4.0GHz
编译命令:
nvcc -arch=sm_50 cuda_filter.cu -o cuda_filter
注意要使用 -arch=sm_50 选择老的计算兼容选项,否则CUDA启动后会报错。
2.3 测试样例
测试样例是用 AudaCity录制的音频,411MB大小,100MB左右的float32样点(单声道)
滤波器对数据(轨道1)做512点的平均(192KHz采样率下,大概是2秒多)。
3 学习过程
3.1 提问DeepSeek
提问:
使用C语言调用CUDA实现一个连续FIR滤波算法。用户使用该算法的步骤是:
- 打开文件 test.pcm。
- 循环每次读取一定个数(N个)float值,直到文件尾部。
- 对每次读取的float值,调用 push_and_filter() 方法。该方法是你需要着重考虑设计、实现的,作用是接着上次的状态,继续执行连续滤波。滤波器是一个提前定义好的数组,有M个元素,float类型,M目前设置合理范围是16-2048。
- 每次滤波的输出,存储在 out.pcm,连续写入。
- 滤波缓存一开始为全0,尾部也全部清零后结束。
请根据CUDA的特性,分析并设计每次读取的N的个数,让计算尽可能快,并给出全部代码。
回答:
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#define MAX_FILTER_LENGTH 2048
#define OPTIMAL_N 4096 // 根据GPU架构优化的读取块大小
#define BLOCK_SIZE 256 // CUDA线程块大小
typedef struct {
float* buffer;
int size;
} CircularBuffer;
__constant__ float d_filter[MAX_FILTER_LENGTH]; // 滤波器系数存储在常量内存
// CUDA核函数,使用共享内存优化
__global__ void fir_kernel(const float* input, float* output, int N, int M) {
extern __shared__ float s_window[];
const int tid = threadIdx.x;
const int bid = blockIdx.x;
const int gid = bid * blockDim.x + tid;
// 每个块加载自己需要的输入窗口
const int window_size = blockDim.x + MAX_FILTER_LENGTH - 1;
const int input_offset = bid * blockDim.x;
// 协作加载数据到共享内存
for(int i = tid; i < window_size; i += blockDim.x) {
const int load_pos = input_offset + i;
s_window[i] = (load_pos < N + M - 1) ? input[load_pos] : 0.0f;
}
__syncthreads();
if(gid < N) { // 确保不越界
float sum = 0.0f;
for(int j = 0; j < M; j++) {
sum += s_window[tid + j] * d_filter[j];
}
output[gid] = sum;
}
}
void push_and_filter(CircularBuffer* state, const float* input, int input_len,
float** output, int* output_len, int M, cudaStream_t stream) {
// 准备扩展输入缓冲区(历史数据 + 新输入)
const int total_len = state->size + input_len;
float* extended_input = (float*)malloc(total_len * sizeof(float));
// 合并历史数据和当前输入
memcpy(extended_input, state->buffer, state->size * sizeof(float));
memcpy(extended_input + state->size, input, input_len * sizeof(float));
// 准备设备内存
float *d_input, *d_output;
const int output_size = input_len;
cudaMalloc(&d_input, total_len * sizeof(float));
cudaMalloc(&d_output, output_size * sizeof(float));
// 异步内存拷贝和计算
cudaMemcpyAsync(d_input, extended_input, total_len * sizeof(float),
cudaMemcpyHostToDevice, stream);
const dim3 grid_dim((output_size + BLOCK_SIZE - 1) / BLOCK_SIZE);
const dim3 block_dim(BLOCK_SIZE);
const size_t shared_mem = (BLOCK_SIZE + MAX_FILTER_LENGTH - 1) * sizeof(float);
fir_kernel<<<grid_dim, block_dim, shared_mem, stream>>>(
d_input, d_output, output_size, M
);
// 回收历史数据(保留最后M-1个样本)
free(state->buffer);
state->size = M - 1;
state->buffer = (float*)malloc(state->size * sizeof(float));
memcpy(state->buffer, extended_input + input_len, state->size * sizeof(float));
// 异步获取结果
*output = (float*)malloc(output_size * sizeof(float));
cudaMemcpyAsync(*output, d_output, output_size * sizeof(float),
cudaMemcpyDeviceToHost, stream);
cudaStreamSynchronize(stream);
*output_len = output_size;
free(extended_input);
cudaFree(d_input);
cudaFree(d_output);
}
int main() {
// 初始化滤波器系数(示例值)
const int M = 512;
float h_filter[M];
for(int i = 0; i < M; i++) h_filter[i] = 1.0f/M;
cudaMemcpyToSymbol(d_filter, h_filter, M * sizeof(float));
// 创建CUDA流和状态缓冲区
cudaStream_t stream;
cudaStreamCreate(&stream);
CircularBuffer state = {NULL, 0};
FILE *fin = fopen("test.pcm", "rb");
FILE *fout = fopen("out.pcm", "wb");
float *input_buf = (float*)malloc(OPTIMAL_N * sizeof(float));
while(1) {
const size_t read_cnt = fread(input_buf, sizeof(float), OPTIMAL_N, fin);
if(read_cnt == 0) break;
float* output_buf;
int output_len;
push_and_filter(&state, input_buf, read_cnt, &output_buf, &output_len, M, stream);
fwrite(output_buf, sizeof(float), output_len, fout);
free(output_buf);
}
// 清理资源
free(input_buf);
free(flush_buf);
free(state.buffer);
fclose(fin);
fclose(fout);
cudaStreamDestroy(stream);
return 0;
}
而后,利用多线程修改,多次互动后,得到最终代码:
3.2 最终代码
#include <stdio.h>
#include <stdlib.h>
#include <cuda_runtime.h>
#include <atomic>
#include <chrono>
#include <thread>
#define MAX_FILTER_LENGTH 2048
#define OPTIMAL_N 1024*1024 // Optimal read block size based on GPU architecture
#define BLOCK_SIZE 256 // CUDA thread block size
#define NUM_STREAMS 4
#define CHECK_CUDA(call) { \
cudaError_t err = call; \
if (err != cudaSuccess) { \
fprintf(stderr, "CUDA error at %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
}
typedef struct {
float* buffer;
int size;
int max_size;
} CircularBuffer;
__constant__ float d_filter[MAX_FILTER_LENGTH]; // Filter coefficients in constant memory
// CUDA kernel with shared memory optimization
__global__ void fir_kernel(const float* input, float* output, int N, int M) {
extern __shared__ float s_window[];
const int tid = threadIdx.x;
const int bid = blockIdx.x;
const int gid = bid * blockDim.x + tid;
// Each block loads its needed input window
const int window_size = blockDim.x + MAX_FILTER_LENGTH - 1;
const int input_offset = bid * blockDim.x;
// Cooperative data loading into shared memory
for(int i = tid; i < window_size; i += blockDim.x) {
const int load_pos = input_offset + i;
s_window[i] = (load_pos < N + M - 1) ? input[load_pos] : 0.0f;
}
__syncthreads();
if(gid < N) { // Ensure we don't go out of bounds
float sum = 0.0f;
for(int j = 0; j < M; j++) {
sum += s_window[tid + j] * d_filter[j];
}
output[gid] = sum;
}
}
//用于拼接前序状态的输入缓存
int mem_ext_input_size = 0;
float* mem_ext_input = 0;
//用于CUDA设备侧的输入输出缓存
float *d_input = 0, *d_output = 0;
int d_inputlen = 0, d_outputlen = 0;
std::atomic<bool> read_finished(false),deal_finished(false);
std::atomic<long long> read_pos(0);
std::atomic<long long> deal_pos(0);
std::atomic<long long> write_pos(0);
//连续滤波函数
void push_and_filter(CircularBuffer* state, const float* input, int input_len,
float* output, int * outlen, int M, cudaStream_t stream) {
// Prepare extended input buffer (history data + new input)
int ext_input_len = state->size + input_len;
if (mem_ext_input_size < ext_input_len) {
if (mem_ext_input)
free(mem_ext_input);
mem_ext_input = (float*)malloc(ext_input_len * sizeof(float));
mem_ext_input_size = ext_input_len;
}
// Combine history data with current input
memcpy(mem_ext_input, state->buffer, state->size * sizeof(float));
memcpy(mem_ext_input + state->size, input, input_len * sizeof(float));
// Prepare device memory
const int output_size = input_len;
if (d_inputlen < ext_input_len) {
if (d_input)
cudaFree(d_input);
CHECK_CUDA(cudaMalloc(&d_input, ext_input_len * sizeof(float)));
d_inputlen = ext_input_len;
}
if (d_outputlen < output_size) {
if (d_output)
cudaFree(d_output);
CHECK_CUDA(cudaMalloc(&d_output, output_size * sizeof(float)));
d_outputlen = output_size;
}
// Synchronous memory copy
//CHECK_CUDA(cudaMemcpy(d_input, mem_ext_input, ext_input_len * sizeof(float), cudaMemcpyHostToDevice));
// Async memory operations with stream
CHECK_CUDA(cudaMemcpyAsync(d_input, mem_ext_input, ext_input_len * sizeof(float),
cudaMemcpyHostToDevice, stream));
const dim3 grid_dim((output_size + BLOCK_SIZE - 1) / BLOCK_SIZE);
const dim3 block_dim(BLOCK_SIZE);
const size_t shared_mem = (BLOCK_SIZE + MAX_FILTER_LENGTH - 1) * sizeof(float);
// Kernel launch with default stream
fir_kernel<<<grid_dim, block_dim, shared_mem ,stream>>>(
d_input, d_output, output_size, M
);
// Recycle history data (keep last M-1 samples)
state->size = M - 1;
if (state->max_size < state->size)
{
if (state->buffer )
free (state->buffer );
state->buffer = (float*)malloc(state->size * sizeof(float));
state->max_size = state->size;
}
memcpy(state->buffer, mem_ext_input + input_len, state->size * sizeof(float));
//CHECK_CUDA(cudaMemcpy(output, d_output, output_size * sizeof(float), cudaMemcpyDeviceToHost));
CHECK_CUDA(cudaMemcpyAsync(output, d_output, output_size * sizeof(float),
cudaMemcpyDeviceToHost, stream));
*outlen = output_size;
CHECK_CUDA(cudaDeviceSynchronize());
return;
}
int main() {
// Initialize filter coefficients (example values)
const int M = 512;
float h_filter[M];
for(int i = 0; i < M; i++) h_filter[i] = 1.0f/M;
CHECK_CUDA(cudaMemcpyToSymbol(d_filter, h_filter, M * sizeof(float)));
// Initialize state buffer
CircularBuffer state = {NULL, 0 , 0};
const int buf_stage = 8;
float (*input_buf)[OPTIMAL_N] = new float [buf_stage][OPTIMAL_N];
float (*output_buf)[OPTIMAL_N] = new float [buf_stage][OPTIMAL_N];
size_t (*input_len) = new size_t [buf_stage];
int (*output_len) = new int [buf_stage];
//Read thread
std::thread read_thread([&]()->void{
FILE *fin = fopen("test.pcm", "rb");
while(!read_finished) {
if (deal_pos + buf_stage - 2 <= read_pos)
{
//Sleep
std::chrono::milliseconds dura(1);
std::this_thread::sleep_for(dura);
continue;
}
const long long pos = read_pos % buf_stage;
//Read
input_len[pos] = fread(input_buf[pos], sizeof(float), OPTIMAL_N, fin);
if(input_len[pos] < OPTIMAL_N)
read_finished = true;
if(++read_pos % 10 ==0)
fprintf(stderr,"ReadPos = %lld\n",(long long)read_pos);
}
fclose(fin);
});
//write thread
std::thread write_thread([&]()->void{
FILE *fout = fopen("out.pcm", "wb");
while((!deal_finished) || write_pos < deal_pos) {
if (write_pos >= deal_pos)
{
//Sleep
std::chrono::milliseconds dura(1);
std::this_thread::sleep_for(dura);
continue;
}
const long long pos = write_pos % buf_stage;
if (output_len[pos])
fwrite(output_buf[pos], sizeof(float), output_len[pos], fout);
if (++write_pos % 10 ==0)
fprintf(stderr,"WritePos = %lld\n",(long long)write_pos);
}
fclose(fout);
});
cudaStream_t streams[buf_stage];
for (int i = 0; i < buf_stage; i++) {
cudaStreamCreate(&streams[i]);
}
//Deal in main Thread
while ((!read_finished) || deal_pos < read_pos )
{
if (deal_pos >= read_pos)
{
//Sleep
std::chrono::milliseconds dura(1);
std::this_thread::sleep_for(dura);
continue;
}
const long long pos = deal_pos % buf_stage;
cudaStream_t stream = streams[pos % NUM_STREAMS];
push_and_filter(&state, input_buf[pos], input_len[pos], output_buf[pos], &(output_len[pos]), M,stream);
if (++deal_pos % 10==0)
fprintf(stderr,"DealPos = %lld\n",(long long)deal_pos);
}
deal_finished = true;
read_thread.join();
write_thread.join();
fprintf(stderr,"Deal Finished.\n");
if (state.buffer) free(state.buffer);
if (mem_ext_input) free(mem_ext_input);
if (d_input) CHECK_CUDA(cudaFree(d_input));
if (d_output) CHECK_CUDA(cudaFree(d_output));
delete []input_buf;
delete []output_buf;
delete []input_len;
delete []output_len;
for (int i = 0; i < NUM_STREAMS; i++) {
CHECK_CUDA(cudaStreamDestroy(streams[i]));
}
return 0;
}
4. 体会
从不熟悉CUDA,到使用CUDA完成了性能还乏善可陈的滤波器,用了4个小时。同时,通过询问DeepSeek如何理解stream流式处理、如何使用Nvidia Visual Profiler的优化指南,了解到很多CUDA相关的周边知识。可见,AI已经使得我这个50岁的人都能学到新东西,更别提年轻人了。现在是只有想不到,没有做不到。