C#带多组标签的Snowflake SQL查询批量数据导出程序

发布于:2025-03-14 ⋅ 阅读:(18) ⋅ 点赞:(0)

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的C#代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

步骤1:创建配置文件类

// AppConfig.cs
public class AppConfig
{
    public SnowflakeConnectionConfig SnowflakeConnection { get; set; }
    public List<QueryConfig> Queries { get; set; }
}

public class SnowflakeConnectionConfig
{
    public string Account { get; set; }
    public string User { get; set; }
    public string Password { get; set; }
    public string Warehouse { get; set; }
    public string Database { get; set; }
    public string Schema { get; set; }
    public string Role { get; set; }
}

public class QueryConfig
{
    public string Label { get; set; }
    public string Sql { get; set; }
}

步骤2:实现日志记录器

// Logger.cs
public class Logger
{
    private readonly string _logPath;
    private readonly object _lock = new object();

    public Logger(string logPath)
    {
        _logPath = logPath;
        InitializeLogFile();
    }

    private void InitializeLogFile()
    {
        lock (_lock)
        {
            File.AppendAllText(_logPath, $"{"Timestamp",-25}|{"Status",-8}|{"Label",-20}|{"StartTime",-20}|{"Duration(s)",-12}|{"Rows",-10}|{"Error"}\n");
        }
    }

    public void LogSuccess(string label, DateTime startTime, long rowCount, TimeSpan duration)
    {
        var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"SUCCESS",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{rowCount,-10}|{"-"}\n";
        WriteLogEntry(entry);
    }

    public void LogError(string label, DateTime startTime, string error, TimeSpan duration)
    {
        var entry = $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}|{"ERROR",-8}|{label,-20}|{startTime:HH:mm:ss}|{duration.TotalSeconds,-12:N2}|{"-",-10}|{error}\n";
        WriteLogEntry(entry);
    }

    private void WriteLogEntry(string entry)
    {
        lock (_lock)
        {
            File.AppendAllText(_logPath, entry);
        }
        Console.Write(entry);
    }
}

步骤3:实现数据导出处理器

// ExportProcessor.cs
using CsvHelper;
using Snowflake.Data.Client;
using System.Globalization;

public class ExportProcessor
{
    private readonly string _connectionString;
    private readonly Logger _logger;

    public ExportProcessor(string connectionString, Logger logger)
    {
        _connectionString = connectionString;
        _logger = logger;
    }

    public async Task ExportQueryAsync(QueryConfig query, string outputDir, CancellationToken cancellationToken = default)
    {
        var startTime = DateTime.UtcNow;
        var sw = Stopwatch.StartNew();
        long rowCount = 0;
        string filePath = Path.Combine(outputDir, $"{query.Label}.csv");

        try
        {
            using (var conn = new SnowflakeDbConnection())
            {
                conn.ConnectionString = _connectionString;
                await conn.OpenAsync(cancellationToken);

                using (var cmd = conn.CreateCommand())
                {
                    cmd.CommandText = query.Sql;

                    using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))
                    using (var writer = new StreamWriter(filePath, append: false))
                    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
                    {
                        // Write header
                        for (int i = 0; i < reader.FieldCount; i++)
                        {
                            csv.WriteField(reader.GetName(i));
                        }
                        await csv.NextRecordAsync();

                        // Write rows
                        while (await reader.ReadAsync(cancellationToken))
                        {
                            for (int i = 0; i < reader.FieldCount; i++)
                            {
                                csv.WriteField(reader.GetValue(i));
                            }
                            await csv.NextRecordAsync();
                            rowCount++;
                        }
                    }
                }
            }

            sw.Stop();
            _logger.LogSuccess(query.Label, startTime, rowCount, sw.Elapsed);
        }
        catch (Exception ex)
        {
            sw.Stop();
            _logger.LogError(query.Label, startTime, ex.Message, sw.Elapsed);
            SafeDeleteFile(filePath);
            throw; // Re-throw if using retry logic
        }
    }

    private void SafeDeleteFile(string path)
    {
        try { File.Delete(path); }
        catch { /* Ignore deletion errors */ }
    }
}

步骤4:主程序实现

// Program.cs
using System.CommandLine;

