topic.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // package topic -- потокобезопасный топик шины данных
  2. package topic
  3. import (
  4. "fmt"
  5. "sync"
  6. "p78git.ddns.net/svi/gobus/api/netapi"
  7. "p78git.ddns.net/svi/gobus/internal/dict_topic/topic/topic_stat"
  8. "p78git.ddns.net/svi/gobus/pkg/alias"
  9. "p78git.ddns.net/svi/gobus/pkg/types"
  10. )
  11. // Topic -- потокобезопасный топик шины данных
  12. type Topic struct {
  13. dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
  14. msg *netapi.TopicMsg // Хранимое сообщение топика
  15. stat *topic_stat.TopicStat // Статистика топика
  16. block sync.RWMutex
  17. }
  18. // Newtopic -- возвращает новый топик
  19. func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
  20. stat, err := topic_stat.NewTopicStat(msg)
  21. if err != nil {
  22. return nil, fmt.Errorf("NewTopic(): in create TopicStat, err=\n\t%w", err)
  23. }
  24. sf := &Topic{
  25. stat: stat,
  26. msg: msg,
  27. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  28. }
  29. _ = types.ITopic(sf)
  30. return sf, nil
  31. }
  32. // Unsubscribe -- отписывает клиента от себя
  33. func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
  34. sf.block.Lock()
  35. defer sf.block.Unlock()
  36. delete(sf.dictProxy, clientName)
  37. }
  38. // Subscribe -- подписывает клиента на себя
  39. func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
  40. sf.block.Lock()
  41. defer sf.block.Unlock()
  42. if clientProxy == nil {
  43. return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
  44. }
  45. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  46. return nil
  47. }
  48. // Name -- возвращает имя топика
  49. func (sf *Topic) Name() alias.TopicName {
  50. return sf.stat.Name()
  51. }
  52. // Get -- возвращает содержимое топика
  53. func (sf *Topic) Get() *netapi.TopicMsg {
  54. sf.block.RLock()
  55. defer sf.block.RUnlock()
  56. return sf.msg
  57. }
  58. // Set -- устанавливает значение топика
  59. func (sf *Topic) Set(msg *netapi.TopicMsg) error {
  60. sf.block.Lock()
  61. defer sf.block.Unlock()
  62. if err := sf.stat.Update(msg); err != nil {
  63. return fmt.Errorf("Topic.Set(): ERROR in update self stat, topic(%q), err==\n\t%w", sf.stat.Name(), err)
  64. }
  65. sf.msg = msg
  66. for _, proxy := range sf.dictProxy {
  67. if !proxy.IsWork() {
  68. delete(sf.dictProxy, proxy.ClientName())
  69. continue
  70. }
  71. proxy.Write(msg)
  72. }
  73. return nil
  74. }
  75. // Stat -- возвращает статистику топика
  76. func (sf *Topic) Stat() types.ITopicStat {
  77. return sf.stat
  78. }