// package serv_subcsribe -- сервер подписки package serv_subcsribe import ( "fmt" "log" "p78git.ddns.net/svi/gobus/api/netapi" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/net/cleint_proxy" "p78git.ddns.net/svi/gobus/pkg/types" ) // ServSubscribe -- сервер подписки по шаблону type ServSubscribe struct { serv types.IService dictTopic types.IDictTopic } // NewServSubscribe -- возвращает новый сервис подписки func NewServSubscribe(serv types.IService) (*ServSubscribe, error) { log.Printf("NewServSubscribe()\n") if serv == nil { return nil, fmt.Errorf("NewServSubscribe(): IService==nil") } sf := &ServSubscribe{ serv: serv, dictTopic: serv.DictTopic(), } return sf, nil } // Subscribe -- подписывает на топики по шаблону func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error { clientProxy, err := cleint_proxy.NewClientProxy(alias.ClientName(req.ClientName), alias.TopicName(req.Sample)) if err != nil { return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err) } sf.dictTopic.Subscribe(clientProxy) fnEndWork := func() { clientProxy.ResetWork() sf.dictTopic.Unsubscribe(clientProxy) } for { msg := clientProxy.Read() if msg == nil { fnEndWork() return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil") } err := serv.Send(&netapi.SyncResponse{ Msg: msg.Msg, }) if err != nil { fnEndWork() return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err) } } }