Golang实现分布式Masscan任务调度系统

发布于:2025-06-13 ⋅ 阅读:(12) ⋅ 点赞:(0)

1. 系统架构概述

1.1 核心组件

1.任务生成器(Task Generator)

  • 功能:生成 Masscan 扫描任务,如指定目标 IP 范围、端口、扫描参数等。
  • 输出:将任务发送到 Kafka 的任务队列(Topic)。

2.任务调度器(Task Scheduler)

  • 功能:从 Kafka 任务队列中消费任务,并将任务分配给可用的 Worker 节点。
  • 实现:可以使用 Kafka Consumer 消费任务,并使用 ZooKeeper 或其他服务发现机制来管理 Worker 节点。

3.Worker 节点(Worker Nodes)

  • 功能:执行 Masscan 扫描任务。
  • 实现:每个 Worker 节点作为一个独立的进程或服务,从 Kafka 接收任务,执行 Masscan 扫描,并将结果发送回 Kafka 的结果队列。

4.结果处理器(Result Processor)

  • 功能:从 Kafka 的结果队列中消费扫描结果,进行处理、分析或存储。
  • 实现:可以使用 Kafka Consumer 消费结果,并将其存储到数据库或进行实时分析。

5.数据库(Database)

  • 功能:存储扫描任务和结果。
  • 选择:如 PostgreSQL、MongoDB、Elasticsearch 等。

