扫码阅读
手机扫码阅读

数据库|从源码分析TiUP如何判断TiDB集群状态

316 2023-09-23

本期摘要

由于之前遇到过一个问题,当目标组件服务器的firewalld开启时,用tiup cluster获取该组件状态时,发现该组件状态为Down,但该组件正常运行。这让笔者好奇tiup cluster是通过何种方式判断组件状态。

于是有了本篇文章,为大家分享一下我的想法。

作者

匿名 |数据库运维工程师

神州数码云基地钛合金团队的一位神秘的DBA~

01

前言

在运维TiDB集群时,经常需要使用tiup cluster display命令去查看组件是否处于运行状态。笔者之前一直认为tiup是通过探测组件端口的方式来判断组件是否存活,类似于telnet ip port的方式。

但是,笔者曾遇到一个问题:当目标组件服务器的firewalld开启时,使用tiup cluster获取该组件的状态时,发现该组件状态为Down,但是该组件确实是正常运行的,使用telnet探测也可以得到返回信息。这让笔者感到好奇,tiup cluster到底是通过什么方式去判断一个组件是Up还是Down呢?

因此,本着好奇的态度,笔者查阅了tiup cluster关于display部分的代码,本文将详细解读在执行tiup cluster display时,tiup是如何去判断TiDB组件的状态。

02

前期检查

display 的执行代码入口在 src/tiup-1.11.0/components/cluster/command/display.go 里面,在正式执行display的功能之前,会做一些前期检查和准备

1.exist, err := tidbSpec.Exist(dopt.ClusterName)
去判断集群名称是否存在,判断集群名称是否存在的方法是通过判断对应的集群拓扑文件是否存在,
例如我的集群名是tidb-test,那就去判断/home/tidb/.tiup/storage/cluster/clusters/tidb-test/meta.yaml是否存在,如果不存在,则判断集群名是不存在的 2.获取集群的元数据信息,集群的元数据信息例如版本等信息赋值给变量metadata
metadata, err := spec.ClusterMetadata(dopt.ClusterName)
这段代码的主要逻辑,其实就是把集群拓扑文件的信息通过yaml.Unmarshal函数进行解析返回给metadata变量,例如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,通过这段代码就得到集群的这些信息 3.如果指定了--version,只会打印集群的版本信息,通过 metadata.Version 变量就得到了集群的版本信息 if showVersionOnly {
  fmt.Println(metadata.Version) return nil } 3.如果指定了--dashboard参数 ,只会打印dashboard地址 if showDashboardOnly {
  tlsCfg, err := metadata.Topology.TLSConfig(tidbSpec.Path(dopt.ClusterName, spec.TLSCertKeyDir)) if err != nil { return err
  } return cm.DisplayDashboardInfo(dopt.ClusterName, time.Second*time.Duration(gOpt.APITimeout), tlsCfg)
} 4.如果指定了--labels ,只会打印集群的label信息 if showTiKVLabels { return cm.DisplayTiKVLabels(dopt, gOpt)
} 5. 最后面才是执行查询集群状态信息的方法
例如执行tiup cluster display tidb-test return cm.Display(dopt, gOpt)

03

Display执行流程

cm.Display的代码在 src/tiup-1.11.0/pkg/cluster/manager/display.go 里面,而在这段代码中获取集群组件状态信息是通过GetClusterTopology 方法去获得的 (src/tiup-1.11.0/pkg/cluster/manager/display.go)

clusterInstInfos, err := m.GetClusterTopology(dopt, opt)

GetClusterTopology 方法的执行流程


