Răsfoiți Sursa

SVI Релизация запросов подписки и запроса публикации

SVI 2 ani în urmă
părinte
comite
57ac889019

+ 43 - 3
internal/dict_topic/dict_topic.go

@@ -3,6 +3,7 @@ package dict_topic
 
 import (
 	"log"
+	"strings"
 	"sync"
 
 	"p78git.ddns.net/svi/gobus/internal/dict_topic/topic"
@@ -12,19 +13,58 @@ import (
 
 // DictTopic -- потокобезопасный словарь топиков
 type DictTopic struct {
-	dict  map[alias.TopicName]types.ITopic
-	block sync.RWMutex
+	dict      map[alias.TopicName]types.ITopic        // Словарь тпоиков
+	dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов
+	block     sync.RWMutex
 }
 
 // NewDictTopic -- возвращает новый потокобезопасный словарь топиков
 func NewDictTopic() (*DictTopic, error) {
 	sf := &DictTopic{
-		dict: make(map[alias.TopicName]types.ITopic),
+		dict:      make(map[alias.TopicName]types.ITopic),
+		dictProxy: make(map[alias.ClientName]types.IClientProxy),
 	}
 	_ = types.IDictTopic(sf)
 	return sf, nil
 }
 
+// Unsubscribe -- отписывает клиента от топиков
+func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) {
+	sf.block.RLock()
+	defer sf.block.RUnlock()
+	sf.unsubscribe(clientProxy)
+}
+
+// Отписывает под блокировкой клиента от топиков
+func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) {
+	sample := clientProxy.SampleTopic()
+	for name, topic := range sf.dict {
+		if !strings.Contains(string(name), string(sample)) {
+			continue
+		}
+		topic.Unsubscribe(clientProxy.ClientName())
+	}
+}
+
+// Subscribe -- подписывает клиента на топики
+func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) {
+	sf.block.RLock()
+	defer sf.block.RUnlock()
+	sf.subscribe(clientProxy)
+}
+
+// Подписывает по образцу топика клиента на события (внутри блокировки)
+func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
+	sample := clientProxy.SampleTopic()
+	for name, topic := range sf.dict {
+		if !strings.Contains(string(name), string(sample)) {
+			continue
+		}
+		topic.Subscribe(clientProxy)
+	}
+	sf.dictProxy[clientProxy.ClientName()] = clientProxy
+}
+
 // Add -- добавляет новый топик
 func (sf *DictTopic) Add(name alias.TopicName) {
 	sf.block.Lock()

+ 40 - 11
internal/dict_topic/topic/topic.go

@@ -6,15 +6,17 @@ import (
 	"sync"
 
 	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
 	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
 // Topic -- потокобезопасный топик шины данных
 type Topic struct {
-	name   alias.TopicName
-	srcNum int    // Номер источника
-	val    []byte // Содержимое топика
-	block  sync.RWMutex
+	name      alias.TopicName
+	dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
+	srcNum    int32                                   // Номер источника
+	val       []byte                                  // Содержимое топика
+	block     sync.RWMutex
 }
 
 // Newtopic -- возвращает новый топик
@@ -23,31 +25,58 @@ func NewTopic(name alias.TopicName) (*Topic, error) {
 		return nil, fmt.Errorf("NewTopic(): name is empty")
 	}
 	sf := &Topic{
-		name: name,
-		val:  []byte{},
+		name:      name,
+		val:       []byte{},
+		dictProxy: make(map[alias.ClientName]types.IClientProxy),
 	}
 	_ = types.ITopic(sf)
 	return sf, nil
 }
 
+// Unsubscribe -- отписывает клиента от себя
+func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	delete(sf.dictProxy, clientName)
+}
+
+// Subscribe -- попдисывает клиента на себя
+func (sf *Topic) Subscribe(clientProxy types.IClientProxy) {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	sf.dictProxy[clientProxy.ClientName()] = clientProxy
+}
+
 // Name -- возвращает имя топика
 func (sf *Topic) Name() alias.TopicName {
 	return sf.name
 }
 
 // Get -- возвращает содержимое топика
