Переглянути джерело

SVI Добавление лайф-тайма топика; 6.0%

SVI 2 роки тому
батько
коміт
81a3357b57

+ 12 - 13
api/gobus.proto

@@ -50,31 +50,30 @@ message SubscribeRequest{
 
 // SyncRequest -- синхронный запрос в шину данных
 message SyncRequest{
-    int32 Source = 1; // Источник данных (0 -- клиент, другое -- реплика)
-    string Topic = 2; // Где опубликовать запрос
-    bytes Msg    = 3; // Байтовое представление сообщения
-}
-
-// TopicMsg -- сообщение топика (внутри шины данных)
-message TopicMsg{
-    int32 Source = 1; // Источник данных (0 -- клиент, другое -- реплика)
-    string Topic = 2; // Где опубликовать запрос
-    bytes Msg    = 3; // Байтовое представление сообщения
+    TopicMsg topicMsg = 1; // Вложенное сообщение
 }
 
 // SyncResponse -- ответ на синхронный запрос
 message SyncResponse{
     bytes Msg = 1; // Содержимое ответа на синхроныый запрос
 }
+
+
 // PublicRequest -- запрос на публикацию сообщения
 message PublicRequest {
-    int32 Source = 1; // Источник данных (0 -- клиент, другое -- для реплики)
-    string Topic = 2; // Куда опубликовать сообщение
+    TopicMsg topicMsg = 1;
+    repeated int32 RepliesList = 2; // Список реплик, на которых сообщение уже опубликовано
+}
+
+// TopicMsg -- сообщение топика (внутри шины данных)
+message TopicMsg{
+    int32 Source = 1; // Источник данных (0 -- клиент, другое -- реплика)
+    string Topic = 2; // Где опубликовать запрос
     bytes Msg    = 3; // Байтовое представление сообщения
     string Uuid  = 4; // Уникальная метка сообщения
-    repeated int32 RepliesList = 5; // Список реплик, на которых сообщение уже опубликовано
 }
 
+
 // DefaultResponse -- возвращаемое значение по умолчанию (ничего не содержит)
 message DefaultResponse {
 }

+ 116 - 148
api/netapi/gobus.pb.go

@@ -262,9 +262,7 @@ type SyncRequest struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- реплика)
-	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Где опубликовать запрос
-	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
+	TopicMsg *TopicMsg `protobuf:"bytes,1,opt,name=topicMsg,proto3" json:"topicMsg,omitempty"` // Вложенное сообщение
 }
 
 func (x *SyncRequest) Reset() {
@@ -299,40 +297,24 @@ func (*SyncRequest) Descriptor() ([]byte, []int) {
 	return file_api_gobus_proto_rawDescGZIP(), []int{4}
 }
 
-func (x *SyncRequest) GetSource() int32 {
-	if x != nil {
-		return x.Source
-	}
-	return 0
-}
-
-func (x *SyncRequest) GetTopic() string {
-	if x != nil {
-		return x.Topic
-	}
-	return ""
-}
-
-func (x *SyncRequest) GetMsg() []byte {
+func (x *SyncRequest) GetTopicMsg() *TopicMsg {
 	if x != nil {
-		return x.Msg
+		return x.TopicMsg
 	}
 	return nil
 }
 
-// TopicMsg -- сообщение топика (внутри шины данных)
-type TopicMsg struct {
+// SyncResponse -- ответ на синхронный запрос
+type SyncResponse struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- реплика)
-	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Где опубликовать запрос
-	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
+	Msg []byte `protobuf:"bytes,1,opt,name=Msg,proto3" json:"Msg,omitempty"` // Содержимое ответа на синхроныый запрос
 }
 
-func (x *TopicMsg) Reset() {
-	*x = TopicMsg{}
+func (x *SyncResponse) Reset() {
+	*x = SyncResponse{}
 	if protoimpl.UnsafeEnabled {
 		mi := &file_api_gobus_proto_msgTypes[5]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -340,13 +322,13 @@ func (x *TopicMsg) Reset() {
 	}
 }
 
-func (x *TopicMsg) String() string {
+func (x *SyncResponse) String() string {
 	return protoimpl.X.MessageStringOf(x)
 }
 
-func (*TopicMsg) ProtoMessage() {}
+func (*SyncResponse) ProtoMessage() {}
 
-func (x *TopicMsg) ProtoReflect() protoreflect.Message {
+func (x *SyncResponse) ProtoReflect() protoreflect.Message {
 	mi := &file_api_gobus_proto_msgTypes[5]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -358,43 +340,30 @@ func (x *TopicMsg) ProtoReflect() protoreflect.Message {
 	return mi.MessageOf(x)
 }
 
-// Deprecated: Use TopicMsg.ProtoReflect.Descriptor instead.
-func (*TopicMsg) Descriptor() ([]byte, []int) {
+// Deprecated: Use SyncResponse.ProtoReflect.Descriptor instead.
+func (*SyncResponse) Descriptor() ([]byte, []int) {
 	return file_api_gobus_proto_rawDescGZIP(), []int{5}
 }
 
-func (x *TopicMsg) GetSource() int32 {
-	if x != nil {
-		return x.Source
-	}
-	return 0
-}
-
-func (x *TopicMsg) GetTopic() string {
-	if x != nil {
-		return x.Topic
-	}
-	return ""
-}
-
-func (x *TopicMsg) GetMsg() []byte {
+func (x *SyncResponse) GetMsg() []byte {
 	if x != nil {
 		return x.Msg
 	}
 	return nil
 }
 
-// SyncResponse -- ответ на синхронный запрос
-type SyncResponse struct {
+// PublicRequest -- запрос на публикацию сообщения
+type PublicRequest struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Msg []byte `protobuf:"bytes,1,opt,name=Msg,proto3" json:"Msg,omitempty"` // Содержимое ответа на синхроныый запрос
+	TopicMsg    *TopicMsg `protobuf:"bytes,1,opt,name=topicMsg,proto3" json:"topicMsg,omitempty"`
+	RepliesList []int32   `protobuf:"varint,2,rep,packed,name=RepliesList,proto3" json:"RepliesList,omitempty"` // Список реплик, на которых сообщение уже опубликовано
 }
 
-func (x *SyncResponse) Reset() {
-	*x = SyncResponse{}
+func (x *PublicRequest) Reset() {
+	*x = PublicRequest{}
 	if protoimpl.UnsafeEnabled {
 		mi := &file_api_gobus_proto_msgTypes[6]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -402,13 +371,13 @@ func (x *SyncResponse) Reset() {
 	}
 }
 
-func (x *SyncResponse) String() string {
+func (x *PublicRequest) String() string {
 	return protoimpl.X.MessageStringOf(x)
 }
 
-func (*SyncResponse) ProtoMessage() {}
+func (*PublicRequest) ProtoMessage() {}
 
-func (x *SyncResponse) ProtoReflect() protoreflect.Message {
+func (x *PublicRequest) ProtoReflect() protoreflect.Message {
 	mi := &file_api_gobus_proto_msgTypes[6]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -420,33 +389,39 @@ func (x *SyncResponse) ProtoReflect() protoreflect.Message {
 	return mi.MessageOf(x)
 }
 
-// Deprecated: Use SyncResponse.ProtoReflect.Descriptor instead.
-func (*SyncResponse) Descriptor() ([]byte, []int) {
+// Deprecated: Use PublicRequest.ProtoReflect.Descriptor instead.
+func (*PublicRequest) Descriptor() ([]byte, []int) {
 	return file_api_gobus_proto_rawDescGZIP(), []int{6}
 }
 
-func (x *SyncResponse) GetMsg() []byte {
+func (x *PublicRequest) GetTopicMsg() *TopicMsg {
 	if x != nil {
-		return x.Msg
+		return x.TopicMsg
 	}
 	return nil
 }
 
-// PublicRequest -- запрос на публикацию сообщения
-type PublicRequest struct {
+func (x *PublicRequest) GetRepliesList() []int32 {
+	if x != nil {
+		return x.RepliesList
+	}
+	return nil
+}
+
+// TopicMsg -- сообщение топика (внутри шины данных)
+type TopicMsg struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Source      int32   `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"`                  // Источник данных (0 -- клиент, другое -- для реплики)
-	Topic       string  `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`                     // Куда опубликовать сообщение
-	Msg         []byte  `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`                         // Байтовое представление сообщения
-	Uuid        string  `protobuf:"bytes,4,opt,name=Uuid,proto3" json:"Uuid,omitempty"`                       // Уникальная метка сообщения
-	RepliesList []int32 `protobuf:"varint,5,rep,packed,name=RepliesList,proto3" json:"RepliesList,omitempty"` // Список реплик, на которых сообщение уже опубликовано
+	Source int32  `protobuf:"varint,1,opt,name=Source,proto3" json:"Source,omitempty"` // Источник данных (0 -- клиент, другое -- реплика)
+	Topic  string `protobuf:"bytes,2,opt,name=Topic,proto3" json:"Topic,omitempty"`    // Где опубликовать запрос
+	Msg    []byte `protobuf:"bytes,3,opt,name=Msg,proto3" json:"Msg,omitempty"`        // Байтовое представление сообщения
+	Uuid   string `protobuf:"bytes,4,opt,name=Uuid,proto3" json:"Uuid,omitempty"`      // Уникальная метка сообщения
 }
 
-func (x *PublicRequest) Reset() {
-	*x = PublicRequest{}
+func (x *TopicMsg) Reset() {
+	*x = TopicMsg{}
 	if protoimpl.UnsafeEnabled {
 		mi := &file_api_gobus_proto_msgTypes[7]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -454,13 +429,13 @@ func (x *PublicRequest) Reset() {
 	}
 }
 
-func (x *PublicRequest) String() string {
+func (x *TopicMsg) String() string {
 	return protoimpl.X.MessageStringOf(x)
 }
 
-func (*PublicRequest) ProtoMessage() {}
+func (*TopicMsg) ProtoMessage() {}
 
-func (x *PublicRequest) ProtoReflect() protoreflect.Message {
+func (x *TopicMsg) ProtoReflect() protoreflect.Message {
 	mi := &file_api_gobus_proto_msgTypes[7]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -472,46 +447,39 @@ func (x *PublicRequest) ProtoReflect() protoreflect.Message {
 	return mi.MessageOf(x)
 }
 
-// Deprecated: Use PublicRequest.ProtoReflect.Descriptor instead.
-func (*PublicRequest) Descriptor() ([]byte, []int) {
+// Deprecated: Use TopicMsg.ProtoReflect.Descriptor instead.
+func (*TopicMsg) Descriptor() ([]byte, []int) {
 	return file_api_gobus_proto_rawDescGZIP(), []int{7}
 }
 
-func (x *PublicRequest) GetSource() int32 {
+func (x *TopicMsg) GetSource() int32 {
 	if x != nil {
 		return x.Source
 	}
 	return 0
 }
 
-func (x *PublicRequest) GetTopic() string {
+func (x *TopicMsg) GetTopic() string {
 	if x != nil {
 		return x.Topic
 	}
 	return ""
 }
 
-func (x *PublicRequest) GetMsg() []byte {
+func (x *TopicMsg) GetMsg() []byte {
 	if x != nil {
 		return x.Msg
 	}
 	return nil
 }
 
-func (x *PublicRequest) GetUuid() string {
+func (x *TopicMsg) GetUuid() string {
 	if x != nil {
 		return x.Uuid
 	}
 	return ""
 }
 
-func (x *PublicRequest) GetRepliesList() []int32 {
-	if x != nil {
-		return x.RepliesList
-	}
-	return nil
-}
-
 // DefaultResponse -- возвращаемое значение по умолчанию (ничего не содержит)
 type DefaultResponse struct {
 	state         protoimpl.MessageState
@@ -576,50 +544,48 @@ var file_api_gobus_proto_rawDesc = []byte{
 	0x61, 0x6d, 0x70, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x61, 0x6d,
 	0x70, 0x6c, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d,
 	0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e,
-	0x61, 0x6d, 0x65, 0x22, 0x4d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
-	0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f,
-	0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63,
-	0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d,
-	0x73, 0x67, 0x22, 0x4a, 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x12, 0x16,
-	0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06,
-	0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18,
-	0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03,
-	0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x20,
-	0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x10,
-	0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67,
-	0x22, 0x85, 0x01, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
-	0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f,
-	0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63,
-	0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d,
-	0x73, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x55, 0x75, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
-	0x52, 0x04, 0x55, 0x75, 0x69, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x65,
-	0x73, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x05, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0b, 0x52, 0x65, 0x70,
-	0x6c, 0x69, 0x65, 0x73, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61,
-	0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05,
-	0x47, 0x6f, 0x42, 0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12,
-	0x17, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69,
-	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
-	0x72, 0x69, 0x6e, 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f,
-	0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x79, 0x6e,
-	0x63, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e,
-	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
-	0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
-	0x22, 0x00, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12,
-	0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63,
-	0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61,
-	0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f,
-	0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63,
-	0x72, 0x69, 0x62, 0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72,
-	0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
-	0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66,
-	0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a,
-	0x03, 0x47, 0x65, 0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
-	0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63,
-	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f,
-	0x6e, 0x65, 0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x61, 0x6d, 0x65, 0x22, 0x3d, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65,
+	0x73, 0x74, 0x12, 0x2e, 0x0a, 0x08, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
+	0x54, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x52, 0x08, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x4d,
+	0x73, 0x67, 0x22, 0x20, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+	0x73, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52,
+	0x03, 0x4d, 0x73, 0x67, 0x22, 0x61, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x52, 0x65,
+	0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x08, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73,
+	0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72,
+	0x69, 0x6e, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x4d, 0x73, 0x67, 0x52, 0x08, 0x74, 0x6f, 0x70,
+	0x69, 0x63, 0x4d, 0x73, 0x67, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x65, 0x73,
+	0x4c, 0x69, 0x73, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0b, 0x52, 0x65, 0x70, 0x6c,
+	0x69, 0x65, 0x73, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x5e, 0x0a, 0x08, 0x54, 0x6f, 0x70, 0x69, 0x63,
+	0x4d, 0x73, 0x67, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20,
+	0x01, 0x28, 0x05, 0x52, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x54,
+	0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x54, 0x6f, 0x70, 0x69,
+	0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03,
+	0x4d, 0x73, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x55, 0x75, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28,
+	0x09, 0x52, 0x04, 0x55, 0x75, 0x69, 0x64, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61, 0x75,
+	0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05, 0x47,
+	0x6f, 0x42, 0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12, 0x17,
+	0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63,
+	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72,
+	0x69, 0x6e, 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+	0x73, 0x65, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x79, 0x6e, 0x63,
+	0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63,
+	0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72,
+	0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+	0x00, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1a,
+	0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72,
+	0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+	0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x44, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+	0x69, 0x62, 0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72, 0x73,
+	0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+	0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66,
+	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x03,
+	0x47, 0x65, 0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53,
+	0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+	0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52,
+	0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x6e,
+	0x65, 0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -641,27 +607,29 @@ var file_api_gobus_proto_goTypes = []interface{}{
 	(*GetRequest)(nil),       // 2: parserin.GetRequest
 	(*SubscribeRequest)(nil), // 3: parserin.SubscribeRequest
 	(*SyncRequest)(nil),      // 4: parserin.SyncRequest
-	(*TopicMsg)(nil),         // 5: parserin.TopicMsg
-	(*SyncResponse)(nil),     // 6: parserin.SyncResponse
-	(*PublicRequest)(nil),    // 7: parserin.PublicRequest
+	(*SyncResponse)(nil),     // 5: parserin.SyncResponse
+	(*PublicRequest)(nil),    // 6: parserin.PublicRequest
+	(*TopicMsg)(nil),         // 7: parserin.TopicMsg
 	(*DefaultResponse)(nil),  // 8: parserin.DefaultResponse
 }
 var file_api_gobus_proto_depIdxs = []int32{
-	7, // 0: parserin.GoBus.Public:input_type -> parserin.PublicRequest
-	4, // 1: parserin.GoBus.SendSync:input_type -> parserin.SyncRequest
-	3, // 2: parserin.GoBus.Subscribe:input_type -> parserin.SubscribeRequest
-	0, // 3: parserin.GoBus.SubscribeBuffer:input_type -> parserin.BuffRequest
-	3, // 4: parserin.GoBus.Get:input_type -> parserin.SubscribeRequest
-	8, // 5: parserin.GoBus.Public:output_type -> parserin.DefaultResponse
-	6, // 6: parserin.GoBus.SendSync:output_type -> parserin.SyncResponse
-	6, // 7: parserin.GoBus.Subscribe:output_type -> parserin.SyncResponse
-	1, // 8: parserin.GoBus.SubscribeBuffer:output_type -> parserin.BuffResponse
-	6, // 9: parserin.GoBus.Get:output_type -> parserin.SyncResponse
-	5, // [5:10] is the sub-list for method output_type
-	0, // [0:5] is the sub-list for method input_type
-	0, // [0:0] is the sub-list for extension type_name
-	0, // [0:0] is the sub-list for extension extendee
-	0, // [0:0] is the sub-list for field type_name
+	7, // 0: parserin.SyncRequest.topicMsg:type_name -> parserin.TopicMsg
+	7, // 1: parserin.PublicRequest.topicMsg:type_name -> parserin.TopicMsg
+	6, // 2: parserin.GoBus.Public:input_type -> parserin.PublicRequest
+	4, // 3: parserin.GoBus.SendSync:input_type -> parserin.SyncRequest
+	3, // 4: parserin.GoBus.Subscribe:input_type -> parserin.SubscribeRequest
+	0, // 5: parserin.GoBus.SubscribeBuffer:input_type -> parserin.BuffRequest
+	3, // 6: parserin.GoBus.Get:input_type -> parserin.SubscribeRequest
+	8, // 7: parserin.GoBus.Public:output_type -> parserin.DefaultResponse
+	5, // 8: parserin.GoBus.SendSync:output_type -> parserin.SyncResponse
+	5, // 9: parserin.GoBus.Subscribe:output_type -> parserin.SyncResponse
+	1, // 10: parserin.GoBus.SubscribeBuffer:output_type -> parserin.BuffResponse
+	5, // 11: parserin.GoBus.Get:output_type -> parserin.SyncResponse
+	7, // [7:12] is the sub-list for method output_type
+	2, // [2:7] is the sub-list for method input_type
+	2, // [2:2] is the sub-list for extension type_name
+	2, // [2:2] is the sub-list for extension extendee
+	0, // [0:2] is the sub-list for field type_name
 }
 
 func init() { file_api_gobus_proto_init() }
@@ -731,7 +699,7 @@ func file_api_gobus_proto_init() {
 			}
 		}
 		file_api_gobus_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*TopicMsg); i {
+			switch v := v.(*SyncResponse); i {
 			case 0:
 				return &v.state
 			case 1:
@@ -743,7 +711,7 @@ func file_api_gobus_proto_init() {
 			}
 		}
 		file_api_gobus_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*SyncResponse); i {
+			switch v := v.(*PublicRequest); i {
 			case 0:
 				return &v.state
 			case 1:
@@ -755,7 +723,7 @@ func file_api_gobus_proto_init() {
 			}
 		}
 		file_api_gobus_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*PublicRequest); i {
+			switch v := v.(*TopicMsg); i {
 			case 0:
 				return &v.state
 			case 1:

+ 1 - 1
cmd/gobus/main_test.go

@@ -10,7 +10,7 @@ type tester struct {
 	t *testing.T
 }
 
-func TestAny(t *testing.T) {
+func TestMain(t *testing.T) {
 	sf := &tester{
 		t: t,
 	}

+ 30 - 13
internal/dict_topic/dict_topic.go

@@ -2,10 +2,11 @@
 package dict_topic
 
 import (
-	"log"
+	"fmt"
 	"strings"
 	"sync"
 
+	"p78git.ddns.net/svi/gobus/api/netapi"
 	"p78git.ddns.net/svi/gobus/internal/dict_topic/topic"
 	"p78git.ddns.net/svi/gobus/pkg/alias"
 	"p78git.ddns.net/svi/gobus/pkg/types"
@@ -29,10 +30,14 @@ func NewDictTopic() (*DictTopic, error) {
 }
 
 // Unsubscribe -- отписывает клиента от топиков
-func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) {
+func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) error {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
+	if clientProxy == nil {
+		return fmt.Errorf("DictTopic.Unsubscribe(): IClientProxy==nil")
+	}
 	sf.unsubscribe(clientProxy)
+	return nil
 }
 
 // Отписывает под блокировкой клиента от топиков
@@ -47,10 +52,14 @@ func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) {
 }
 
 // Subscribe -- подписывает клиента на топики
-func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) {
+func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) error {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
+	if clientProxy == nil {
+		return fmt.Errorf("DictTopic.Subscribe(): IClientProxy==nil")
+	}
 	sf.subscribe(clientProxy)
+	return nil
 }
 
 // Подписывает по образцу топика клиента на события (внутри блокировки)
@@ -71,21 +80,29 @@ func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
 	sf.dictProxy[clientProxy.ClientName()] = clientProxy
 }
 
-// Add -- добавляет новый топик
-func (sf *DictTopic) Add(name alias.TopicName) {
+// Public -- публикует в новый топик (при необходимости создаёт)
+func (sf *DictTopic) Public(msg *netapi.TopicMsg) error {
 	sf.block.Lock()
 	defer sf.block.Unlock()
-	topic, err := topic.NewTopic(name)
-	if err != nil {
-		log.Printf("DictTopic.Add(): in create ITopic, err=\n\t%v\n", err)
-		return
+	if msg == nil {
+		return fmt.Errorf("DictTopic.Add(): msg==nil")
 	}
-	sf.dict[name] = topic
-	{ // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
-		for _, clientProxy := range sf.dictProxy {
-			topic.Subscribe(clientProxy)
+	top_, isOk := sf.dict[alias.TopicName(msg.Topic)]
+	if !isOk {
+		var err error
+		top_, err = topic.NewTopic(msg)
+		if err != nil {
+			return fmt.Errorf("DictTopic.Add(): in create ITopic, err=\n\t%v\n", err)
+		}
+		sf.dict[top_.Name()] = top_
+		{ // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
+			for _, clientProxy := range sf.dictProxy {
+				top_.Subscribe(clientProxy)
+			}
 		}
 	}
+	top_.Set(msg) // Оповещение подписчиков (на самом деле уже сообщение присвоено)
+	return nil
 }
 
 // Get -- возвращает топик по имени

+ 23 - 18
internal/dict_topic/topic/topic.go

@@ -14,19 +14,18 @@ import (
 type Topic struct {
 	name      alias.TopicName
 	dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
-	srcNum    int32                                   // Номер источника
-	val       []byte                                  // Содержимое топика
+	msg       *netapi.TopicMsg                        // Хранимое сообщение топика
 	block     sync.RWMutex
 }
 
 // Newtopic -- возвращает новый топик
-func NewTopic(name alias.TopicName) (*Topic, error) {
-	if name == "" {
+func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
+	if msg == nil {
 		return nil, fmt.Errorf("NewTopic(): name is empty")
 	}
 	sf := &Topic{
-		name:      name,
-		val:       []byte{},
+		name:      alias.TopicName(msg.Topic),
+		msg:       msg,
 		dictProxy: make(map[alias.ClientName]types.IClientProxy),
 	}
 	_ = types.ITopic(sf)
@@ -40,11 +39,15 @@ func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
 	delete(sf.dictProxy, clientName)
 }
 
-// Subscribe -- попдисывает клиента на себя
-func (sf *Topic) Subscribe(clientProxy types.IClientProxy) {
+// Subscribe -- подписывает клиента на себя
+func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
 	sf.block.Lock()
 	defer sf.block.Unlock()
+	if clientProxy == nil {
+		return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
+	}
 	sf.dictProxy[clientProxy.ClientName()] = clientProxy
+	return nil
 }
 
 // Name -- возвращает имя топика
@@ -56,20 +59,20 @@ func (sf *Topic) Name() alias.TopicName {
 func (sf *Topic) Get() *netapi.TopicMsg {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
-	msg := &netapi.TopicMsg{
-		Source: sf.srcNum,
-		Topic:  string(sf.name),
-		Msg:    sf.val,
-	}
-	return msg
+	return sf.msg
 }
 
 // Set -- устанавливает значение топика
-func (sf *Topic) Set(msg *netapi.TopicMsg) {
+func (sf *Topic) Set(msg *netapi.TopicMsg) error {
 	sf.block.Lock()
 	defer sf.block.Unlock()
-	sf.srcNum = msg.Source
-	sf.val = msg.Msg
+	if msg == nil {
+		return fmt.Errorf("Topic.Set(): ERROR msg==nil\n")
+	}
+	if sf.name != alias.TopicName(msg.Topic) {
+		return fmt.Errorf("Topic.Set(): topic name(%q)!=msg.Topic(%v)\n", sf.name, msg.Topic)
+	}
+	sf.msg = msg
 	for _, proxy := range sf.dictProxy {
 		if !proxy.IsWork() {
 			delete(sf.dictProxy, proxy.ClientName())
@@ -77,11 +80,13 @@ func (sf *Topic) Set(msg *netapi.TopicMsg) {
 		}
 		proxy.Write(msg)
 	}
+	return nil
 }
 
 // Size -- возвращает размер топика
 func (sf *Topic) Size() int {
 	sf.block.RLock()
 	defer sf.block.RUnlock()
-	return len(sf.name) + len(sf.val)
+	// *2 потому что имя хранится два раза
+	return len(sf.name)*2 + len(sf.msg.Msg)
 }

+ 43 - 0
internal/dict_topic/topic/topic_live_time/topic_live_time.go

@@ -0,0 +1,43 @@
+// package topic_live_time -- время жизни топика
+//
+//	После истечения времени -- топик будет уничтожен
+package topic_live_time
+
+import (
+	"fmt"
+	"sync"
+
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+)
+
+// TopicLiveTime --  время жизни топика
+type TopicLiveTime struct {
+	val   alias.TopicLiveTime // Фактическое оставшееся время топика в секундах
+	block sync.RWMutex
+}
+
+// NewTopicLiveTime -- возвращает новое время жизни топика
+func NewTopicLiveTime(val alias.TopicLiveTime) (*TopicLiveTime, error) {
+	if val < 10 {
+		return nil, fmt.Errorf("NewTopicLiveTime(): val(%v)<10", val)
+	}
+	sf := &TopicLiveTime{
+		val: val,
+	}
+	return sf, nil
+}
+
+// Get -- возвращает хранимое значение
+func (sf *TopicLiveTime) Get() alias.TopicLiveTime {
+	sf.block.RLock()
+	defer sf.block.RUnlock()
+	return sf.val
+}
+
+// Down -- уменьшает значение времени лайф-тайма на заданное число секунд
+func (sf *TopicLiveTime) Down(val alias.TopicLiveTime) bool {
+	sf.block.Lock()
+	defer sf.block.Unlock()
+	sf.val -= val
+	return sf.val <= 0
+}

+ 71 - 0
internal/dict_topic/topic/topic_live_time/topic_live_time_test.go

@@ -0,0 +1,71 @@
+package topic_live_time
+
+import "testing"
+
+/*
+	Тест для времени жизни топика
+*/
+
+type tester struct {
+	t    *testing.T
+	live *TopicLiveTime
+}
+
+func TestTopicLiveTime(t *testing.T) {
+	sf := &tester{
+		t: t,
+	}
+	sf.create()
+	sf.down()
+}
+
+// Уменьшение счётика лайф-тайма
+func (sf *tester) down() {
+	sf.t.Log("down")
+	sf.down1()
+}
+
+// Недостаточное обнуление счётчика лайф-тайма
+func (sf *tester) down1() {
+	sf.t.Log("down1")
+	if sf.live.Down(5) {
+		sf.t.Fatalf("down1(): limit==true")
+	}
+	if !sf.live.Down(5) {
+		sf.t.Fatalf("down1(): limit==false")
+	}
+}
+
+// Создание топика
+func (sf *tester) create() {
+	sf.t.Logf("create")
+	sf.createBad1()
+	sf.createGood1()
+}
+
+func (sf *tester) createGood1() {
+	sf.t.Log("createGood1")
+	var err error
+	sf.live, err = NewTopicLiveTime(10)
+	if err != nil {
+		sf.t.Fatalf("createGood1(): err=%v", err)
+	}
+	if sf.live == nil {
+		sf.t.Fatalf("createGood1(): live==nil")
+	}
+	if val := sf.live.Get(); val != 10 {
+		sf.t.Fatalf("createGood1(): val(%v)!=10", val)
+	}
+}
+
+// Слишком маленькое время жизни
+func (sf *tester) createBad1() {
+	sf.t.Log("createBad1")
+	live, err := NewTopicLiveTime(9)
+	if err == nil {
+		sf.t.Fatalf("createBad1(): err==nil")
+	}
+	if live != nil {
+		sf.t.Fatalf("createBad1(): live!=nil")
+	}
+}

+ 24 - 2
internal/dict_topic/topic/topic_test.go

@@ -1,6 +1,10 @@
 package topic
 
-import "testing"
+import (
+	"testing"
+
+	"p78git.ddns.net/svi/gobus/api/netapi"
+)
 
 /*
 	Тест для топика шины данных
@@ -21,12 +25,30 @@ func TestTopic(t *testing.T) {
 func (sf *tester) create() {
 	sf.t.Logf("create")
 	sf.createBad1()
+	sf.createGood1()
+}
+
+func (sf *tester) createGood1() {
+	sf.t.Logf("createGood1")
+	msg := &netapi.TopicMsg{
+		Source: 0,
+		Topic:  "test_topic",
+		Msg:    []byte{},
+		Uuid:   "",
+	}
+	topic, err := NewTopic(msg)
+	if err != nil {
+		sf.t.Fatalf("createGood1(): err=%v", err)
+	}
+	if topic == nil {
+		sf.t.Fatalf("createGood1(): topic==nil")
+	}
 }
 
 // Нет имени топика
 func (sf *tester) createBad1() {
 	sf.t.Logf("createBad1")
-	topic, err := NewTopic("")
+	topic, err := NewTopic(nil)
 	if err == nil {
 		sf.t.Fatalf("createBad1(): err==nil")
 	}

+ 3 - 11
internal/serv_grpc/serv_public/serv_public.go

@@ -7,7 +7,6 @@ import (
 	"log"
 
 	"p78git.ddns.net/svi/gobus/api/netapi"
-	"p78git.ddns.net/svi/gobus/pkg/alias"
 	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
@@ -32,16 +31,9 @@ func NewServPublic(serv types.IService) (*ServPublic, error) {
 
 // Public -- публикация сообщения в топик
 func (sf *ServPublic) Public(ctx context.Context, req *netapi.PublicRequest) (*netapi.DefaultResponse, error) {
-	topic := sf.dictTopic.Get(alias.TopicName(req.Topic))
-	if topic == nil {
-		sf.dictTopic.Add(alias.TopicName(req.Topic))
-		topic = sf.dictTopic.Get(alias.TopicName(req.Topic))
+	err := sf.dictTopic.Public(req.TopicMsg)
+	if err != nil {
+		return nil, fmt.Errorf("ServPublic.Public(): in set topic, err=\n\t%w", err)
 	}
-	msg := &netapi.TopicMsg{
-		Source: req.Source,
-		Topic:  req.Topic,
-		Msg:    req.Msg,
-	}
-	topic.Set(msg)
 	return &netapi.DefaultResponse{}, nil
 }

+ 3 - 0
pkg/alias/alias.go

@@ -6,3 +6,6 @@ type TopicName string
 
 // ClientName -- униальное имя клиента
 type ClientName string
+
+// TopicLiveTime -- время жизни топика в секундах
+type TopicLiveTime int

+ 8 - 5
pkg/types/idict_topic.go

@@ -1,11 +1,14 @@
 package types
 
-import "p78git.ddns.net/svi/gobus/pkg/alias"
+import (
+	"p78git.ddns.net/svi/gobus/api/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/alias"
+)
 
 // IDictTopic -- потокобезопасный словарь топиков
 type IDictTopic interface {
-	// Add -- добавляет топик в словрь
-	Add(alias.TopicName)
+	// Public -- публикует топик в словрь
+	Public(*netapi.TopicMsg) error
 	// Get -- возвращает топик по имени
 	Get(alias.TopicName) ITopic
 	// Len -- возвращает количество топиков
@@ -13,7 +16,7 @@ type IDictTopic interface {
 	// Size -- возвращает размер всех топиков
 	Size() int
 	// Subscribe -- подписывает клиента на топики по шаблону
-	Subscribe(IClientProxy)
+	Subscribe(IClientProxy) error
 	// Unsubscribe -- отписывает клиента от топиков
-	Unsubscribe(IClientProxy)
+	Unsubscribe(IClientProxy) error
 }

+ 2 - 2
pkg/types/itopic.go

@@ -10,13 +10,13 @@ type ITopic interface {
 	// Name -- возвращает имя топика
 	Name() alias.TopicName
 	// Set -- устанавливает содержимое топика
-	Set(*netapi.TopicMsg)
+	Set(*netapi.TopicMsg) error
 	// Get -- возвращает содержимое топика
 	Get() *netapi.TopicMsg
 	// Size -- возвращает размер топика
 	Size() int
 	// Subscribe -- подписывает клиента
-	Subscribe(IClientProxy)
+	Subscribe(IClientProxy) error
 	// Unsubscribe -- отписывает клиента
 	Unsubscribe(alias.ClientName)
 }