【kafka】Golang实现分布式Masscan任务调度系统

发布于:2025-06-12 ⋅ 阅读:(21) ⋅ 点赞:(0)

要求:

        输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。

        命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。

服务端程序:

  1. 从kafka消费者接收扫描任务信息
  2. 通过调用masscan启动探测任务,获取进度和结果信息,进度写入Redis,结果信息写入Kafka。
  3. 要求对启动任务、kafka、整理流程进行封装。
  4. 要求启动2个server端,通过命令行程序下发2个不同网段,可以均匀的分配到2个server上面执行完成。

测试要求:

  1. 启动两个server端程序。
  2. 通过命令行程序下发两个任务,IP不一样。
  3. 看server端程序日志,是否均匀的扫描了两个任务。

     

前置准备:

        安装docker

思路:

1. 系统架构设计

采用生产者-消费者模式:

  • 命令行客户端作为生产者,将扫描任务发布到Kafka
  • 两个服务端实例作为消费者,从Kafka获取任务并执行

2. 关键组件设计

  1. 任务表示

    • 使用JSON格式表示扫描任务,包含:
      • IP范围(单个IP或CIDR格式)
      • 端口范围
      • 扫描带宽限制
      • 任务状态
      • 进度信息
  2. Kafka设计

    • 创建一个主题(如scan-tasks
    • 使用单个分区确保任务顺序性(或根据需求设计分区策略)
    • 考虑使用消费者组实现两个服务端的负载均衡
  3. Redis设计

    • 存储任务进度信息
    • 使用Hash结构存储每个任务的进度百分比
    • 设置适当的TTL防止数据无限增长
  4. 服务端负载均衡

    • 两个服务端加入同一个Kafka消费者组
    • Kafka会自动将任务均匀分配给两个消费者

3. 执行流程

  1. 客户端流程

    • 解析命令行参数(IP范围、端口、带宽)
    • 验证输入格式
    • 创建Kafka生产者
    • 将任务发布到Kafka主题
  2. 服务端流程

    • 初始化Kafka消费者(加入消费者组)
    • 初始化Redis连接
    • 循环消费任务:
      a. 从Kafka获取任务
      b. 更新Redis中任务状态为"running"
      c. 调用masscan执行扫描:
      • 构造masscan命令行参数
      • 启动masscan进程
      • 监控进程输出和退出状态
        d. 实时解析masscan输出,更新Redis中的进度
        e. 扫描完成后:
      • 更新Redis中任务状态为"completed"
      • 将完整结果发布到另一个Kafka主题(如scan-result

4. 关键技术点

  1. Masscan集成

    • 使用exec.Command启动masscan进程
    • 实时解析masscan的标准输出和错误输出
    • 根据输出计算扫描进度
  2. 错误处理

    • 处理无效IP格式
    • 处理masscan执行失败
    • 处理Kafka/Redis连接问题
  3. 日志记录

    • 记录服务端操作日志
    • 记录任务执行状态变化
    • 记录错误信息

5. 测试验证思路

  1. 启动两个服务端实例
  2. 使用客户端提交两个不同网段的任务
  3. 观察:
    • 两个服务端的日志输出
    • 任务是否被均匀分配(一个服务端处理一个任务)
    • 扫描进度是否正确更新
    • 最终结果是否正确输出

6. 扩展考虑

  1. 任务优先级

    • 可以在任务中添加优先级字段
    • 服务端根据优先级处理任务
  2. 任务超时

    • 添加任务超时机制
    • 超时后重新分配任务
  3. 结果存储

    • 可以考虑将结果存入数据库而不仅是Kafka
  4. 水平扩展

    • 设计支持更多服务端实例的扩展方案

这个设计实现了基本的分布式扫描任务调度系统,核心是利用Kafka的消息队列特性实现任务分发,通过消费者组机制实现负载均衡,使用Redis作为共享状态存储。

实现:

        项目结构:
        

         kafka:
         consumer   
        
package kafka

import (
	"context"
	"errors"
	"fmt"
	"github.com/IBM/sarama"
	"log"
	"sync"
)

type MessageHandler func([]byte) error

type SaramaConsumer struct {
	client    sarama.ConsumerGroup
	handlers  map[string]MessageHandler
	ready     chan bool
	ctx       context.Context
	cancel    context.CancelFunc
	consuming sync.WaitGroup
	memberId  string
	groupId   string
}

func NewKafkaConsumer(brokers []string, groupId string, topic []string) (*SaramaConsumer, error) {
	config := sarama.NewConfig()
	config.Version = sarama.V2_5_0_0                      // 使用适当的 Kafka 版本
	config.Consumer.Offse