go-zero中使用elasticsearch的示例

发布于:2025-03-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

安装ES+kibana

关于安装ES的方法,我之前的文章已经详细写过了,可以查看这篇文章:《Elasticsearch搜索引擎系统入门_elasticsearch 系统-CSDN博客

本次测试,我使用MacOS M1,安装的ES和kibana版本为:7.17.27

通过如下的链接直达下载地址,下载后解压:

Elasticsearch 7.17.27 | Elastic

Kibana 7.17.27 | Elastic

配置ES

设置ES密码:vim ./elasticsearch-7.17.27/config/elasticsearch.yml,添加如下内容:

http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true

先启动ES,启动命令:./elasticsearch-7.17.27/bin/elasticsearch -d #后台启动ES

等待ES启动后,然后执行如下命令:./elasticsearch-7.17.27/bin/elasticsearch-setup-passwords interactive

需要设置以下六种账户的密码elastic、apm_system、kibana、logstash_system、beats_system、remote_monitoring_user

输入y开始设置,由于是测试,我这里全部输入123456

image-20250222200938374

六种密码设置完成后,需要再次重启ES。然后浏览器访问 http://localhost:9200/ 打开ES,输入账号elastic和刚才设置的密码(我这里是123456)后:

image-20250222193001746

配置kibana

设置kibana的密码,vim ./kibana-7.17.27-darwin-aarch64/config/kibana.yml,在最后面添加如下内容:

elasticsearch.username: "elastic"
elasticsearch.password: "123456" #你在es中设置的密码

启动kibana:./kibana-7.17.27-darwin-aarch64/bin/kibana ,稍等片刻,然后在浏览器访问http://localhost:5601/app/kibana 打开kibana,输入上面的账号和密码(elastic/123456

image-20250222195231817

在go-zero中使用ES

go-zero中ES配置项

在go-zero项目的etc/mscrm-api-dev.yaml 文件中,增加如下配置项:

Elasticsearch:
  Hosts:
    - "http://localhost:9200"
  Username: "elastic"
  Password: "123456"

gozero/internal/config/config.go 中添加ES的配置:

type Config struct {
	rest.RestConf
	Mysql struct {
		DataSource string
	}
	Cache         cache.CacheConf
	Elasticsearch struct {
		Hosts    []string
		Username string
		Password string
	}
}

新建 gozero/internal/svc/es/es_config.go ,用来加载ES的配置项:

package es

import (
	"go-demo-2025/gozero/internal/config"
)

// ESConfig 定义 Elasticsearch 的配置结构体
type ESConfig struct {
	Hosts    []string
	Username string
	Password string
}

// LoadESConfig 加载 Elasticsearch 配置
func LoadESConfig(c config.Config) ESConfig {
	return ESConfig{
		Hosts:    c.Elasticsearch.Hosts,
		Username: c.Elasticsearch.Username,
		Password: c.Elasticsearch.Password,
	}
}

go-zero中ES单例客户端

安装扩展:go get -u github.com/olivere/elastic/v7

新建 gozero/internal/svc/es/es_client.go 作为ES的客户端:

package es

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
	"github.com/zeromicro/go-zero/core/logx"
	"net/http"
	"sync"
)

var (
	esClient *elastic.Client
	esOnce   sync.Once
	esErr    error
)

// GetESClient 返回 Elasticsearch 客户端单例实例
func GetESClient(ctx context.Context, cfg ESConfig) (*elastic.Client, error) {
	esOnce.Do(func() {
		// 创建一个不使用代理的 HTTP 客户端
		httpClient := &http.Client{
			Transport: &http.Transport{
				Proxy: http.ProxyURL(nil), // 禁用代理
			},
		}
		esClient, esErr = elastic.NewClient(
			elastic.SetURL(cfg.Hosts...),
			elastic.SetBasicAuth(cfg.Username, cfg.Password),
			elastic.SetSniff(false),
			elastic.SetHealthcheck(false),
			elastic.SetHttpClient(httpClient), // 使用自定义的 HTTP 客户端
		)
		if esErr != nil {
			logx.Error(fmt.Sprintf("failed to connect to Elasticsearch: %+v", esErr))
		}
	})
	// 初始化索引和文档结构
	indexName := "system_log"
	indexStruct := `
        {
            "mappings": {
                "properties": {
                    "menu_id": {"type": "long"},
                    "url": {"type": "keyword"},
                    "content": {"type": "keyword"},
                    "user_id": {"type": "long"},
                    "create_time": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
                }
            }
        }
        `
	if err := initIndex(ctx, esClient, indexName, indexStruct); err != nil {
		return nil, err
	}
	return esClient, esErr
}

