ABP VNext + Apache Flink 实时流计算:打造高可用“交易风控”系统 🌐
📚 目录
一、背景🚀
在金融 💰、电商 🛒、IoT 🌐 等高频交互系统中,越来越多的场景需要“实时发现问题并响应”。
二、系统整体架构 🏗️
💡 图示展示了各组件之间的数据流向,实现消息解耦和高可用。
三、实战展示 🛠️:交易行为告警系统
3.1 ABP 采集交易事件 📝
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Distributed;
public class TransactionCreatedDomainEvent : DomainEvent
{
public Guid UserId { get; set; }
public decimal Amount { get; set; }
public string Location { get; set; }
}
public class TransactionCreatedHandler : IDistributedEventHandler<TransactionCreatedDomainEvent>
{
private readonly IDistributedEventBus _eventBus;
private readonly ILogger<TransactionCreatedHandler> _logger;
public TransactionCreatedHandler(IDistributedEventBus eventBus,
ILogger<TransactionCreatedHandler> logger)
{
_eventBus = eventBus;
_logger = logger;
}
public async Task HandleEventAsync(TransactionCreatedDomainEvent eventData)
{
var eto = new TransactionCreatedEto
{
UserId = eventData.UserId,
Amount = eventData.Amount,
Location = eventData.Location,
OccurredAt = Clock.Now
};
try
{
await _eventBus.PublishAsync(eto);
}
catch (Exception ex)
{
_logger.LogError(ex, "发布交易事件失败:{UserId}", eventData.UserId);
throw;
}
}
}
CAP + Outbox 配置示例 💼
// appsettings.json
"Cap": {
"UseEntityFramework": true,
"UseDashboard": true,
"Producer": {
"Kafka": { "Servers": "localhost:9092" }
},
"Outbox": { "TableName": "CapOutboxMessages" }
}
3.2 Flink CEP 模式与 Exactly-Once ⚡
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.eventtime._
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import java.time.Duration
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(10000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new RocksDBStateBackend("file:///flink-checkpoints"))
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[Transaction](Duration.ofSeconds(5))
.withTimestampAssigner((event, _) => event.timestamp.toEpochMilli)
val stream = env
.addSource(new FlinkKafkaConsumer[Transaction]("transactions", deserializer, props))
.assignTimestampsAndWatermarks(watermarkStrategy)
val pattern = Pattern.begin[Transaction]("first")
.where(_.amount > 10000)
.next("second")
.where(new IterativeCondition[Transaction] {
override def filter(event: Transaction, ctx: IterativeCondition.Context[Transaction]) = {
val first = ctx.getEventsForPattern("first").iterator().next()
event.location != first.location
}
})
.within(Time.minutes(5))
pattern.handleTimeout(new PatternTimeoutFunction[Transaction, Unit] {
override def timeout(map: java.util.Map[String, java.util.List[Transaction]], timestamp: Long, out: Collector[Unit]): Unit = {
// 超时清理逻辑
}
}, Time.minutes(5))
💡 建议全链路使用 Schema Registry 管理消息格式,防止兼容性问题。
3.3 Redis Stream + SignalR 实时推送 🔔
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
public class RiskAlertWorker : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly IHubContext<RiskAlertHub> _hubContext;
private readonly ILogger<RiskAlertWorker> _logger;
public RiskAlertWorker(IConnectionMultiplexer redis,
IHubContext<RiskAlertHub> hubContext,
ILogger<RiskAlertWorker> logger)
{
_redis = redis;
_hubContext = hubContext;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var db = _redis.GetDatabase();
try { await db.StreamCreateConsumerGroupAsync("risk-alerts", "alert-group", "$", true); }
catch { /* 忽略 BUSYGROUP */ }
int backoff = 1000;
while (!stoppingToken.IsCancellationRequested)
{
try
{
var entries = await db.StreamReadGroupAsync(
"risk-alerts", "alert-group", "consumer-1",
count: 10, flags: CommandFlags.Block(5000));
foreach (var entry in entries)
{
var alert = JsonSerializer.Deserialize<RiskEventDto>(entry["data"]!);
await _hubContext.Clients.Group(alert.UserId.ToString())
.SendAsync("ReceiveAlert", alert, stoppingToken);
await db.StreamAcknowledgeAsync("risk-alerts", "alert-group", entry.Id);
}
backoff = 1000;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理 Redis 告警失败");
await Task.Delay(backoff, stoppingToken);
backoff = Math.Min(backoff * 2, 16000);
}
}
}
}
[Authorize]
public class RiskAlertHub : Hub { }
四、生产级部署和监控 📈
组件 | 推荐配置 |
---|---|
ABP 后端 | Pod 存活/就绪探针 ✅ + HTTPS 🔒 + Serilog→Elasticsearch Sink 📝 + CAP Outbox |
Kafka | enable.idempotence=true 🔁, acks=all ✅, TLS/SASL 🔐 |
Flink | RocksDBStateBackend ⚙️ + EXACTLY_ONCE ⚡ + State TTL 🕒 + HA 🌟 |
Redis | Redis Cluster 🔄 + AOF 📝 + ACL 🔑 + 阻塞消费 ⏳ |
PostgreSQL | 主从流复制 🛠️ + WAL 日志 📜 + TimescaleDB 插件 📊 |
SignalR | Azure SignalR ☁️ / Redis Backplane 🔄 + JWT 鉴权 🔏 |
# Flink YAML 示例
state.backend: rocksdb
checkpointing:
interval: 10s
mode: EXACTLY_ONCE
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# Flink Prometheus Reporter
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250
📊 在 Grafana 中可视化:Kafka TPS、Flink 延迟分位、Redis 消费速率、ABP 请求成功率/错误率。
五、自动化测试 🧪
// Testcontainers 启动依赖
var kafka = new KafkaContainer().StartAsync().GetAwaiter().GetResult();
var redis = new RedisContainer().StartAsync().GetAwaiter().GetResult();
var postgres = new PostgreSqlContainer().StartAsync().GetAwaiter().GetResult();
// 注入到 ABP 测试模块
context.Services.Configure<CapOptions>(opts => {
opts.ProducerConnectionString = kafka.GetBootstrapAddress();
opts.OutboxTableName = "CapOutboxMessages";
});
// Flink MiniCluster
var flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder().Build());
flinkCluster.Start();