diff --git a/docker/compose/local-sync-mount-compose.yml b/docker/compose/local-sync-mount-compose.yml index fec866698..0ce1fdeda 100644 --- a/docker/compose/local-sync-mount-compose.yml +++ b/docker/compose/local-sync-mount-compose.yml @@ -3,19 +3,54 @@ services: node1: image: chrislusf/seaweedfs:local command: "server -master -volume -filer" + ports: + - 8888:8888 + - 18888:18888 + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ] + interval: 1s + start_period: 10s + timeout: 30s mount1: image: chrislusf/seaweedfs:local privileged: true command: "mount -filer=node1:8888 -dir=/mnt -dirAutoCreate" + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://node1:8888/" ] + interval: 1s + start_period: 10s + timeout: 30s + depends_on: + node1: + condition: service_healthy node2: image: chrislusf/seaweedfs:local ports: - 7888:8888 + - 17888:18888 command: "server -master -volume -filer" + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ] + interval: 1s + start_period: 10s + timeout: 30s mount2: image: chrislusf/seaweedfs:local privileged: true command: "mount -filer=node2:8888 -dir=/mnt -dirAutoCreate" + healthcheck: + test: [ "CMD", "curl", "--fail", "-I", "http://node2:8888/" ] + interval: 1s + start_period: 10s + timeout: 30s + depends_on: + node2: + condition: service_healthy sync: image: chrislusf/seaweedfs:local command: "-v=4 filer.sync -a=node1:8888 -b=node2:8888 -a.debug -b.debug" + depends_on: + mount1: + condition: service_healthy + mount2: + condition: service_healthy diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 912607847..53abee8b7 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -30,24 +30,24 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo return err } - processor := NewMetadataProcessor(eachEntryFunc, 128) + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) + processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano()) var lastLogTsNs = time.Now().UnixNano() processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processor.AddSyncJob(resp) return nil }, 3*time.Second, func(counter int64, lastTsNs int64) error { - if processor.processedTsWatermark == 0 { + offsetTsNs := processor.processedTsWatermark.Load() + if offsetTsNs == 0 { return nil } now := time.Now().UnixNano() - glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now - return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs) }) - lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) - option.clientEpoch++ metadataFollowOption := &pb.MetadataFollowOption{ diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index d4305b666..00f6d7493 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -33,7 +33,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour return err } - processor := NewMetadataProcessor(eachEntryFunc, 128) + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) + processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano()) var lastLogTsNs = time.Now().UnixNano() processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { @@ -50,18 +51,17 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour processor.AddSyncJob(resp) return nil }, 3*time.Second, func(counter int64, lastTsNs int64) error { - if processor.processedTsWatermark == 0 { + offsetTsNs := processor.processedTsWatermark.Load() + if offsetTsNs == 0 { return nil } // use processor.processedTsWatermark instead of the lastTsNs from the most recent job now := time.Now().UnixNano() - glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now - return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, processor.processedTsWatermark) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, offsetTsNs) }) - lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) - option.clientEpoch++ metadataFollowOption := &pb.MetadataFollowOption{ diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index b1e32b65e..006f6794a 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -19,6 +19,7 @@ import ( "os" "regexp" "strings" + "sync/atomic" "time" ) @@ -50,7 +51,7 @@ type SyncOptions struct { aDoDeleteFiles *bool bDoDeleteFiles *bool clientId int32 - clientEpoch int32 + clientEpoch atomic.Int32 } const ( @@ -150,10 +151,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool { os.Exit(2) } for { - syncOptions.clientEpoch++ + syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, - syncOptions.clientEpoch, + syncOptions.clientEpoch.Load(), grpcDialOption, filerA, *syncOptions.aPath, @@ -188,10 +189,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } go func() { for { - syncOptions.clientEpoch++ + syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, - syncOptions.clientEpoch, + syncOptions.clientEpoch.Load(), grpcDialOption, filerB, *syncOptions.bPath, @@ -274,7 +275,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit) concurrency = DefaultConcurrencyLimit } - processor := NewMetadataProcessor(processEventFn, concurrency) + processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) @@ -282,16 +283,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti processor.AddSyncJob(resp) return nil }, 3*time.Second, func(counter int64, lastTsNs int64) error { - if processor.processedTsWatermark == 0 { + offsetTsNs := processor.processedTsWatermark.Load() + if offsetTsNs == 0 { return nil } // use processor.processedTsWatermark instead of the lastTsNs from the most recent job now := time.Now().UnixNano() - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now // collect synchronous offset - statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark)) - return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) }) metadataFollowOption := &pb.MetadataFollowOption{ diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 9d2ba75d5..d49031b98 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "sync/atomic" ) type MetadataProcessor struct { @@ -14,15 +15,16 @@ type MetadataProcessor struct { activeJobsCond *sync.Cond concurrencyLimit int fn pb.ProcessMetadataFunc - processedTsWatermark int64 + processedTsWatermark atomic.Int64 } -func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { +func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), concurrencyLimit: concurrency, } + t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } @@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) } } if isOldest { - t.processedTsWatermark = resp.TsNs + t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }()