| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- // package serv_subcsribe -- сервер подписки
- package serv_subcsribe
- import (
- "fmt"
- "log"
- "p78git.ddns.net/svi/gobus/api/netapi"
- "p78git.ddns.net/svi/gobus/pkg/net/client_proxy"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // ServSubscribe -- сервер подписки по шаблону
- type ServSubscribe struct {
- serv types.IService
- dictTopic types.IDictTopic
- }
- // NewServSubscribe -- возвращает новый сервис подписки
- func NewServSubscribe(serv types.IService) (*ServSubscribe, error) {
- log.Printf("NewServSubscribe()\n")
- if serv == nil {
- return nil, fmt.Errorf("NewServSubscribe(): IService==nil")
- }
- sf := &ServSubscribe{
- serv: serv,
- dictTopic: serv.DictTopic(),
- }
- return sf, nil
- }
- // Subscribe -- подписывает на топики по шаблону
- func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
- clientProxy, err := client_proxy.NewClientProxy(req)
- if err != nil {
- return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
- }
- sf.dictTopic.Subscribe(clientProxy)
- fnEndWork := func() {
- clientProxy.ResetWork()
- sf.dictTopic.Unsubscribe(clientProxy)
- }
- for {
- msg := clientProxy.Read()
- if msg == nil {
- fnEndWork()
- return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil")
- }
- err := serv.Send(&netapi.SyncResponse{
- Msg: msg.BinMsg,
- })
- if err != nil {
- fnEndWork()
- return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err)
- }
- }
- }
|