|
@@ -19,29 +19,37 @@ type ClientProxyBuffer struct {
|
|
|
name alias.ClientName // Имя клиента
|
|
name alias.ClientName // Имя клиента
|
|
|
sample alias.TopicName // Образец подписи
|
|
sample alias.TopicName // Образец подписи
|
|
|
isWork bool // Признак, что получатель живой
|
|
isWork bool // Признак, что получатель живой
|
|
|
|
|
+ isLost bool // Признак, что потеряны данные
|
|
|
lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
|
|
lstMsg []*netapi.TopicMsg // Список очереди ожидания сообщений клиента
|
|
|
sizeByteLimit int // Предельный размер буфера в байтах
|
|
sizeByteLimit int // Предельный размер буфера в байтах
|
|
|
sizeByteCurrent int // Текущий размер буфера в байтах
|
|
sizeByteCurrent int // Текущий размер буфера в байтах
|
|
|
chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
|
|
chMsg chan *netapi.TopicMsg // Канал для обратной связи из топиков
|
|
|
|
|
+ chBuff chan *netapi.TopicMsg // Канал временного буфера
|
|
|
block sync.RWMutex
|
|
block sync.RWMutex
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
|
|
// NewClientProxyBuffer -- возвращает новый буферизованный прокси клиента
|
|
|
func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
|
|
func NewClientProxyBuffer(req *netapi.BuffRequest) (*ClientProxyBuffer, error) {
|
|
|
- if req == nil {
|
|
|
|
|
- return nil, fmt.Errorf("NewClientProxyBuffer(): BuffRequest==nil")
|
|
|
|
|
|
|
+ { // Предусловия
|
|
|
|
|
+ if req == nil {
|
|
|
|
|
+ return nil, fmt.Errorf("NewClientProxyBuffer(): SubscribeRequest==nil")
|
|
|
|
|
+ }
|
|
|
|
|
+ if req.ClientName == "" {
|
|
|
|
|
+ return nil, fmt.Errorf("NewClientProxyBuffer(): name is empty")
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- log.Printf("NewClientProxy(): name=%q, sample=%q\n", req.ClientName, req.Sample)
|
|
|
|
|
- suffix := "_" + alias.ClientName(fmt.Sprint(time.Now().UnixMilli()))
|
|
|
|
|
|
|
+ log.Printf("NewClientProxyBuffer(): name=%q, sample=%q\n", req.ClientName, req.Sample)
|
|
|
sf := &ClientProxyBuffer{
|
|
sf := &ClientProxyBuffer{
|
|
|
- name: alias.ClientName(req.ClientName) + suffix,
|
|
|
|
|
|
|
+ name: alias.ClientName(req.ClientName),
|
|
|
sample: alias.TopicName(req.Sample),
|
|
sample: alias.TopicName(req.Sample),
|
|
|
chMsg: make(chan *netapi.TopicMsg, 10),
|
|
chMsg: make(chan *netapi.TopicMsg, 10),
|
|
|
|
|
+ chBuff: make(chan *netapi.TopicMsg, 10),
|
|
|
lstMsg: make([]*netapi.TopicMsg, 0),
|
|
lstMsg: make([]*netapi.TopicMsg, 0),
|
|
|
sizeByteLimit: int(req.MsgSumSizeLimit),
|
|
sizeByteLimit: int(req.MsgSumSizeLimit),
|
|
|
isWork: true,
|
|
isWork: true,
|
|
|
}
|
|
}
|
|
|
- _ = types.IClientProxy(sf)
|
|
|
|
|
|
|
+ go sf.workBuff()
|
|
|
|
|
+ _ = types.IClientProxyBuffer(sf)
|
|
|
return sf, nil
|
|
return sf, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -73,7 +81,17 @@ func (sf *ClientProxyBuffer) Write(msg *netapi.TopicMsg) {
|
|
|
if !sf.isWork {
|
|
if !sf.isWork {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
- sf.chMsg <- msg
|
|
|
|
|
|
|
+ if msg == nil {
|
|
|
|
|
+ log.Printf("ClientProxy.Write(): clientName=%q, msg==nil\n", sf.name)
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ // Проверка буфера
|
|
|
|
|
+ if len(sf.chMsg) < 10 {
|
|
|
|
|
+ sf.chMsg <- msg
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ // Буфер заполнен. Добавить в очередь
|
|
|
|
|
+ sf.chBuff <- msg
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// SampleTopic -- возвращает образец шаблона подписки на топики
|
|
// SampleTopic -- возвращает образец шаблона подписки на топики
|
|
@@ -85,3 +103,44 @@ func (sf *ClientProxyBuffer) SampleTopic() alias.TopicName {
|
|
|
func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
|
|
func (sf *ClientProxyBuffer) ClientName() alias.ClientName {
|
|
|
return sf.name
|
|
return sf.name
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+// Работает в отдельном потоке, добавляет в буфер собщения ,если канал отдачи клиенту забит.
|
|
|
|
|
+func (sf *ClientProxyBuffer) workBuff() {
|
|
|
|
|
+ fnSend := func() string { // Проверка с блокировкой доступности канала отправки
|
|
|
|
|
+ sf.block.Lock()
|
|
|
|
|
+ defer sf.block.Unlock()
|
|
|
|
|
+ if len(sf.chMsg) >= 10 {
|
|
|
|
|
+ return "full"
|
|
|
|
|
+ }
|
|
|
|
|
+ if len(sf.lstMsg) == 0 {
|
|
|
|
|
+ sf.sizeByteCurrent = 0
|
|
|
|
|
+ return "empty"
|
|
|
|
|
+ }
|
|
|
|
|
+ msg := sf.lstMsg[0]
|
|
|
|
|
+ sf.sizeByteCurrent -= len(msg.BinMsg) + len(msg.Topic)
|
|
|
|
|
+ sf.chMsg <- msg
|
|
|
|
|
+ sf.lstMsg = sf.lstMsg[:1]
|
|
|
|
|
+ return "send"
|
|
|
|
|
+ }
|
|
|
|
|
+ for sf.IsWork() { // Цикл добавления сообщений в буфер и вычитки из него
|
|
|
|
|
+ select {
|
|
|
|
|
+ case msg := <-sf.chBuff: // Сообщение переполнения
|
|
|
|
|
+ if sf.sizeByteCurrent >= sf.sizeByteLimit {
|
|
|
|
|
+ sf.isLost = true
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ sf.sizeByteCurrent += len(msg.BinMsg) + len(msg.Topic)
|
|
|
|
|
+ sf.lstMsg = append(sf.lstMsg, msg)
|
|
|
|
|
+ continue
|
|
|
|
|
+ default:
|
|
|
|
|
+ switch res := fnSend(); res {
|
|
|
|
|
+ case "full": // чередь чтения забита, немного подождать
|
|
|
|
|
+ time.Sleep(time.Millisecond * 1000)
|
|
|
|
|
+ case "empty": // Очередь пустая долго спим
|
|
|
|
|
+ time.Sleep(time.Millisecond * 5000)
|
|
|
|
|
+ case "send": // Что-то отправили, сразу проверить повторную отправку
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|