| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- // package serv_subcsribe -- сервер подписки
- package serv_subcsribe
- import (
- "fmt"
- "log"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/net/cleint_proxy"
- "p78git.ddns.net/svi/gobus/pkg/net/netapi"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // ServSubscribe -- сервер подписки по шаблону
- type ServSubscribe struct {
- serv types.IService
- dictTopic types.IDictTopic
- }
- // NewServSubscribe -- возвращает новый сервис подписки
- func NewServSubscribe(serv types.IService) (*ServSubscribe, error) {
- log.Printf("NewServSubscribe()\n")
- if serv == nil {
- return nil, fmt.Errorf("NewServSubscribe(): IService==nil")
- }
- sf := &ServSubscribe{
- serv: serv,
- dictTopic: serv.DictTopic(),
- }
- return sf, nil
- }
- // Subscribe -- подписывает на топики по шаблону
- func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
- clientProxy, err := cleint_proxy.NewClientProxy(alias.ClientName(req.ClientName), alias.TopicName(req.Sample))
- if err != nil {
- return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
- }
- sf.dictTopic.Subscribe(clientProxy)
- fnEndWork := func() {
- clientProxy.ResetWork()
- sf.dictTopic.Unsubscribe(clientProxy)
- }
- for {
- msg := clientProxy.Read()
- if msg == nil {
- fnEndWork()
- return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil")
- }
- err := serv.Send(&netapi.SyncResponse{
- Msg: msg.Msg,
- })
- if err != nil {
- fnEndWork()
- return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err)
- }
- }
- }
|