ソースを参照

SVI Добавление кода буферизованным клиентам

SVI 2 年 前
コミット
8d9b8de983

+ 51 - 0
internal/dict_client_proxy_buffer/dict_client_proxy_buffer.go

@@ -0,0 +1,51 @@
+// package dict_client_proxy_buffer -- словарь буферизованных клиентов
+//
+//	Способен переживать отключения клиентов и накапливать информацию по необходимости
+package dict_client_proxy_buffer
+
+import (
+	"fmt"
+	"sync"
+
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/client_proxy_buffer"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/types"
+)
+
+// DictClientProxyBuffer -- словарь буферизованных клиентов
+//
+//	Способен переживать отключения клиентов и накапливать информацию по необходимости
+type DictClientProxyBuffer struct {
+	serv      types.IService
+	dictTopic types.IDictTopic
+	dict      map[alias.ClientName]types.IClientProxyBuffer
+	block     sync.RWMutex
+}
+
+// NewDictClientProxyBuffer -- возвращает новый словарь буферизованных клиентов
+func NewDictClientProxyBuffer(serv types.IService) (*DictClientProxyBuffer, error) {
+	if serv == nil {
+		return nil, fmt.Errorf("NewDictClientProxyBuffer(): IService==nil")
+	}
+	sf := &DictClientProxyBuffer{
+		serv:      serv,
+		dictTopic: serv.DictTopic(),
+		dict:      make(map[alias.ClientName]types.IClientProxyBuffer),
+	}
+	return sf, nil
+}
+
+// Add -- добавляет нового буферизованного клиента в словарь
+func (sf *DictClientProxyBuffer) Add(req *netapi.BuffRequest) (types.IClientProxyBuffer, error) {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	clientProxyBuff, err := client_proxy_buffer.NewClientProxyBuffer(req)
+	if err != nil {
+		return nil, fmt.Errorf("DictClientProxyBuffer.Add(): in create ClientProxyBuffer(): err=\n\t%w", err)
+	}
+	sf.dict[clientProxyBuff.ClientName()] = clientProxyBuff
+	// Отправить в словарь топиков на подписку
+	sf.dictTopic.Subscribe(clientProxyBuff)
+	return clientProxyBuff, nil
+}

+ 13 - 2
internal/dict_topic/dict_topic.go

@@ -55,6 +55,12 @@ func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) {
 
 // Подписывает по образцу топика клиента на события (внутри блокировки)
 func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
+	{ // Сначала проверить наличие такого клиента и отписать его
+		_, isOk := sf.dictProxy[clientProxy.ClientName()]
+		if isOk {
+			sf.unsubscribe(clientProxy)
+		}
+	}
 	sample := clientProxy.SampleTopic()
 	for name, topic := range sf.dict {
 		if !strings.Contains(string(name), string(sample)) {
@@ -75,9 +81,14 @@ func (sf *DictTopic) Add(name alias.TopicName) {
 		return
 	}
 	sf.dict[name] = topic
+	{ // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
+		for _, clientProxy := range sf.dictProxy {
+			topic.Subscribe(clientProxy)
+		}
+	}
 }
 
-// Get -- возвращае ттопик по имени
+// Get -- возвращает топик по имени
 func (sf *DictTopic) Get(name alias.TopicName) types.ITopic {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
@@ -92,7 +103,7 @@ func (sf *DictTopic) Len() int {
 	return len(sf.dict)
 }
 
-// Size -- возвращает размер всех тпоиков
+// Size -- возвращает размер всех топиков
 func (sf *DictTopic) Size() int {
 	sf.block.RLock()
 	defer sf.block.RUnlock()

+ 4 - 3
internal/serv_grpc/serv_subscribe_buffer/serv_subscribe_buffer.go

@@ -5,8 +5,8 @@ import (
 	"fmt"
 
 	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/client_proxy_buffer"
 	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
-	"p78git.ddns.net/svi/gobus/pkg/net/cleint_proxy"
 	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
@@ -14,6 +14,7 @@ import (
 type ServSubscribeBuffer struct {
 	serv      types.IService
 	dictTopic types.IDictTopic
+	dictProxyBuffer types.IDictClientProxyBuffer
 }
 
 // NewServSubscribeBuffer -- возвращает новый сервер буферизованной подписки
@@ -30,11 +31,11 @@ func NewServSubscribeBuffer(serv types.IService) (*ServSubscribeBuffer, error) {
 
 // SubscribeBuffer -- обслуживает буферизованные попдиски
 func (sf *ServSubscribeBuffer) SubscribeBuffer(req *netapi.BuffRequest, serv netapi.GoBus_SubscribeBufferServer) error {
-	clientProxy, err := cleint_proxy.NewClientProxy(alias.ClientName(req.ClientName), alias.TopicName(req.Sample))
+	clientProxy, err := client_proxy_buffer.NewClientProxyBuffer(req)
 	if err != nil {
 		return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
 	}
 	sf.dictTopic.Subscribe(clientProxy)
-	
+
 	return nil
 }

+ 2 - 0
pkg/net/gobus.proto

@@ -71,6 +71,8 @@ message PublicRequest {
     int32 Source = 1; // Источник данных (0 -- клиент, другое -- для реплики)
     string Topic = 2; // Куда опубликовать сообщение
     bytes Msg    = 3; // Байтовое представление сообщения
+    string Uuid  = 4; // Уникальная метка сообщения
+    repeated int32 RepliesList = 5; // Список реплик, на которых сообщение уже опубликовано
 }
 
 // DefaultResponse -- возвращаемое значение по умолчанию (ничего не содержит)

+ 52 - 32
pkg/net/netapi/gobus.pb.go

@@ -1,6 +1,6 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-// 	protoc-gen-go v1.30.0
+// 	protoc-gen-go v1.27.1
 // 	protoc        v3.19.4
 // source: pkg/net/gobus.proto
 
@@ -438,9 +438,11 @@ type PublicRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- для реплики)
-	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Куда опубликовать сообщение
-	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
+	Source      int32   `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"`                  // Источник данных (0 -- клиент, другое -- для реплики)
+	Topic       string  `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`                     // Куда опубликовать сообщение
+	Msg         []byte  `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`                         // Байтовое представление сообщения
+	Time        string  `protobuf:"bytes,4,opt,name=Time,proto3" json:"Time,omitempty"`                       // Исходное время сообщения
+	RepliesList []int32 `protobuf:"varint,5,rep,packed,name=RepliesList,proto3" json:"RepliesList,omitempty"` // Список реплик, на которых сообщение уже опубликовано
 }
 
 func (x *PublicRequest) Reset() {
@@ -496,6 +498,20 @@ func (x *PublicRequest) GetMsg() []byte {
 	return nil
 }
 
+func (x *PublicRequest) GetTime() string {
+	if x != nil {
+		return x.Time
+	}
+	return ""
+}
+
+func (x *PublicRequest) GetRepliesList() []int32 {
+	if x != nil {
+		return x.RepliesList
+	}
+	return nil
+}
+
 // DefaultResponse -- возвращаемое значение по умолчанию (ничего не содержит)
 type DefaultResponse struct {
 	state         protoimpl.MessageState
@@ -572,35 +588,39 @@ var file_pkg_net_gobus_proto_rawDesc = []byte{
 	0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d,
 	0x73, 0x67, 0x22, 0x20, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
 	0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52,
-	0x03, 0x4d, 0x73, 0x67, 0x22, 0x4f, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65,
-	0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18,
-	0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a,
-	0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f,
-	0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
-	0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74,
-	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05, 0x47, 0x6f, 0x42,
-	0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12, 0x17, 0x2e, 0x70,
-	0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65,
-	0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e,
-	0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
-	0x22, 0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x15,
-	0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65,
-	0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e,
-	0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
-	0x43, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1a, 0x2e, 0x70,
-	0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
-	0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
-	0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
-	0x22, 0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
-	0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72,
-	0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16,
+	0x03, 0x4d, 0x73, 0x67, 0x22, 0x85, 0x01, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52,
+	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14,
+	0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54,
+	0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28,
+	0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x04,
+	0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65,
+	0x70, 0x6c, 0x69, 0x65, 0x73, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x05, 0x20, 0x03, 0x28, 0x05, 0x52,
+	0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x65, 0x73, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x11, 0x0a, 0x0f,
+	0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32,
+	0xcc, 0x02, 0x0a, 0x05, 0x47, 0x6f, 0x42, 0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62,
+	0x6c, 0x69, 0x63, 0x12, 0x17, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50,
+	0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70,
+	0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52,
+	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e,
+	0x64, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e,
+	0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70,
+	0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70,
+	0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x62, 0x65, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53,
+	0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+	0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52,
+	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53,
+	0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15,
 	0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65,
-	0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x03, 0x47, 0x65,
-	0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62,
-	0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
-	0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
-	0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x6e, 0x65, 0x74,
-	0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e,
+	0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30,
+	0x01, 0x12, 0x3b, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
+	0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71,
+	0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
+	0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a,
+	0x5a, 0x08, 0x2e, 0x2f, 0x6e, 0x65, 0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x33,
 }
 
 var (

+ 9 - 21
pkg/net/netapi/gobus_grpc.pb.go

@@ -1,13 +1,9 @@
 // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
 // versions:
-// - protoc-gen-go-grpc v1.3.0
+// - protoc-gen-go-grpc v1.2.0
 // - protoc             v3.19.4
 // source: pkg/net/gobus.proto
 
-//
-//Файл содержит описание сервиса шины данных с необходимыми
-//структурами.
-
 package netapi
 
 import (
@@ -22,14 +18,6 @@ import (
 // Requires gRPC-Go v1.32.0 or later.
 const _ = grpc.SupportPackageIsVersion7
 
-const (
-	GoBus_Public_FullMethodName          = "/parserin.GoBus/Public"
-	GoBus_SendSync_FullMethodName        = "/parserin.GoBus/SendSync"
-	GoBus_Subscribe_FullMethodName       = "/parserin.GoBus/Subscribe"
-	GoBus_SubscribeBuffer_FullMethodName = "/parserin.GoBus/SubscribeBuffer"
-	GoBus_Get_FullMethodName             = "/parserin.GoBus/Get"
-)
-
 // GoBusClient is the client API for GoBus service.
 //
 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@@ -56,7 +44,7 @@ func NewGoBusClient(cc grpc.ClientConnInterface) GoBusClient {
 
 func (c *goBusClient) Public(ctx context.Context, in *PublicRequest, opts ...grpc.CallOption) (*DefaultResponse, error) {
 	out := new(DefaultResponse)
-	err := c.cc.Invoke(ctx, GoBus_Public_FullMethodName, in, out, opts...)
+	err := c.cc.Invoke(ctx, "/parserin.GoBus/Public", in, out, opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -65,7 +53,7 @@ func (c *goBusClient) Public(ctx context.Context, in *PublicRequest, opts ...grp
 
 func (c *goBusClient) SendSync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error) {
 	out := new(SyncResponse)
-	err := c.cc.Invoke(ctx, GoBus_SendSync_FullMethodName, in, out, opts...)
+	err := c.cc.Invoke(ctx, "/parserin.GoBus/SendSync", in, out, opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -73,7 +61,7 @@ func (c *goBusClient) SendSync(ctx context.Context, in *SyncRequest, opts ...grp
 }
 
 func (c *goBusClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (GoBus_SubscribeClient, error) {
-	stream, err := c.cc.NewStream(ctx, &GoBus_ServiceDesc.Streams[0], GoBus_Subscribe_FullMethodName, opts...)
+	stream, err := c.cc.NewStream(ctx, &GoBus_ServiceDesc.Streams[0], "/parserin.GoBus/Subscribe", opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -105,7 +93,7 @@ func (x *goBusSubscribeClient) Recv() (*SyncResponse, error) {
 }
 
 func (c *goBusClient) SubscribeBuffer(ctx context.Context, in *BuffRequest, opts ...grpc.CallOption) (GoBus_SubscribeBufferClient, error) {
-	stream, err := c.cc.NewStream(ctx, &GoBus_ServiceDesc.Streams[1], GoBus_SubscribeBuffer_FullMethodName, opts...)
+	stream, err := c.cc.NewStream(ctx, &GoBus_ServiceDesc.Streams[1], "/parserin.GoBus/SubscribeBuffer", opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -138,7 +126,7 @@ func (x *goBusSubscribeBufferClient) Recv() (*BuffResponse, error) {
 
 func (c *goBusClient) Get(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*SyncResponse, error) {
 	out := new(SyncResponse)
-	err := c.cc.Invoke(ctx, GoBus_Get_FullMethodName, in, out, opts...)
+	err := c.cc.Invoke(ctx, "/parserin.GoBus/Get", in, out, opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -204,7 +192,7 @@ func _GoBus_Public_Handler(srv interface{}, ctx context.Context, dec func(interf
 	}
 	info := &grpc.UnaryServerInfo{
 		Server:     srv,
-		FullMethod: GoBus_Public_FullMethodName,
+		FullMethod: "/parserin.GoBus/Public",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
 		return srv.(GoBusServer).Public(ctx, req.(*PublicRequest))
@@ -222,7 +210,7 @@ func _GoBus_SendSync_Handler(srv interface{}, ctx context.Context, dec func(inte
 	}
 	info := &grpc.UnaryServerInfo{
 		Server:     srv,
-		FullMethod: GoBus_SendSync_FullMethodName,
+		FullMethod: "/parserin.GoBus/SendSync",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
 		return srv.(GoBusServer).SendSync(ctx, req.(*SyncRequest))
@@ -282,7 +270,7 @@ func _GoBus_Get_Handler(srv interface{}, ctx context.Context, dec func(interface
 	}
 	info := &grpc.UnaryServerInfo{
 		Server:     srv,
-		FullMethod: GoBus_Get_FullMethodName,
+		FullMethod: "/parserin.GoBus/Get",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
 		return srv.(GoBusServer).Get(ctx, req.(*SubscribeRequest))

+ 2 - 2
pkg/types/iclent_proxy_buffer.go

@@ -1,6 +1,6 @@
 package types
 
 // IClientProxyBuffer -- прокси клиента с буферизацией топиков
-type IClientProxyBuffer interface{
+type IClientProxyBuffer interface {
 	IClientProxy
-}
+}

+ 9 - 0
pkg/types/idict_client_proxy_buffer.go

@@ -0,0 +1,9 @@
+package types
+
+import "p78git.ddns.net/svi/gobus/pkg/net/netapi"
+
+// IDictClientProxyBuffer -- потокобезопасный словарь проксей буферизованных клиентов
+type IDictClientProxyBuffer interface{
+	// Add -- добавляет новый буферизованный прокси для клиента
+	Add(*netapi.BuffRequest)(IClientProxyBuffer, error)
+}