topic.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // package topic -- потокобезопасный топик шины данных
  2. package topic
  3. import (
  4. "fmt"
  5. "sync"
  6. "p78git.ddns.net/svi/gobus/api/netapi"
  7. "p78git.ddns.net/svi/gobus/pkg/alias"
  8. "p78git.ddns.net/svi/gobus/pkg/types"
  9. )
  10. // Topic -- потокобезопасный топик шины данных
  11. type Topic struct {
  12. name alias.TopicName
  13. dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
  14. srcNum int32 // Номер источника
  15. val []byte // Содержимое топика
  16. block sync.RWMutex
  17. }
  18. // Newtopic -- возвращает новый топик
  19. func NewTopic(name alias.TopicName) (*Topic, error) {
  20. if name == "" {
  21. return nil, fmt.Errorf("NewTopic(): name is empty")
  22. }
  23. sf := &Topic{
  24. name: name,
  25. val: []byte{},
  26. dictProxy: make(map[alias.ClientName]types.IClientProxy),
  27. }
  28. _ = types.ITopic(sf)
  29. return sf, nil
  30. }
  31. // Unsubscribe -- отписывает клиента от себя
  32. func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
  33. sf.block.Lock()
  34. defer sf.block.Unlock()
  35. delete(sf.dictProxy, clientName)
  36. }
  37. // Subscribe -- попдисывает клиента на себя
  38. func (sf *Topic) Subscribe(clientProxy types.IClientProxy) {
  39. sf.block.Lock()
  40. defer sf.block.Unlock()
  41. sf.dictProxy[clientProxy.ClientName()] = clientProxy
  42. }
  43. // Name -- возвращает имя топика
  44. func (sf *Topic) Name() alias.TopicName {
  45. return sf.name
  46. }
  47. // Get -- возвращает содержимое топика
  48. func (sf *Topic) Get() *netapi.TopicMsg {
  49. sf.block.RLock()
  50. defer sf.block.RUnlock()
  51. msg := &netapi.TopicMsg{
  52. Source: sf.srcNum,
  53. Topic: string(sf.name),
  54. Msg: sf.val,
  55. }
  56. return msg
  57. }
  58. // Set -- устанавливает значение топика
  59. func (sf *Topic) Set(msg *netapi.TopicMsg) {
  60. sf.block.Lock()
  61. defer sf.block.Unlock()
  62. sf.srcNum = msg.Source
  63. sf.val = msg.Msg
  64. for _, proxy := range sf.dictProxy {
  65. if !proxy.IsWork() {
  66. delete(sf.dictProxy, proxy.ClientName())
  67. continue
  68. }
  69. proxy.Write(msg)
  70. }
  71. }
  72. // Size -- возвращает размер топика
  73. func (sf *Topic) Size() int {
  74. sf.block.RLock()
  75. defer sf.block.RUnlock()
  76. return len(sf.name) + len(sf.val)
  77. }