Skip to content

Commit

Permalink
Merge branch 'master' into support_pyroscope
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Feb 8, 2025
2 parents 3aa96f1 + 3af31b2 commit 4ee9551
Show file tree
Hide file tree
Showing 210 changed files with 3,567 additions and 2,317 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pd-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ jobs:
- worker_id: 8
name: 'TSO Integration Test'
- worker_id: 9
name: 'MicroService Integration(!TSO)'
name: 'Microservice Integration(!TSO)'
- worker_id: 10
name: 'MicroService Integration(TSO)'
name: 'Microservice Integration(TSO)'
outputs:
job-total: 10
steps:
Expand Down
19 changes: 19 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ linters:
- reassign
- intrange
- gci
- goheader
linters-settings:
gocritic:
# Which checks should be disabled; can't be combined with 'enabled-checks'; default is empty
Expand Down Expand Up @@ -241,6 +242,24 @@ linters-settings:
- prefix(github.com/pingcap)
- prefix(github.com/tikv/pd)
- blank
goheader:
values:
regexp:
COPYRIGHT-HEADER: Copyright \d{4} TiKV Project Authors.
template: |-
{{ COPYRIGHT-HEADER }}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
issues:
exclude-rules:
- path: (_test\.go|pkg/mock/.*\.go|tests/.*\.go)
Expand Down
100 changes: 43 additions & 57 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -42,7 +40,6 @@ import (
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
cb "github.com/tikv/pd/client/pkg/circuitbreaker"
"github.com/tikv/pd/client/pkg/utils/tlsutil"
sd "github.com/tikv/pd/client/servicediscovery"
)
Expand Down Expand Up @@ -146,12 +143,11 @@ var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
type serviceModeKeeper struct {
// RMutex here is for the future usage that there might be multiple goroutines
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient *tso.Cli
tsoSvcDiscovery sd.ServiceDiscovery
routerClient *router.Cli
}

