Go语言管道Channel通信教程

发布于:2025-07-26 ⋅ 阅读:(19) ⋅ 点赞:(0)

Go语言管道Channel通信教程

目录

  1. Channel基础概念
  2. Channel类型与创建
  3. Channel操作详解
  4. Select语句
  5. Channel通信模式
  6. 高级Channel技巧
  7. 实战案例

Channel基础概念

什么是Channel?

Channel是Go语言中用于goroutine之间通信的管道。它体现了Go的并发哲学:“不要通过共享内存来通信,而要通过通信来共享内存”

Channel的特性

  • 类型安全:每个channel只能传输特定类型的数据
  • 同步机制:提供goroutine之间的同步
  • 方向性:可以限制channel的读写方向
  • 缓冲控制:支持无缓冲和有缓冲两种模式

Channel类型与创建

无缓冲Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    // 创建无缓冲channel
    ch := make(chan string)
    
    // 启动发送者goroutine
    go func() {
        time.Sleep(1 * time.Second)
        ch <- "Hello, Channel!"
        fmt.Println("Message sent")
    }()
    
    // 主goroutine接收消息
    fmt.Println("Waiting for message...")
    message := <-ch
    fmt.Println("Received:", message)
}

有缓冲Channel

package main

import (
    "fmt"
    "time"
)

func main() {
    // 创建缓冲大小为3的channel
    ch := make(chan int, 3)
    
    // 发送数据(不会阻塞,因为有缓冲)
    ch <- 1
    ch <- 2
    ch <- 3
    
    fmt.Printf("Channel length: %d, capacity: %d\n", len(ch), cap(ch))
    
    // 接收数据
    for i := 0; i < 3; i++ {
        value := <-ch
        fmt.Printf("Received: %d\n", value)
    }
}

方向性Channel

package main

import "fmt"

// 只能发送的channel
func sender(ch chan<- string) {
    ch <- "Hello from sender"
    close(ch)
}

// 只能接收的channel
func receiver(ch <-chan string) {
    for message := range ch {
        fmt.Println("Received:", message)
    }
}

func main() {
    ch := make(chan string)
    
    go sender(ch)
    receiver(ch)
}

Channel操作详解

发送和接收

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int, 2)
    
    // 发送操作
    ch <- 42
    ch <- 100
    
    // 接收操作
    value1 := <-ch
    value2 := <-ch
    
    fmt.Printf("Received: %d, %d\n", value1, value2)
    
    // 带ok的接收操作
    ch <- 200
    close(ch)
    
    value3, ok := <-ch
    fmt.Printf("Received: %d, ok: %t\n", value3, ok)
    
    value4, ok := <-ch
    fmt.Printf("Received: %d, ok: %t\n", value4, ok) // ok为false,channel已关闭
}

关闭Channel

package main

import "fmt"

func producer(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
        fmt.Printf("Sent: %d\n", i)
    }
    close(ch) // 关闭channel表示不再发送数据
}

func consumer(ch <-chan int) {
    // 使用range遍历channel,直到channel关闭
    for value := range ch {
        fmt.Printf("Received: %d\n", value)
    }
    fmt.Println("Channel closed, consumer finished")
}

func main() {
    ch := make(chan int, 2)
    
    go producer(ch)
    consumer(ch)
}

Select语句

基本Select用法

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "Message from ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "Message from ch2"
    }()
    
    // select语句等待多个channel操作
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received from ch1:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received from ch2:", msg2)
        }
    }
}

带超时的Select

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(3 * time.Second)
        ch <- "Delayed message"
    }()
    
    select {
    case msg := <-ch:
        fmt.Println("Received:", msg)
    case <-time.After(2 * time.Second):
        fmt.Println("Timeout: no message received within 2 seconds")
    }
}

非阻塞Select

package main

import "fmt"

func main() {
    ch := make(chan int, 1)
    
    // 非阻塞发送
    select {
    case ch <- 42:
        fmt.Println("Sent 42")
    default:
        fmt.Println("Channel is full, cannot send")
    }
    
    // 非阻塞接收
    select {
    case value := <-ch:
        fmt.Printf("Received: %d\n", value)
    default:
        fmt.Println("No value available")
    }
    
    // 再次尝试非阻塞接收
    select {
    case value := <-ch:
        fmt.Printf("Received: %d\n", value)
    default:
        fmt.Println("No value available")
    }
}

Channel通信模式

生产者-消费者模式

package main

import (
    "fmt"
    "sync"
    "time"
)

type Task struct {
    ID   int
    Data string
}

