ABP vNext 与 HDFS 数据湖存储集成 🚀
📚 目录
🧠 背景与目标
随着企业多源数据(图像、日志、文档)激增,构建一个具备海量存储能力、统一管理视图与数据湖分析的文件平台已成必需。
基于 ABP vNext BlobStoring 与 HDFS HA,打造一个可视化、可控、可拓展的现代数据湖文件平台。
🛠️ 依赖与安装
# ABP Blob 存储框架
dotnet add package Volo.Abp.BlobStoring
dotnet add package Volo.Abp.BlobStoring.UI
# WebHDFS 客户端
dotnet add package WebHdfs.NET
# 重试与断路器
dotnet add package Polly
# Shell 调用
dotnet add package CliWrap
# 可选:应用监控
dotnet add package Microsoft.ApplicationInsights.AspNetCore --version 2.21.0
🧱 系统架构设计
- IBlobContainer:ABP 中访问存储容器的统一接口
- HdfsBlobProvider:继承
BlobProviderBase
,支持重试、日志、监控 - HDFS HA:通过 ZooKeeper 主备切换;支持 HTTPS/TLS 🔐 和 Kerberos 安全认证🛡️
⚙️ 核心实现
1️⃣ 配置 appsettings.json
{
"Hdfs": {
"NameNodeUri": "https://mycluster:50070", // 支持 HTTP/HTTPS 🌐
"User": "hdfs",
"UseKerberos": true, // 是否开启 Kerberos 🔒
"KeytabPath": "/etc/security/keytabs/hdfs.headless.keytab"
}
}
// HdfsOptions 定义
public class HdfsOptions
{
public string NameNodeUri { get; set; } = default!;
public string User { get; set; } = default!;
public bool UseKerberos { get; set; }
public string KeytabPath { get; set; } = default!;
}
// 注册配置
context.Services.Configure<HdfsOptions>(
context.Configuration.GetSection("Hdfs"));
2️⃣ 自定义 HdfsBlobProvider
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using Microsoft.ApplicationInsights;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Polly;
using Volo.Abp.BlobStoring;
using Volo.Abp.DependencyInjection;
using WebHdfs.Core;
public class HdfsBlobProvider : BlobProviderBase, ISingletonDependency
{
private readonly WebHdfsClient _client;
private readonly ILogger<HdfsBlobProvider> _logger;
private readonly TelemetryClient _telemetry;
private readonly AsyncPolicy _retryPolicy;
public HdfsBlobProvider(
IOptions<HdfsOptions> options,
ILogger<HdfsBlobProvider> logger,
TelemetryClient telemetry)
{
var opts = options.Value;
// Kerberos 初始化(容器已挂载 Keytab)
if (opts.UseKerberos)
{
Process.Start("kinit", $"-kt {opts.KeytabPath} {opts.User}");
}
_client = new WebHdfsClient(new Uri(opts.NameNodeUri), opts.User);
_logger = logger;
_telemetry = telemetry;
// 3 次指数退避重试 🔄
_retryPolicy = Policy
.Handle<IOException>()
.WaitAndRetryAsync(3, i => TimeSpan.FromSeconds(Math.Pow(2, i)));
}
public override async Task SaveAsync(BlobProviderSaveArgs args)
{
var sw = Stopwatch.StartNew();
try
{
using var buffered = new BufferedStream(args.Stream, 4 * 1024 * 1024);
await _retryPolicy.ExecuteAsync(() =>
_client.CreateFileAsync(args.BlobName, buffered, overwrite: true));
_telemetry.TrackMetric("HDFS_Save_Duration", sw.ElapsedMilliseconds);
_logger.LogInformation("✔️ 文件 {Name} 保存成功", args.BlobName);
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 保存至 HDFS 失败:{Name}", args.BlobName);
throw;
}
}
public override async Task<Stream?> GetOrNullAsync(BlobProviderGetArgs args)
{
var sw = Stopwatch.StartNew();
try
{
var stream = await _retryPolicy.ExecuteAsync(() =>
_client.OpenReadAsync(args.BlobName));
_telemetry.TrackMetric("HDFS_Get_Duration", sw.ElapsedMilliseconds);
_logger.LogInformation("📥 文件 {Name} 获取成功", args.BlobName);
return stream;
}
catch (FileNotFoundException)
{
_logger.LogWarning("⚠️ 未找到文件:{Name}", args.BlobName);
return null;
}
}
public override async Task<bool> DeleteAsync(BlobProviderDeleteArgs args)
{
try
{
var result = await _retryPolicy.ExecuteAsync(() =>
_client.DeleteAsync(args.BlobName));
_logger.LogInformation("🗑️ 文件 {Name} 删除{Status}", args.BlobName, result);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 删除 HDFS 文件失败:{Name}", args.BlobName);
return false;
}
}
}
3️⃣ 注册 HdfsBlobProvider
Configure<AbpBlobStoringOptions>(options =>
{
options.Containers.ConfigureDefault(container =>
{
container.ProviderType = typeof(HdfsBlobProvider);
});
});
4️⃣ 应用服务示例
using Microsoft.AspNetCore.Mvc;
using Volo.Abp.BlobStoring;
using Volo.Abp.Application.Services;
using Volo.Abp.Exceptions;
public class FileAppService : ApplicationService
{
private readonly IBlobContainer _blobContainer;
public FileAppService(IBlobContainer blobContainer)
{
_blobContainer = blobContainer;
}
public async Task UploadAsync(string name, Stream content)
{
await _blobContainer.SaveAsync(name, content);
Logger.LogInformation("✅ 上传 {Name} 完成", name);
}
public async Task<Stream> DownloadAsync(string name)
{
var stream = await _blobContainer.GetOrNullAsync(name)
?? throw new UserFriendlyException("文件不存在");
Logger.LogInformation("📤 下载 {Name} 完成", name);
return stream;
}
}
🔐 HDFS HA & 安全配置
<configuration>
<property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property>
<property><name>dfs.nameservices</name><value>mycluster</value></property>
<property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property>
<property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>node1:8020</value></property>
<property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>node2:8020</value></property>
<property><name>dfs.client.failover.proxy.provider.mycluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property>
<property><name>dfs.replication</name><value>3</value></property>
<!-- Kerberos -->
<property><name>hadoop.security.authentication</name><value>kerberos</value></property>
</configuration>
- 部署要求:3×JournalNode + 3×ZooKeeper + Kerberos KDC 🛡️
- Keytab 挂载:容器
/etc/security/keytabs
,设置chmod 400
- HTTPS/TLS:配置
HttpClientHandler.ServerCertificateCustomValidationCallback
忽略或校验证书 🔐
💾 分片上传与合并
使用 HDFS Append 保持二进制完整,避免文本命令限制。🔗
🐶 单元测试示例
using System.IO;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Moq;
using Polly;
using Volo.Abp.Testing;
using Xunit;
using WebHdfs.Core;
public class HdfsBlobProvider_Tests : AbpIntegratedTestBase
{
[Fact]
public async Task SaveAsync_RetriesOnIOException()
{
var clientMock = new Mock<WebHdfsClient>(MockBehavior.Strict,
new Uri("http://x"), "u");
clientMock.SetupSequence(c => c.CreateFileAsync(
It.IsAny<string>(), It.IsAny<Stream>(), true))
.ThrowsAsync(new IOException())
.ThrowsAsync(new IOException())
.Returns(Task.CompletedTask);
var options = Options.Create(new HdfsOptions { NameNodeUri = "http://x", User = "u" });
var telemetry = new TelemetryClient();
var provider = new HdfsBlobProvider(options,
NullLogger<HdfsBlobProvider>.Instance,
telemetry);
await provider.SaveAsync(new BlobProviderSaveArgs("test", new MemoryStream()));
clientMock.Verify(c => c.CreateFileAsync("test",
It.IsAny<Stream>(), true), Times.Exactly(3));
}
}
📦 Docker Compose 快速部署
version: '3'
services:
zk:
image: zookeeper:3.6
ports: ["2181:2181"]
journalnode:
image: bde2020/hadoop-journalnode:2.0.0-hadoop3.2.1-java8
depends_on: [zk]
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
depends_on: [zk, journalnode]
environment:
- CLUSTER_NAME=mycluster
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
depends_on: [namenode]
🔍 监控与运维
- Prometheus/AI 埋点:使用
TelemetryClient
或 ABP ICounter 记录操作耗时 - 日志链路:加入
CorrelationId
、BlobName
、NodeAddress
等上下文信息 - 健康检查:配置 ABP HealthChecks,监测 HDFS 端点 ✅