| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- // package client_proxy_buffer -- буферизованный прокси для клиента подписки
- //
- // Позволяет переживать отключения клиента
- package client_proxy_buffer
- import (
- "fmt"
- "log"
- "sync"
- "time"
- "p78git.ddns.net/svi/gobus/api/netapi"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // ClientProxyBuffer -- буферизованный прокси для клиента подписки
- type ClientProxyBuffer struct {
- name alias.ClientName // Имя клиента
- sample alias.TopicName // Образец подписи
- isWork bool // Признак, что получатель живой
- isLost bool // Признак, что потеряны данные
- lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
- sizeByteLimit int // Предельный размер буфера в байтах
- sizeByteCurrent int // Текущий размер буфера в байтах
- chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
- chBuff chan *netapi.TopicMsg // Канал временного буфера
- block sync.RWMutex
- }
- // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
- func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
- { // Предусловия
- if req == nil {
- return nil, fmt.Errorf("NewClientProxyBuffer(): SubscribeRequest==nil")
- }
- if req.ClientName == "" {
- return nil, fmt.Errorf("NewClientProxyBuffer(): name is empty")
- }
- }
- log.Printf("NewClientProxyBuffer(): name=%q, sample=%q\n", req.ClientName, req.Sample)
- sf := &ClientProxyBuffer{
- name: alias.ClientName(req.ClientName),
- sample: alias.TopicName(req.Sample),
- chMsg: make(chan *netapi.TopicMsg, 10),
- chBuff: make(chan *netapi.TopicMsg, 10),
- lstMsg: make([]*netapi.TopicMsg, 0),
- sizeByteLimit: int(req.MsgSumSizeLimit),
- isWork: true,
- }
- go sf.workBuff()
- _ = types.IClientProxyBuffer(sf)
- return sf, nil
- }
- // ResetWork -- сбрасывает признак работы
- func (sf *ClientProxyBuffer) ResetWork() {
- sf.block.Lock()
- defer sf.block.Unlock()
- sf.isWork = false
- close(sf.chMsg)
- }
- // IsWork -- возвращает признак работы прокси
- func (sf *ClientProxyBuffer) IsWork() bool {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return sf.isWork
- }
- // Read -- читает канал сообщений от топиков
- func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg {
- msg := <-sf.chMsg
- return msg
- }
- // Write -- записывает в себя сообщение топика
- func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) {
- sf.block.Lock()
- defer sf.block.Unlock()
- if !sf.isWork {
- return
- }
- if msg == nil {
- log.Printf("ClientProxy.Write(): clientName=%q, msg==nil\n", sf.name)
- return
- }
- // Проверка буфера
- if len(sf.chMsg) < 10 {
- sf.chMsg <- msg
- return
- }
- // Буфер заполнен. Добавить в очередь
- sf.chBuff <- msg
- }
- // SampleTopic -- возвращает образец шаблона подписки на топики
- func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName {
- return sf.sample
- }
- // ClientName -- возвращает имя клиента
- func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
- return sf.name
- }
- // Работает в отдельном потоке, добавляет в буфер собщения ,если канал отдачи клиенту забит.
- func (sf *ClientProxyBuffer) workBuff() {
- fnSend := func() string { // Проверка с блокировкой доступности канала отправки
- sf.block.Lock()
- defer sf.block.Unlock()
- if len(sf.chMsg) >= 10 {
- return "full"
- }
- if len(sf.lstMsg) == 0 {
- sf.sizeByteCurrent = 0
- return "empty"
- }
- msg := sf.lstMsg[0]
- sf.sizeByteCurrent -= len(msg.BinMsg) + len(msg.Topic)
- sf.chMsg <- msg
- sf.lstMsg = sf.lstMsg[:1]
- return "send"
- }
- for sf.IsWork() { // Цикл добавления сообщений в буфер и вычитки из него
- select {
- case msg := <-sf.chBuff: // Сообщение переполнения
- if sf.sizeByteCurrent >= sf.sizeByteLimit {
- sf.isLost = true
- continue
- }
- sf.sizeByteCurrent += len(msg.BinMsg) + len(msg.Topic)
- sf.lstMsg = append(sf.lstMsg, msg)
- continue
- default:
- switch res := fnSend(); res {
- case "full": // чередь чтения забита, немного подождать
- time.Sleep(time.Millisecond * 1000)
- case "empty": // Очередь пустая долго спим
- time.Sleep(time.Millisecond * 5000)
- case "send": // Что-то отправили, сразу проверить повторную отправку
- continue
- }
- }
- }
- }
|