grpc connection to filer add sw-client-id header

This commit is contained in:
chrislu 2023-01-20 01:48:12 -08:00
parent b048659749
commit 81fdf3651b
36 changed files with 74 additions and 61 deletions

View file

@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"strconv"
@ -52,7 +53,7 @@ func main() {
}
func startGenerateMetadata() {
pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error {
pb.WithFilerClient(false, util.RandomInt32(), pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error {
for i := 0; i < *n; i++ {
name := fmt.Sprintf("file%d", i)

View file

@ -96,7 +96,7 @@ func runFilerCat(cmd *Command, args []string) bool {
writer = f
}
pb.WithFilerClient(false, filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pb.WithFilerClient(false, util.RandomInt32(), filerCat.filerAddress, filerCat.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,

View file

@ -159,6 +159,7 @@ func runCopy(cmd *Command, args []string) bool {
worker := FileCopyWorker{
options: &copy,
filerAddress: filerAddress,
signature: util.RandomInt32(),
}
if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
@ -172,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool {
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
err = pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
@ -225,6 +226,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
type FileCopyWorker struct {
options *CopyOptions
filerAddress pb.ServerAddress
signature int32
}
func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
@ -302,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
err = pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@ -368,7 +370,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0, time.Now().UnixNano()))
}
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@ -479,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := pb.WithGrpcFilerClient(false, worker.signature, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@ -569,7 +571,7 @@ var _ = filer_pb.FilerClient(&FileCopyWorker{})
func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
filerGrpcAddress := worker.filerAddress.ToGrpcAddress()
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
err = pb.WithGrpcClient(streamingMode, worker.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, false, worker.options.grpcDialOption)

View file

@ -225,7 +225,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
func (metaBackup *FilerMetaBackupOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithFilerClient(streamingMode, metaBackup.clientId, pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})

View file

@ -35,7 +35,7 @@ type RemoteGatewayOptions struct {
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}

View file

@ -26,7 +26,7 @@ type RemoteSyncOptions struct {
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}

View file

@ -304,7 +304,7 @@ func getSignaturePrefixByPath(path string) string {
func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@ -330,7 +330,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature
}
func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))

View file

@ -50,7 +50,7 @@ func (iamopt *IamOptions) startIamServer() bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)

View file

@ -163,7 +163,7 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)

View file

@ -87,7 +87,7 @@ func (wo *WebDavOption) startWebDav() bool {
var cipher bool
// connect to filer
for {
err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)

View file

@ -32,7 +32,7 @@ type FilerConf struct {
func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) {
var buf bytes.Buffer
if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := pb.WithGrpcFilerClient(false, 0, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if masterClient != nil {
return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf)
} else {

View file

@ -192,7 +192,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
}
glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithFilerClient(true, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1)
@ -228,7 +228,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
}
func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithFilerClient(false, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err

View file

@ -11,7 +11,7 @@ import (
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {

View file

@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if readErr = pb.WithFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {

View file

@ -77,7 +77,7 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
func (iam IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamIdentityFile, &buf); err != nil {
return err
}
@ -99,7 +99,7 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
if err := filer.ProtoToText(&buf, s3cfg); err != nil {
return fmt.Errorf("ProtoToText: %s", err)
}
return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = util.Retry("saveIamIdentity", func() error {
return filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile, buf.Bytes())
})
@ -112,7 +112,7 @@ func (iam IamS3ApiConfigure) PutS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfigurat
func (iam IamS3ApiConfigure) GetPolicies(policies *Policies) (err error) {
var buf bytes.Buffer
err = pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err = filer.ReadEntry(iam.masterClient, client, filer.IamConfigDirectory, filer.IamPoliciesFile, &buf); err != nil {
return err
}
@ -136,7 +136,7 @@ func (iam IamS3ApiConfigure) PutPolicies(policies *Policies) (err error) {
if b, err = json.Marshal(policies); err != nil {
return err
}
return pb.WithGrpcFilerClient(false, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithGrpcFilerClient(false, 0, iam.option.Filer, iam.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if err := filer.SaveInsideFiler(client, filer.IamConfigDirectory, filer.IamPoliciesFile, b); err != nil {
return err
}

View file

@ -22,7 +22,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile
for x := 0; x < n; x++ {
filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
err = pb.WithGrpcClient(streamingMode, wfs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress, false, wfs.option.GrpcDialOption)

View file

@ -51,7 +51,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
return
}
err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
// read filer conf first
data, err := filer.ReadInsideFiler(client, dir, name)
if err != nil {
@ -76,7 +76,7 @@ func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *
var buf bytes.Buffer
filer.ProtoToText(&buf, info)
err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
// read filer conf first
err := filer.SaveInsideFiler(client, dir, name, buf.Bytes())
if err != nil {

View file

@ -83,7 +83,7 @@ func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn)
return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
}

View file

@ -10,7 +10,7 @@ import (
func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, volumeServer.ToGrpcAddress(), false, grpcDialOption)
@ -19,7 +19,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, g
func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterServer.ToGrpcAddress(), false, grpcDialOption)

View file

@ -25,7 +25,7 @@ func FollowMetadata(filerAddress ServerAddress, grpcDialOption grpc.DialOption,
pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, untilTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, eventErrorType EventErrorType) error {
err := WithFilerClient(true, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
err := WithFilerClient(true, clientId, filerAddress, grpcDialOption, makeSubscribeMetadataFunc(clientName, clientId, clientEpoch, pathPrefix, additionalPathPrefixes, nil, &lastTsNs, untilTsNs, selfSignature, processEventFn, eventErrorType))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}

View file

@ -3,6 +3,7 @@ package pb
import (
"context"
"fmt"
"google.golang.org/grpc/metadata"
"math/rand"
"net/http"
"strconv"
@ -118,7 +119,7 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO
}
// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {
func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {
if !streamingMode {
vgc, err := getOrCreateConnection(address, waitForReady, opts...)
@ -141,7 +142,12 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address
}
return executionErr
} else {
grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...)
ctx := context.Background()
if signature != 0 {
md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)})
ctx = metadata.NewOutgoingContext(ctx, md)
}
grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
if err != nil {
return fmt.Errorf("fail to dial %s: %v", address, err)
}
@ -204,7 +210,7 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) {
}
func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, master.ToGrpcAddress(), waitForReady, grpcDialOption)
@ -212,7 +218,7 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g
}
func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpcDialOption grpc.DialOption, fn func(client volume_server_pb.VolumeServerClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, volumeServer.ToGrpcAddress(), false, grpcDialOption)
@ -220,7 +226,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc
}
func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, broker.ToGrpcAddress(), false, grpcDialOption)
@ -230,7 +236,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g
func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption)
@ -244,22 +250,22 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri
func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, false, grpcDialOption)
}
func WithFilerClient(streamingMode bool, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
func WithFilerClient(streamingMode bool, signature int32, filer ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
return WithGrpcFilerClient(streamingMode, filer, grpcDialOption, fn)
return WithGrpcFilerClient(streamingMode, signature, filer, grpcDialOption, fn)
}
func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
func WithGrpcFilerClient(streamingMode bool, signature int32, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return WithGrpcClient(streamingMode, signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption)
@ -269,7 +275,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp
func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) (err error) {
for _, filerAddress := range filerAddresses {
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
err = WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filerAddress.ToGrpcAddress(), false, grpcDialOption)

View file

@ -17,7 +17,7 @@ func GetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s
dirHash := uint32(util.HashStringToLong(dir))
readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
readErr = pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)
@ -46,7 +46,7 @@ func SetSyncOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, dir s
dirHash := uint32(util.HashStringToLong(dir))
return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(SyncKeyPrefix + "____")
util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], dirHash)