1.获取ClusterMeta 结构体指针
metadata, err := m.meta(name)
通过m.meta 方法,metadata变量本质是一个*spec.ClusterMeta ,是ClusterMeta 这个结构体的对应的指针 type ClusterMeta struct {
  User string `yaml:"user"` // the user to run and manage cluster on remote Version string `yaml:"tidb_version"` // the version of TiDB cluster // EnableFirewall bool `yaml:"firewall"` OpsVer string `yaml:"last_ops_ver,omitempty"` // the version of ourself that updated the meta last time Topology *Specification `yaml:"topology"` }
ClusterMeta 结构体所定义的字段与集群拓扑yaml文件的格式是相对应的,比如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,其它的配置项都是从这3个大的属性下面去获得的
user: tidb
tidb_version: v6.1.2 topology:
  global:
    user: tidb
    ssh_port: 22 ssh_type: builtin
    deploy_dir: /tidb-deploy
    data_dir: /tidb-data
    os: linux
   .........
ClusterMeta 这个结构体有很多方法,例如在src/tiup-1.11.0/pkg/cluster/spec/util.go 下面定义了这个结构体的方法 func (m *ClusterMeta) GetTopology() Topology { return m.Topology
} // SetTopology implement Metadata interface. func (m *ClusterMeta) SetTopology(topo Topology) {
  tidbTopo, ok := topo.(*Specification) //tidbTopo, ok := topo.S if !ok { panic(fmt.Sprintln("wrong type: ", reflect.TypeOf(topo)))
  }
  m.Topology = tidbTopo
} // GetBaseMeta implements Metadata interface. func (m *ClusterMeta) GetBaseMeta() *BaseMeta { return &BaseMeta{
    Version: m.Version,
    User: m.User,
    OpsVer: &m.OpsVer,
  }
}
在m.meta 方法里面继而又调用了 m.specManager.Metadata这个方法,m.specManager.Metadata 里面的主要逻辑是调用yaml.Unmarshal 函数去解析集群的拓扑文件,得到集群的拓扑文件信息,这一点是非常重要的 2.接下来通过metadata 变量,就可以调用结构体ClusterMeta相应的方法,从而具体得到集群拓扑文件相应的信息
topo := metadata.GetTopology() 
通过调用ClusterMeta结构体的GetTopology方法,这个方法的返回值正好是ClusterMeta结构体里面的Topology字段,这个字段对应的就是集群拓扑文件里面的topology属性信息,当然ClusterMeta结构体里面的Topology字段是一个嵌套结构体
里面嵌套了Specification 这个结构体, Specification 这个结构体 对应的就是集群拓扑文件里面的topology属性下面的配置项
同理base := metadata.GetBaseMeta()通过调用ClusterMeta结构体的GetBaseMeta方法,这个方法通过构造函数的方式,返回的是集群拓扑文件里面的user,tidb_version和last_ops_ver,omitempty 这些信息(不过笔者对last_ops_ver,omitempty 这两项配置信息不是很了解,就不去深究了)
其实,代码写到了这里,我们大抵明白了整个display的逻辑,首先通过yaml.Unmarshal去解析整个集群的拓扑文件,从而得到集群拓扑文件里面的信息
statusTimeout := time.Duration(opt.APITimeout) * time.Second
这里定义了获取组件状态的超时时间,默认是10s,通过 --status-timeout 进行传递
在src/tiup-1.11.0/components/cluster/command/display.go里面的
cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 40, "Timeout in seconds when getting node status")
这行代码通过传递进来的参数对这个变量赋值 3.获取pd地址
masterList := topo.BaseTopo().MasterList
通过topo.BaseTopo() 这个方法通过构造函数将BaseTopo结构体进行赋值,然后通过BaseTopo结构体的MasterList字段获取到了整个集群pd 的地址,pd的地址以切片的形式返回 4.获取需要查看具体组件或者具体节点的信息
filterRoles := set.NewStringSet(opt.Roles...)
filterNodes := set.NewStringSet(opt.Nodes...)
如果这个时候diplay 的时候有-R或者-N指定了需要判断状态的具体组件或者是节点的ip:port,那么上述代码就会将组件或者节点信息存储到filterRoles或者是filterNodes中
tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))
获取tsl的信息,由于我们一般不使用tls进行通讯,这里不做这个讨论 5.查询pd组件状态
下面的这段代码,通过topo.IterInstance 方法通过函数回调获得了pd组件的状态
这段代码的核心逻辑如下 1.通过ComponentsByStartOrder(src/tiup-1.11.0/pkg/cluster/spec/spec.go) 方法获得了将tidb目前所有的组件的信息放到了1个切片返回,这个切片的类型其实是1个Component接口类型(src/tiup-1.11.0/pkg/cluster/spec/instance.go),也就是说这个切片里面的元素可以调用这个接口所定义的函数 2.构建循环,去轮询每个组件,每个组件的Instances()方法返回的也是1个接口类型的切片,最终调用接口里面的ComponentName方法去判断是否是pd组件 3.查询pd组件的状态,查询pd组件的状态最终是通过每个组件的Status方法去完成的,具体的逻辑,会在文章的最后面说明
当然这段代码还涉及到并发线程去查询组件的信息,并发线程数是 --concurrency 参数指定的 var mu sync.Mutex
  topo.IterInstance(func(ins spec.Instance) { if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster { return }
    status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...)
    mu.Lock() if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") {
      instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())
      masterActive = append(masterActive, instAddr)
    }
    masterStatus[ins.ID()] = status
    fmt.Println("masterStatus", masterStatus)
    mu.Unlock()
  }, opt.Concurrency) 6.获取除pd组件以外的其它组件的状态
