From 183352c796d0f81b8ff8072f18b74f615133c1fe Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 29 Dec 2023 00:54:37 +0500 Subject: [PATCH] shell meta load add concurrency (#4529) * fix: increase speed cmd fs meta load * fix: add wg --- weed/shell/command_fs_meta_load.go | 36 ++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 0cbdddb49..a2ae9401d 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -6,6 +6,7 @@ import ( "io" "os" "strings" + "sync" "time" "google.golang.org/protobuf/proto" @@ -47,6 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. metaLoadCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.dirPrefix = metaLoadCommand.String("dirPrefix", "", "load entries only with directories matching prefix") + concurrency := metaLoadCommand.Int("concurrency", 1, "number of parallel meta load to filer") verbose := metaLoadCommand.Bool("v", true, "verbose mode") if err = metaLoadCommand.Parse(args[0 : len(args)-1]); err != nil { return nil @@ -64,6 +66,9 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) + waitChan := make(chan struct{}, *concurrency) + defer close(waitChan) + var wg sync.WaitGroup for { if n, err := dst.Read(sizeBuf); n != 4 { @@ -105,21 +110,34 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. } fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x") - if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ - Directory: fullEntry.Dir, - Entry: fullEntry.Entry, - }); err != nil { - return err - } - if fullEntry.Entry.IsDirectory { + wg.Wait() + if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: fullEntry.Dir, + Entry: fullEntry.Entry, + }); errEntry != nil { + return errEntry + } dirCount++ } else { + wg.Add(1) + waitChan <- struct{}{} + go func(entry *filer_pb.FullEntry) { + if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: entry.Dir, + Entry: entry.Entry, + }); errEntry != nil { + err = errEntry + } + defer wg.Done() + <-waitChan + }(fullEntry) + if err != nil { + return err + } fileCount++ } - } - }) if err == nil {