C#将Box企业网盘里的文件批量上载到S3,并导入Redshift

发布于:2025-03-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

C# .NET8将Box企业网盘里一个目录下的所有文件全部上载到S3的一个目录下,这些文件假设全是gzip压缩文件,然后全部导入Amazon Redshift数据库,要实现异步处理,异常处理和输出运行状态日志,所有参数信息来自ini配置文件。

将Box企业网盘里的文件上传到Amazon S3,你需要分别使用Box API和Amazon S3 API。在C#.NET 8中,你可以使用相应的SDK来简化这个过程。以下是一个大致的步骤指南:

  1. 设置Box API和Amazon S3的访问权限:

    • 获取Box API的Client ID、Client Secret和Access Token(或使用OAuth流程获取)。

    • 配置Amazon S3的Access Key、Secret Key、Bucket名称等。

  2. 下载Box企业网盘里的文件:

    • 使用Box API SDK下载文件。你可能需要先列出文件,找到你想要下载的那个,然后获取其下载URL或使用API直接下载。

  3. 将文件上传到Amazon S3:

    • 使用Amazon S3 API SDK将下载的文件上传到指定的Bucket中。

  4. 错误处理和日志记录:

    • 在整个过程中添加适当的错误处理和日志记录,以便在出现问题时能够追踪和调试。

以下是一个简化C#代码示例,展示了如何使用Box API SDK下载文件并使用Amazon S3 API SDK上传文件(注意:你需要安装Box.V2和AWSSDK.S3的NuGet包):

 using Box.V2;
using Box.V2.Auth;
using Box.V2.Models;
using Amazon;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.S3.Transfer;
using System;
using System.IO;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // Box API 配置
        string boxClientId = "your-box-client-id";
        string boxClientSecret = "your-box-client-secret";
        string boxAccessToken = "your-box-access-token"; // 或者使用OAuth流程获取

        // Amazon S3 配置
        string awsAccessKey = "your-aws-access-key";
        string awsSecretKey = "your-aws-secret-key";
        string bucketName = "your-s3-bucket-name";
        string s3Region = RegionEndpoint.USEast1.SystemName; // 根据你的Bucket区域设置

        // Box API 初始化
        var boxConfig = new BoxConfig(boxClientId, boxClientSecret, new Uri("https://api.box.com/2.0/"));
        var boxSession = new OAuthSession(boxConfig, new Uri("https://account.box.com/api/oauth2/authorize"), boxAccessToken);
        var boxClient = new BoxClient(boxConfig, boxSession);

        // 查找并下载Box文件(这里假设你已经知道文件ID)
        string fileId = "your-box-file-id";
        var fileRequest = new BoxFilesRequest(fileId);
        var fileInfo = await boxClient.FilesManager.GetInformationAsync(fileRequest);

        // 下载文件到本地临时路径
        string tempFilePath = Path.GetTempFileName();
        using (var client = new System.Net.Http.HttpClient())
        {
            var downloadResponse = await client.GetAsync(fileInfo.DownloadUrl);
            downloadResponse.EnsureSuccessStatusCode();
            using (var fs = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write))
            {
                await downloadResponse.Content.CopyToAsync(fs);
            }
        }

        // Amazon S3 初始化
        var s3Client = new AmazonS3Client(awsAccessKey, awsSecretKey, RegionEndpoint.GetBySystemName(s3Region));

        // 上传文件到S3
        string s3Key = "your-s3-key"; // 文件在S3中的名称
        TransferUtility transferUtility = new TransferUtility(s3Client);
        transferUtility.Upload(tempFilePath, bucketName, s3Key);

        // 清理临时文件
        File.Delete(tempFilePath);

        Console.WriteLine("文件已成功从Box上传到S3!");
    }
}

注意:

• 上面的代码示例是为了说明流程而简化的,它没有处理所有可能的错误情况。

• 在实际部署中,你应该避免在代码中硬编码敏感信息(如API密钥和访问令牌),而是使用环境变量或安全的密钥管理服务。

• 你可能需要根据Box API和Amazon S3 API的最新文档来调整代码。

• Box文件的下载URL可能需要额外的处理,特别是如果它是通过OAuth流程获取的预签名URL。

• 上面的代码使用了TransferUtility来简化S3上传过程,但你也可以使用PutObjectRequest来更细粒度地控制上传过程。

完整解决方案包含异步处理、异常处理、日志记录及INI配置文件读取:

using Box.V2;
using Box.V2.Auth;
using Box.V2.Config;
using Box.V2.Models;
using Amazon.S3;
using Amazon.S3.Transfer;
using Nett;

// 配置文件结构
public class AppConfig
{
    public BoxSettings Box { get; set; }
    public S3Settings S3 { get; set; }
    public RedshiftSettings Redshift { get; set; }
}

public class BoxSettings
{
    public string ClientId { get; set; }
    public string ClientSecret { get; set; }
    public string AccessToken { get; set; }
    public string FolderId { get; set; }
}

public class S3Settings
{
    public string AccessKey { get; set; }
    public string SecretKey { get; set; }
    public string BucketName { get; set; }
    public string Region { get; set; }
    public string TargetPath { get; set; }
}

public class RedshiftSettings
{
    public string ConnectionString { get; set; }
    public string TableName { get; set; }
    public string IamRoleArn { get; set; }
}

public static class Logger
{
    private static readonly object _lock = new();
    
    public static void Log(string message, string level = "INFO")
    {
        var logEntry = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} [{level}] {message}";
        lock (_lock)
        {
            File.AppendAllText("transfer.log", logEntry + Environment.NewLine);
        }
    }
}

public class BoxS3RedshiftService
{
    private readonly AppConfig _config;
    private readonly BoxClient _boxClient;
    private readonly IAmazonS3 _s3Client;
    private readonly SemaphoreSlim _semaphore = new(5);

    public BoxS3RedshiftService(AppConfig config)
    {
        _config = config;
        _boxClient = InitializeBoxClient(config.Box);
        _s3Client = InitializeS3Client(config.S3);
    }

    private BoxClient InitializeBoxClient(BoxSettings settings)
    {
        var auth = new OAuthSession(settings.AccessToken, "N/A", 3600, "bearer");
        var boxConfig = new BoxConfigBuilder(settings.ClientId, settings.ClientSecret, new Uri("http://localhost")).Build();
        return new BoxClient(boxConfig, auth);
    }

    private IAmazonS3 InitializeS3Client(S3Settings settings)
    {
        var region = Amazon.RegionEndpoint.GetBySystemName(settings.Region);
        return new AmazonS3Client(settings.AccessKey, settings.SecretKey, region);
    }

    public async Task ProcessFilesAsync()
    {
        try
        {
            Logger.Log("开始获取Box文件列表");
            var files = await GetBoxFilesRecursiveAsync(_config.Box.FolderId);
            Logger.Log($"找到{files.Count}个待处理文件");

            var tasks = files.Select(async file =>
            {
                await _semaphore.WaitAsync();
                try
                {
                    await ProcessSingleFileAsync(file);
                }
                finally
                {
                    _semaphore.Release();
                }
            });

            await Task.WhenAll(tasks);
            Logger.Log("所有文件处理完成");
        }
        catch (Exception ex)
        {
            Logger.Log($"处理过程中发生全局异常: {ex}", "ERROR");
        }
    }

    private async Task<List<BoxItem>> GetBoxFilesRecursiveAsync(string folderId)
    {
        var result = new List<BoxItem>();
        var items = await _boxClient.FoldersManager.GetFolderItemsAsync(folderId, 1000);
        
        foreach (var item in items.Entries)
        {
            if (item.Type == "file" && item.Name.EndsWith(".gz"))
            {
                result.Add(item);
            }
            else if (item.Type == "folder")
            {
                result.AddRange(await GetBoxFilesRecursiveAsync(item.Id));
            }
        }
        return result;
    }

