1.需求:
计算滑动时间下的1小时、3小时、6小时、12小时、24小时降水数据,统计这个时间下的分钟级降水数据
2.分析
第一版本:降水分钟级数据保存时间不长,保存太多意义不大,以更新的形式来保存这些统计数据效果会比较好,使用管道先进先出的原则每5分钟取一次数据,每次只需要取近5分钟的数据即可,设置管道长度分别为12组、36组、72组,144组和288组降水数据,取够数据之后将之前第一组的数据自动去除即可
版本分析:
优点:基本能达到在一次过程中的数据统计功能,而且占用内存很小,数据整体运行比较顺畅,每次取的数据量比较小
缺点:数据统计不准确,虽然用了定时器的定数回调方法来处理数据,仍然存在数据统计不准的情况,这个问题初步判定为天擎数据的后期更新,但是管道内的数据没有更新导致数据存在差异的问题,数据的准确性不能保证,弃用
第二个版本:保留每5分钟更新一次数据的频率,将近1小时、3小时、6小时、12小时、24小时降水数据直接进行统计,统计后的数据保存下来即可
版本分析:
优点:数据的准确性能保证,数据没有差异
缺点:数据的体量比较大,内存占用比较大,在统计过程中存在内存占用很高的情况
版本更新:
在每次定时回调统计数据的方法中将每次调用后的法中对已经弃用的一些变量和占用内存较高的变量进行GC处理
更新分析:
优点:数据统计的内存占比在每次统计完成后会清理掉,占用的内存会比较小
缺点:每次都要统计很多数据,有些站点的数据在处理的时候会在时间段内漏报,在统计时会出现站点报错,导致站点数据统计不上的情况,在统计的时候加上判断
最终效果:
using System.Data;
using System.Data.SqlClient;
using System.Text;
using CMISS_DATA_Helper;
using Model;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using NPOI.SS.Formula.Functions;
class Program
{
private static readonly Dictionary<string, decimal> hourlyAccumulator_1 = new Dictionary<string, decimal>();
private static readonly Dictionary<string, decimal> hourlyAccumulator_3 = new Dictionary<string, decimal>();
private static readonly Dictionary<string, decimal> hourlyAccumulator_6 = new Dictionary<string, decimal>();
private static readonly Dictionary<string, decimal> hourlyAccumulator_12 = new Dictionary<string, decimal>();
private static readonly Dictionary<string, decimal> hourlyAccumulator_24 = new Dictionary<string, decimal>();
private static readonly object lockObj = new object();
private static Timer timer;
private static string DbConnectionString = "Data Source=.;Initial Catalog=Meteorological_Bz;User Id=sa;Password=sa123...;MultipleActiveResultSets=True";
// 配置参数
private static int IntervalMinutes = 5;
private static int WindowSize_1 = (60 / IntervalMinutes) * 1; // 1小时 = 12 * 5分钟
private static int WindowSize_3 = (60 / IntervalMinutes) * 3;
private static int WindowSize_6 = (60 / IntervalMinutes) * 6;
private static int WindowSize_12 = (60 / IntervalMinutes) * 12;
private static int WindowSize_24 = (60 / IntervalMinutes) * 24;
private static string columns = "Station_Id_C,UPDATE_TIME,Datetime,PRE";
private static string url = "";//天擎接口
private static string AdminCode = "110000";//区划码
private static string[] stationIds = getStations();//获取站点数据
static JObject valueconfig;
static async Task Main(string[] args)
{
// 初始化站点数据(假设已知400个站点ID)
System.Text.Encoding.RegisterProvider(System.Text.CodePagesEncodingProvider.Instance);
using (System.IO.StreamReader file = new System.IO.StreamReader(AppDomain.CurrentDomain.BaseDirectory + "dataconfig.json", Encoding.UTF8))
{
valueconfig = JsonConvert.DeserializeObject<JObject>(file.ReadToEnd());
file.Close();
}
DbConnectionString = valueconfig["conn"].ToString();
if (valueconfig["stationIds"].ToString() != string.Empty) stationIds = valueconfig["stationIds"].ToString().Split(",");
columns = valueconfig["columns"].ToString();
url = valueconfig["url"].ToString();
AdminCode = valueconfig["AdminCode"].ToString();
IntervalMinutes = int.Parse(valueconfig["IntervalMinutes"].ToString());
InitializeAccumulator(stationIds);
// 初始化定时器(立即启动,每5分钟执行)
timer = new Timer(ProcessWeatherData, null, 0, IntervalMinutes * 60 * 1000);
Console.WriteLine("服务已启动,按任意键退出...");
Console.ReadKey();
timer.Dispose();
}
// 初始化累加器(所有站点归零)
private static void InitializeAccumulator(string[] stationIds)
{
lock (lockObj)
{
Console.WriteLine("初始化站点!");
foreach (var id in stationIds)
{
hourlyAccumulator_1[id] = 0;
hourlyAccumulator_3[id] = 0;
hourlyAccumulator_6[id] = 0;
hourlyAccumulator_12[id] = 0;
hourlyAccumulator_24[id] = 0;
}
}
}
// 定时器回调方法
private static async void ProcessWeatherData(object state)
{
try
{
InitializeAccumulator(stationIds);
// 1. 获取API数据
var currentData_1 = await FetchWeatherDataAsync(60 * 1);
var currentData_3 = await FetchWeatherDataAsync(60 * 3);
var currentData_6 = await FetchWeatherDataAsync(60 * 6);
var currentData_12 = await FetchWeatherDataAsync(60 * 12);
var currentData_24 = await FetchWeatherDataAsync(60 * 24);
lock (lockObj)
{
try
{
foreach (var kvp in currentData_1)
{
if (hourlyAccumulator_1.ContainsKey(kvp.Key))
{
hourlyAccumulator_1[kvp.Key] += kvp.Value;
}
else
{
hourlyAccumulator_1.Add(kvp.Key, kvp.Value);
}
}
//Console.WriteLine($"{DateTime.Now}: 1小时降水已统计");
}
catch (Exception ex)
{
Console.WriteLine($"1小时降水已统计异常{ex.Message}");
}
try
{
foreach (var kvp in currentData_3)
{
if (hourlyAccumulator_3.ContainsKey(kvp.Key))
{
hourlyAccumulator_3[kvp.Key] += kvp.Value;
}
else
{
hourlyAccumulator_3.Add(kvp.Key, kvp.Value);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"3小时降水已统计异常{ex.Message}");
}
try
{
foreach (var kvp in currentData_6)
{
if (hourlyAccumulator_6.ContainsKey(kvp.Key))
{
hourlyAccumulator_6[kvp.Key] += kvp.Value;
}
else
{
hourlyAccumulator_6.Add(kvp.Key, kvp.Value);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"6小时降水已统计异常{ex.Message}");
}
try
{
foreach (var kvp in currentData_12)
{
if (hourlyAccumulator_12.ContainsKey(kvp.Key))
hourlyAccumulator_12[kvp.Key] += kvp.Value;
else
{
hourlyAccumulator_12.Add(kvp.Key, kvp.Value);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"12小时降水已统计异常{ex.Message}");
}
try
{
foreach (var kvp in currentData_24)
{
if (hourlyAccumulator_24.ContainsKey(kvp.Key))
hourlyAccumulator_24[kvp.Key] += kvp.Value;
else
{
hourlyAccumulator_24.Add(kvp.Key, kvp.Value);
}
}
}
catch (Exception ex)
{
Console.WriteLine($"24小时降水已统计异常{ex.Message}");
}
// 4. 更新数据库
UpdateDatabase(hourlyAccumulator_1, hourlyAccumulator_3, hourlyAccumulator_6, hourlyAccumulator_12, hourlyAccumulator_24);
Console.WriteLine($"{DateTime.Now}:近小时数据已更新");
//5.释放内存
try
{
Console.WriteLine($"{DateTime.Now}:释放内存");
currentData_1.Clear();
currentData_1.TrimExcess();
currentData_1 = null;
currentData_3.Clear();
currentData_3.TrimExcess();
currentData_3 = null;
currentData_6.Clear();
currentData_6.TrimExcess();
currentData_6 = null;
currentData_12.Clear();
currentData_12.TrimExcess();
currentData_12 = null;
currentData_24.Clear();
currentData_24.TrimExcess();
currentData_24 = null;
hourlyAccumulator_1.Clear();
hourlyAccumulator_1.TrimExcess();
hourlyAccumulator_3.Clear();
hourlyAccumulator_3.TrimExcess();
hourlyAccumulator_6.Clear();
hourlyAccumulator_6.TrimExcess();
hourlyAccumulator_12.Clear();
hourlyAccumulator_12.TrimExcess();
hourlyAccumulator_24.Clear();
hourlyAccumulator_24.TrimExcess();
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
}
catch (Exception ex)
{
Console.WriteLine($"处理失败: {ex.Message}");
}
}
private static async Task<Dictionary<string, decimal>> FetchWeatherDataAsync(int minute){
//获取天擎数据,并异步返回Dictionary数据
return data;
}
private static void UpdateDatabase(Dictionary<string, decimal> data_1, Dictionary<string, decimal> data_3, Dictionary<string, decimal> data_6, Dictionary<string, decimal> data_12, Dictionary<string, decimal> data_24)
{
//更新数据库
}
}