topic.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // package topic -- потокобезопасный топик шины данных
  2. package topic
  3. import (
  4. "fmt"
  5. "sync"
  6. "p78git.ddns.net/svi/gobus/api/netapi"
  7. "p78git.ddns.net/svi/gobus/pkg/alias"
  8. "p78git.ddns.net/svi/gobus/pkg/types"
  9. )
  10. // Topic -- потокобезопасный топик шины данных
  11. type Topic struct {
  12. name alias.TopicName
  13. dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
  14. msg *netapi.TopicMsg // Хранимое сообщение топика
  15. block sync.RWMutex
  16. }
  17. // Newtopic -- возвращает новый топик
  18. func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
  19. if msg == nil {
  20. return nil, fmt.Errorf("NewTopic(): name is empty")
  21. }
  22. sf := &Topic{
  23. name: alias.TopicName(msg.Topic),
  24. msg: msg,
  25. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  26. }
  27. _ = types.ITopic(sf)
  28. return sf, nil
  29. }
  30. // Unsubscribe -- отписывает клиента от себя
  31. func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
  32. sf.block.Lock()
  33. defer sf.block.Unlock()
  34. delete(sf.dictProxy, clientName)
  35. }
  36. // Subscribe -- подписывает клиента на себя
  37. func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
  38. sf.block.Lock()
  39. defer sf.block.Unlock()
  40. if clientProxy == nil {
  41. return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
  42. }
  43. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  44. return nil
  45. }
  46. // Name -- возвращает имя топика
  47. func (sf *Topic) Name() alias.TopicName {
  48. return sf.name
  49. }
  50. // Get -- возвращает содержимое топика
  51. func (sf *Topic) Get() *netapi.TopicMsg {
  52. sf.block.RLock()
  53. defer sf.block.RUnlock()
  54. return sf.msg
  55. }
  56. // Set -- устанавливает значение топика
  57. func (sf *Topic) Set(msg *netapi.TopicMsg) error {
  58. sf.block.Lock()
  59. defer sf.block.Unlock()
  60. if msg == nil {
  61. return fmt.Errorf("Topic.Set(): ERROR msg==nil\n")
  62. }
  63. if sf.name != alias.TopicName(msg.Topic) {
  64. return fmt.Errorf("Topic.Set(): topic name(%q)!=msg.Topic(%v)\n", sf.name, msg.Topic)
  65. }
  66. sf.msg = msg
  67. for _, proxy := range sf.dictProxy {
  68. if !proxy.IsWork() {
  69. delete(sf.dictProxy, proxy.ClientName())
  70. continue
  71. }
  72. proxy.Write(msg)
  73. }
  74. return nil
  75. }
  76. // Size -- возвращает размер топика
  77. func (sf *Topic) Size() int {
  78. sf.block.RLock()
  79. defer sf.block.RUnlock()
  80. // *2 потому что имя хранится два раза
  81. return len(sf.name)*2 + len(sf.msg.Msg)
  82. }