【请关注】关于VC++实现使用Redis不同方法,有效达到 Redis 性能优化、防击穿

发布于:2025-05-30 ⋅ 阅读:(14) ⋅ 点赞:(0)

麻烦先关注方便了解最新分享。

 

关于VC++实现使用Redis不同方法,有效达到 Redis 性能优化、防击穿。

 

一、性能优化核心方法

 

1. 使用连接池复用连接

 

// 连接池结构体

struct RedisPool {

    hiredisContext** connections;

    int size;

    sem_t sem; // 信号量控制并发

};

 

// 初始化连接池

RedisPool* create_redis_pool(const char* host, int port, int pool_size) {

    RedisPool* pool = (RedisPool*)malloc(sizeof(RedisPool));

    pool->size = pool_size;

    pool->connections = (hiredisContext**)malloc(pool_size * sizeof**iredisContext*));

    sem_init(&pool->sem, 0, pool_size); // 初始化信号量

 

    for (int i = 0; i < pool_size; i++) {

        pool->connections[i] = redisConnect(host, port);

        if (pool->connections[i]->err) {

            // 处理连接失败

            redisFree(pool->connections[i]);

            exit(1);

        }

    }

    return pool;

}

 

// 从连接池获取连接

hiredisContext* get_connection(RedisPool* pool) {

    sem_wait(&pool->sem); // 等待可用连接

    for (int i = 0; i < pool->size; i++) {

        if (pool->connections[i]->err == 0) { // 检查连接健康状态

            return pool->connections[i];

        }

    }

    return NULL; // 理论上信号量控制下不会出现

}

 

 

原理:避免频繁创建/销毁连接(单次连接耗时约 1-3ms),连接池大小建议为  CPU核心数*2+1 。

 

2. Pipeline 批量操作

 

void pipeline_example(RedisPool* pool) {

    hiredisContext* conn = get_connection(pool);

    redisBuffer* buf = redisBufferCreate(); // 创建 Pipeline 缓冲区

 

    // 批量添加命令

    redisAppendCommand(buf, "SET key1 %s", "value1");

    redisAppendCommand(buf, "SET key2 %s", "value2");

    redisAppendCommand(buf, "GET key1");

 

    // 一次性发送所有命令

    size_t len = redisBufferLength(buf);

    if (redisWrite(conn, buf->data, len) != REDIS_OK) {

        // 处理写入失败

        redisFree(conn);

        return;

    }

 

    // 读取所有结果

    redisReply* reply;

    int count = 3; // 命令数量

    for (int i = 0; i < count; i++) {

        if (redisGetReply(conn, (void**)&reply) == REDIS_OK) {

            // 处理结果

            if (reply->type == REDIS_REPLY_STRING) {

                printf("GET result: %s\n", reply->str);

            }

            freeReplyObject(reply);

        }

    }

    redisBufferFree(buf);

    sem_post(&pool->sem); // 归还连接

}

 

 

效果:1000 次单命令耗时约 80ms,Pipeline 批量处理仅需 2ms(减少网络往返开销)。

 

3. 使用异步 API(libevent 异步库)

 

#include <event2/event.h>

#include <hiredis/adapters/libevent.h>

 

void async_callback(redisAsyncContext* ctx, void* reply, void* privdata) {

    redisReply* r = (redisReply*)reply;

    if (r->type == REDIS_REPLY_STRING) {

        printf("Async GET: %s\n", r->str);

    }

}

 

void async_example() {

    struct event_base* base = event_base_new();

    redisAsyncContext* ctx = redisAsyncConnect("127.0.0.1", 6379);

    

    if (ctx->err) {

        // 处理连接失败

        return;

    }

    

    // 绑定异步事件

    redisLibeventAttach(ctx, base);

    redisAsyncCommand(ctx, async_callback, NULL, "GET key1");

    

    event_base_dispatch(base); // 进入事件循环

    event_base_free(base);

    redisAsyncFree(ctx);

}

 

 

场景:适用于非阻塞业务(如日志采集、消息队列消费),单线程可处理上万并发请求。

 

二、防击穿/雪崩核心方案

 

4. 布隆过滤器拦截无效请求

 

// 布隆过滤器(基于 Redis BitMap)

