// package client_proxy_buffer -- буферизованный прокси для клиента подписки // // Позволяет переживать отключения клиента package client_proxy_buffer import ( "fmt" "log" "sync" "time" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/net/netapi" "p78git.ddns.net/svi/gobus/pkg/types" ) // ClientProxyBuffer -- буферизованный прокси для клиента подписки type ClientProxyBuffer struct { name alias.ClientName // Имя клиента sample alias.TopicName // Образец подписи isWork bool // Признак, что получатель живой lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента sizeByteLimit int // Предельный размер буфера в байтах sizeByteCurrent int // Текущий размер буфера в байтах chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков block sync.RWMutex } // NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) { if req == nil { return nil, fmt.Errorf("NewClientProxyBuffer(): BuffRequest==nil") } log.Printf("NewClientProxy(): name=%q, sample=%q\n", req.ClientName, req.Sample) suffix := "_" + alias.ClientName(fmt.Sprint(time.Now().UnixMilli())) sf := &ClientProxyBuffer{ name: alias.ClientName(req.ClientName) + suffix, sample: alias.TopicName(req.Sample), chMsg: make(chan *netapi.TopicMsg, 10), lstMsg: make([]*netapi.TopicMsg, 0), sizeByteLimit: int(req.MsgSumSizeLimit), isWork: true, } _ = types.IClientProxy(sf) return sf, nil } // ResetWork -- сбрасывает признак работы func (sf *ClientProxyBuffer) ResetWork() { sf.block.Lock() defer sf.block.Unlock() sf.isWork = false close(sf.chMsg) } // IsWork -- возвращает признак работы прокси func (sf *ClientProxyBuffer) IsWork() bool { sf.block.RLock() defer sf.block.RUnlock() return sf.isWork } // Read -- читает канал сообщений от топиков func (sf *ClientProxyBuffer) Read() *netapi.TopicMsg { msg := <-sf.chMsg return msg } // Write -- записывает в себя сообщение топика func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) { sf.block.Lock() defer sf.block.Unlock() if !sf.isWork { return } sf.chMsg <- msg } // SampleTopic -- возвращает образец шаблона подписки на топики func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName { return sf.sample } // ClientName -- возвращает имя клиента func (sf *ClientProxyBuffer) ClientName() alias.ClientName { return sf.name }