概述
实现一个CSV文件加载器,支持并行处理和多种配置选项。主要特点包括:
- 支持多种编码自动检测
- 可配置的分隔符和数据处理选项
- 串行和并行两种处理模式
- 顺序保持选项
- 健壮的错误处理
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
public static class CsvDataLoader
{
public class Options
{
public Encoding Encoding { get; set; } = null;
public char Delimiter { get; set; } = ',';
public bool HasHeader { get; set; } = true;
public int ExpectedColumnCount { get; set; } = 7;
public bool TrimValues { get; set; } = true;
public int DegreeOfParallelism { get; set; } = -1;
public bool PreserveOrder { get; set; } = true; // 新增:保持原始顺序
}
public static IReadOnlyList<CsvData> LoadFromFile(string filePath, Options options = null)
{
options ??= new Options();
if (!File.Exists(filePath))
throw new FileNotFoundException($"File not found: {filePath}", filePath);
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
try
{
var encoding = options.Encoding ?? DetectFileEncoding(filePath);
var lines = File.ReadLines(filePath, encoding).ToList(); // 改为立即加载所有行
return options.DegreeOfParallelism > 1
? ProcessLinesInParallel(lines, options)
: ProcessLinesSequentially(lines, options);
}
catch (Exception ex) when (ex is not InvalidOperationException)
{
throw new InvalidOperationException($"Error loading CSV file: {ex.Message}", ex);
}
}
private static IReadOnlyList<CsvData> ProcessLinesSequentially(List<string> lines, Options options)
{
var result = new List<CsvData>();
foreach (var line in lines.Skip(options.HasHeader ? 1 : 0))
{
if (string.IsNullOrWhiteSpace(line)) continue;
var item = ParseLine(line, options);
if (item != null)
{
item.ProcessData();
result.Add(item);
}
}
return result.AsReadOnly();
}
private static IReadOnlyList<CsvData> ProcessLinesInParallel(List<string> lines, Options options)
{
var linesToProcess = lines
.Skip(options.HasHeader ? 1 : 0)
.Where(line => !string.IsNullOrWhiteSpace(line))
.ToList();
if (options.PreserveOrder)
{
// 保持顺序的并行处理
var results = new CsvData[linesToProcess.Count];
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = options.DegreeOfParallelism
};
Parallel.For(0, linesToProcess.Count, parallelOptions, i =>
{
var item = ParseLine(linesToProcess[i], options);
if (item != null)
{
item.ProcessData();
results[i] = item;
}
});
return results.Where(item => item != null).ToList().AsReadOnly();
}
else
{
// 不保持顺序的更快处理
var results = new ConcurrentBag<CsvData>();
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = options.DegreeOfParallelism
};
Parallel.ForEach(linesToProcess, parallelOptions, line =>
{
var item = ParseLine(line, options);
if (item != null)
{
item.ProcessData();
results.Add(item);
}
});
return results.ToList().AsReadOnly();
}
}
private static CsvData ParseLine(string line, Options options)
{
try
{
var columns = SplitCsvLine(line, options.Delimiter);
if (columns.Length < options.ExpectedColumnCount)
return null;
return new CsvData
{
Group = options.TrimValues ? columns[0].Trim() : columns[0],
Class = options.TrimValues ? columns[1].Trim() : columns[1],
Comm = options.TrimValues ? columns[2].Trim() : columns[2],
Name = options.TrimValues ? columns[3].Trim() : columns[3],
Type = options.TrimValues ? columns[4].Trim() : columns[4],
Units = options.TrimValues ? columns[5].Trim() : columns[5],
Addr = options.TrimValues ? columns[6].Trim() : columns[6]
};
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Error parsing line: {line}. Error: {ex.Message}");
return null;
}
}
private static string[] SplitCsvLine(string line, char delimiter)
{
// 简单实现,不考虑引号包裹的字段中包含分隔符的情况
return line.Split(new[] { delimiter }, StringSplitOptions.None);
}
private static Encoding DetectFileEncoding(string filePath)
{
var bom = new byte[4];
using (var file = new FileStream(filePath, FileMode.Open, FileAccess.Read))
{
file.Read(bom, 0, 4);
}
if (bom[0] == 0xef && bom[1] == 0xbb && bom[2] == 0xbf) return Encoding.UTF8;
if (bom[0] == 0xff && bom[1] == 0xfe) return Encoding.Unicode;
if (bom[0] == 0xfe && bom[1] == 0xff) return Encoding.BigEndianUnicode;
if (bom[0] == 0 && bom[1] == 0 && bom[2] == 0xfe && bom[3] == 0xff) return Encoding.UTF32;
try
{
var sampleText = File.ReadAllText(filePath, Encoding.UTF8);
if (sampleText.Contains("℃") || sampleText.Contains("°C"))
return Encoding.UTF8;
}
catch { }
return Encoding.GetEncoding("GBK");
}
}
代码结构分析
1. Options 配置类
public class Options
{
public Encoding Encoding { get; set; } = null; // 自动检测编码
public char Delimiter { get; set; } = ','; // 分隔符
public bool HasHeader { get; set; } = true; // 是否有标题行
public int ExpectedColumnCount { get; set; } = 7; // 预期列数
public bool TrimValues { get; set; } = true; // 是否修剪值
public int DegreeOfParallelism { get; set; } = -1; // 并行度
public bool PreserveOrder { get; set; } = true; // 保持顺序
}
要点:
• 使用配置类封装所有可配置选项,避免方法参数过多
• 提供合理的默认值简化调用
• 并行度-1表示使用所有可用核心
2. 主加载方法 LoadFromFile
public static IReadOnlyList<CsvData> LoadFromFile(string filePath, Options options = null)
{
// 参数检查和初始化
var encoding = options.Encoding ?? DetectFileEncoding(filePath);
var lines = File.ReadLines(filePath, encoding).ToList();
return options.DegreeOfParallelism > 1
? ProcessLinesInParallel(lines, options)
: ProcessLinesSequentially(lines, options);
}
要点:
• 使用空值合并运算符(??)提供默认值
• 立即将行加载到内存(.ToList())确保并行处理安全
• 根据并行度自动选择处理方式
3. 串行处理 ProcessLinesSequentially
private static IReadOnlyList<CsvData> ProcessLinesSequentially(List<string> lines, Options options)
{
var result = new List<CsvData>();
foreach (var line in lines.Skip(options.HasHeader ? 1 : 0))
{
if (string.IsNullOrWhiteSpace(line)) continue;
var item = ParseLine(line, options);
if (item != null)
{
item.ProcessData();
result.Add(item);
}
}
return result.AsReadOnly();
}
要点:
• 显式循环比LINQ更易调试
• 跳过标题行和空行的处理
• 返回只读集合确保安全性
4. 并行处理 ProcessLinesInParallel
private static IReadOnlyList<CsvData> ProcessLinesInParallel(List<string> lines, Options options)
{
var linesToProcess = lines.Skip(options.HasHeader ? 1 : 0)
.Where(line => !string.IsNullOrWhiteSpace(line))
.ToList();
if (options.PreserveOrder)
{
// 保持顺序的并行处理
var results = new CsvData[linesToProcess.Count];
Parallel.For(0, linesToProcess.Count, parallelOptions, i =>
{
var item = ParseLine(linesToProcess[i], options);
if (item != null)
{
item.ProcessData();
results[i] = item;
}
});
return results.Where(item => item != null).ToList().AsReadOnly();
}
else
{
// 不保持顺序的更快处理
var results = new ConcurrentBag<CsvData>();
Parallel.ForEach(linesToProcess, parallelOptions, line =>
{
var item = ParseLine(line, options);
if (item != null)
{
item.ProcessData();
results.Add(item);
}
});
return results.ToList().AsReadOnly();
}
}
要点:
• 两种并行策略:保持顺序(Parallel.For)和不保持顺序(Parallel.ForEach)
• 预分配数组保持顺序,ConcurrentBag不保持顺序
• 并行选项控制最大并发数
5. 行解析 ParseLine
private static CsvData ParseLine(string line, Options options)
{
try
{
var columns = SplitCsvLine(line, options.Delimiter);
if (columns.Length < options.ExpectedColumnCount) return null;
return new CsvData
{
// 使用配置的TrimValues选项处理每个字段
};
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"Error parsing line: {line}. Error: {ex.Message}");
return null;
}
}
要点:
• 防御性编程:检查列数、异常处理
• 记录解析错误但不中断处理
• 统一应用TrimValues配置