This commit is contained in:
chrislu 2023-12-22 11:33:50 -08:00
parent ee1c9bc314
commit e0727071c8
7 changed files with 49 additions and 48 deletions

View file

@ -31,21 +31,21 @@ var (
)
type CopyOptions struct {
include *string
replication *string
collection *string
ttl *string
diskType *string
maxMB *int
masterClient *wdclient.MasterClient
concurrentFiles *int
concurrentChunks *int
grpcDialOption grpc.DialOption
masters []string
cipher bool
ttlSec int32
checkSize *bool
verbose *bool
include *string
replication *string
collection *string
ttl *string
diskType *string
maxMB *int
masterClient *wdclient.MasterClient
concurrentFiles *int
concurrentChunks *int
grpcDialOption grpc.DialOption
masters []string
cipher bool
ttlSec int32
checkSize *bool
verbose *bool
volumeServerAccess *string
}

View file

@ -80,7 +80,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
func (option *RemoteSyncOptions) makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
client, err := remote_storage.GetRemoteStorage(remoteStorage)
if err != nil {
return nil, err

View file

@ -37,24 +37,24 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process ack messages
go func() {
for {
_, err := stream.Recv()
if err != nil {
glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
}
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return
for {
_, err := stream.Recv()
if err != nil {
glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
}
select {
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
// Client disconnected
return
}
return
default:
// Continue processing the request
}
return
default:
// Continue processing the request
}
}
}()
// send commands to subscriber
@ -68,7 +68,7 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
}
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
return err
case message := <- cgi.ResponseChan:
case message := <-cgi.ResponseChan:
if err := stream.Send(message); err != nil {
glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
}

View file

@ -32,9 +32,10 @@ type Balancer struct {
// Collected from all brokers when they connect to the broker leader
TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
}
func NewBalancer() *Balancer {
return &Balancer{
Brokers: cmap.New[*BrokerStats](),
Brokers: cmap.New[*BrokerStats](),
TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
}
}

View file

@ -6,8 +6,8 @@ import (
)
type PartitionSlotToBroker struct {
RangeStart int32
RangeStop int32
RangeStart int32
RangeStop int32
AssignedBroker string
}
@ -36,12 +36,12 @@ func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broke
}
}
ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
AssignedBroker: broker,
})
}
func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
for _, partitionSlot := range ps.PartitionSlots {
if partitionSlot.AssignedBroker == broker {
partitionSlot.AssignedBroker = ""

View file

@ -6,28 +6,29 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
type ConsumerGroupInstance struct {
InstanceId string
// the consumer group instance may not have an active partition
Partitions []*topic.Partition
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
Partitions []*topic.Partition
ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
}
type ConsumerGroup struct {
// map a consumer group instance id to a consumer group instance
ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
mapping *PartitionConsumerMapping
mapping *PartitionConsumerMapping
}
func NewConsumerGroup() *ConsumerGroup {
return &ConsumerGroup{
ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
}
}
func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
return &ConsumerGroupInstance{
InstanceId: instanceId,
InstanceId: instanceId,
ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
}
}

View file

@ -6,7 +6,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
type TopicConsumerGroups struct {
// map a consumer group name to a consumer group
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
@ -19,13 +18,13 @@ type TopicConsumerGroups struct {
type Coordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.Balancer
balancer *pub_balancer.Balancer
}
func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
return &Coordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,
balancer: balancer,
}
}
@ -50,7 +49,7 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName
}
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{
func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(topic)
cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
if cg == nil {