下面的这段代码,是整个display 过程中去真正查询组件状态信息的代码
整体的执行逻辑如下 1.如果指定-R或者-N参数去判断某个组件或者是某个实例的地址去查询状态的时候,如果这个组件或者实例的地址不存在的时候就会直接报错退出执行 2.如果是pd组件,不需要再次查询pd的状态了,直接就可以得到pd的状态了,如果pd的地址是dashboard的地址,status变量会加上"|UI"字符串 3.查询除pd以外组件的其它组件的状态
topo.IterInstance(func(ins spec.Instance) { // apply role filter if len(filterRoles) > 0 && !filterRoles.Exist(ins.Role()) {
      fmt.Println("role not exists", ins.Role()) return } // apply node filter if len(filterNodes) > 0 && !filterNodes.Exist(ins.ID()) {
      fmt.Println("node not exists", ins.ID()) return }
    dataDir := "-" insDirs := ins.UsedDirs()
    deployDir := insDirs[0] if len(insDirs) > 1 {
      dataDir = insDirs[1]
    } var status, memory string switch ins.ComponentName() { case spec.ComponentPD:
      status = masterStatus[ins.ID()]
      instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort()) if dashboardAddr == instAddr {
        status += "|UI" } case spec.ComponentDMMaster:
      status = masterStatus[ins.ID()] default:
      status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...)
    }
......
查询组件的状态,是通过status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...) 这行代码完成的

04

查询组件状态的总体执行流程


查询组件的状态的整体逻辑为:之前在上段代码中提过,去循环每个组件定义Instances()方法,得到Instance的接口,从而可以调用Instance的接口里面的所定义的Status方法去获得各个组件的运行状态
Instance的接口的定义(src/tiup-1.11.0/pkg/cluster/spec/instance.go) type Instance interface {
  InstanceSpec
  ID() string Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error
  InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
  ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error
  PrepareStart(ctx context.Context, tlsCfg *tls.Config) error
  ComponentName() string InstanceName() string ServiceName() string ResourceControl() meta.ResourceControl
  GetHost() string GetPort() int GetSSHPort() int DeployDir() string UsedPorts() []int UsedDirs() []string Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration
  DataDir() string LogDir() string OS() string // only linux supported now Arch() string IsPatched() bool SetPatched(bool)
  setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]interface{}, paths meta.DirPaths) (map[string]interface{}, error)
} func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { return i.StatusFn(ctx, timeout, tlsCfg, pdList...)
}

05

查询Drainer等组件状态的执行流程

在目前现有的tiup代码中,各个组件状态的查询实际上有一些差别。具体来说,查询TiDB、Drainer、prometheus等组件的状态的代码逻辑和查询pd、tikv组件的代码逻辑不同。

下面以查询Drainer组件状态为例,观察查询drainer等组件状态的大体逻辑。查询drainer、tidb、prometheus等组件状态是通过调用Status方法,进而调用statusByHost函数(源代码路径为 src/tiup-1.11.0/pkg/cluster/spec/drainer.go)来确定组件是up还是down。


func statusByHost(host string, port int, path string, timeout time.Duration, tlsCfg *tls.Config) string { if timeout < time.Second { timeout = statusQueryTimeout } client := utils.NewHTTPClient(timeout, tlsCfg) //是否对集群启用 TLS。启用之后,组件之间、客户端与组件之间都必须使用生成的 TLS 证书进行连接,默认值:false scheme := "http" if tlsCfg != nil {
    scheme = "https" } if path == "" {
    path = "/" }
  url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path)
  instance_address := fmt.Sprintf("%s:%d", host, port) // body doesn't have any status section needed body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return "Down" } return "Up" }

我将tiup关于statusByHost函数的执行代码单独提取出来,并放到下面的代码中。利用这部分代码,可以直接获取某个实例的运行状态,判断其是否为up或down状态。

