serv_subscribe.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. // package serv_subcsribe -- сервер подписки
  2. package serv_subcsribe
  3. import (
  4. "fmt"
  5. "log"
  6. "p78git.ddns.net/svi/gobus/api/netapi"
  7. "p78git.ddns.net/svi/gobus/pkg/net/client_proxy"
  8. "p78git.ddns.net/svi/gobus/pkg/types"
  9. )
  10. // ServSubscribe -- сервер подписки по шаблону
  11. type ServSubscribe struct {
  12. serv types.IService
  13. dictTopic types.IDictTopic
  14. }
  15. // NewServSubscribe -- возвращает новый сервис подписки
  16. func NewServSubscribe(serv types.IService) (*ServSubscribe, error) {
  17. log.Printf("NewServSubscribe()\n")
  18. if serv == nil {
  19. return nil, fmt.Errorf("NewServSubscribe(): IService==nil")
  20. }
  21. sf := &ServSubscribe{
  22. serv: serv,
  23. dictTopic: serv.DictTopic(),
  24. }
  25. return sf, nil
  26. }
  27. // Subscribe -- подписывает на топики по шаблону
  28. func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
  29. clientProxy, err := client_proxy.NewClientProxy(req)
  30. if err != nil {
  31. return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
  32. }
  33. sf.dictTopic.Subscribe(clientProxy)
  34. fnEndWork := func() {
  35. clientProxy.ResetWork()
  36. sf.dictTopic.Unsubscribe(clientProxy)
  37. }
  38. for {
  39. msg := clientProxy.Read()
  40. if msg == nil {
  41. fnEndWork()
  42. return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil")
  43. }
  44. err := serv.Send(&netapi.SyncResponse{
  45. Msg: msg.BinMsg,
  46. })
  47. if err != nil {
  48. fnEndWork()
  49. return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err)
  50. }
  51. }
  52. }