client_proxy_buffer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // package client_proxy_buffer -- буферизованный прокси для клиента подписки
  2. //
  3. // Позволяет переживать отключения клиента
  4. package client_proxy_buffer
  5. import (
  6. "fmt"
  7. "log"
  8. "sync"
  9. "time"
  10. "p78git.ddns.net/svi/gobus/pkg/alias"
  11. "p78git.ddns.net/svi/gobus/pkg/net/netapi"
  12. "p78git.ddns.net/svi/gobus/pkg/types"
  13. )
  14. // ClientProxyBuffer -- буферизованный прокси для клиента подписки
  15. type ClientProxyBuffer struct {
  16. name alias.ClientName // Имя клиента
  17. sample alias.TopicName // Образец подписи
  18. isWork bool // Признак, что получатель живой
  19. lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
  20. sizeByteLimit int // Предельный размер буфера в байтах
  21. sizeByteCurrent int // Текущий размер буфера в байтах
  22. chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
  23. block sync.RWMutex
  24. }
  25. // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
  26. func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
  27. if req == nil {
  28. return nil, fmt.Errorf("NewClientProxyBuffer(): BuffRequest==nil")
  29. }
  30. log.Printf("NewClientProxy(): name=%q, sample=%q\n", req.ClientName, req.Sample)
  31. suffix := "_" + alias.ClientName(fmt.Sprint(time.Now().UnixMilli()))
  32. sf := &ClientProxyBuffer{
  33. name: alias.ClientName(req.ClientName) + suffix,
  34. sample: alias.TopicName(req.Sample),
  35. chMsg: make(chan *netapi.TopicMsg, 10),
  36. lstMsg: make([]*netapi.TopicMsg, 0),
  37. sizeByteLimit: int(req.MsgSumSizeLimit),
  38. isWork: true,
  39. }
  40. _ = types.IClientProxy(sf)
  41. return sf, nil
  42. }
  43. // ResetWork -- сбрасывает признак работы
  44. func (sf *ClientProxyBuffer) ResetWork() {
  45. sf.block.Lock()
  46. defer sf.block.Unlock()
  47. sf.isWork = false
  48. close(sf.chMsg)
  49. }
  50. // IsWork -- возвращает признак работы прокси
  51. func (sf *ClientProxyBuffer) IsWork() bool {
  52. sf.block.RLock()
  53. defer sf.block.RUnlock()
  54. return sf.isWork
  55. }
  56. // Read -- читает канал сообщений от топиков
  57. func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg {
  58. msg := <-sf.chMsg
  59. return msg
  60. }
  61. // Write -- записывает в себя сообщение топика
  62. func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) {
  63. sf.block.Lock()
  64. defer sf.block.Unlock()
  65. if !sf.isWork {
  66. return
  67. }
  68. sf.chMsg <- msg
  69. }
  70. // SampleTopic -- возвращает образец шаблона подписки на топики
  71. func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName {
  72. return sf.sample
  73. }
  74. // ClientName -- возвращает имя клиента
  75. func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
  76. return sf.name
  77. }