Przeglądaj źródła

SVI Добавление кода GRPC-сервера

SVI 2 lat temu
rodzic
commit
501d591ac1

+ 15 - 0
internal/serv_grpc/serv_get/serv_get.go

@@ -0,0 +1,15 @@
+// package serv_get -- реализация GET-запроса по шаблону
+package serv_get
+
+import (
+	"context"
+
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+)
+
+type ServGet struct{}
+
+// Get -- возвращает содержимое топиков по шаблону
+func (sf *ServGet) Get(ctx context.Context, req *netapi.SubscribeRequest) (*netapi.SyncResponse, error) {
+	return &netapi.SyncResponse{}, nil
+}

+ 55 - 3
internal/serv_grpc/serv_grpc.go

@@ -3,25 +3,77 @@ package serv_grpc
 
 import (
 	"fmt"
+	"io"
 	"log"
+	"net"
 	"os"
+
+	"google.golang.org/grpc"
+
+	"p78git.ddns.net/svi/gobus/internal/serv_grpc/serv_get"
+	"p78git.ddns.net/svi/gobus/internal/serv_grpc/serv_public"
+	"p78git.ddns.net/svi/gobus/internal/serv_grpc/serv_subcsribe"
+	"p78git.ddns.net/svi/gobus/internal/serv_grpc/serv_subscribe_buffer"
+	"p78git.ddns.net/svi/gobus/internal/serv_grpc/serv_sync"
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
 // ServGrpc -- сервер GRPC-запросов шины данных
-type ServGrpc struct{}
+type ServGrpc struct {
+	netapi.UnsafeGoBusServer
+	srv  net.Listener
+	port string
+	*serv_get.ServGet
+	*serv_public.ServPublic
+	*serv_sync.ServSync
+	*serv_subcsribe.ServSubscribe
+	*serv_subscribe_buffer.ServSubscribeBuffer
+}
 
 // NewServGrpc -- возвращает новый GRPC-сервер шины данных
-func NewServGrpc() (*ServGrpc, error) {
+func NewServGrpc(serv types.IService) (*ServGrpc, error) {
 	log.Printf("NewServGrpc()\n")
+	if serv == nil {
+		return nil, fmt.Errorf("NewServGrpc(): IService==nil")
+	}
 	port := os.Getenv("SERV_GRPC_PORT")
 	if port == "" {
 		return nil, fmt.Errorf("NewServGrpc(): env SERV_GRPC_PORT not set")
 	}
-	sf := &ServGrpc{}
+	sf := &ServGrpc{
+		port: port,
+	}
+	var err error
+	sf.ServPublic, err = serv_public.NewServPublic(serv)
+	if err != nil {
+		return nil, fmt.Errorf("NewServGrpc(): in create ServPublic, err=\n\t%w", err)
+	}
+	_ = netapi.GoBusServer(sf)
 	return sf, nil
 }
 
 // Run -- запускает GRPC-сервер в работу
 func (sf *ServGrpc) Run() {
 	log.Printf("ServGrpc.Run()\n")
+	go sf.listen()
+}
+
+// Слушает порт
+func (sf *ServGrpc) listen() {
+	var err error
+	sf.srv, err = net.Listen("tcp", ":"+sf.port)
+	if err != nil {
+		log.Printf("ServGrpc.listen(): failed to listen, err=\n\t%v\n", err)
+		return
+	}
+	opts := []grpc.ServerOption{}
+	grpcServer := grpc.NewServer(opts...)
+	netapi.RegisterGoBusServer(grpcServer, sf)
+	err = grpcServer.Serve(sf.srv)
+	if err != nil && err != io.EOF {
+		strErr := fmt.Sprintf("ServGrpc.listen(): failed to listen, err=\n\t%v\n", err)
+		log.Printf("ServGrpc.listen(): ошибка при работе GRPC-сервера, err=\n\t%v\n", strErr)
+	}
+	log.Printf("ServGrpc.listen(): работа сервера завершена")
 }

+ 33 - 0
internal/serv_grpc/serv_public/serv_public.go

@@ -0,0 +1,33 @@
+// package serv_public -- сервер запросов на публикацию
+package serv_public
+
+import (
+	"context"
+	"fmt"
+
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+	"p78git.ddns.net/svi/gobus/pkg/types"
+)
+
+// ServPublic -- отвечает на запросы публикации
+type ServPublic struct {
+	serv types.IService
+	dictTopic types.IDictTopic
+}
+
+// NewServPublic -- возвращает новый публикатор
+func NewServPublic(serv types.IService) (*ServPublic, error) {
+	if serv == nil {
+		return nil, fmt.Errorf("NewServPublic(): IService==nil")
+	}
+	sf := &ServPublic{
+		serv: serv,
+		dictTopic: serv.DictTopic(),
+	}
+	return sf, nil
+}
+
+// Public -- публикация сообщения в топик
+func (sf *ServPublic) Public(ctx context.Context, req *netapi.PublicRequest) (*netapi.DefaultResponse, error) {
+	return &netapi.DefaultResponse{}, nil
+}

+ 12 - 0
internal/serv_grpc/serv_subcsribe/serv_subscribe.go

@@ -0,0 +1,12 @@
+// package serv_subcsribe -- сервер подписки
+package serv_subcsribe
+
+import (
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+)
+
+type ServSubscribe struct{}
+
+func (sf *ServSubscribe) Subscribe(req *netapi.SubscribeRequest, serv netapi.GoBus_SubscribeServer) error {
+	return nil
+}

+ 10 - 0
internal/serv_grpc/serv_subscribe_buffer/serv_subscribe_buffer.go

@@ -0,0 +1,10 @@
+// package serv_subscribe_buffer -- сервер подписки с буфером хранения
+package serv_subscribe_buffer
+
+import "p78git.ddns.net/svi/gobus/pkg/net/netapi"
+
+type ServSubscribeBuffer struct{}
+
+func (sf *ServSubscribeBuffer) SubscribeBuffer(req *netapi.BuffRequest, serv netapi.GoBus_SubscribeBufferServer) error {
+	return nil
+}

+ 14 - 0
internal/serv_grpc/serv_sync/serv_sync.go

@@ -0,0 +1,14 @@
+// package serv_sync -- отправляет синхронный запрос в шину
+package serv_sync
+
+import (
+	"context"
+
+	"p78git.ddns.net/svi/gobus/pkg/net/netapi"
+)
+
+type ServSync struct{}
+
+func (sf *ServSync) SendSync(ctx context.Context, req *netapi.SyncRequest) (*netapi.SyncResponse, error) {
+	return &netapi.SyncResponse{}, nil
+}

+ 30 - 4
internal/service/service.go

@@ -2,25 +2,39 @@
 package service
 
 import (
+	"context"
 	"fmt"
 	"log"
 
+	"p78git.ddns.net/svi/gobus/internal/dict_topic"
 	"p78git.ddns.net/svi/gobus/internal/serv_grpc"
+	"p78git.ddns.net/svi/gobus/pkg/types"
 )
 
 // Service -- главный тип сервиса шины данных
 type Service struct {
-	servGrpc *serv_grpc.ServGrpc
+	ctxBg     context.Context  // Неотменяемый контекст приложения
+	ctx       context.Context  // Отменяемый контекст приложения
+	fnCancel  func()           // Функция отмены контекста приложения
+	dictTopic types.IDictTopic // Потокобезопасный словарь топиков
+	servGrpc  *serv_grpc.ServGrpc
 }
 
 // NewSevice -- взвращае тновый сервис
 func NewSevice() (*Service, error) {
 	log.Printf("NewService()\n")
-	sf := &Service{}
+	sf := &Service{
+		ctxBg: context.Background(),
+	}
+	sf.ctx, sf.fnCancel = context.WithCancel(sf.ctxBg)
 	var err error
-	sf.servGrpc, err = serv_grpc.NewServGrpc()
+	sf.dictTopic, err = dict_topic.NewDictTopic()
+	if err != nil {
+		return nil, fmt.Errorf("NewService(): in create DictTopic, err=\n\t%w", err)
+	}
+	sf.servGrpc, err = serv_grpc.NewServGrpc(sf)
 	if err != nil {
-		return nil, fmt.Errorf("NewService(): in create ServGrpc, err=\n\t%v\n", err)
+		return nil, fmt.Errorf("NewService(): in create ServGrpc, err=\n\t%w", err)
 	}
 	return sf, nil
 }
@@ -28,5 +42,17 @@ func NewSevice() (*Service, error) {
 // Run -- запускает сервис в работу
 func (sf *Service) Run() error {
 	log.Printf("Service.Run()\n")
+	sf.servGrpc.Run()
+	<-sf.ctx.Done()
 	return nil
 }
+
+// Ctx -- возвращает контекст приложения
+func (sf Service) Ctx() context.Context {
+	return sf.ctx
+}
+
+// CancelApp -- отменяет контекст приложения
+func (sf *Service) CancelApp() {
+	sf.fnCancel()
+}

+ 5 - 0
pkg/alias/alias.go

@@ -0,0 +1,5 @@
+// package alias -- алиасы сервиса
+package alias
+
+// TopicName -- имя топика
+type TopicName string

+ 2 - 2
pkg/net/gobus.proto

@@ -20,7 +20,7 @@ service GoBus {
     // SubscribeBuffer -- подписка по шаблону с буфером на выход
     rpc SubscribeBuffer (BuffRequest) returns (stream BuffResponse){}
     // Get -- возвращает значения топиков по шаблону
-    rpc Get(SubscribeRequest) returns (GetRequest){}
+    rpc Get(SubscribeRequest) returns (SyncResponse){}
 }
 
 // BuffRequest -- запрос на буферизованную подписку
@@ -59,7 +59,7 @@ message SyncResponse{
 }
 // PublicRequest -- запрос на публикацию сообщения
 message PublicRequest {
-    int32 Source = 1; // Источник данных (0 -- клиент, другое -- реплика)
+    int32 Source = 1; // Источник данных (0 -- клиент, другое -- для реплики)
     string Topic = 2; // Куда опубликовать сообщение
     bytes Msg    = 3; // Байтовое представление сообщения
 }

+ 6 - 6
pkg/net/netapi/gobus.pb.go

@@ -489,7 +489,7 @@ var file_pkg_net_gobus_proto_rawDesc = []byte{
 	0x12, 0x14, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
 	0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x4d, 0x73, 0x67, 0x18, 0x03, 0x20,
 	0x01, 0x28, 0x0c, 0x52, 0x03, 0x4d, 0x73, 0x67, 0x22, 0x11, 0x0a, 0x0f, 0x44, 0x65, 0x66, 0x61,
-	0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xca, 0x02, 0x0a, 0x05,
+	0x75, 0x6c, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x05,
 	0x47, 0x6f, 0x42, 0x75, 0x73, 0x12, 0x3e, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x12,
 	0x17, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69,
 	0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65,
@@ -506,12 +506,12 @@ var file_pkg_net_gobus_proto_rawDesc = []byte{
 	0x72, 0x69, 0x62, 0x65, 0x42, 0x75, 0x66, 0x66, 0x65, 0x72, 0x12, 0x15, 0x2e, 0x70, 0x61, 0x72,
 	0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
 	0x74, 0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x42, 0x75, 0x66,
-	0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x39, 0x0a,
+	0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3b, 0x0a,
 	0x03, 0x47, 0x65, 0x74, 0x12, 0x1a, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e,
 	0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
-	0x1a, 0x14, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x52,
-	0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x6e, 0x65,
-	0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x1a, 0x16, 0x2e, 0x70, 0x61, 0x72, 0x73, 0x65, 0x72, 0x69, 0x6e, 0x2e, 0x53, 0x79, 0x6e, 0x63,
+	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f,
+	0x6e, 0x65, 0x74, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -547,7 +547,7 @@ var file_pkg_net_gobus_proto_depIdxs = []int32{
 	5, // 6: parserin.GoBus.SendSync:output_type -> parserin.SyncResponse
 	5, // 7: parserin.GoBus.Subscribe:output_type -> parserin.SyncResponse
 	1, // 8: parserin.GoBus.SubscribeBuffer:output_type -> parserin.BuffResponse
-	2, // 9: parserin.GoBus.Get:output_type -> parserin.GetRequest
+	5, // 9: parserin.GoBus.Get:output_type -> parserin.SyncResponse
 	5, // [5:10] is the sub-list for method output_type
 	0, // [0:5] is the sub-list for method input_type
 	0, // [0:0] is the sub-list for extension type_name

+ 5 - 5
pkg/net/netapi/gobus_grpc.pb.go

@@ -43,7 +43,7 @@ type GoBusClient interface {
 	// SubscribeBuffer -- подписка по шаблону с буфером на выход
 	SubscribeBuffer(ctx context.Context, in *BuffRequest, opts ...grpc.CallOption) (GoBus_SubscribeBufferClient, error)
 	// Get -- возвращает значения топиков по шаблону
-	Get(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*GetRequest, error)
+	Get(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*SyncResponse, error)
 }
 
 type goBusClient struct {
@@ -136,8 +136,8 @@ func (x *goBusSubscribeBufferClient) Recv() (*BuffResponse, error) {
 	return m, nil
 }
 
-func (c *goBusClient) Get(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*GetRequest, error) {
-	out := new(GetRequest)
+func (c *goBusClient) Get(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*SyncResponse, error) {
+	out := new(SyncResponse)
 	err := c.cc.Invoke(ctx, GoBus_Get_FullMethodName, in, out, opts...)
 	if err != nil {
 		return nil, err
@@ -158,7 +158,7 @@ type GoBusServer interface {
 	// SubscribeBuffer -- подписка по шаблону с буфером на выход
 	SubscribeBuffer(*BuffRequest, GoBus_SubscribeBufferServer) error
 	// Get -- возвращает значения топиков по шаблону
-	Get(context.Context, *SubscribeRequest) (*GetRequest, error)
+	Get(context.Context, *SubscribeRequest) (*SyncResponse, error)
 	mustEmbedUnimplementedGoBusServer()
 }
 
@@ -178,7 +178,7 @@ func (UnimplementedGoBusServer) Subscribe(*SubscribeRequest, GoBus_SubscribeServ
 func (UnimplementedGoBusServer) SubscribeBuffer(*BuffRequest, GoBus_SubscribeBufferServer) error {
 	return status.Errorf(codes.Unimplemented, "method SubscribeBuffer not implemented")
 }
-func (UnimplementedGoBusServer) Get(context.Context, *SubscribeRequest) (*GetRequest, error) {
+func (UnimplementedGoBusServer) Get(context.Context, *SubscribeRequest) (*SyncResponse, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Get not implemented")
 }
 func (UnimplementedGoBusServer) mustEmbedUnimplementedGoBusServer() {}

+ 15 - 0
pkg/types/idict_topic.go

@@ -0,0 +1,15 @@
+package types
+
+import "p78git.ddns.net/svi/gobus/pkg/alias"
+
+// IDictTopic -- потокобезопасный словарь топиков
+type IDictTopic interface {
+	// Add -- добавляет топик в словрь
+	Add(alias.TopicName)
+	// Get -- возвращает топик по имени
+	Get(alias.TopicName) ITopic
+	// Len -- возвращает количество топиков
+	Len() int
+	// Size -- возвращает размер всех топиков
+	Size() int
+}

+ 14 - 0
pkg/types/iservice.go

@@ -0,0 +1,14 @@
+// package types -- типизация дял сервиса
+package types
+
+import "context"
+
+// IService -- сервис шины данных
+type IService interface {
+	// Ctx -- возвращает контекст приложения
+	Ctx() context.Context
+	// CancelApp -- отменяет контекст приложения
+	CancelApp()
+	// DictTopic -- возвращает потокобезопасный словарь топиков
+	DictTopic()IDictTopic
+}

+ 15 - 0
pkg/types/itopic.go

@@ -0,0 +1,15 @@
+package types
+
+import "p78git.ddns.net/svi/gobus/pkg/alias"
+
+// ITopic -- интерфейс топика шины данных
+type ITopic interface {
+	// Name -- возвращает имя топика
+	Name() alias.TopicName
+	// Set -- устанавливает содержимое топика
+	Set(srcNum int, val []byte)
+	// Get -- возвращает содержимое топика
+	Get() []byte
+	// Size -- возвращает размер топика
+	Size() int
+}