// package serv_subcsribe -- сервер подписки package serv_subcsribe import ( "fmt" "log" "p78git.ddns.net/svi/gobus/api/netapi" "p78git.ddns.net/svi/gobus/pkg/net/client_proxy" "p78git.ddns.net/svi/gobus/pkg/types" ) // ServSubscribe -- сервер подписки по шаблону type ServSubscribe struct { serv types.IService dictTopic types.IDictTopic } // NewServSubscribe -- возвращает новый сервис подписки func NewServSubscribe(serv types.IService) (*ServSubscribe, error) { log.Printf("NewServSubscribe()\n") if serv == nil { return nil, fmt.Errorf("NewServSubscribe(): IService==nil") } sf := &ServSubscribe{ serv: serv, dictTopic: serv.DictTopic(), } return sf, nil } // Subscribe -- подписывает на топики по шаблону func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error { clientProxy, err := client_proxy.NewClientProxy(req) if err != nil { return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err) } sf.dictTopic.Subscribe(clientProxy) fnEndWork := func() { clientProxy.ResetWork() sf.dictTopic.Unsubscribe(clientProxy) } for { msg := clientProxy.Read() if msg == nil { fnEndWork() return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil") } err := serv.Send(&netapi.SyncResponse{ Msg: msg.BinMsg, }) if err != nil { fnEndWork() return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err) } } }