背景
场景:有一批设备数据上报到系统,通过rabbitmq进行做消费队列,每15分钟一报,每次报2w条,
如果一条条接收,每接收一条耗时0.5秒,那2w条就要2.7小时,这也太久了......那如何优化呢,要求搜索时长,且mq的数据不要重复消费
原理:
dotcap原理:这里略过 ,可以自行去了解。这里引出 就是抛弃mq的失败ack机制
IOT上报数据--->IOT平台--->RabbitMQ----->消费MQ----->Redis---->消费Redis的数据
↑
定时任务去触发(不足100条时)
采用多任务批处理机制:
1)每次从Redis哈希表中取出100条数据;
2)分页并行处理(每页20条);
3)处理成功后删除缓存。同时设置定时任务(每20秒)处理不足100条的残留数据,确保数据及时性。
方案通过Redis分布式锁防止重复消费,实现了处理效率从小时级到秒级的提升,且保证数据不丢失不重复。
步骤
step1:dotcap:接收mq的消息
[NonAction]
[CapSubscribe("xxx.device", Group = "xxx.device")]
public void SubscribeMQAsync(object obj)
{
var msg = "";
var dt = DateTime.Now;
try
{
//将接收到的mq消息直接塞到redis的哈希里
RedisHelper.AddH(CommCacheConfig.MQ_ELEC_KEY, Guid.NewGuid().ToStr(), obj);
}
catch (Exception ex)
{
msg += $"异常,参数:{str}=>{ex.ToStr()}耗时:" + (DateTime.Now - dt).TotalSeconds;
throw ex;
}
}
step2:处理逻辑(核心逻辑)
public async Task Save(MQData<MQData> data = null, bool isApi = false)
{
var isStill = false;
var msg2=string.Empty;
try
{
msg2 += "锁前=> ";
var flag = RedisHelper.SetNX(CommCacheConfig.MQ_ELEC_KEY + "_Lock", DateTime.Now.ToStr(), 240);
if (flag)
{
msg2 += "锁中=> ";
var dic = RedisCacheHelper.HGetAllAsync<MQData<MQData>>(CommCacheConfig.MQ_ELEC_KEY).GetAwaiter().GetResult();
if (dic.Count() >= 100) dic = dic.Take(100).ToDictionary(x => x.Key, x => x.Value);
isStill=dic.Count() > 0;//如果超过100条代表还有 这里打个表计表示继续
if (data != null)
{
if (dic == null) dic = new Dictionary<string, MQData<MQData>>();
dic.Add(Guid.NewGuid().ToStr(), data);
}
if (dic.Count() == 0) return ResParameter.Fail("无数据");
var tasks = new List<Task>();
var pageSize = 20;
var totalPage = dic.Count() / pageSize + dic.Count() % pageSize > 0 ? 1 : 0;
if (isApi)
msg2 += "isApi=> ";
//这个改多任务处理
var dt=DateTime.Now;
var pages = new List<int>();
for (var page = 1; page <= totalPage; page++)
{
var dic2 = dic.Skip((page - 1) * pageSize).Take(pageSize).ToDictionary(x => x.Key, x => x.Value);
tasks.Add(Task.Run(() => Handle(dic2)));
}
//等待处理完
await Task.WhenAll(tasks);
msg2 += $"Task.WhenAll{(DateTime.Now-dt).TotalSeconds}秒=> ";
}
}
catch (Exception ex)
{
msg2 += $"ex:{ex.Message}=>";
}
finally
{
RedisHelper.Remove(CommCacheConfig.MQ_ELEC_KEY + "_Lock");
msg2 += $"finally--end";
LogHelper.Info("Elec", msg2);
// 如果当前缓存还有 ,就继续处理
if (isStill)await Save(null, isApi);
}
}
///保存数据库的操作 并删除掉缓存的key
private async Task<int> Handle(Dictionary<string, MQData<MQData>> dic)
{
var fields = new List<string>();
using (DataFilter.Disable<IMultiTenant>())
{
using (var uow = UnitOfWorkManager.Begin(requiresNew: true, isTransactional: true))
{
var service = IocManager.Instance.GetService<XXXManager>();
fields = await service.TaskAsync(dic);
await uow.CompleteAsync();
}
}
if (fields.Count > 0)
RedisHelper.DelH(CommCacheConfig.MQ_ELEC_KEY, fields.ToArray());
return fields.Count;
}
Step3:加定时任务:防止不足100条的时候没有处理机制
/// <summary>
/// MQ会推到 消费者那里 然后缓存到redis ,然后每1分钟把redis(不足100条的)的保存到数据库里
/// </summary>
[DisallowConcurrentExecution]
[TaskCustomAttribute("每次100条定时任务","xxx主题", "0/20 * * * * ?")]
public class DeviceMqJob : IJob
{
readonly IHttpClientFactory httpClientFactory;
public ElectMqJob(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
/// <returns></returns>
public async Task Execute(IJobExecutionContext context)
{
try
{
using (var clinet = new HttpClient())
{
//触发机制 这里走的是http就触发刚刚的逻辑
var url = HostLocalUrl + "/api/Save";
var r = await httpClientFactory.HttpSendAsync(HttpMethod.Post, url);
}
}
catch (Exception ex)
{
LogHelper.Error(ex);
}
}
}