var m =&promMetadata{
promURL: conf.prometheus.url,// Start out with the full time range. The shipper will constrain it later.// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger,"thanos-sidecar"),}
根据是否配置了 存储决定开启upload
存储命令行参数 --objstore.config-file
confContentYaml, err := conf.objStore.Content()if err !=nil{return errors.Wrap(err,"getting object store config")}var uploads =trueiflen(confContentYaml)==0{
level.Info(logger).Log("msg","no supported bucket was configured, uploads will be disabled")
uploads =false}
funcvalidatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata)error{var(
flagErr error
flags promclient.Flags
)if err := runutil.Retry(2*time.Second, ctx.Done(),func()error{if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr !=nil&& flagErr != promclient.ErrFlagEndpointNotFound {
level.Warn(logger).Log("msg","failed to get Prometheus flags. Is Prometheus running? Retrying","err", flagErr)return errors.Wrapf(flagErr,"fetch Prometheus flags")}returnnil}); err !=nil{return errors.Wrapf(err,"fetch Prometheus flags")}if flagErr !=nil{
level.Warn(logger).Log("msg","failed to check Prometheus flags, due to potentially older Prometheus. No extra validation is done.","err", flagErr)returnnil}// Check if compaction is disabled.if flags.TSDBMinTime != flags.TSDBMaxTime {if!ignoreBlockSize {return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime)}
level.Warn(logger).Log("msg","flag to ignore Prometheus min/max block duration flags differing is being used. If the upload of a 2h block fails and a Prometheus compaction happens that block may be missing from your Thanos bucket storage.")}// Check if block time is 2h.if flags.TSDBMinTime != model.Duration(2*time.Hour){
level.Warn(logger).Log("msg","found that TSDB block time is not 2h. Only 2h block time is recommended.","block-time", flags.TSDBMinTime)}returnnil}
获取prometheus的版本信息
调用prometheus 接口 /api/v1/status/buildinfo
// We retry infinitely until we reach and fetch BuildVersion from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(),func()error{if err := m.BuildVersion(ctx); err !=nil{
level.Warn(logger).Log("msg","failed to fetch prometheus version. Is Prometheus running? Retrying","err", err,)return err
}
level.Info(logger).Log("msg","successfully loaded prometheus version",)returnnil})if err !=nil{return errors.Wrap(err,"failed to get prometheus version")}
获取prometheus配置的 external labels
调用 prometheus /api/v1/status/config 接口
// Blocking query of external labels before joining as a Source Peer into gossip.// We retry infinitely until we reach and fetch labels from our Prometheus.
err = runutil.Retry(2*time.Second, ctx.Done(),func()error{if err := m.UpdateLabels(ctx); err !=nil{
level.Warn(logger).Log("msg","failed to fetch initial external labels. Is Prometheus running? Retrying","err", err,)
promUp.Set(0)
statusProber.NotReady(err)return err
}
level.Info(logger).Log("msg","successfully loaded prometheus external labels","external_labels", m.Labels().String(),)
promUp.Set(1)
statusProber.Ready()
lastHeartbeat.SetToCurrentTime()returnnil})if err !=nil{return errors.Wrap(err,"initial external labels query")}
sidecar要求prometheus 采集器一定要配置 external label
iflen(m.Labels())==0{return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")}
// Periodically query the Prometheus config. We use this as a heartbeat as well as for updating// the external labels we apply.return runutil.Repeat(30*time.Second, ctx.Done(),func()error{
iterCtx, iterCancel := context.WithTimeout(context.Background(),5*time.Second)deferiterCancel()if err := m.UpdateLabels(iterCtx); err !=nil{
level.Warn(logger).Log("msg","heartbeat failed","err", err)
promUp.Set(0)}else{
promUp.Set(1)
lastHeartbeat.SetToCurrentTime()}returnnil})
// NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client// to talk to Prometheus.// It attaches the provided external labels to all results. Provided external labels has to be sorted.funcNewPrometheusStore(
logger log.Logger,
reg prometheus.Registerer,
client *promclient.Client,
baseURL *url.URL,
component component.StoreAPI,
externalLabelsFn func() labels.Labels,
timestamps func()(mint int64, maxt int64),
promVersion func()string,)(*PrometheusStore,error){if logger ==nil{
logger = log.NewNopLogger()}
p :=&PrometheusStore{
logger: logger,
base: baseURL,
client: client,
component: component,
externalLabelsFn: externalLabelsFn,
timestamps: timestamps,
promVersion: promVersion,
remoteReadAcceptableResponses:[]prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES},
buffers: sync.Pool{New:func()interface{}{
b :=make([]byte,0, initialBufSize)return&b
}},
framesRead: promauto.With(reg).NewHistogram(
prometheus.HistogramOpts{
Name:"prometheus_store_received_frames",
Help:"Number of frames received per streamed response.",
Buckets: prometheus.ExponentialBuckets(10,10,5),},),}return p,nil}
// Exemplars returns all specified exemplars from Prometheus.func(p *Prometheus)Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exemplars_ExemplarsServer)error{
exemplars, err := p.client.ExemplarsInGRPC(s.Context(), p.base, r.Query, r.Start, r.End)if err !=nil{return err
}// Prometheus does not add external labels, so we need to add on our own.
extLset := p.extLabels()for_, e :=range exemplars {// Make sure the returned series labels are sorted.
e.SetSeriesLabels(labelpb.ExtendSortedLabels(e.SeriesLabels.PromLabels(), extLset))var err error
tracing.DoInSpan(s.Context(),"send_exemplars_response",func(_ context.Context){
err = s.Send(&exemplarspb.ExemplarsResponse{Result:&exemplarspb.ExemplarsResponse_Data{Data: e}})})if err !=nil{return err
}}returnnil}
promReadyTimeout := conf.prometheus.readyTimeout
extLabelsCtx, cancel := context.WithTimeout(ctx, promReadyTimeout)defercancel()if err := runutil.Retry(2*time.Second, extLabelsCtx.Done(),func()error{iflen(m.Labels())==0{return errors.New("not uploading as no external labels are configured yet - is Prometheus healthy/reachable?")}returnnil}); err !=nil{return errors.Wrapf(err,"aborting as no external labels found after waiting %s", promReadyTimeout)}
用bkt创建托运人
代码注释中也说明了 shipper会持续的扫描 data目录,上传数据
// The background shipper continuously scans the data directory and uploads// new blocks to Google Cloud Storage or an S3-compatible storage service.
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc))
读取 tsdb.path 下面的 thanos.shipper.json ,读取已经上传的block id
这个文件记录了已经通过sidecar上传的 block id
meta, err :=ReadMetaFile(s.dir)if err !=nil{// If we encounter any error, proceed with an empty meta file and overwrite it later.// The meta file is only used to avoid unnecessary bucket.Exists call,// which are properly handled by the system if their occur anyway.if!os.IsNotExist(err){
level.Warn(s.logger).Log("msg","reading meta file failed, will override it","err", err)}
meta =&Meta{Version: MetaVersion1}}// Build a map of blocks we already uploaded.
hasUploaded :=make(map[ulid.ULID]struct{},len(meta.Uploaded))for_, id :=range meta.Uploaded {
hasUploaded[id]=struct{}{}}// Reset the uploaded slice so we can rebuild it only with blocks that still exist locally.
meta.Uploaded =nil
if err := s.upload(ctx, m); err !=nil{if!s.allowOutOfOrderUploads {return0, errors.Wrapf(err,"upload %v", m.ULID)}// No error returned, just log line. This is because we want other blocks to be uploaded even// though this one failed. It will be retried on second Sync iteration.
level.Error(s.logger).Log("msg","shipping failed","block", m.ULID,"err", err)
uploadErrs++continue}
upload函数
会在tsdb的data目录下创建 thanos/upload目录
然后以block文件夹的名字创建目录
再创建硬链接操作,避免上传过程中数据被tsdb其他动作占用删除等
level.Info(s.logger).Log("msg","upload new block","id", meta.ULID)// We hard-link the files into a temporary upload directory so we are not affected// by other operations happening against the TSDB directory.
updir := filepath.Join(s.dir,"thanos","upload", meta.ULID.String())// Remove updir just in case.if err := os.RemoveAll(updir); err !=nil{return errors.Wrap(err,"clean upload directory")}if err := os.MkdirAll(updir,0750); err !=nil{return errors.Wrap(err,"create upload dir")}deferfunc(){if err := os.RemoveAll(updir); err !=nil{
level.Error(s.logger).Log("msg","failed to clean upload directory","err", err)}}()
dir := filepath.Join(s.dir, meta.ULID.String())if err :=hardlinkBlock(dir, updir); err !=nil{return errors.Wrap(err,"hard link block")}// Attach current labels and write a new meta file with Thanos extensions.if lset := s.labels(); lset !=nil{
meta.Thanos.Labels = lset.Map()}
meta.Thanos.Source = s.source
meta.Thanos.SegmentFiles = block.GetSegmentFiles(updir)if err := meta.WriteToDir(s.logger, updir); err !=nil{return errors.Wrap(err,"write meta file")}return block.Upload(ctx, s.logger, s.bucket, updir, s.hashFunc)
if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload debug meta file"))}if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload chunks"))}if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err !=nil{returncleanUp(logger, bkt, id, errors.Wrap(err,"upload index"))}// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), strings.NewReader(metaEncoded.String())); err !=nil{// Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases,