c# 实现一个CSV文件加载器,支持并行处理和多种配置

发布于:2025-04-13 ⋅ 阅读:(19) ⋅ 点赞:(0)

概述

实现一个CSV文件加载器,支持并行处理和多种配置选项。主要特点包括:

  1. 支持多种编码自动检测
  2. 可配置的分隔符和数据处理选项
  3. 串行和并行两种处理模式
  4. 顺序保持选项
  5. 健壮的错误处理
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

public static class CsvDataLoader
{
    public class Options
    {
        public Encoding Encoding { get; set; } = null;
        public char Delimiter { get; set; } = ',';
        public bool HasHeader { get; set; } = true;
        public int ExpectedColumnCount { get; set; } = 7;
        public bool TrimValues { get; set; } = true;
        public int DegreeOfParallelism { get; set; } = -1;
        public bool PreserveOrder { get; set; } = true; // 新增:保持原始顺序
    }

    public static IReadOnlyList<CsvData> LoadFromFile(string filePath, Options options = null)
    {
        options ??= new Options();
        
        if (!File.Exists(filePath))
            throw new FileNotFoundException($"File not found: {filePath}", filePath);

        Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);

        try
        {
            var encoding = options.Encoding ?? DetectFileEncoding(filePath);
            var lines = File.ReadLines(filePath, encoding).ToList(); // 改为立即加载所有行
            
            return options.DegreeOfParallelism > 1 
                ? ProcessLinesInParallel(lines, options) 
                : ProcessLinesSequentially(lines, options);
        }
        catch (Exception ex) when (ex is not InvalidOperationException)
        {
            throw new InvalidOperationException($"Error loading CSV file: {ex.Message}", ex);
        }
    }

    private static IReadOnlyList<CsvData> ProcessLinesSequentially(List<string> lines, Options options)
    {
        var result = new List<CsvData>();
        
        foreach (var line in lines.Skip(options.HasHeader ? 1 : 0))
        {
            if (string.IsNullOrWhiteSpace(line)) continue;
            
            var item = ParseLine(line, options);
            if (item != null)
            {
                item.ProcessData();
                result.Add(item);
            }
        }
        
        return result.AsReadOnly();
    }

    private static IReadOnlyList<CsvData> ProcessLinesInParallel(List<string> lines, Options options)
    {
        var linesToProcess = lines
            .Skip(options.HasHeader ? 1 : 0)
            .Where(line => !string.IsNullOrWhiteSpace(line))
            .ToList();

        if (options.PreserveOrder)
        {
            // 保持顺序的并行处理
            var results = new CsvData[linesToProcess.Count];
            var parallelOptions = new ParallelOptions 
            { 
                MaxDegreeOfParallelism = options.DegreeOfParallelism 
            };

            Parallel.For(0, linesToProcess.Count, parallelOptions, i =>
            {
                var item = ParseLine(linesToProcess[i], options);
                if (item != null)
                {
                    item.ProcessData();
                    results[i] = item;
                }
            });

            return results.Where(item => item != null).ToList().AsReadOnly();
        }
        else
        {
            // 不保持顺序的更快处理
            var results = new ConcurrentBag<CsvData>();
            var parallelOptions = new ParallelOptions 
            { 
                MaxDegreeOfParallelism = options.DegreeOfParallelism 
            };

            Parallel.ForEach(linesToProcess, parallelOptions, line =>
            {
                var item = ParseLine(line, options);
                if (item != null)
                {
                    item.ProcessData();
                    results.Add(item);
                }
            });

            return results.ToList().AsReadOnly();
        }
    }

    private static CsvData ParseLine(string line, Options options)
    {
        try
        {
            var columns = SplitCsvLine(line, options.Delimiter);

            if (columns.Length < options.ExpectedColumnCount)
                return null;

            return new CsvData
            {
                Group = options.TrimValues ? columns[0].Trim() : columns[0],
                Class = options.TrimValues ? columns[1].Trim() : columns[1],
                Comm = options.TrimValues ? columns[2].Trim() : columns[2],
                Name = options.TrimValues ? columns[3].Trim() : columns[3],
                Type = options.TrimValues ? columns[4].Trim() : columns[4],
                Units = options.TrimValues ? columns[5].Trim() : columns[5],
                Addr = options.TrimValues ? columns[6].Trim() : columns[6]
            };
        }
        catch (Exception ex)
        {
            System.Diagnostics.Debug.WriteLine($"Error parsing line: {line}. Error: {ex.Message}");
            return null;
        }
    }

    private static string[] SplitCsvLine(string line, char delimiter)
    {
        // 简单实现,不考虑引号包裹的字段中包含分隔符的情况
        return line.Split(new[] { delimiter }, StringSplitOptions.None);
    }

    private static Encoding DetectFileEncoding(string filePath)
    {
        var bom = new byte[4];
        using (var file = new FileStream(filePath, FileMode.Open, FileAccess.Read))
        {
            file.Read(bom, 0, 4);
        }

        if (bom[0] == 0xef && bom[1] == 0xbb && bom[2] == 0xbf) return Encoding.UTF8;
        if (bom[0] == 0xff && bom[1] == 0xfe) return Encoding.Unicode;
        if (bom[0] == 0xfe && bom[1] == 0xff) return Encoding.BigEndianUnicode;
        if (bom[0] == 0 && bom[1] == 0 && bom[2] == 0xfe && bom[3] == 0xff) return Encoding.UTF32;

        try
        {
            var sampleText = File.ReadAllText(filePath, Encoding.UTF8);
            if (sampleText.Contains("℃") || sampleText.Contains("°C"))
                return Encoding.UTF8;
        }
        catch { }

        return Encoding.GetEncoding("GBK");
    }
}