func producer(tasks chan<- Task, wg *sync.WaitGroup) {
    defer wg.Done()
    defer close(tasks)
    
    for i := 1; i <= 10; i++ {
        task := Task{
            ID:   i,
            Data: fmt.Sprintf("Task-%d", i),
        }
        tasks <- task
        fmt.Printf("Produced: %s\n", task.Data)
        time.Sleep(100 * time.Millisecond)
    }
}

func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for task := range tasks {
        fmt.Printf("Consumer %d processing: %s\n", id, task.Data)
        time.Sleep(200 * time.Millisecond) // 模拟处理时间
        fmt.Printf("Consumer %d finished: %s\n", id, task.Data)
    }
}

func main() {
    tasks := make(chan Task, 5) // 缓冲channel
    var wg sync.WaitGroup
    
    // 启动生产者
    wg.Add(1)
    go producer(tasks, &wg)
    
    // 启动多个消费者
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go consumer(i, tasks, &wg)
    }
    
    wg.Wait()
    fmt.Println("All tasks completed")
}

管道模式

package main

import "fmt"

// 第一阶段:生成数字
func generate(nums chan<- int) {
    for i := 1; i <= 10; i++ {
        nums <- i
    }
    close(nums)
}

// 第二阶段:计算平方
func square(nums <-chan int, squares chan<- int) {
    for num := range nums {
        squares <- num * num
    }
    close(squares)
}

// 第三阶段:过滤偶数
func filter(squares <-chan int, evens chan<- int) {
    for square := range squares {
        if square%2 == 0 {
            evens <- square
        }
    }
    close(evens)
}

func main() {
    nums := make(chan int)
    squares := make(chan int)
    evens := make(chan int)
    
    // 启动管道的各个阶段
    go generate(nums)
    go square(nums, squares)
    go filter(squares, evens)
    
    // 输出最终结果
    fmt.Println("Even squares:")
    for even := range evens {
        fmt.Println(even)
    }
}

扇入模式

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, output chan<- string) {
    for i := 1; i <= 3; i++ {
        message := fmt.Sprintf("Worker %d - Message %d", id, i)
        output <- message
        time.Sleep(time.Second)
    }
    close(output)
}

func fanIn(inputs ...<-chan string) <-chan string {
    output := make(chan string)
    var wg sync.WaitGroup
    
    // 为每个输入channel启动一个goroutine
    for _, input := range inputs {
        wg.Add(1)
        go func(ch <-chan string) {
            defer wg.Done()
            for message := range ch {
                output <- message
            }
        }(input)
    }
    
    // 等待所有输入完成后关闭输出channel
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

func main() {
    // 创建多个worker的输出channel
    ch1 := make(chan string)
    ch2 := make(chan string)
    ch3 := make(chan string)
    
    // 启动workers
    go worker(1, ch1)
    go worker(2, ch2)
    go worker(3, ch3)
    
    // 扇入所有worker的输出
    merged := fanIn(ch1, ch2, ch3)
    
    // 接收合并后的消息
    for message := range merged {
        fmt.Println("Received:", message)
    }
}

高级Channel技巧

Channel的Channel

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan chan string) {
    for job := range jobs {
        result := fmt.Sprintf("Worker %d processed job", id)
        job <- result
        close(job)
    }
}

func main() {
    jobs := make(chan chan string, 3)
    
    // 启动workers
    for i := 1; i <= 2; i++ {
        go worker(i, jobs)
    }
    
    // 发送任务
    for i := 1; i <= 5; i++ {
        resultCh := make(chan string, 1)
        jobs <- resultCh
        
        // 等待结果
        result := <-resultCh
        fmt.Printf("Job %d result: %s\n", i, result)
    }
    
    close(jobs)
}

信号量模式

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(capacity int) Semaphore {
    return make(Semaphore, capacity)
}

func (s Semaphore) Acquire() {
    s <- struct{}{}
}

func (s Semaphore) Release() {
    <-s
}

func worker(id int, sem Semaphore, wg *sync.WaitGroup) {
    defer wg.Done()
    
    sem.Acquire() // 获取信号量
    defer sem.Release() // 释放信号量
    
    fmt.Printf("Worker %d started\n", id)
    time.Sleep(2 * time.Second) // 模拟工作
    fmt.Printf("Worker %d finished\n", id)
}

func main() {
    const maxConcurrent = 3
    sem := NewSemaphore(maxConcurrent)
    var wg sync.WaitGroup
    
    // 启动10个worker,但最多只有3个同时运行
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go worker(i, sem, &wg)
    }
    
    wg.Wait()
    fmt.Println("All workers completed")
}