void bloom_filter_add(RedisPool* pool, const char* key) {

    hiredisContext* conn = get_connection(pool);

    // 使用 SHA-1 生成 2 个哈希值

    unsigned char hash1[20], hash2[20];

    SHA1((unsigned char*)key, strlen(key), hash1);

    SHA1((unsigned char*)key, strlen(key), hash2);

 

    // 计算两个 bit 位置(假设布隆长度 1e6,哈希数 2)

    long bit1 = ((((uint64_t)hash1[0]) << 56) | ((uint64_t)hash1[1] << 48) | ...) % 1000000;

    long bit2 = ((((uint64_t)hash2[0]) << 56) | ...) % 1000000;

 

    // 设置 bit 位

    redisCommand(conn, "SETBIT bloom_filter %ld 1", bit1);

    redisCommand(conn, "SETBIT bloom_filter %ld 1", bit2);

    sem_post(&pool->sem);

}

 

bool bloom_filter_check(RedisPool* pool, const char* key) {

    // 类似 add 逻辑计算 bit1/bit2

    // 检查两个 bit 位是否都为 1

    long res1 = (long)redisCommand(conn, "GETBIT bloom_filter %ld", bit1);

    long res2 = (long)redisCommand(conn, "GETBIT bloom_filter %ld", bit2);

    return res1 == 1 && res2 == 1;

}

 

 

参数:误判率建议设为 0.1%,布隆长度 =  (条目数 * -ln(误判率)) / (ln2)^2 。

 

5. 互斥锁(RedLock)防止缓存击穿

 

// RedLock 实现(简化版)

#define LOCK_EXPIRE 5000 // 锁过期时间(ms)

#define LOCK_RETRY 500 // 重试间隔(ms)

 

bool redlock_acquire(RedisPool* pool, const char* lock_key, const char* client_id) {

    time_t start = time(NULL);

    while (time(NULL) - start < LOCK_EXPIRE) {

        // 尝试获取锁(NX+PX 保证原子性)

        hiredisContext* conn = get_connection(pool);

        redisReply* reply = (redisReply*)redisCommand(conn, 

            "SET %s %s NX PX %d", lock_key, client_id, LOCK_EXPIRE);

        sem_post(&pool->sem);

        

        if (reply && reply->type == REDIS_REPLY_STRING && strcmp(reply->str, "OK") == 0) {

            freeReplyObject(reply);

            return true;

        }

        freeReplyObject(reply);

        usleep(LOCK_RETRY * 1000); // 等待重试

    }

    return false;

}

 

void redlock_release(RedisPool* pool, const char* lock_key, const char* client_id) {

    hiredisContext* conn = get_connection(pool);

    // 验证客户端 ID 再释放(避免误删其他锁)

    redisCommand(conn, "if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1]) else return 0 end",

        lock_key, client_id);

    sem_post(&pool->sem);

}

 

 

注意:RedLock 需至少 3 个 Redis 实例保证分布式锁可靠性,获取锁需超过半数节点成功。

 

6. 热点数据本地缓存(如 LRU 机制)

 

// 基于 std::unordered_map 的 LRU 缓存

struct LRUItem {

    std::string value;

    time_t timestamp;

};

 

class LRUCache {

private:

    std::unordered_map<std::string, LRUItem> cache;

    size_t max_size;

    time_t ttl; // 过期时间(秒)

 

public:

    LRUCache(size_t max_size, time_t ttl) : max_size(max_size), ttl(ttl) {}

 

    bool get(const std::string& key, std::string& value) {

        auto it = cache.find(key);

        if (it == cache.end()) return false;

        if (time(NULL) - it->second.timestamp > ttl) {

            cache.erase(it); // 过期淘汰

            return false;

        }

        // 更新访问时间

        it->second.timestamp = time(NULL);

        value = it->second.value;

        return true;

    }

 

    void set(const std::string& key, const std::string& value) {

        if (cache.size() >= max_size) {

            // 淘汰最久未使用项(简化实现,实际可用 list+map)

            auto oldest = std::min_element(cache.begin(), cache.end(),

                [](const auto& a, const auto& b) {

                    return a.second.timestamp < b.second.timestamp;

                });

            cache.erase(oldest);

        }

        cache[key] = {value, time(NULL)};

    }

};

 

// 使用示例

LRUCache local_cache(1000, 300); // 最多存 1000 条,5 分钟过期

std::string value;

