dict_topic.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // package dict_topic -- потокобезопасный словарь топиков
  2. package dict_topic
  3. import (
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "p78git.ddns.net/svi/gobus/api/netapi"
  8. "p78git.ddns.net/svi/gobus/internal/dict_topic/topic"
  9. "p78git.ddns.net/svi/gobus/pkg/alias"
  10. "p78git.ddns.net/svi/gobus/pkg/types"
  11. )
  12. // DictTopic -- потокобезопасный словарь топиков
  13. type DictTopic struct {
  14. dict map[alias.TopicName]types.ITopic // Словарь тпоиков
  15. dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов
  16. block sync.RWMutex
  17. }
  18. // NewDictTopic -- возвращает новый потокобезопасный словарь топиков
  19. func NewDictTopic() *DictTopic {
  20. sf := &DictTopic{
  21. dict: make(map[alias.TopicName]types.ITopic),
  22. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  23. }
  24. _ = types.IDictTopic(sf)
  25. return sf
  26. }
  27. // Unsubscribe -- отписывает клиента от топиков
  28. func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) error {
  29. sf.block.RLock()
  30. defer sf.block.RUnlock()
  31. if clientProxy == nil {
  32. return fmt.Errorf("DictTopic.Unsubscribe(): IClientProxy==nil")
  33. }
  34. sf.unsubscribe(clientProxy)
  35. return nil
  36. }
  37. // Отписывает под блокировкой клиента от топиков
  38. func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) {
  39. clientName := clientProxy.ClientName()
  40. sampleTopic := string(clientProxy.SampleTopic())
  41. for name, topic := range sf.dict {
  42. if !strings.Contains(string(name), sampleTopic) {
  43. continue
  44. }
  45. topic.Unsubscribe(clientName)
  46. }
  47. }
  48. // Subscribe -- подписывает клиента на топики
  49. func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) error {
  50. sf.block.RLock()
  51. defer sf.block.RUnlock()
  52. if clientProxy == nil {
  53. return fmt.Errorf("DictTopic.Subscribe(): IClientProxy==nil")
  54. }
  55. sf.subscribe(clientProxy)
  56. return nil
  57. }
  58. // Подписывает по образцу топика клиента на события (внутри блокировки)
  59. func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
  60. { // Сначала проверить наличие такого клиента и отписать его
  61. _, isOk := sf.dictProxy[clientProxy.ClientName()]
  62. if isOk {
  63. sf.unsubscribe(clientProxy)
  64. }
  65. }
  66. sample := clientProxy.SampleTopic()
  67. for name, topic := range sf.dict {
  68. if !strings.Contains(string(name), string(sample)) {
  69. continue
  70. }
  71. topic.Subscribe(clientProxy)
  72. }
  73. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  74. }
  75. // Public -- публикует в новый топик (при необходимости создаёт)
  76. func (sf *DictTopic) Public(msg *netapi.TopicMsg) error {
  77. sf.block.Lock()
  78. defer sf.block.Unlock()
  79. if msg == nil {
  80. return fmt.Errorf("DictTopic.Add(): msg==nil")
  81. }
  82. top_, isOk := sf.dict[alias.TopicName(msg.Topic)]
  83. if !isOk {
  84. var err error
  85. top_, err = topic.NewTopic(msg)
  86. if err != nil {
  87. return fmt.Errorf("DictTopic.Add(): in create ITopic, err=\n\t%w", err)
  88. }
  89. sf.dict[top_.Name()] = top_
  90. { // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
  91. for _, clientProxy := range sf.dictProxy {
  92. top_.Subscribe(clientProxy)
  93. }
  94. }
  95. }
  96. top_.Set(msg) // Оповещение подписчиков (на самом деле уже сообщение присвоено)
  97. return nil
  98. }
  99. // Get -- возвращает топик по имени
  100. func (sf *DictTopic) Get(name alias.TopicName) types.ITopic {
  101. sf.block.RLock()
  102. defer sf.block.RUnlock()
  103. topic := sf.dict[name]
  104. return topic
  105. }
  106. // Len -- возвращает число всех топиков
  107. func (sf *DictTopic) Len() int {
  108. sf.block.RLock()
  109. defer sf.block.RUnlock()
  110. return len(sf.dict)
  111. }
  112. // Size -- возвращает размер всех топиков
  113. func (sf *DictTopic) Size() int {
  114. sf.block.RLock()
  115. defer sf.block.RUnlock()
  116. res := 0
  117. for _, topic := range sf.dict {
  118. res += topic.Stat().Size()
  119. }
  120. return res
  121. }