topic.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. // package topic -- потокобезопасный топик шины данных
  2. package topic
  3. import (
  4. "fmt"
  5. "sync"
  6. "p78git.ddns.net/svi/gobus/api/netapi"
  7. "p78git.ddns.net/svi/gobus/internal/dict_topic/topic/topic_stat"
  8. "p78git.ddns.net/svi/gobus/pkg/alias"
  9. "p78git.ddns.net/svi/gobus/pkg/types"
  10. )
  11. // Topic -- потокобезопасный топик шины данных
  12. type Topic struct {
  13. dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
  14. msg *netapi.TopicMsg // Хранимое сообщение топика
  15. stat *topic_stat.TopicStat // Статистика топика
  16. block sync.RWMutex
  17. }
  18. // Newtopic -- возвращает новый топик
  19. func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
  20. stat, err := topic_stat.NewTopicStat(msg)
  21. if err != nil {
  22. return nil, fmt.Errorf("NewTopic(): in create TopicStat, err=\n\t%w", err)
  23. }
  24. sf := &Topic{
  25. stat: stat,
  26. msg: msg,
  27. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  28. }
  29. _ = types.ITopic(sf)
  30. return sf, nil
  31. }
  32. // Unsubscribe -- отписывает клиента от себя
  33. func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
  34. sf.block.Lock()
  35. defer sf.block.Unlock()
  36. delete(sf.dictProxy, clientName)
  37. }
  38. // Subscribe -- подписывает клиента на себя
  39. func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
  40. sf.block.Lock()
  41. defer sf.block.Unlock()
  42. if clientProxy == nil {
  43. return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
  44. }
  45. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  46. clientProxy.Write(sf.msg)
  47. return nil
  48. }
  49. // Name -- возвращает имя топика
  50. func (sf *Topic) Name() alias.TopicName {
  51. return sf.stat.Name()
  52. }
  53. // Get -- возвращает содержимое топика
  54. func (sf *Topic) Get() *netapi.TopicMsg {
  55. sf.block.RLock()
  56. defer sf.block.RUnlock()
  57. return sf.msg
  58. }
  59. // Set -- устанавливает значение топика
  60. func (sf *Topic) Set(msg *netapi.TopicMsg) error {
  61. sf.block.Lock()
  62. defer sf.block.Unlock()
  63. if err := sf.stat.Update(msg); err != nil {
  64. return fmt.Errorf("Topic.Set(): ERROR in update self stat, topic(%q), err==\n\t%w", sf.stat.Name(), err)
  65. }
  66. sf.msg = msg
  67. for _, proxy := range sf.dictProxy {
  68. if !proxy.IsWork() {
  69. delete(sf.dictProxy, proxy.ClientName())
  70. continue
  71. }
  72. proxy.Write(msg)
  73. }
  74. return nil
  75. }
  76. // Stat -- возвращает статистику топика
  77. func (sf *Topic) Stat() types.ITopicStat {
  78. return sf.stat
  79. }