func (k *serviceModeKeeper) close() {
Expand Down Expand Up @@ -363,8 +359,8 @@ func newClientWithKeyspaceName(
c := &client{
callerComponent: adjustCallerComponent(callerComponent),
inner: &innerClient{
// Create a PD service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the PD service discovery for the following interactions.
// Create a service discovery with null keyspace id, then query the real id with the keyspace name,
// finally update the keyspace id to the service discovery for the following interactions.
keyspaceID: constants.NullKeyspaceID,
updateTokenConnectionCh: make(chan struct{}, 1),
ctx: clientCtx,
Expand All @@ -387,7 +383,7 @@ func newClientWithKeyspaceName(
}
c.inner.keyspaceID = keyspaceMeta.GetId()
// c.keyspaceID is the source of truth for keyspace id.
c.inner.pdSvcDiscovery.SetKeyspaceID(c.inner.keyspaceID)
c.inner.serviceDiscovery.SetKeyspaceID(c.inner.keyspaceID)
return nil
}

Expand Down Expand Up @@ -415,17 +411,17 @@ func (c *client) ResetTSOClient() {

// GetClusterID returns the ClusterID.
func (c *client) GetClusterID(context.Context) uint64 {
return c.inner.pdSvcDiscovery.GetClusterID()
return c.inner.serviceDiscovery.GetClusterID()
}

// GetLeaderURL returns the leader URL.
func (c *client) GetLeaderURL() string {
return c.inner.pdSvcDiscovery.GetServingURL()
return c.inner.serviceDiscovery.GetServingURL()
}

// GetServiceDiscovery returns the client-side service discovery object
func (c *client) GetServiceDiscovery() sd.ServiceDiscovery {
return c.inner.pdSvcDiscovery
return c.inner.serviceDiscovery
}

// UpdateOption updates the client option.
Expand All @@ -441,7 +437,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
}
case opt.EnableTSOFollowerProxy:
if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
return errors.New("[pd] tso follower proxy is only supported when PD provides TSO")
}
enable, ok := value.(bool)
if !ok {
Expand All @@ -460,12 +456,6 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.inner.option.SetTSOClientRPCConcurrency(value)
case opt.RegionMetadataCircuitBreakerSettings:
applySettingsChange, ok := value.(func(config *cb.Settings))
if !ok {
return errors.New("[pd] invalid value type for RegionMetadataCircuitBreakerSettings option, it should be pd.Settings")
}
c.inner.regionMetaCircuitBreaker.ChangeSettings(applySettingsChange)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -494,7 +484,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, context.Context) {
serviceClient := c.inner.pdSvcDiscovery.GetServiceClient()
serviceClient := c.inner.serviceDiscovery.GetServiceClient()
if serviceClient == nil || serviceClient.GetClientConn() == nil {
return nil, ctx
}
Expand Down Expand Up @@ -535,7 +525,7 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi

// GetMinTS implements the TSOClient interface.
func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
// Handle compatibility issue in case of PD/API server doesn't support GetMinTS API.
// Handle compatibility issue in case of PD doesn't support GetMinTS API.
serviceMode := c.inner.getServiceMode()
switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -579,21 +569,16 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return minTS.Physical, minTS.Logical, nil
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *router.Region {
if res.Region == nil {
return nil
}
// EnableRouterClient enables the router client.
// This is only for test currently.
func (c *client) EnableRouterClient() {
c.inner.initRouterClient()
}

r := &router.Region{
Meta: res.Region,
Leader: res.Leader,
PendingPeers: res.PendingPeers,
Buckets: res.Buckets,
}
for _, s := range res.DownPeers {
r.DownPeers = append(r.DownPeers, s.Peer)
}
return r
func (c *client) getRouterClient() *router.Cli {
c.inner.RLock()
defer c.inner.RUnlock()
return c.inner.routerClient
}

// GetRegionFromMember implements the RPCClient interface.
Expand All @@ -607,7 +592,7 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

var resp *pdpb.GetRegionResponse
for _, url := range memberURLs {
conn, err := c.inner.pdSvcDiscovery.GetOrCreateGRPCConn(url)
conn, err := c.inner.serviceDiscovery.GetOrCreateGRPCConn(url)
if err != nil {
log.Error("[pd] can't get grpc connection", zap.String("member-URL", url), errs.ZapError(err))
continue
Expand All @@ -628,11 +613,11 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs

if resp == nil {
metrics.CmdFailedDurationGetRegion.Observe(time.Since(start).Seconds())
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
errorMsg := fmt.Sprintf("[pd] can't get region info from member URLs: %+v", memberURLs)
return nil, errors.WithStack(errors.New(errorMsg))
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegion implements the RPCClient interface.
Expand All @@ -646,6 +631,10 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetRegion(ctx, key, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand All @@ -660,13 +649,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand All @@ -678,7 +661,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
if err = c.respForErr(metrics.CmdFailedDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetPrevRegion implements the RPCClient interface.
Expand All @@ -692,6 +675,10 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetPrevRegion(ctx, key, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand All @@ -706,10 +693,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
return resp, isOverloaded(err), err
})
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand All @@ -721,7 +705,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if err = c.respForErr(metrics.CmdFailedDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// GetRegionByID implements the RPCClient interface.
Expand All @@ -735,6 +719,10 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout)
defer cancel()

if routerClient := c.getRouterClient(); routerClient != nil {
return routerClient.GetRegionByID(ctx, regionID, opts...)
}

options := &opt.GetRegionOp{}
for _, opt := range opts {
opt(options)
Expand All @@ -749,10 +737,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
return resp, isOverloaded(err), err
})

resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand All @@ -764,7 +750,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if err = c.respForErr(metrics.CmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
}
return handleRegionResponse(resp), nil
return router.ConvertToRegion(resp), nil
}

// ScanRegions implements the RPCClient interface.
Expand Down Expand Up @@ -1170,7 +1156,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o

func (c *client) requestHeader() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: c.inner.pdSvcDiscovery.GetClusterID(),
ClusterId: c.inner.serviceDiscovery.GetClusterID(),
CallerId: string(caller.GetCallerID()),
CallerComponent: string(c.callerComponent),
}
Expand Down Expand Up @@ -1354,7 +1340,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
if err != nil || header.GetError() != nil {
observer.Observe(time.Since(start).Seconds())
if err != nil {
c.inner.pdSvcDiscovery.ScheduleCheckMemberChanged()
c.inner.serviceDiscovery.ScheduleCheckMemberChanged()
return errors.WithStack(err)
}
return errors.WithStack(errors.New(header.GetError().String()))
Expand Down
Loading

0 comments on commit 4ee9551

Please sign in to comment.