serv_subscribe.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. // package serv_subcsribe -- сервер подписки
  2. package serv_subcsribe
  3. import (
  4. "fmt"
  5. "log"
  6. "p78git.ddns.net/svi/gobus/pkg/alias"
  7. "p78git.ddns.net/svi/gobus/pkg/net/cleint_proxy"
  8. "p78git.ddns.net/svi/gobus/pkg/net/netapi"
  9. "p78git.ddns.net/svi/gobus/pkg/types"
  10. )
  11. // ServSubscribe -- сервер подписки по шаблону
  12. type ServSubscribe struct {
  13. serv types.IService
  14. dictTopic types.IDictTopic
  15. }
  16. // NewServSubscribe -- возвращает новый сервис подписки
  17. func NewServSubscribe(serv types.IService) (*ServSubscribe, error) {
  18. log.Printf("NewServSubscribe()\n")
  19. if serv == nil {
  20. return nil, fmt.Errorf("NewServSubscribe(): IService==nil")
  21. }
  22. sf := &ServSubscribe{
  23. serv: serv,
  24. dictTopic: serv.DictTopic(),
  25. }
  26. return sf, nil
  27. }
  28. // Subscribe -- подписывает на топики по шаблону
  29. func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
  30. clientProxy, err := cleint_proxy.NewClientProxy(alias.ClientName(req.ClientName), alias.TopicName(req.Sample))
  31. if err != nil {
  32. return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
  33. }
  34. sf.dictTopic.Subscribe(clientProxy)
  35. fnEndWork := func() {
  36. clientProxy.ResetWork()
  37. sf.dictTopic.Unsubscribe(clientProxy)
  38. }
  39. for {
  40. msg := clientProxy.Read()
  41. if msg == nil {
  42. fnEndWork()
  43. return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil")
  44. }
  45. err := serv.Send(&netapi.SyncResponse{
  46. Msg: msg.Msg,
  47. })
  48. if err != nil {
  49. fnEndWork()
  50. return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err)
  51. }
  52. }
  53. }