// package kbus_base -- базовая часть шины данных package kbus_base import ( "fmt" "sync" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias" . "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" . "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) const ( strBusBaseStream = "bus_base" ) // KBusBase -- базовая часть шины данных type KBusBase struct { Ctx_ IKernelCtx IsWork_ ISafeBool ctx ILocalCtx log ILogBuf dictSub IDictTopicSub dictServe IDictTopicServe } var ( Bus_ *KBusBase block sync.Mutex ) // GetKernelBusBase -- возвращает базовую шину сообщений func GetKernelBusBase() IResult[*KBusBase] { block.Lock() defer block.Unlock() if Bus_ != nil { return NewRes(Bus_) } resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from kernCtx, err=\n\t%w", resKernCtx.Err()) return NewErr[*KBusBase](err) } kCtx := resKernCtx.Val() resLocCtx := NewLocalCtx(kCtx.Ctx()) if resLocCtx.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from localCtx, err=\n\t%w", resLocCtx.Err()) return NewErr[*KBusBase](err) } resDictServe := dict_topic_serve.NewDictServe() if resDictServe.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from dictServe, err=\n\t%w", resDictServe.Err()) return NewErr[*KBusBase](err) } resDictTopicSu := dict_topic_sub.NewDictTopicSub() if resDictTopicSu.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from dictTopicSu, err=\n\t%w", resDictTopicSu.Err()) return NewErr[*KBusBase](err) } Bus_ = &KBusBase{ Ctx_: resKernCtx.Val(), IsWork_: NewSafeBool(), dictSub: resDictTopicSu.Val(), dictServe: IDictTopicServe(resDictServe.Val()), ctx: resLocCtx.Val(), } Bus_.log = Bus_.ctx.Log() go Bus_.close() go Bus_.run() Bus_.IsWork_.Set() res := Bus_.Ctx_.Wg().Add(strBusBaseStream) res.Hassert("GetKernelBusBase(): in add name stream(%v)", strBusBaseStream) resSet := Bus_.Ctx_.Set("kernBusBase", Bus_, "base of data bus") if resSet.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in set kernel bus base to kernel ctx, err=\n\t%w", resSet.Err()) return NewErr[*KBusBase](err) } _ = IKernelBus(Bus_) return NewRes(Bus_) } // Log -- возвращает лог шины func (sf *KBusBase) Log() ILogBuf { return sf.log } func (sf *KBusBase) run() { sf.log.Debug("KBusBase.run()") for { break } } // Unsubscribe -- отписывает обработчик от топика func (sf *KBusBase) Unsubscribe(handler IBusHandlerSubscribe) { sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name()) sf.dictSub.Unsubscribe(handler) } // Subscribe -- подписывает обработчик на топик func (sf *KBusBase) Subscribe(handler IBusHandlerSubscribe) IResult[bool] { sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name()) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name()) sf.log.Err(err.Error()) return NewErr[bool](err) } res := sf.dictSub.Subscribe(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return NewErr[bool](err) } return NewRes(true) } // SendRequest -- отправляет запрос в шину данных func (sf *KBusBase) SendRequest(topic ATopic, binReq []byte) IResult[[]byte] { sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic) sf.log.Err(err.Error()) return NewErr[[]byte](err) } res := sf.dictServe.SendRequest(topic, binReq) if res.IsErr() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err()) sf.log.Err(err.Error()) return NewErr[[]byte](err) } return res } // RegisterServe -- регистрирует обработчики входящих запросов func (sf *KBusBase) RegisterServe(handler IBusHandlerServe) IResult[bool] { if handler == nil { return NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil")) } sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name()) res := sf.dictServe.Register(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return NewErr[bool](err) } return NewRes(true) } // Publish -- публикует сообщение в шину func (sf *KBusBase) Publish(topic ATopic, binMsg []byte) IResult[bool] { sf.log.Debug("KBusBase.Publish(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic) sf.log.Err(err.Error()) return NewErr[bool](err) } // Асинхронный запуск чтения go sf.dictSub.Read(topic, binMsg) return NewRes(true) } // IsWork -- возвращает признак работы шины func (sf *KBusBase) IsWork() bool { return sf.IsWork_.Get() } // Ожидает закрытия шины в отдельном потоке func (sf *KBusBase) close() { sf.Ctx_.Wait() sf.Ctx_.Lock() defer sf.Ctx_.Unlock() if !sf.IsWork_.Get() { return } sf.IsWork_.Reset() sf.Ctx_.Wg().Done(strBusBaseStream) sf.log.Debug("KBusBase.close(): done") }