dict_topic.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // package dict_topic -- потокобезопасный словарь топиков
  2. package dict_topic
  3. import (
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "p78git.ddns.net/svi/gobus/api/netapi"
  8. "p78git.ddns.net/svi/gobus/internal/dict_topic/topic"
  9. "p78git.ddns.net/svi/gobus/pkg/alias"
  10. "p78git.ddns.net/svi/gobus/pkg/types"
  11. )
  12. // DictTopic -- потокобезопасный словарь топиков
  13. type DictTopic struct {
  14. dict map[alias.TopicName]types.ITopic // Словарь тпоиков
  15. dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов
  16. block sync.RWMutex
  17. }
  18. // NewDictTopic -- возвращает новый потокобезопасный словарь топиков
  19. func NewDictTopic() (*DictTopic, error) {
  20. sf := &DictTopic{
  21. dict: make(map[alias.TopicName]types.ITopic),
  22. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  23. }
  24. _ = types.IDictTopic(sf)
  25. return sf, nil
  26. }
  27. // Unsubscribe -- отписывает клиента от топиков
  28. func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) error {
  29. sf.block.RLock()
  30. defer sf.block.RUnlock()
  31. if clientProxy == nil {
  32. return fmt.Errorf("DictTopic.Unsubscribe(): IClientProxy==nil")
  33. }
  34. sf.unsubscribe(clientProxy)
  35. return nil
  36. }
  37. // Отписывает под блокировкой клиента от топиков
  38. func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) {
  39. sample := clientProxy.SampleTopic()
  40. for name, topic := range sf.dict {
  41. if !strings.Contains(string(name), string(sample)) {
  42. continue
  43. }
  44. topic.Unsubscribe(clientProxy.ClientName())
  45. }
  46. }
  47. // Subscribe -- подписывает клиента на топики
  48. func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) error {
  49. sf.block.RLock()
  50. defer sf.block.RUnlock()
  51. if clientProxy == nil {
  52. return fmt.Errorf("DictTopic.Subscribe(): IClientProxy==nil")
  53. }
  54. sf.subscribe(clientProxy)
  55. return nil
  56. }
  57. // Подписывает по образцу топика клиента на события (внутри блокировки)
  58. func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
  59. { // Сначала проверить наличие такого клиента и отписать его
  60. _, isOk := sf.dictProxy[clientProxy.ClientName()]
  61. if isOk {
  62. sf.unsubscribe(clientProxy)
  63. }
  64. }
  65. sample := clientProxy.SampleTopic()
  66. for name, topic := range sf.dict {
  67. if !strings.Contains(string(name), string(sample)) {
  68. continue
  69. }
  70. topic.Subscribe(clientProxy)
  71. }
  72. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  73. }
  74. // Public -- публикует в новый топик (при необходимости создаёт)
  75. func (sf *DictTopic) Public(msg *netapi.TopicMsg) error {
  76. sf.block.Lock()
  77. defer sf.block.Unlock()
  78. if msg == nil {
  79. return fmt.Errorf("DictTopic.Add(): msg==nil")
  80. }
  81. top_, isOk := sf.dict[alias.TopicName(msg.Topic)]
  82. if !isOk {
  83. var err error
  84. top_, err = topic.NewTopic(msg)
  85. if err != nil {
  86. return fmt.Errorf("DictTopic.Add(): in create ITopic, err=\n\t%w", err)
  87. }
  88. sf.dict[top_.Name()] = top_
  89. { // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
  90. for _, clientProxy := range sf.dictProxy {
  91. top_.Subscribe(clientProxy)
  92. }
  93. }
  94. }
  95. top_.Set(msg) // Оповещение подписчиков (на самом деле уже сообщение присвоено)
  96. return nil
  97. }
  98. // Get -- возвращает топик по имени
  99. func (sf *DictTopic) Get(name alias.TopicName) types.ITopic {
  100. sf.block.RLock()
  101. defer sf.block.RUnlock()
  102. topic := sf.dict[name]
  103. return topic
  104. }
  105. // Len -- возвращает число всех топиков
  106. func (sf *DictTopic) Len() int {
  107. sf.block.RLock()
  108. defer sf.block.RUnlock()
  109. return len(sf.dict)
  110. }
  111. // Size -- возвращает размер всех топиков
  112. func (sf *DictTopic) Size() int {
  113. sf.block.RLock()
  114. defer sf.block.RUnlock()
  115. res := 0
  116. for _, topic := range sf.dict {
  117. res += topic.Stat().Size()
  118. }
  119. return res
  120. }