// package client_proxy_buffer -- буферизованный прокси для клиента подписки // // Позволяет переживать отключения клиента package client_proxy_buffer import ( "fmt" "log" "sync" "time" "p78git.ddns.net/svi/gobus/api/netapi" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/types" ) // ClientProxyBuffer -- буферизованный прокси для клиента подписки type ClientProxyBuffer struct { name alias.ClientName // Имя клиента sample alias.TopicName // Образец подписи isWork bool // Признак, что получатель живой isLost bool // Признак, что потеряны данные lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента sizeByteLimit int // Предельный размер буфера в байтах sizeByteCurrent int // Текущий размер буфера в байтах chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков chBuff chan *netapi.TopicMsg // Канал временного буфера block sync.RWMutex } // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) { { // Предусловия if req == nil { return nil, fmt.Errorf("NewClientProxyBuffer(): SubscribeRequest==nil") } if req.ClientName == "" { return nil, fmt.Errorf("NewClientProxyBuffer(): name is empty") } } log.Printf("NewClientProxyBuffer(): name=%q, sample=%q\n", req.ClientName, req.Sample) sf := &ClientProxyBuffer{ name: alias.ClientName(req.ClientName), sample: alias.TopicName(req.Sample), chMsg: make(chan *netapi.TopicMsg, 10), chBuff: make(chan *netapi.TopicMsg, 10), lstMsg: make([]*netapi.TopicMsg, 0), sizeByteLimit: int(req.MsgSumSizeLimit), isWork: true, } go sf.workBuff() _ = types.IClientProxyBuffer(sf) return sf, nil } // ResetWork -- сбрасывает признак работы func (sf *ClientProxyBuffer) ResetWork() { sf.block.Lock() defer sf.block.Unlock() sf.isWork = false close(sf.chMsg) } // IsWork -- возвращает признак работы прокси func (sf *ClientProxyBuffer) IsWork() bool { sf.block.RLock() defer sf.block.RUnlock() return sf.isWork } // Read -- читает канал сообщений от топиков func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg { msg := <-sf.chMsg return msg } // Write -- записывает в себя сообщение топика func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) { sf.block.Lock() defer sf.block.Unlock() if !sf.isWork { return } if msg == nil { log.Printf("ClientProxy.Write(): clientName=%q, msg==nil\n", sf.name) return } // Проверка буфера if len(sf.chMsg) < 10 { sf.chMsg <- msg return } // Буфер заполнен. Добавить в очередь sf.chBuff <- msg } // SampleTopic -- возвращает образец шаблона подписки на топики func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName { return sf.sample } // ClientName -- возвращает имя клиента func (sf *ClientProxyBuffer) ClientName() alias.ClientName { return sf.name } // Работает в отдельном потоке, добавляет в буфер собщения ,если канал отдачи клиенту забит. func (sf *ClientProxyBuffer) workBuff() { fnSend := func() string { // Проверка с блокировкой доступности канала отправки sf.block.Lock() defer sf.block.Unlock() if len(sf.chMsg) >= 10 { return "full" } if len(sf.lstMsg) == 0 { sf.sizeByteCurrent = 0 return "empty" } msg := sf.lstMsg[0] sf.sizeByteCurrent -= len(msg.BinMsg) + len(msg.Topic) sf.chMsg <- msg sf.lstMsg = sf.lstMsg[:1] return "send" } for sf.IsWork() { // Цикл добавления сообщений в буфер и вычитки из него select { case msg := <-sf.chBuff: // Сообщение переполнения if sf.sizeByteCurrent >= sf.sizeByteLimit { sf.isLost = true continue } sf.sizeByteCurrent += len(msg.BinMsg) + len(msg.Topic) sf.lstMsg = append(sf.lstMsg, msg) continue default: switch res := fnSend(); res { case "full": // чередь чтения забита, немного подождать time.Sleep(time.Millisecond * 1000) case "empty": // Очередь пустая долго спим time.Sleep(time.Millisecond * 5000) case "send": // Что-то отправили, сразу проверить повторную отправку continue } } } }