class Program
{
    static async Task Main(string[] args)
    {
        var configOption = new Option<FileInfo>(
            name: "--config",
            description: "Path to configuration file");
        
        var outputOption = new Option<DirectoryInfo>(
            name: "--output",
            description: "Output directory for CSV files");

        var rootCommand = new RootCommand
        {
            configOption,
            outputOption
        };

        rootCommand.Description = "Snowflake Data Exporter";
        
        rootCommand.SetHandler(async (config, outputDir) => 
        {
            await RunExport(config, outputDir);
        }, configOption, outputOption);

        await rootCommand.InvokeAsync(args);
    }

    static async Task RunExport(FileInfo configFile, DirectoryInfo outputDir)
    {
        // Read configuration
        var config = JsonSerializer.Deserialize<AppConfig>(
            File.ReadAllText(configFile.FullName),
            new JsonSerializerOptions { PropertyNameCaseInsensitive = true });

        // Create output directory
        Directory.CreateDirectory(outputDir.FullName);

        // Initialize logger
        var logger = new Logger(Path.Combine(outputDir.FullName, "export.log"));

        // Build connection string
        var connBuilder = new SnowflakeDbConnectionStringBuilder
        {
            Account = config.SnowflakeConnection.Account,
            User = config.SnowflakeConnection.User,
            Password = config.SnowflakeConnection.Password,
            Warehouse = config.SnowflakeConnection.Warehouse,
            Db = config.SnowflakeConnection.Database,
            Schema = config.SnowflakeConnection.Schema,
            Role = config.SnowflakeConnection.Role
        };

        // Initialize processor
        var processor = new ExportProcessor(
            connBuilder.ToString(),
            logger);

        // Parallel execution with throttling
        var parallelOptions = new ParallelOptions 
        { 
            MaxDegreeOfParallelism = Environment.ProcessorCount 
        };

        await Parallel.ForEachAsync(
            config.Queries,
            parallelOptions,
            async (query, cancellationToken) =>
            {
                await processor.ExportQueryAsync(
                    query,
                    outputDir.FullName,
                    cancellationToken);
            });
    }
}

步骤5:配置文件示例(config.json)

{
  "snowflakeConnection": {
    "account": "your_account",
    "user": "your_user",
    "password": "your_password",
    "warehouse": "COMPUTE_WH",
    "database": "PROD_DB",
    "schema": "PUBLIC",
    "role": "SYSADMIN"
  },
  "queries": [
    {
      "label": "customers",
      "sql": "SELECT * FROM CUSTOMERS"
    },
    {
      "label": "orders",
      "sql": "SELECT * FROM ORDERS"
    }
  ]
}

实现说明

  1. 并行处理

    • 使用Parallel.ForEachAsync进行并行查询处理
    • 默认并行度设置为处理器核心数
    • 每个查询独立使用自己的数据库连接
  2. 大文件处理

    • 使用CsvHelper进行流式写入
    • 采用异步I/O操作(ReadAsync/WriteAsync
    • 逐行处理避免内存爆炸
  3. 错误处理

    • 自动删除不完整文件
    • 详细错误日志记录
    • 异常传播与隔离设计
  4. 日志功能

    • 结构化日志格式
    • 线程安全写入
    • 包含关键性能指标
  5. 性能优化

    • 异步数据库操作
    • 并行查询执行
    • 流式结果集处理

使用说明

  1. 安装依赖:

    dotnet add package Snowflake.Data
    dotnet add package CsvHelper
    dotnet add package System.CommandLine
    
  2. 编译运行:

    dotnet run -- --config ./config.json --output ./exports
    
  3. 输出结构:

    exports/
      customers.csv
      orders.csv
      export.log
    

日志示例

2023-09-20 14:30:45|SUCCESS |customers           |14:30:30|15.23      |1000000    |- 
2023-09-20 14:31:02|ERROR   |orders             |14:30:45|17.12      |-          |Timeout expired

此实现提供了:

  • 线程安全的并行处理
  • 完整错误处理机制
  • 详细的执行日志
  • 高效的大数据处理能力
  • 可配置的Snowflake连接参数
  • 清晰的命令行界面

网站公告

今日签到

点亮在社区的每一天
去签到