代码结构分析

1. Options 配置类

public class Options
{
    public Encoding Encoding { get; set; } = null; // 自动检测编码
    public char Delimiter { get; set; } = ','; // 分隔符
    public bool HasHeader { get; set; } = true; // 是否有标题行
    public int ExpectedColumnCount { get; set; } = 7; // 预期列数
    public bool TrimValues { get; set; } = true; // 是否修剪值
    public int DegreeOfParallelism { get; set; } = -1; // 并行度
    public bool PreserveOrder { get; set; } = true; // 保持顺序
}

要点
• 使用配置类封装所有可配置选项,避免方法参数过多
• 提供合理的默认值简化调用
• 并行度-1表示使用所有可用核心

2. 主加载方法 LoadFromFile

public static IReadOnlyList<CsvData> LoadFromFile(string filePath, Options options = null)
{
    // 参数检查和初始化
    var encoding = options.Encoding ?? DetectFileEncoding(filePath);
    var lines = File.ReadLines(filePath, encoding).ToList();
    
    return options.DegreeOfParallelism > 1 
        ? ProcessLinesInParallel(lines, options) 
        : ProcessLinesSequentially(lines, options);
}

要点
• 使用空值合并运算符(??)提供默认值
• 立即将行加载到内存(.ToList())确保并行处理安全
• 根据并行度自动选择处理方式

3. 串行处理 ProcessLinesSequentially

private static IReadOnlyList<CsvData> ProcessLinesSequentially(List<string> lines, Options options)
{
    var result = new List<CsvData>();
    
    foreach (var line in lines.Skip(options.HasHeader ? 1 : 0))
    {
        if (string.IsNullOrWhiteSpace(line)) continue;
        
        var item = ParseLine(line, options);
        if (item != null)
        {
            item.ProcessData();
            result.Add(item);
        }
    }
    
    return result.AsReadOnly();
}

要点
• 显式循环比LINQ更易调试
• 跳过标题行和空行的处理
• 返回只读集合确保安全性

4. 并行处理 ProcessLinesInParallel

private static IReadOnlyList<CsvData> ProcessLinesInParallel(List<string> lines, Options options)
{
    var linesToProcess = lines.Skip(options.HasHeader ? 1 : 0)
                            .Where(line => !string.IsNullOrWhiteSpace(line))
                            .ToList();

    if (options.PreserveOrder)
    {
        // 保持顺序的并行处理
        var results = new CsvData[linesToProcess.Count];
        Parallel.For(0, linesToProcess.Count, parallelOptions, i =>
        {
            var item = ParseLine(linesToProcess[i], options);
            if (item != null)
            {
                item.ProcessData();
                results[i] = item;
            }
        });
        return results.Where(item => item != null).ToList().AsReadOnly();
    }
    else
    {
        // 不保持顺序的更快处理
        var results = new ConcurrentBag<CsvData>();
        Parallel.ForEach(linesToProcess, parallelOptions, line =>
        {
            var item = ParseLine(line, options);
            if (item != null)
            {
                item.ProcessData();
                results.Add(item);
            }
        });
        return results.ToList().AsReadOnly();
    }
}

要点
• 两种并行策略:保持顺序(Parallel.For)和不保持顺序(Parallel.ForEach)
• 预分配数组保持顺序,ConcurrentBag不保持顺序
• 并行选项控制最大并发数

5. 行解析 ParseLine

private static CsvData ParseLine(string line, Options options)
{
    try
    {
        var columns = SplitCsvLine(line, options.Delimiter);
        if (columns.Length < options.ExpectedColumnCount) return null;
        
        return new CsvData
        {
            // 使用配置的TrimValues选项处理每个字段
        };
    }
    catch (Exception ex)
    {
        System.Diagnostics.Debug.WriteLine($"Error parsing line: {line}. Error: {ex.Message}");
        return null;
    }
}

要点
• 防御性编程:检查列数、异常处理
• 记录解析错误但不中断处理
• 统一应用TrimValues配置


网站公告

今日签到

点亮在社区的每一天
去签到