// initIndex 是一个辅助函数,用于初始化索引
func initIndex(ctx context.Context, es *elastic.Client, indexName, indexStruct string) error {
	// 检查索引是否存在
	exists, err := es.IndexExists(indexName).Do(ctx)
	if err != nil {
		logx.Errorf("Failed to check index %s existence: %v", indexName, err)
		return err
	}

	if !exists {
		// 创建索引
		createIndex, err := es.CreateIndex(indexName).BodyString(indexStruct).Do(ctx)
		if err != nil {
			logx.Errorf("Failed to create index %s: %v", indexName, err)
			return err
		}
		if !createIndex.Acknowledged {
			logx.Errorf("Index %s creation not acknowledged", indexName)
			return fmt.Errorf("index %s creation not acknowledged", indexName)
		}
		logx.Infof("Index %s created", indexName)
	} else {
		logx.Infof("Index %s already exists", indexName)
	}

	return nil
}

GetESClient 加入到 ServiceContext 中:

func NewServiceContext(c config.Config) *ServiceContext {
	initCtx := context.Background()

	// 初始化 Elasticsearch 客户端单例实例
	esCfg := es.LoadESConfig(c)
	esClient, err := es.GetESClient(initCtx, esCfg)
	if err != nil {
		logx.Errorf("failed to get Elasticsearch client: %v", err)
	}

	return &ServiceContext{
		//....
		ES:     esClient,
	}
}

go-zero中ES方法实现

接下来,模拟一个常见的业务,就是写入系统日志的功能,在go-zero中将系统日志写入到ES中。

新建 gozero/internal/repo/es/system_log/system_log.go 文件,这个文件中主要实现ES索引的定义和写入数据、以及查询数据的逻辑。核心代码如下:

// 定义ES索引名称
const indexName = "es_system_log_data"

// ES存储的数据结构
type SystemLogEsData struct {
	MenuId     int64  `json:"menu_id"`
	Url        string `json:"url"`
	Content    string `json:"content"`
	UserId     int64  `json:"user_id"`
	CreateTime string `json:"create_time"`
}

// 查询ES的请求参数
type SystemLogEsSearchParams struct {
	SystemLogEsData
	CreateTimeStart string `json:"create_time_start"`
	CreateTimeEnd   string `json:"create_time_end"`
	Page            int    `json:"page"`
	PageSize        int    `json:"page_size"`
}

// 添加ES数据
func (r *SystemLogESRepository) InsertEsData(doc *SystemLogEsData) (*elastic.IndexResponse, error) {
	resp, err := r.esClient.Index().
		Index(indexName).
		BodyJson(doc).
		Do(r.ctx)
	if err != nil {
		logx.WithContext(r.ctx).Errorf("insert error:%+v", err.Error())
		return nil, err
	}
	fmt.Println("resp:", resp)
	return resp, nil
}

