client_proxy_buffer.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // package client_proxy_buffer -- буферизованный прокси для клиента подписки
  2. //
  3. // Позволяет переживать отключения клиента
  4. package client_proxy_buffer
  5. import (
  6. "fmt"
  7. "log"
  8. "sync"
  9. "time"
  10. "p78git.ddns.net/svi/gobus/api/netapi"
  11. "p78git.ddns.net/svi/gobus/pkg/alias"
  12. "p78git.ddns.net/svi/gobus/pkg/types"
  13. )
  14. // ClientProxyBuffer -- буферизованный прокси для клиента подписки
  15. type ClientProxyBuffer struct {
  16. name alias.ClientName // Имя клиента
  17. sample alias.TopicName // Образец подписи
  18. isWork bool // Признак, что получатель живой
  19. isLost bool // Признак, что потеряны данные
  20. lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
  21. sizeByteLimit int // Предельный размер буфера в байтах
  22. sizeByteCurrent int // Текущий размер буфера в байтах
  23. chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
  24. chBuff chan *netapi.TopicMsg // Канал временного буфера
  25. block sync.RWMutex
  26. }
  27. // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
  28. func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
  29. { // Предусловия
  30. if req == nil {
  31. return nil, fmt.Errorf("NewClientProxyBuffer(): SubscribeRequest==nil")
  32. }
  33. if req.ClientName == "" {
  34. return nil, fmt.Errorf("NewClientProxyBuffer(): name is empty")
  35. }
  36. }
  37. log.Printf("NewClientProxyBuffer(): name=%q, sample=%q\n", req.ClientName, req.Sample)
  38. sf := &ClientProxyBuffer{
  39. name: alias.ClientName(req.ClientName),
  40. sample: alias.TopicName(req.Sample),
  41. chMsg: make(chan *netapi.TopicMsg, 10),
  42. chBuff: make(chan *netapi.TopicMsg, 10),
  43. lstMsg: make([]*netapi.TopicMsg, 0),
  44. sizeByteLimit: int(req.MsgSumSizeLimit),
  45. isWork: true,
  46. }
  47. go sf.workBuff()
  48. _ = types.IClientProxyBuffer(sf)
  49. return sf, nil
  50. }
  51. // ResetWork -- сбрасывает признак работы
  52. func (sf *ClientProxyBuffer) ResetWork() {
  53. sf.block.Lock()
  54. defer sf.block.Unlock()
  55. sf.isWork = false
  56. close(sf.chMsg)
  57. }
  58. // IsWork -- возвращает признак работы прокси
  59. func (sf *ClientProxyBuffer) IsWork() bool {
  60. sf.block.RLock()
  61. defer sf.block.RUnlock()
  62. return sf.isWork
  63. }
  64. // Read -- читает канал сообщений от топиков
  65. func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg {
  66. msg := <-sf.chMsg
  67. return msg
  68. }
  69. // Write -- записывает в себя сообщение топика
  70. func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) {
  71. sf.block.Lock()
  72. defer sf.block.Unlock()
  73. if !sf.isWork {
  74. return
  75. }
  76. if msg == nil {
  77. log.Printf("ClientProxy.Write(): clientName=%q, msg==nil\n", sf.name)
  78. return
  79. }
  80. // Проверка буфера
  81. if len(sf.chMsg) < 10 {
  82. sf.chMsg <- msg
  83. return
  84. }
  85. // Буфер заполнен. Добавить в очередь
  86. sf.chBuff <- msg
  87. }
  88. // SampleTopic -- возвращает образец шаблона подписки на топики
  89. func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName {
  90. return sf.sample
  91. }
  92. // ClientName -- возвращает имя клиента
  93. func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
  94. return sf.name
  95. }
  96. // Работает в отдельном потоке, добавляет в буфер собщения ,если канал отдачи клиенту забит.
  97. func (sf *ClientProxyBuffer) workBuff() {
  98. fnSend := func() string { // Проверка с блокировкой доступности канала отправки
  99. sf.block.Lock()
  100. defer sf.block.Unlock()
  101. if len(sf.chMsg) >= 10 {
  102. return "full"
  103. }
  104. if len(sf.lstMsg) == 0 {
  105. sf.sizeByteCurrent = 0
  106. return "empty"
  107. }
  108. msg := sf.lstMsg[0]
  109. sf.sizeByteCurrent -= len(msg.BinMsg) + len(msg.Topic)
  110. sf.chMsg <- msg
  111. sf.lstMsg = sf.lstMsg[:1]
  112. return "send"
  113. }
  114. for sf.IsWork() { // Цикл добавления сообщений в буфер и вычитки из него
  115. select {
  116. case msg := <-sf.chBuff: // Сообщение переполнения
  117. if sf.sizeByteCurrent >= sf.sizeByteLimit {
  118. sf.isLost = true
  119. continue
  120. }
  121. sf.sizeByteCurrent += len(msg.BinMsg) + len(msg.Topic)
  122. sf.lstMsg = append(sf.lstMsg, msg)
  123. continue
  124. default:
  125. switch res := fnSend(); res {
  126. case "full": // чередь чтения забита, немного подождать
  127. time.Sleep(time.Millisecond * 1000)
  128. case "empty": // Очередь пустая долго спим
  129. time.Sleep(time.Millisecond * 5000)
  130. case "send": // Что-то отправили, сразу проверить повторную отправку
  131. continue
  132. }
  133. }
  134. }
  135. }