View file

@ -92,7 +92,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
}
func ReadFilerSignature(grpcDialOption grpc.DialOption, filer pb.ServerAddress) (filerSignature int32, readErr error) {
if readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if readErr = pb.WithFilerClient(false, 0, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil {
return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err)
} else {

View file

@ -135,7 +135,7 @@ var _ = filer_pb.FilerClient(&FilerSink{})
func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, false, fs.grpcDialOption)

View file

@ -33,6 +33,7 @@ type FilerSink struct {
writeChunkByFiler bool
isIncremental bool
executor *util.LimitedConcurrentExecutor
signature int32
}
func init() {
@ -54,6 +55,7 @@ func (fs *FilerSink) IsIncremental() bool {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
fs.signature = util.RandomInt32()
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),

View file

@ -28,10 +28,12 @@ type FilerSource struct {
address string
proxyByFiler bool
dataCenter string
signature int32
}
func (fs *FilerSource) Initialize(configuration util.Configuration, prefix string) error {
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
fs.signature = util.RandomInt32()
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),
@ -128,7 +130,7 @@ var _ = filer_pb.FilerClient(&FilerSource{})
func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, false, fs.grpcDialOption)

View file

@ -92,7 +92,7 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
var content []byte
err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err = pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err = filer.ReadInsideFiler(client, filer.IamConfigDirectory, filer.IamIdentityFile)
return err
})

View file

@ -28,7 +28,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
limitations: make(map[string]int64),
}
err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile)
if err != nil {
return fmt.Errorf("read S3 circuit breaker config: %v", err)

View file

@ -16,7 +16,7 @@ var _ = filer_pb.FilerClient(&S3ApiServer{})
func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)

View file

@ -49,7 +49,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res
StartTimeNs: time.Now().UnixNano(),
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs

View file

@ -161,7 +161,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r
StartTimeNs: time.Now().UnixNano(),
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs

View file

@ -317,7 +317,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ
StartTimeNs: time.Now().UnixNano(),
}
if req.TargetType == cluster.FilerType {
pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
if pingResp != nil {
resp.RemoteTimeNs = pingResp.StartTimeNs

View file

@ -133,7 +133,7 @@ var _ = filer_pb.FilerClient(&WebDavFileSystem{})
func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)

View file

@ -161,7 +161,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, filer := range filers {
for _, master := range masters {
fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(master),
TargetType: cluster.MasterType,
@ -181,7 +181,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, filer := range filers {
for _, volumeServer := range volumeServers {
fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
err := pb.WithFilerClient(false, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(volumeServer),
TargetType: cluster.VolumeServerType,
@ -224,7 +224,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
for _, sourceFiler := range filers {
for _, targetFiler := range filers {
fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
err := pb.WithFilerClient(false, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
err := pb.WithFilerClient(false, 0, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
Target: string(targetFiler),
TargetType: cluster.FilerType,

View file

@ -105,7 +105,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
if node.Rack != "" {
fmt.Fprintf(writer, " Rack: %v\n", node.Rack)
}
pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err == nil {
if resp.FilerGroup != "" {
@ -120,7 +120,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
})
}
for _, node := range filerNodes {
pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
pb.WithFilerClient(false, 0, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address)
selfSignature := filerSignatures[node]
for peer, peerSignature := range filerSignatures {

View file

@ -111,7 +111,7 @@ var _ = filer_pb.FilerClient(&CommandEnv{})
func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcFilerClient(streamingMode, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
}