// 查询ES数据
func (r *SystemLogESRepository) SelectEsData(req *SystemLogEsSearchParams) ([]SystemLogEsData, int64, error) {
	boolQuery := elastic.NewBoolQuery()

	// 精确匹配条件
	if req.MenuId > 0 {
		boolQuery = boolQuery.Must(elastic.NewTermQuery("menu_id", req.MenuId))
	}
	if req.UserId > 0 {
		boolQuery = boolQuery.Must(elastic.NewTermQuery("user_id", req.UserId))
	}

	// 模糊搜索条件
	if req.Url != "" {
		boolQuery = boolQuery.Must(elastic.NewMatchQuery("url", req.Url))
	}
	if req.Content != "" {
		boolQuery = boolQuery.Must(elastic.NewMatchQuery("content", req.Content))
	}

	// 时间范围搜索条件
	if req.CreateTimeStart != "" && req.CreateTimeEnd != "" {
		loc, err := time.LoadLocation("Asia/Shanghai")
		// 将字符串格式的时间转换为时间戳
		startTime, err := time.ParseInLocation("2006-01-02 15:04:05", req.CreateTimeStart, loc)
		if err != nil {
			logx.WithContext(r.ctx).Errorf("Failed to parse start time: %v", err)
			return nil, 0, err
		}
		endTime, err := time.ParseInLocation("2006-01-02 15:04:05", req.CreateTimeEnd, loc)
		if err != nil {
			logx.WithContext(r.ctx).Errorf("Failed to parse end time: %v", err)
			return nil, 0, err
		}
		// 使用时间戳进行范围查询
		boolQuery = boolQuery.Must(elastic.NewRangeQuery("create_time").Gte(startTime.Unix()).Lte(endTime.Unix()))
	}

	// 构建搜索请求
	searchResult, err := r.esClient.Search().
		Index(indexName).
		Query(boolQuery).
		From(int((req.Page-1)*req.PageSize)).
		Size(int(req.PageSize)).
		Sort("create_time", false). // true-正序,false-倒序
		//Sort("id", false). // 根据 id 倒序排序
		Do(r.ctx)
	if err != nil {
		logx.WithContext(r.ctx).Errorf("Failed to search Elasticsearch: %v", err)
		return nil, 0, err
	}

	esDataList := make([]SystemLogEsData, 0)
	for _, hit := range searchResult.Hits.Hits {
		var operateRecord SystemLogEsData
		// 使用 json.Unmarshal 手动解析 Elasticsearch 返回的 JSON 数据到 ChannelListDocument 结构体
		err := json.Unmarshal(hit.Source, &operateRecord)
		if err != nil {
			logx.WithContext(r.ctx).Errorf("Failed to unmarshal hit to Document: %v, Hit: %s", err, hit.Source)
			continue
		}
		esDataList = append(esDataList, operateRecord)
	}

	if len(esDataList) == 0 {
		logx.WithContext(r.ctx).Infof("No items converted to Document, total hits: %d", searchResult.TotalHits())
	}

	if len(esDataList) == 0 {
		logx.WithContext(r.ctx).Infof("No items converted to Document, total hits: %d", searchResult.TotalHits())
	}

	return esDataList, searchResult.TotalHits(), nil
}

go-zero中ES方法测试

接下来,写一个测试方法,调试一下以上逻辑是否正确。新建 gozero/internal/repo/es/system_log/system_log_test.go 文件,写入两个测试用例。

先来测试写入数据到ES的方法:

func TestInsertDataToEs(t *testing.T) {
	var c config.Config
	configFile := "../../../../etc/gozero-api-dev.yaml"
	conf.MustLoad(configFile, &c)
	svcCtx := svc.NewServiceContext(c)
	ctx := context.Background()
	now := time.Now().Format("2006-01-02 15:04:05")

	// 创建模拟的输入文档
	doc := &SystemLogEsData{
		MenuId:     10,
		Url:        "http://www.xxx.com/a/b/1",
		Content:    "test",
		UserId:     2,
		CreateTime: now,
	}
	r := NewSystemLogESRepository(ctx, svcCtx)
	resp, err := r.InsertEsData(doc)
	fmt.Println("resp.Id", resp.Id)
	assert.NoError(t, err)
}

测试结果:

image-20250224135017126

再来测试从ES中查询数据的方法,代码如下:

func TestSelectDataFromEs(t *testing.T) {
	var c config.Config
	configFile := "../../../../etc/gozero-api-dev.yaml"
	conf.MustLoad(configFile, &c)
	svcCtx := svc.NewServiceContext(c)
	ctx := context.Background()

	searchParams := &SystemLogEsSearchParams{
		SystemLogEsData: SystemLogEsData{
			//MenuId: 10,
			//Url:     "http://www.xxx.com/a/b/1",
			//Content: "test",
			UserId:  2,
		},
		//CreateTimeStart: "2023-07-01 00:00:00",
		//CreateTimeEnd:   "2023-08-01 00:00:00",
		Page:     1,
		PageSize: 50,
	}
	r := NewSystemLogESRepository(ctx, svcCtx)
	result, count, err := r.SelectEsData(searchParams)
	//fmt.Println("result", result)
	for _, item := range result {
		fmt.Println("result_item", utils.StructToMap(item))
	}
	fmt.Println("count", count)
	assert.NoError(t, err)
}

测试结果:

image-20250224135102126

在kibana中查看数据

在kiban面板中,依次进入:Management / Stack Management / Index Patterns,点击右上角 Create index pattern,添加新增的index:

image-20250222210043316

然后查看Discover 中的数据:

image-20250224135201242

https://gitee.com/rxbook/go-demo-2025


网站公告

今日签到

点亮在社区的每一天
去签到