if (!local_cache.get("hot_key", value)) {

    // 从 Redis 加载并写入本地缓存

    hiredisContext* conn = get_connection(pool);

    redisReply* reply = (redisReply*)redisCommand(conn, "GET hot_key");

    if (reply && reply->type == REDIS_REPLY_STRING) {

        value = reply->str;

        local_cache.set("hot_key", value);

    }

    freeReplyObject(reply);

    sem_post(&pool->sem);

}

 

 

场景:适合 QPS > 1w 的热点数据(如商品详情页),本地缓存命中率需维持在 80% 以上。

 

三、其他优化手段(附代码片段)

 

7. 避免大 Key(控制 Value 大小 < 100KB)

 

// 写入前校验 Value 大小

void check_big_value(const char* value, size_t max_size) {

    size_t len = strlen(value);

    if (len > max_size) {

        throw std::runtime_error("Value exceeds 100KB limit");

    }

}

 

 

8. 使用压缩编码(LZ4 压缩大 Value)

 

#include <lz4.h>

 

std::string compress_value(const std::string& raw) {

    char* compressed = (char*)malloc(LZ4_compressBound(raw.size()));

    int compressed_size = LZ4_compress_default(raw.data(), compressed, raw.size());

    std::string result(compressed, compressed_size);

    free(compressed);

    return result;

}

 

std::string decompress_value(const std::string& compressed, size_t original_size) {

    char* decompressed = (char*)malloc(original_size);

    int decompressed_size = LZ4_decompress_safe(compressed.data(), decompressed, compressed.size(), original_size);

    std::string result(decompressed, decompressed_size);

    free(decompressed);

    return result;

}

 

 

9. 异步淘汰过期 Key(定期删除 + 惰性删除)

 

// 定期删除任务(后台线程执行)

void async_expire_cleanup(RedisPool* pool) {

    while (true) {

        hiredisContext* conn = get_connection(pool);

        redisReply* reply = (redisReply*)redisCommand(conn, "SCAN 0 MATCH key:* EXIT 1000"); // 每次扫描 1000 个 Key

        if (reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) {

            redisReply* keys_reply = reply->element[1];

            for (int i = 0; i < keys_reply->elements; i++) {

                std::string key = keys_reply->element[i]->str;

                redisReply* ttl_reply = (redisReply*)redisCommand(conn, "TTL %s", key.c_str());

                if (ttl_reply->integer < 0) continue; // 无过期时间

                if (ttl_reply->integer < 60) { // 临近过期时主动淘汰

                    redisCommand(conn, "DEL %s", key.c_str());

                }

                freeReplyObject(ttl_reply);

            }

        }

        freeReplyObject(reply);

        sem_post(&pool->sem);

        sleep(60); // 每分钟执行一次

    }

}

 

 

四、完整优化配置示例(redis.conf 对应项)

 

# 性能优化配置

tcp-backlog 511 # 优化 TCP 三次握手队列

tcp-keepalive 300 # 保持连接活性(秒)

hz 100 # 提高事件循环频率(默认 10,高负载设为 100)

aof-rewrite-incremental-fsync yes # AOF 重写期间优化 fsync

 

# 防击穿配置

maxmemory 2gb # 限制内存使用

maxmemory-policy allkeys-lru # 内存不足时淘汰策略

notify-keyspace-events Ex # 开启 Key 过期通知

 

 

关键指标监控代码

 

// 实时监控 Redis 状态

void monitor_redis(RedisPool* pool) {

    hiredisContext* conn = get_connection(pool);

    redisReply* reply = (redisReply*)redisCommand(conn, "INFO stats");

    if (reply->type == REDIS_REPLY_STRING) {

        std::string info = reply->str;

        // 解析 QPS:

        size_t pos = info.find("instantaneous_ops_per_sec:");

        if (pos != std::string::npos) {

            int qps = std::stoi(info.substr(pos + 21));

            if (qps > 10000) { // 阈值告警

                send_alert("Redis QPS 过高: " + std::to_string(qps));

            }

        }

    }

    freeReplyObject(reply);

    sem_post(&pool->sem);

}

 

 

注意事项

 

1. 线程安全: hiredis  非线程安全,连接池需配合锁/信号量使用。

2. 序列化:复杂对象需先序列化为 JSON/Protocol Buffers 再存入 Redis。

3. 压测工具:用  redis-benchmark  验证优化效果(如单实例 QPS 从 5w 提升至 8w)。

 

如需某方案的深度优化或特定场景扩展(如 Redis Cluster 分布式锁)。


网站公告

今日签到

点亮在社区的每一天
去签到