实战案例

并发Web爬虫

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

type CrawlResult struct {
    URL        string
    StatusCode int
    Error      error
    Duration   time.Duration
}

type Crawler struct {
    maxConcurrent int
    semaphore     chan struct{}
}

func NewCrawler(maxConcurrent int) *Crawler {
    return &Crawler{
        maxConcurrent: maxConcurrent,
        semaphore:     make(chan struct{}, maxConcurrent),
    }
}

func (c *Crawler) crawlURL(url string, results chan<- CrawlResult, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 获取信号量
    c.semaphore <- struct{}{}
    defer func() { <-c.semaphore }()
    
    start := time.Now()
    resp, err := http.Get(url)
    duration := time.Since(start)
    
    result := CrawlResult{
        URL:      url,
        Duration: duration,
        Error:    err,
    }
    
    if err == nil {
        result.StatusCode = resp.StatusCode
        resp.Body.Close()
    }
    
    results <- result
}

func (c *Crawler) Crawl(urls []string) <-chan CrawlResult {
    results := make(chan CrawlResult, len(urls))
    var wg sync.WaitGroup
    
    for _, url := range urls {
        wg.Add(1)
        go c.crawlURL(url, results, &wg)
    }
    
    go func() {
        wg.Wait()
        close(results)
    }()
    
    return results
}

func main() {
    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.stackoverflow.com",
        "https://www.golang.org",
        "https://www.reddit.com",
    }
    
    crawler := NewCrawler(3) // 最多3个并发请求
    results := crawler.Crawl(urls)
    
    fmt.Println("Crawling results:")
    for result := range results {
        if result.Error != nil {
            fmt.Printf("❌ %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("✅ %s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)
        }
    }
}

实时数据处理管道

package main

import (
    "fmt"
    "math/rand"
    "time"
)

type DataPoint struct {
    ID        int
    Value     float64
    Timestamp time.Time
}

type ProcessedData struct {
    DataPoint
    Processed bool
    Result    float64
}

// 数据生成器
func dataGenerator(output chan<- DataPoint) {
    defer close(output)
    
    for i := 1; i <= 20; i++ {
        data := DataPoint{
            ID:        i,
            Value:     rand.Float64() * 100,
            Timestamp: time.Now(),
        }
        output <- data
        time.Sleep(100 * time.Millisecond)
    }
}

// 数据处理器
func dataProcessor(input <-chan DataPoint, output chan<- ProcessedData) {
    defer close(output)
    
    for data := range input {
        // 模拟数据处理
        time.Sleep(50 * time.Millisecond)
        
        processed := ProcessedData{
            DataPoint: data,
            Processed: true,
            Result:    data.Value * 2, // 简单的处理逻辑
        }
        output <- processed
    }
}

// 数据过滤器
func dataFilter(input <-chan ProcessedData, output chan<- ProcessedData) {
    defer close(output)
    
    for data := range input {
        // 只传递结果大于100的数据
        if data.Result > 100 {
            output <- data
        }
    }
}

func main() {
    // 创建管道
    rawData := make(chan DataPoint, 5)
    processedData := make(chan ProcessedData, 5)
    filteredData := make(chan ProcessedData, 5)
    
    // 启动管道各阶段
    go dataGenerator(rawData)
    go dataProcessor(rawData, processedData)
    go dataFilter(processedData, filteredData)
    
    // 输出最终结果
    fmt.Println("Filtered results (Result > 100):")
    for data := range filteredData {
        fmt.Printf("ID: %d, Original: %.2f, Result: %.2f, Time: %s\n",
            data.ID, data.Value, data.Result, data.Timestamp.Format("15:04:05"))
    }
}

总结

Channel是Go语言并发编程的核心工具,提供了优雅的goroutine间通信方式:

关键概念

  • 无缓冲vs有缓冲:控制同步行为
  • 方向性:限制channel的使用方式
  • Select语句:处理多个channel操作
  • 关闭channel:信号传递机制

常用模式

  • 生产者-消费者:解耦数据生产和消费
  • 管道:数据流式处理
  • 扇入扇出:并发处理和结果聚合
  • 信号量:控制并发数量

最佳实践

  1. 发送者负责关闭channel
  2. 使用range遍历channel
  3. 利用select实现超时和非阻塞操作
  4. 合理设置缓冲大小
  5. 避免channel泄漏

掌握Channel的使用是成为Go并发编程专家的必经之路。记住:通过通信来共享内存,而不是通过共享内存来通信

参考资源


网站公告

今日签到

点亮在社区的每一天
去签到