    private async Task ProcessSingleFileAsync(BoxItem file)
    {
        string tempPath = null;
        try
        {
            // 下载文件
            tempPath = await DownloadBoxFileAsync(file.Id);
            Logger.Log($"下载完成: {file.Name}");

            // 上传S3
            var s3Key = $"{_config.S3.TargetPath}/{file.Name}";
            await UploadToS3Async(tempPath, s3Key);
            Logger.Log($"上传完成: {s3Key}");

            // 导入Redshift
            await ImportToRedshiftAsync(s3Key);
            Logger.Log($"Redshift导入完成: {s3Key}");
        }
        catch (Exception ex)
        {
            Logger.Log($"处理文件{file.Name}失败: {ex}", "ERROR");
        }
        finally
        {
            if (File.Exists(tempPath))
            {
                File.Delete(tempPath);
            }
        }
    }

    private async Task<string> DownloadBoxFileAsync(string fileId)
    {
        var tempPath = Path.GetTempFileName();
        using var stream = await _boxClient.FilesManager.DownloadAsync(fileId);
        using var fileStream = File.Create(tempPath);
        await stream.CopyToAsync(fileStream);
        return tempPath;
    }

    private async Task UploadToS3Async(string localPath, string s3Key)
    {
        var transferUtility = new TransferUtility(_s3Client);
        await transferUtility.UploadAsync(localPath, _config.S3.BucketName, s3Key);
    }

    private async Task ImportToRedshiftAsync(string s3Key)
    {
        using var conn = new Npgsql.NpgsqlConnection(_config.Redshift.ConnectionString);
        await conn.OpenAsync();
        
        var copyCommand = $@"COPY {_config.Redshift.TableName} 
                           FROM 's3://{_config.S3.BucketName}/{s3Key}' 
                           CREDENTIALS 'aws_iam_role={_config.Redshift.IamRoleArn}'
                           GZIP
                           COMPUPDATE OFF
                           STATUPDATE OFF";
        
        using var cmd = new Npgsql.NpgsqlCommand(copyCommand, conn);
        await cmd.ExecuteNonQueryAsync();
    }
}

class Program
{
    static async Task Main(string[] args)
    {
        // 读取配置文件
        var config = Toml.ReadFile<AppConfig>("config.toml");
        
        // 初始化并执行服务
        var service = new BoxS3RedshiftService(config);
        await service.ProcessFilesAsync();
    }
}

配套配置文件 (config.toml):

[Box]
ClientId = "your_box_client_id"
ClientSecret = "your_box_client_secret"
AccessToken = "your_box_access_token"
FolderId = "your_box_folder_id"

[S3]
AccessKey = "your_aws_access_key"
SecretKey = "your_aws_secret_key"
BucketName = "your_bucket_name"
Region = "us-east-1"
TargetPath = "imports/daily"

[Redshift]
ConnectionString = "Host=your-cluster.xxxxx.redshift.amazonaws.com;Database=your_db;User Id=user;Password=password;"
TableName = "sales_data"
IamRoleArn = "arn:aws:iam::123456789012:role/RedshiftImportRole"

实现说明:

  1. 异步处理:

    • 使用async/await进行异步编程
    • 通过SemaphoreSlim控制并发度(最大5个并行任务)
    • 使用Task.WhenAll等待所有文件处理完成
  2. 异常处理:

    • 每个文件处理过程有独立try-catch
    • 全局异常处理包裹主流程
    • 使用finally块确保资源清理
  3. 日志记录:

    • 线程安全的日志记录机制
    • 记录时间戳、日志级别和操作详情
    • 日志文件自动追加模式
  4. 配置管理:

    • 使用TOML格式配置文件(需安装Nett包)
    • 支持Box、S3和Redshift的配置分离
    • 敏感信息不硬编码在代码中
  5. 扩展功能:

    • 递归遍历Box目录
    • 自动清理临时文件
    • Redshift COPY命令参数配置
    • GZIP压缩文件自动识别

使用步骤:

  1. 安装依赖包:

    dotnet add package Box.V2
    dotnet add package AWSSDK.S3
    dotnet add package Npgsql
    dotnet add package Nett
    
  2. 创建TOML配置文件

  3. 运行程序:

    dotnet run
    

增强建议:

  1. 增加重试机制(使用Polly等重试库)
  2. 添加进度报告功能
  3. 实现Box Token自动刷新
  4. 添加文件校验(MD5校验和)
  5. 支持配置文件加密
  6. 添加邮件/短信通知功能
  7. 实现断点续传功能

网站公告

今日签到

点亮在社区的每一天
去签到