总体而言,statusByHost函数的执行逻辑是:

  1. 获取相关实例的url(api)(例如htt-p://172.16.1.1:8249/status)

  2. 调用http相关模块去请求该url

  3. 通过请求url返回的内容来判断实例的状态是up还是down


package main import ( "context" "crypto/tls" "fmt" "io" "net" "net/http" "net/url" "os" "time" ) type HTTPClient struct {
  client *http.Client
  header http.Header
} // NewHTTPClient returns a new HTTP client with timeout and HTTPS support func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient { if timeout < time.Second { timeout = 10 * time.Second // default timeout is 10s }
  tr := &http.Transport{
    TLSClientConfig: tlsConfig,
    Dial: (&net.Dialer{Timeout: 3 * time.Second}).Dial,
  } // prefer to use the inner http proxy httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY") if len(httpProxy) == 0 {
    httpProxy = os.Getenv("HTTP_PROXY")
  } if len(httpProxy) > 0 { if proxyURL, err := url.Parse(httpProxy); err == nil {
      tr.Proxy = http.ProxyURL(proxyURL)
    }
  } return &HTTPClient{
    client: &http.Client{
      Timeout: timeout,
      Transport: tr,
    },
  }
} // GetWithStatusCode fetch a URL with GET method and returns the response, also the status code. func (c *HTTPClient) GetWithStatusCode(ctx context.Context, url string) ([]byte, int, error) { var statusCode int req, err := http.NewRequest("GET", url, nil) ////发送GET请求 ////url:请求地址 //req 返回请求url的返回内容,是1个结构体类型 if err != nil { return nil, statusCode, err
  }
  req.Header = c.header if ctx != nil {
    req = req.WithContext(ctx)
  }
  res, err := c.client.Do(req) if err != nil {
    fmt.Printf("url:%v, statusCode is:%v,err is:%v \n", url, statusCode, err) return nil, statusCode, err //如果实例状态获取异常,比如实例关闭,那么就会将错误和statusCode 返回 } defer res.Body.Close()
  fmt.Println("the res.StatusCode is", res.StatusCode) //the res.StatusCode is 200 data, err := checkHTTPResponse(res) return data, res.StatusCode, err //如果实例状态正常,res.StatusCode 为200 } // checkHTTPResponse checks if an HTTP response is with normal status codes func checkHTTPResponse(res *http.Response) ([]byte, error) {
  body, err := io.ReadAll(res.Body) if err != nil { return nil, err
  } if res.StatusCode < 200 || res.StatusCode >= 400 { return body, fmt.Errorf("error requesting %s, response: %s, code %d",
      res.Request.URL, string(body), res.StatusCode)
  } return body, nil } func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) {
  data, _, err := c.GetWithStatusCode(ctx, url) return data, err
} func GetInstanceStatus(url string) string {
  client := NewHTTPClient(10, nil) //通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return "Down" } return "Up" } //0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client //1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态 //2.get 函数调用GetWithStatusCode函数 查看url 地址的状态 //3.GetWithStatusCode 函数又调用checkHTTPResponse 函数, //总结 如果GetWithStatusCode 返回错误信息或者http 请求返回的内容为空,则判断实例状态为Down func main() {
  url := "http://172.16.1.1:8249/status" status := GetInstanceStatus(url) //GetInstanceStatus(url) fmt.Println(status)
}

statusByHost函数的大概执行过程是:通过访问组件自身暴露出来的url(或称api),使用http协议获取该url的响应内容,然后通过该响应内容来判断组件的状态是Up还是Down。因此,判断一个组件的状态并不是通过探测端口的方式实现的,这一点以前我并没有想到。


//0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client //1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态 //2.get 函数调用GetWithStatusCode函数 查看url 地址的状态 //3.GetWithStatusCode 函数又调用checkHTTPResponse 函数, //总结 如果GetWithStatusCode 返回错误或者http 请求返回的内容为空,则判断实例状态为Down

06

查询Pd组件状态的执行流程

查询Pd组件状态的执行流程查询pd组件状态的代码主要集中在 src/tiup-1.11.0/pkg/cluster/spec/pd.go里面的Statu-s方法


func (s *PDSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string { if timeout < time.Second { timeout = statusQueryTimeout } addr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort)
  pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg) // check health err := pc.CheckHealth() if err != nil { return "Down" } // find leader node leader, err := pc.GetLeader() if err != nil { return "ERR" }
  res := "Up" if s.Name == leader.Name {
    res += "|L" } return res
}

