net8.0一键创建支持(Kafka)

发布于:2025-07-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

Necore项目生成器 - 在线创建Necore模板项目 | 一键下载

 KafkaController.cs

using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;

namespace UnT.Template.Controllers
{
    [Route("api/kafkas")]
    [ApiController]
    public class KafkaController : ControllerBase
    {
        private readonly IConfiguration _configuration;

        public KafkaController(IConfiguration configuration)
        {
            _configuration = configuration;
        }

        [HttpPost("publish")]
        [Produces("application/json")]
        [ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]
        public async Task<IActionResult> Insert()
        {
            try
            {
                var producerConfig = new ProducerConfig
                {
                    BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),
                    ClientId = "UnT.Template",
                    Acks = Acks.All, 
                    MessageSendMaxRetries = 3,
                    RetryBackoffMs = 1000,
                    LingerMs = 5 
                };

                // 创建生产者
                using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
                {
                    var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });
                    producer.Produce("unt_queue", new Message<Null, string> { Value = message },
                        (deliveryReport) =>
                        {
                            if (deliveryReport.Error.Code != ErrorCode.NoError)
                            {
                                Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");
                            }
                            else
                            {
                                Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");
                            }
                        });
                    producer.Flush(TimeSpan.FromSeconds(10));
                }
                return Ok(new ApiResponse<bool> { Success = true, Data = true });
            }
            catch (Exception ex)
            {
                return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });
            }
        }

        [HttpPost("consume")]
        [Produces("application/json")]
        [ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]
        public async Task<IActionResult> Consume()
        {
            try
            {
                Task.Run(() =>
                {
                    var consumerConfig = new ConsumerConfig
                    {
                        BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),
                        GroupId = "UnT.Template.Consumer.Group",
                        EnableAutoCommit = false, 
                        AutoOffsetReset = AutoOffsetReset.Latest,
                        EnablePartitionEof = true,
                        StatisticsIntervalMs = 5000
                    };

                    using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
                    {
                        //订阅主题
                        consumer.Subscribe("unt_queue");

                        //取消令牌,用于优雅退出
                        var cts = new CancellationTokenSource();
                        Console.CancelKeyPress += (_, e) => {
                            e.Cancel = true;
                            cts.Cancel();
                        };

                        try
                        {
                            while (true)
                            {
                                try
                                {
                                    //消费消息
                                    var cr = consumer.Consume(cts.Token);
                                    if (cr.IsPartitionEOF)
                                    {
                                        Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");
                                        continue;
                                    }

                                    //检查空消息
                                    if (cr.Message == null)
                                    {
                                        Console.WriteLine("收到空消息");
                                        continue;
                                    }

                                    //处理有效消息
                                    Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");

                                    //手动提交偏移量(如果EnableAutoCommit=false)
                                    consumer.Commit(cr);
                                }
                                catch (ConsumeException e)
                                {
                                    Console.WriteLine($"消费错误: {e.Error.Reason}");
                                }
                            }
                        }
                        catch (OperationCanceledException)
                        {
                            // 确保消费者正确关闭
                            consumer.Close();
                        }
                    }

                });
                await Task.Delay(TimeSpan.FromSeconds(5));
                return Ok(new ApiResponse<bool> { Success = true, Data = true });
            }
            catch (Exception ex)
            {
                return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });
            }
        }
    }
}


网站公告

今日签到

点亮在社区的每一天
去签到