| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- // package client_proxy_buffer -- буферизованный прокси для клиента подписки
- //
- // Позволяет переживать отключения клиента
- package client_proxy_buffer
- import (
- "fmt"
- "log"
- "sync"
- "time"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/net/netapi"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // ClientProxyBuffer -- буферизованный прокси для клиента подписки
- type ClientProxyBuffer struct {
- name alias.ClientName // Имя клиента
- sample alias.TopicName // Образец подписи
- isWork bool // Признак, что получатель живой
- lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
- sizeByteLimit int // Предельный размер буфера в байтах
- sizeByteCurrent int // Текущий размер буфера в байтах
- chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
- block sync.RWMutex
- }
- // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
- func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
- if req == nil {
- return nil, fmt.Errorf("NewClientProxyBuffer(): BuffRequest==nil")
- }
- log.Printf("NewClientProxy(): name=%q, sample=%q\n", req.ClientName, req.Sample)
- suffix := "_" + alias.ClientName(fmt.Sprint(time.Now().UnixMilli()))
- sf := &ClientProxyBuffer{
- name: alias.ClientName(req.ClientName) + suffix,
- sample: alias.TopicName(req.Sample),
- chMsg: make(chan *netapi.TopicMsg, 10),
- lstMsg: make([]*netapi.TopicMsg, 0),
- sizeByteLimit: int(req.MsgSumSizeLimit),
- isWork: true,
- }
- _ = types.IClientProxy(sf)
- return sf, nil
- }
- // ResetWork -- сбрасывает признак работы
- func (sf *ClientProxyBuffer) ResetWork() {
- sf.block.Lock()
- defer sf.block.Unlock()
- sf.isWork = false
- close(sf.chMsg)
- }
- // IsWork -- возвращает признак работы прокси
- func (sf *ClientProxyBuffer) IsWork() bool {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return sf.isWork
- }
- // Read -- читает канал сообщений от топиков
- func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg {
- msg := <-sf.chMsg
- return msg
- }
- // Write -- записывает в себя сообщение топика
- func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) {
- sf.block.Lock()
- defer sf.block.Unlock()
- if !sf.isWork {
- return
- }
- sf.chMsg <- msg
- }
- // SampleTopic -- возвращает образец шаблона подписки на топики
- func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName {
- return sf.sample
- }
- // ClientName -- возвращает имя клиента
- func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
- return sf.name
- }
|