6.监控与日志(Monitoring & Logging)

  • 功能:监控系统的运行状态,记录日志以便故障排查。
  • 实现:使用 Prometheus、Grafana、ELK(Elasticsearch, Logstash, Kibana)等工具。

    1.2 工作流程

    1.任务生成:任务生成器生成 Masscan 扫描任务,并将其发送到 Kafka 的任务队列。

    2.任务调度:任务调度器从 Kafka 任务队列中消费任务,并将任务分配给可用的 Worker 节点。

    3.任务执行:Worker 节点接收任务,执行 Masscan 扫描。

    4.结果处理:Worker 节点将扫描结果发送回 Kafka 的结果队列。

    5.结果存储与分析:结果处理器从 Kafka 结果队列中消费结果,并将其存储到数据库或进行实时分析。

      2. 关键组件实现

      2.1 任务生成器

      任务生成器负责生成 Masscan 扫描任务,并将其发送到 Kafka 的任务队列。

      go
      
      package main
      
      import (
          "context"
          "fmt"
          "github.com/segmentio/kafka-go"
          "time"
      )
      
      func main() {
          // 配置 Kafka 连接
          writer := kafka.NewWriter(kafka.WriterConfig{
              Brokers:  []string{"kafka-broker1:9092", "kafka-broker2:9092"},
              Topic:    "masscan_tasks",
             Balancer: &kafka.LeastBytes{},
          })
      
          // 生成任务
          tasks := generateTasks()
      
          // 发送任务到 Kafka
          for _, task := range tasks {
              msg := kafka.Message{
                  Value: []byte(task),
              }
              err := writer.WriteMessages(context.Background(), msg)
              if err != nil {
                  fmt.Println("Error sending message:", err)
              }
          }
      
          // 关闭连接
          writer.Close()
      }
      
      func generateTasks() []string {
          // 示例:生成简单的 Masscan 命令
          tasks := []string{
              `{"command": "masscan 192.168.1.0/24 -p80,443 --rate=1000"}`,
              `{"command": "masscan 10.0.0.0/16 -p22,8080 --rate=500"}`,
          }
          return tasks
      }
      

      2.2 任务调度器

      任务调度器从 Kafka 任务队列中消费任务,并将任务分配给 Worker 节点。

      go
      
      package main
      
      import (
          "context"
          "fmt"
          "github.com/segmentio/kafka-go"
          "time"
      )
      
      func main() {
          // 配置 Kafka 连接
          reader := kafka.NewReader(kafka.ReaderConfig{
              Brokers:  []string{"kafka-broker1:9092", "kafka-broker2:9092"},
              Topic:    "masscan_tasks",
              GroupID:  "masscan_scheduler",
              MinBytes: 1,    // 1B
              MaxBytes: 10e6, // 10MB
          })
      
          // 消费任务
          for {
              msg, err := reader.ReadMessage(context.Background())
              if err != nil {
                  fmt.Println("Error reading message:", err)
                  continue
              }
      
              // 解析任务
              var task Task
              err = json.Unmarshal(msg.Value, &task)
              if err != nil {
                  fmt.Println("Error parsing message:", err)
                  continue
              }
      
              // 分配任务给 Worker(这里简单打印,实际应发送到 Worker 的 Kafka 队列)
              fmt.Printf("Received task: %s\n", task.Command)
              // TODO: 实现任务分配逻辑
          }
      
          reader.Close()
      }
      
      type Task struct {
          Command string `json:"command"`
      }
      

      2.3 Worker 节点

      Worker 节点从 Kafka 接收任务,执行 Masscan 扫描,并将结果发送回 Kafka。

      go
      
      package main
      
      import (
          "context"
          "encoding/json"
          "fmt"
          "github.com/segmentio/kafka-go"
          "os/exec"
          "time"
      )
      
      type Task struct {
          Command string `json:"command"`
      }
      
      type Result struct {
          TaskID     string `json:"task_id"`
          Output     string `json:"output"`
          Error      string `json:"error"`
          Timestamp  time.Time `json:"timestamp"`
      }
      
      func main() {
          // 配置 Kafka 连接
          reader := kafka.NewReader(kafka.ReaderConfig{
              Brokers:  []string{"kafka-broker1:9092", "kafka-broker2:9092"},
              Topic:    "masscan_tasks",
              GroupID:  "masscan_worker",
              MinBytes: 1,    // 1B
              MaxBytes: 10e6, // 10MB
          })
      
          writer := kafka.NewWriter(kafka.WriterConfig{
              Brokers:  []string{"kafka-broker1:9092", "kafka-broker2:9092"},
              Topic:    "masscan_results",
             Balancer: &kafka.LeastBytes{},
          })
      
          for {
              msg, err := reader.ReadMessage(context.Background())
              if err != nil {
                  fmt.Println("Error reading message:", err)
                  continue
              }
      
              var task Task
              err = json.Unmarshal(msg.Value, &task)
              if err != nil {
                  fmt.Println("Error parsing message:", err)
                  continue
              }
      
              // 执行 Masscan 命令
              cmd := exec.Command("sh", "-c", task.Command)
              output, err := cmd.CombinedOutput()
              result := Result{
                  TaskID:    fmt.Sprintf("%d", time.Now().UnixNano()),
                  Output:    string(output),
                  Error:     "",
                  Timestamp: time.Now(),
              }
              if err != nil {
                  result.Error = err.Error()
              }
      
              // 发送结果到 Kafka
              resultBytes, _ := json.Marshal(result)
              err = writer.WriteMessages(context.Background(), kafka.Message{
                  Value: resultBytes,
              })
              if err != nil {
                  fmt.Println("Error sending result:", err)
              }
          }
      
          reader.Close()
          writer.Close()
      }
      

      2.4 结果处理器

      结果处理器从 Kafka 结果队列中消费结果,并将其存储到数据库或进行实时分析。

      go
      
      package main
      
      import (
          "context"
          "encoding/json"
          "fmt"
          "github.com/segmentio/kafka-go"
          "time"
      )
      
      type Result struct {
          TaskID     string    `json:"task_id"`
          Output     string    `json:"output"`
          Error      string    `json:"error"`
          Timestamp  time.Time `json:"timestamp"`
      }
      
      func main() {
          // 配置 Kafka 连接
          reader := kafka.NewReader(kafka.ReaderConfig{
              Brokers:  []string{"kafka-broker1:9092", "kafka-broker2:9092"},
              Topic:    "masscan_results",
              GroupID:  "masscan_result_processor",
              MinBytes: 1,    // 1B
              MaxBytes: 10e6, // 10MB
          })
      
          // 处理结果
          for {
              msg, err := reader.ReadMessage(context.Background())
              if err != nil {
                  fmt.Println("Error reading message:", err)
                  continue
              }
      
              var result Result
              err = json.Unmarshal(msg.Value, &result)
              if err != nil {
                  fmt.Println("Error parsing message:", err)
                  continue
              }
      
              // 处理结果,例如存储到数据库
              storeResult(result)
          }
      
          reader.Close()
      }
      
      func storeResult(result Result) {
          // 示例:打印结果,实际应存储到数据库
          fmt.Printf("Result: %+v\n", result)
      }
      

      3. 最佳实践

      3.1 使用 Kafka 消费者组

      利用 Kafka 的消费者组机制,实现任务的负载均衡和故障恢复。每个 Worker 节点作为一个消费者组成员,Kafka 会自动分配任务给各个成员。

      3.2 错误处理与重试

      • 错误处理:在 Worker 节点中实现错误处理机制,记录失败的任务,并采取相应的措施,如重试或报警。
      • 重试策略:实现合理的重试策略,避免无限重试导致资源浪费。

      3.3 监控与日志

      • 监控:使用 Prometheus、Grafana 等工具监控 Kafka 集群、Worker 节点和任务处理情况。
      • 日志:集中管理日志,使用 ELK 堆栈或其他日志管理工具,方便故障排查。

      3.4 安全性

      • 认证与授权:配置 Kafka 的认证和授权机制,确保通信安全。
      • 数据加密:使用 TLS 加密 Kafka 通信,防止数据泄露。
      • 访问控制:限制对 Kafka 主题的访问权限,防止未授权访问。

      3.5 性能优化

      • 批量处理:在发送和接收 Kafka 消息时,使用批量处理,提高吞吐量。
      • 压缩:配置 Kafka 的压缩机制,减少网络带宽消耗。
      • 分区管理:合理配置 Kafka 分区,确保负载均衡和高效的消息传递。

      3.6 可扩展性

      • 水平扩展:通过增加 Worker 节点的数量,实现系统的水平扩展。
      • 弹性伸缩:使用容器编排工具(如 Kubernetes)实现 Worker 节点的弹性伸缩,根据负载自动调整资源。

      4. 总结

      通过结合 Golang 和 Apache Kafka,可以构建一个高效、可扩展且可靠的分布式 Masscan 任务调度系统。

      Kafka 提供了强大的消息传递能力,而 Golang 则以其高性能和并发处理能力,成为实现 Worker 节点和任务调度器的理想选择。

      关键点

      • 任务调度:利用 Kafka 的发布/订阅机制,实现任务的动态分配和负载均衡。
      • Worker 节点:实现独立的 Worker 节点,处理 Masscan 扫描任务,并将结果发送回 Kafka。
      • 结果处理:通过 Kafka 结果队列,集中处理和存储扫描结果。
      • 监控与安全:实施全面的监控和安全保障措施,确保系统的稳定性和安全性。

      联系方式:https://t.me/XMOhost26

      交流技术群:https://t.me/owolai008