这段代码的主要逻辑如下

  1. 生成1个pd状态的url(或者叫做api),例如 http://172.168.1.3:2379/pd/ping

  2. 执行 CheckHealth()方法调用http相关模块的get请求去访问这个pd状态的url,如果请求出错,判断这个pd实例的状态为down

  3. 生成pd的leader状态的url(或者叫做api),例如 http://172.168.1.3:2379/pd/api/v1/leader,执行 Get-Leader()方法调用http相关模块的get请求去访问这个url,通过访问这个url会返回pd 组件的leader的地址(比如pd leader是172.168.1.4:2379,但是通过访问 172.168.1.3:2379/pd/api/v1/leader 也可以将le-ader的地址也返回,比如可以执行linux命令curl http://172.168.1.3:2379/pd/api/v1/leader 看看返回的信息是怎样的)

  4. 如果本次的pd地址刚好和通过访问leader url返回信息中的leader地址是同1个,则说明本次的pd地址就是leader,则返回"Up|L",否则就返回"Up"

07

查询tkv组件状态的执行流程

https://docs.pingcap.com/zh/tidb/stable/tidb-scheduling#信息收集

在tidb所有组件中,tikv的组件状态是最复杂的,有up、Disconnect、Offline、Tombstone 等各种状态,那么这些状态的逻辑是怎么获得的?

tikv获取状态的逻辑主要集中在 src/tiup-1.11.0/pkg/cluster/spec/tikv.go里面的checkStoreStatus和Status方法


func checkStoreStatus(ctx context.Context, storeAddr string, tlsCfg *tls.Config, pdList ...string) string { if len(pdList) < 1 { return "N/A" }
  pdapi := api.NewPDClient(ctx, pdList, statusQueryTimeout, tlsCfg)
  store, err := pdapi.GetCurrentStore(storeAddr) if err != nil { if errors.Is(err, api.ErrNoStore) { return "N/A" } return "Down" } return store.Store.StateName
}

总体执行逻辑为:

  1. 通过http的模块去访问pd的url,例如 http://172.168.1.3:2379/pd/api/v1/stores?stat-e=0&state=1&state=2
  2. 去循环这个结构体里面的信息,去判断如果某个tikv实例的状态不是Tombstone状态,则终止循环,并且判断这个如果这个实例的状态值是不是Pending Offline状态,如果不是,则直接返回这个tikv实例状态值(如果tikv的状态值是Up、Disconnected、Down 走的就是这个逻辑)

  3. 如果通过循环storesInfo结构体得到某个tikv的实例是Tombstone状态,那么并不会马上结束循环,而是继续去循环,直到找到最大的storeid,然后才返回这个这个tikv store的状态,至于为什么这么涉及,通过查看代码注释,如果pd发生切换, store ID 可能存在重复现象,在这里,笔者就只解读代码的执行逻辑,如果对这个逻辑感兴趣可以查看这个issue(https://git-hub.co-m/tikv/pd/issues/3303 )和具体的代码逻辑 (src/tiup-1.11.0/pkg/clust-er/api/pdapi.go GetCurrentStore方法)

  4. 如果某个tikv在集群拓扑文件有offline为true的标识,并且返回的状态是offline,那么就判断这个tikv的状态是Pending Offline


            ssh_port: 22 port: 20161 status_port: 20181 deploy_dir: /tidb-deploy/tikv-20161 data_dir: /tidb-data/tikv-20161 log_dir: /tidb-deploy/tikv-20161/log
    offline: true



08

结论


通过学习tiup cluster display代码,我对如何判断组件状态有了一个较为清晰的了解。


总的来说,tiup是通过使用http请求访问各个组件暴露出来的接口url来获取组件的运行状态(tikv的状态信息是通过pd的api获取的)。


当然,如果操作系统的防火墙打开,也会导致http请求失败,从而判断该组件状态为Down。






Hello~

这里是神州数码云基地
编程大法,技术前沿,尽在其中
超多原创技术干货持续输出ing~

原文链接: http://mp.weixin.qq.com/s?__biz=Mzg5MzUyOTgwMQ==&mid=2247523446&idx=1&sn=cc75d0a37749d07c026f7437f2ec3478&chksm=c02fa7d0f7582ec657b69115a82e7d28c537738b43cfa6a5271098bc320a578b1eb5eca49dcd#rd