-func (sf *Topic) Get() []byte {
+func (sf *Topic) Get() *netapi.TopicMsg {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
-	return sf.val
+	msg := &netapi.TopicMsg{
+		Source: sf.srcNum,
+		Topic:  string(sf.name),
+		Msg:    sf.val,
+	}
+	return msg
 }
 
 // Set -- устанавливает значение топика
-func (sf *Topic) Set(srcNum int, val []byte) {
+func (sf *Topic) Set(msg *netapi.TopicMsg) {
 	sf.block.Lock()
 	defer sf.block.Unlock()
-	sf.srcNum = srcNum
-	sf.val = val
+	sf.srcNum = msg.Source
+	sf.val = msg.Msg
+	for _, proxy := range sf.dictProxy {
+		if !proxy.IsWork() {
+			delete(sf.dictProxy, proxy.ClientName())
+			continue
+		}
+		proxy.Write(msg)
+	}
 }
 
 // Size -- возвращает размер топика

+ 4 - 0
internal/serv_grpc/serv_grpc.go

@@ -49,6 +49,10 @@ func NewServGrpc(serv types.IService) (*ServGrpc, error) {
 	if err != nil {
 		return nil, fmt.Errorf("NewServGrpc(): in create ServPublic, err=\n\t%w", err)
 	}
+	sf.ServSubscribe, err = serv_subcsribe.NewServSubscribe(serv)
+	if err != nil {
+		return nil, fmt.Errorf("NewServGrpc(): in create ServSubscribe, err=\n\t%w", err)
+	}
 	_ = netapi.GoBusServer(sf)
 	return sf, nil
 }

+ 16 - 2
internal/serv_grpc/serv_public/serv_public.go

@@ -4,24 +4,27 @@ package serv_public
 import (
 	"context"
 	"fmt"
+	"log"
 
+	"p78git.ddns.net/svi/gobus/pkg/alias"
 	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
 	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
 // ServPublic -- отвечает на запросы публикации
 type ServPublic struct {
-	serv types.IService
+	serv      types.IService
 	dictTopic types.IDictTopic
 }
 
 // NewServPublic -- возвращает новый публикатор
 func NewServPublic(serv types.IService) (*ServPublic, error) {
+	log.Printf("NewServPublic()\n")
 	if serv == nil {
 		return nil, fmt.Errorf("NewServPublic(): IService==nil")
 	}
 	sf := &ServPublic{
-		serv: serv,
+		serv:      serv,
 		dictTopic: serv.DictTopic(),
 	}
 	return sf, nil
@@ -29,5 +32,16 @@ func NewServPublic(serv types.IService) (*ServPublic, error) {
 
 // Public -- публикация сообщения в топик
 func (sf *ServPublic) Public(ctx context.Context, req *netapi.PublicRequest) (*netapi.DefaultResponse, error) {
+	topic := sf.dictTopic.Get(alias.TopicName(req.Topic))
+	if topic == nil {
+		sf.dictTopic.Add(alias.TopicName(req.Topic))
+		topic = sf.dictTopic.Get(alias.TopicName(req.Topic))
+	}
+	msg := &netapi.TopicMsg{
+		Source: req.Source,
+		Topic:  req.Topic,
+		Msg:    req.Msg,
+	}
+	topic.Set(msg)
 	return &netapi.DefaultResponse{}, nil
 }

+ 48 - 2
internal/serv_grpc/serv_subcsribe/serv_subscribe.go

@@ -2,11 +2,57 @@
 package serv_subcsribe
 
 import (
+	"fmt"
+	"log"
+
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/cleint_proxy"
 	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
-type ServSubscribe struct{}
+// ServSubscribe -- сервер подписки по шаблону
+type ServSubscribe struct {
+	serv      types.IService
+	dictTopic types.IDictTopic
+}
+
+// NewServSubscribe -- возвращает новый сервис подписки
+func NewServSubscribe(serv types.IService) (*ServSubscribe, error) {
+	log.Printf("NewServSubscribe()\n")
+	if serv == nil {
+		return nil, fmt.Errorf("NewServSubscribe(): IService==nil")
+	}
+	sf := &ServSubscribe{
+		serv:      serv,
+		dictTopic: serv.DictTopic(),
+	}
+	return sf, nil
+}
 
+// Subscribe -- подписывает на топики по шаблону
 func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
-	return nil
+	clientProxy, err := cleint_proxy.NewClientProxy(alias.ClientName(req.ClientName), alias.TopicName(req.Sample))
+	if err != nil {
+		return fmt.Errorf("ServSubscribe.Subscribe(): in create ClientProxy, err=\n\t%w", err)
+	}
+	sf.dictTopic.Subscribe(clientProxy)
+	fnEndWork := func() {
+		clientProxy.ResetWork()
+		sf.dictTopic.Unsubscribe(clientProxy)
+	}
+	for {
+		msg := clientProxy.Read()
+		if msg == nil {
+			fnEndWork()
+			return fmt.Errorf("ServSubscribe.Subscribe(): in read ClientProxy, msg==nil")
+		}
+		err := serv.Send(&netapi.SyncResponse{
+			Msg: msg.Msg,
+		})
+		if err != nil {
+			fnEndWork()
+			return fmt.Errorf("ServSubscribe.Subscribe(): in send msg, err=\n\t%w", err)
+		}
+	}
 }

+ 5 - 0
internal/service/service.go

@@ -56,3 +56,8 @@ func (sf Service) Ctx() context.Context {
 func (sf *Service) CancelApp() {
 	sf.fnCancel()
 }
+
+// DictTopic -- возвращает потокобезопасный словарь топиков
+func (sf *Service) DictTopic() types.IDictTopic {
+	return sf.dictTopic
+}

+ 3 - 0
pkg/alias/alias.go

@@ -3,3 +3,6 @@ package alias
 
 // TopicName -- имя топика
 type TopicName string
+
+// ClientName -- униальное имя клиента
+type ClientName string

+ 80 - 0
pkg/net/cleint_proxy/client_proxy.go

@@ -0,0 +1,80 @@
+// package cleint_proxy -- прокси подписывания клиента на топики по шаблону
+package cleint_proxy
+
+import (
+	"fmt"
+	"log"
+	"sync"
+	"time"
+
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/types"
+)
+
+// ClientProxy -- параметры подписывания клиента на топик
+type ClientProxy struct {
+	name   alias.ClientName      // Имя клиента
+	sample alias.TopicName       // Образец подписи
+	isWork bool                  // Признак, что получатель живой
+	chMsg  chan *netapi.TopicMsg // Канал для обратной связи из топиков
+	block  sync.RWMutex
+}
+
+// NewClientProxy -- возвращает новый прокси подписывания клиента на топик
+func NewClientProxy(name alias.ClientName, sample alias.TopicName) (*ClientProxy, error) {
+	log.Printf("NewClientProxy(): name=%q, sample=%q\n", name, sample)
+	if name == "" {
+		return nil, fmt.Errorf("NewClientProxy(): name is empty")
+	}
+	suffix := "_" + alias.ClientName(fmt.Sprint(time.Now().UnixMilli()))
+	sf := &ClientProxy{
+		name:   name + suffix,
+		sample: sample,
+		chMsg:  make(chan *netapi.TopicMsg, 10),
+		isWork: true,
+	}
+	_ = types.IClientProxy(sf)
+	return sf, nil
+}
+
+// ResetWork -- сбрасывает признак работы
+func (sf *ClientProxy) ResetWork() {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	sf.isWork = false
+	close(sf.chMsg)
+}
+
+// IsWork -- возвращает признак работы прокси
+func (sf *ClientProxy) IsWork() bool {
+	sf.block.RLock()
+	defer sf.block.RUnlock()
+	return sf.isWork
+}
+
+// Read -- читает канал сообщений от топиков
+func (sf *ClientProxy) Read() *netapi.TopicMsg {
+	msg := <-sf.chMsg
+	return msg
+}
+
+// Write -- записывает в себя сообщение топика
+func (sf *ClientProxy) Write(msg *netapi.TopicMsg) {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	if !sf.isWork {
+		return
+	}
+	sf.chMsg <- msg
+}
+
+// SampleTopic -- возвращает образец шаблона подписки на топики
+func (sf *ClientProxy) SampleTopic() alias.TopicName {
+	return sf.sample
+}
+
+// ClientName -- возвращает имя клиента
+func (sf *ClientProxy) ClientName() alias.ClientName {
+	return sf.name
+}

+ 9 - 1
pkg/net/gobus.proto

@@ -43,7 +43,8 @@ message GetRequest{
 
 // SubscribeRequest -- запрос на подписку топиков по шаблону
 message SubscribeRequest{
-    string Sample = 1; // Шаблон топиков на подписку
+    string Sample     = 1; // Шаблон топиков на подписку
+    string ClientName = 2; // Уникальное имя клиента
 }
 
 // SyncRequest -- синхронный запрос в шину данных
@@ -53,6 +54,13 @@ message SyncRequest{
     bytes Msg    = 3; // Байтовое представление сообщения
 }
 
+// TopicMsg -- сообщение топика (внутри шины данных)
+message TopicMsg{
+    int32 Source = 1; // Источник данных (0 -- клиент, другое -- реплика)
+    string Topic = 2; // Где опубликовать запрос
+    bytes Msg    = 3; // Байтовое представление сообщения
+}
+
 // SyncResponse -- ответ на синхронный запрос
 message SyncResponse{
     bytes Msg = 1; // Содержимое ответа на синхроныый запрос

+ 148 - 56
pkg/net/netapi/gobus.pb.go

@@ -198,7 +198,8 @@ type SubscribeRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Sample string `protobuf:"bytes,1,opt,name=Sample,proto3" json:"Sample,omitempty"` // Шаблон топиков на подписку
+	Sample     string `protobuf:"bytes,1,opt,name=Sample,proto3" json:"Sample,omitempty"`         // Шаблон топиков на подписку
+	ClientName string `protobuf:"bytes,2,opt,name=ClientName,proto3" json:"ClientName,omitempty"` // Уникальное имя клиента
 }
 
 func (x *SubscribeRequest) Reset() {
@@ -240,6 +241,13 @@ func (x *SubscribeRequest) GetSample() string {
 	return ""
 }
 
+func (x *SubscribeRequest) GetClientName() string {
+	if x != nil {
+		return x.ClientName
+	}
+	return ""
+}
+
 // SyncRequest -- синхронный запрос в шину данных
 type SyncRequest struct {
 	state         protoimpl.MessageState
@@ -304,6 +312,70 @@ func (x *SyncRequest) GetMsg() []byte {
 	return nil
 }
 
+// TopicMsg -- сообщение топика (внутри шины данных)
+type TopicMsg struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- реплика)
+	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Где опубликовать запрос
+	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
+}
+
+func (x *TopicMsg) Reset() {
+	*x = TopicMsg{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_pkg_net_gobus_proto_msgTypes[5]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *TopicMsg) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TopicMsg) ProtoMessage() {}
+
+func (x *TopicMsg) ProtoReflect() protoreflect.Message {
+	mi := &file_pkg_net_gobus_proto_msgTypes[5]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use TopicMsg.ProtoReflect.Descriptor instead.
+func (*TopicMsg) Descriptor() ([]byte, []int) {
+	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *TopicMsg) GetSource() int32 {
+	if x != nil {
+		return x.Source
+	}
+	return 0
+}
+
+func (x *TopicMsg) GetTopic() string {
+	if x != nil {
+		return x.Topic
+	}
+	return ""
+}
+
+func (x *TopicMsg) GetMsg() []byte {
+	if x != nil {
+		return x.Msg
+	}
+	return nil
+}
+
 // SyncResponse -- ответ на синхронный запрос
 type SyncResponse struct {
 	state         protoimpl.MessageState
@@ -316,7 +388,7 @@ type SyncResponse struct {
 func (x *SyncResponse) Reset() {
 	*x = SyncResponse{}
 	if protoimpl.UnsafeEnabled {
-		mi := &file_pkg_net_gobus_proto_msgTypes[5]
+		mi := &file_pkg_net_gobus_proto_msgTypes[6]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		ms.StoreMessageInfo(mi)
 	}
@@ -329,7 +401,7 @@ func (x *SyncResponse) String() string {
 func (*SyncResponse) ProtoMessage() {}
 
 func (x *SyncResponse) ProtoReflect() protoreflect.Message {
-	mi := &file_pkg_net_gobus_proto_msgTypes[5]
+	mi := &file_pkg_net_gobus_proto_msgTypes[6]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -342,7 +414,7 @@ func (x *SyncResponse) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use SyncResponse.ProtoReflect.Descriptor instead.
 func (*SyncResponse) Descriptor() ([]byte, []int) {
-	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{5}
+	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{6}
 }
 
 func (x *SyncResponse) GetMsg() []byte {
@@ -358,7 +430,7 @@ type PublicRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- реплика)
+	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- для реплики)
 	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Куда опубликовать сообщение
 	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
 }
@@ -366,7 +438,7 @@ type PublicRequest struct {
 func (x *PublicRequest) Reset() {
 	*x = PublicRequest{}
 	if protoimpl.UnsafeEnabled {
-		mi := &file_pkg_net_gobus_proto_msgTypes[6]
+		mi := &file_pkg_net_gobus_proto_msgTypes[7]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		ms.StoreMessageInfo(mi)
 	}
@@ -379,7 +451,7 @@ func (x *PublicRequest) String() string {
 func (*PublicRequest) ProtoMessage() {}
 
 func (x *PublicRequest) ProtoReflect() protoreflect.Message {
-	mi := &file_pkg_net_gobus_proto_msgTypes[6]
+	mi := &file_pkg_net_gobus_proto_msgTypes[7]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -392,7 +464,7 @@ func (x *PublicRequest) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use PublicRequest.ProtoReflect.Descriptor instead.
 func (*PublicRequest) Descriptor() ([]byte, []int) {
-	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{6}
+	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{7}
 }
 
 func (x *PublicRequest) GetSource() int32 {
@@ -426,7 +498,7 @@ type DefaultResponse struct {
 func (x *DefaultResponse) Reset() {
 	*x = DefaultResponse{}
 	if protoimpl.UnsafeEnabled {
-		mi := &file_pkg_net_gobus_proto_msgTypes[7]
+		mi := &file_pkg_net_gobus_proto_msgTypes[8]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		ms.StoreMessageInfo(mi)
 	}
@@ -439,7 +511,7 @@ func (x *DefaultResponse) String() string {
 func (*DefaultResponse) ProtoMessage() {}
 
 func (x *DefaultResponse) ProtoReflect() protoreflect.Message {
-	mi := &file_pkg_net_gobus_proto_msgTypes[7]
+	mi := &file_pkg_net_gobus_proto_msgTypes[8]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -452,7 +524,7 @@ func (x *DefaultResponse) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use DefaultResponse.ProtoReflect.Descriptor instead.
 func (*DefaultResponse) Descriptor() ([]byte, []int) {
-	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{7}
+	return file_pkg_net_gobus_proto_rawDescGZIP(), []int{8}
 }
 
 var File_pkg_net_gobus_proto protoreflect.FileDescriptor
@@ -473,45 +545,52 @@ var file_pkg_net_gobus_proto_rawDesc = []byte{
 	0x52, 0x06, 0x49, 0x73, 0x4c, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18,
 	0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65,
 	0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18,
-	0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x2a, 0x0a, 0x10, 0x53, 0x75,
+	0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x4a, 0x0a, 0x10, 0x53, 0x75,
 	0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16,
 	0x0a, 0x06, 0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06,
-	0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x22, 0x4d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65,
+	0x53, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74,
+	0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x43, 0x6c, 0x69, 0x65,
+	0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x4d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65,
 	0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18,
 	0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a,
 	0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f,
 	0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
-	0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x20, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73,
-	0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x4f, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69,
-	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72,
-	0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65,
-	0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
-	0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20,
-	0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61,
-	0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05,
-	0x47, 0x6f, 0x42, 0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12,
-	0x17, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69,
-	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
-	0x72, 0x69, 0x6e, 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
-	0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x79, 0x6e,
-	0x63, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e,
-	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
-	0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
-	0x22, 0x00, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12,
-	0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63,
-	0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61,
-	0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f,
-	0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63,
-	0x72, 0x69, 0x62, 0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72,
-	0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
-	0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66,
-	0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a,
-	0x03, 0x47, 0x65, 0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
-	0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63,
-	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f,
-	0x6e, 0x65, 0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x4a, 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73,
+	0x67, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
+	0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f, 0x70,
+	0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12,
+	0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73,
+	0x67, 0x22, 0x20, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+	0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03,
+	0x4d, 0x73, 0x67, 0x22, 0x4f, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65, 0x71,
+	0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05,
+	0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70,
+	0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52,
+	0x03, 0x4d, 0x73, 0x67, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52,
+	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05, 0x47, 0x6f, 0x42, 0x75,
+	0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12, 0x17, 0x2e, 0x70, 0x61,
+	0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65, 0x71,
+	0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
+	0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+	0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x15, 0x2e,
+	0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71,
+	0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
+	0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x43,
+	0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1a, 0x2e, 0x70, 0x61,
+	0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
+	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72,
+	0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+	0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
+	0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69,
+	0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e,
+	0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x73,
+	0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x03, 0x47, 0x65, 0x74,
+	0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73,
+	0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70,
+	0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70,
+	0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x6e, 0x65, 0x74, 0x61,
+	0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -526,28 +605,29 @@ func file_pkg_net_gobus_proto_rawDescGZIP() []byte {
 	return file_pkg_net_gobus_proto_rawDescData
 }
 
-var file_pkg_net_gobus_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
+var file_pkg_net_gobus_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
 var file_pkg_net_gobus_proto_goTypes = []interface{}{
 	(*BuffRequest)(nil),      // 0: parserin.BuffRequest
 	(*BuffResponse)(nil),     // 1: parserin.BuffResponse
 	(*GetRequest)(nil),       // 2: parserin.GetRequest
 	(*SubscribeRequest)(nil), // 3: parserin.SubscribeRequest
 	(*SyncRequest)(nil),      // 4: parserin.SyncRequest
-	(*SyncResponse)(nil),     // 5: parserin.SyncResponse
-	(*PublicRequest)(nil),    // 6: parserin.PublicRequest
-	(*DefaultResponse)(nil),  // 7: parserin.DefaultResponse
+	(*TopicMsg)(nil),         // 5: parserin.TopicMsg
+	(*SyncResponse)(nil),     // 6: parserin.SyncResponse
+	(*PublicRequest)(nil),    // 7: parserin.PublicRequest
+	(*DefaultResponse)(nil),  // 8: parserin.DefaultResponse
 }
 var file_pkg_net_gobus_proto_depIdxs = []int32{
-	6, // 0: parserin.GoBus.Public:input_type -> parserin.PublicRequest
+	7, // 0: parserin.GoBus.Public:input_type -> parserin.PublicRequest
 	4, // 1: parserin.GoBus.SendSync:input_type -> parserin.SyncRequest
 	3, // 2: parserin.GoBus.Subscribe:input_type -> parserin.SubscribeRequest
 	0, // 3: parserin.GoBus.SubscribeBuffer:input_type -> parserin.BuffRequest
 	3, // 4: parserin.GoBus.Get:input_type -> parserin.SubscribeRequest
-	7, // 5: parserin.GoBus.Public:output_type -> parserin.DefaultResponse
-	5, // 6: parserin.GoBus.SendSync:output_type -> parserin.SyncResponse
-	5, // 7: parserin.GoBus.Subscribe:output_type -> parserin.SyncResponse
+	8, // 5: parserin.GoBus.Public:output_type -> parserin.DefaultResponse
+	6, // 6: parserin.GoBus.SendSync:output_type -> parserin.SyncResponse
+	6, // 7: parserin.GoBus.Subscribe:output_type -> parserin.SyncResponse
 	1, // 8: parserin.GoBus.SubscribeBuffer:output_type -> parserin.BuffResponse
-	5, // 9: parserin.GoBus.Get:output_type -> parserin.SyncResponse
+	6, // 9: parserin.GoBus.Get:output_type -> parserin.SyncResponse
 	5, // [5:10] is the sub-list for method output_type
 	0, // [0:5] is the sub-list for method input_type
 	0, // [0:0] is the sub-list for extension type_name
@@ -622,7 +702,7 @@ func file_pkg_net_gobus_proto_init() {
 			}
 		}
 		file_pkg_net_gobus_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*SyncResponse); i {
+			switch v := v.(*TopicMsg); i {
 			case 0:
 				return &v.state
 			case 1:
@@ -634,7 +714,7 @@ func file_pkg_net_gobus_proto_init() {
 			}
 		}
 		file_pkg_net_gobus_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*PublicRequest); i {
+			switch v := v.(*SyncResponse); i {
 			case 0:
 				return &v.state
 			case 1:
@@ -646,6 +726,18 @@ func file_pkg_net_gobus_proto_init() {
 			}
 		}
 		file_pkg_net_gobus_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*PublicRequest); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_pkg_net_gobus_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
 			switch v := v.(*DefaultResponse); i {
 			case 0:
 				return &v.state
@@ -664,7 +756,7 @@ func file_pkg_net_gobus_proto_init() {
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_pkg_net_gobus_proto_rawDesc,
 			NumEnums:      0,
-			NumMessages:   8,
+			NumMessages:   9,
 			NumExtensions: 0,
 			NumServices:   1,
 		},

+ 18 - 0
pkg/types/iclient_proxy.go

@@ -0,0 +1,18 @@
+package types
+
+import (
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+)
+
+// IClientProxy -- прокси-клиента на подписку по шаблону на топики
+type IClientProxy interface {
+	// ClientName -- возвращает уникальное имя клиента
+	ClientName() alias.ClientName
+	// SampleTopic -- возвращает шаблок подписки на тпоики
+	SampleTopic() alias.TopicName
+	// Write -- пишет в прокси клиента сообщение топика
+	Write(*netapi.TopicMsg)
+	// IsWork -- возвращает признак работы прокси клиента
+	IsWork() bool
+}

+ 4 - 0
pkg/types/idict_topic.go

@@ -12,4 +12,8 @@ type IDictTopic interface {
 	Len() int
 	// Size -- возвращает размер всех топиков
 	Size() int
+	// Subscribe -- подписывает клиента на топики по шаблону
+	Subscribe(IClientProxy)
+	// Unsubscribe -- отписывает клиента от топиков
+	Unsubscribe(IClientProxy)
 }

+ 1 - 1
pkg/types/iservice.go

@@ -10,5 +10,5 @@ type IService interface {
 	// CancelApp -- отменяет контекст приложения
 	CancelApp()
 	// DictTopic -- возвращает потокобезопасный словарь топиков
-	DictTopic()IDictTopic
+	DictTopic() IDictTopic
 }

+ 10 - 3
pkg/types/itopic.go

@@ -1,15 +1,22 @@
 package types
 
-import "p78git.ddns.net/svi/gobus/pkg/alias"
+import (
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+)
 
 // ITopic -- интерфейс топика шины данных
 type ITopic interface {
 	// Name -- возвращает имя топика
 	Name() alias.TopicName
 	// Set -- устанавливает содержимое топика
-	Set(srcNum int, val []byte)
+	Set(*netapi.TopicMsg)
 	// Get -- возвращает содержимое топика
-	Get() []byte
+	Get() *netapi.TopicMsg
 	// Size -- возвращает размер топика
 	Size() int
+	// Subscribe -- подписывает клиента
+	Subscribe(IClientProxy)
+	// Unsubscribe -- отписывает клиента
+	Unsubscribe(alias.ClientName)
 }