// package kbus_base -- базовая часть шины данных. package kbus_base import ( "fmt" "sync" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) const ( strBusBaseStream = "bus_base" ) // KBusBase -- базовая часть шины данных. type KBusBase struct { Ctx_ mKt.IKernelCtx IsWork_ mKt.ISafeBool ctx mKt.ILocalCtx log mKt.ILogBuf dictSub mKt.IDictTopicSub dictServe mKt.IDictTopicServe } var ( Bus_ *KBusBase block sync.Mutex ) // GetKernelBusBase -- возвращает базовую шину сообщений. func GetKernelBusBase() mKt.IResult[*KBusBase] { block.Lock() defer block.Unlock() if Bus_ != nil { return mL1.NewRes(Bus_) } resKernCtx := kctx.GetKernelCtx() if resKernCtx.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from kernCtx, err=\n\t%w", resKernCtx.Err()) return mL1.NewErr[*KBusBase](err) } kCtx := resKernCtx.Val() resLocCtx := mL1.NewLocalCtx(kCtx.Ctx()) if resLocCtx.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from localCtx, err=\n\t%w", resLocCtx.Err()) return mL1.NewErr[*KBusBase](err) } resDictServe := dict_topic_serve.NewDictServe() if resDictServe.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from dictServe, err=\n\t%w", resDictServe.Err()) return mL1.NewErr[*KBusBase](err) } resDictTopicSu := dict_topic_sub.NewDictTopicSub() if resDictTopicSu.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in get from dictTopicSu, err=\n\t%w", resDictTopicSu.Err()) return mL1.NewErr[*KBusBase](err) } Bus_ = &KBusBase{ Ctx_: resKernCtx.Val(), IsWork_: mL1.NewSafeBool(), dictSub: resDictTopicSu.Val(), dictServe: mKt.IDictTopicServe(resDictServe.Val()), ctx: resLocCtx.Val(), } Bus_.log = Bus_.ctx.Log() go Bus_.close() go Bus_.run() Bus_.IsWork_.Set() res := Bus_.Ctx_.Wg().Add(strBusBaseStream) res.Hassert("GetKernelBusBase(): in add name stream(%v)", strBusBaseStream) resSet := Bus_.Ctx_.Set("kernBusBase", Bus_, "base of data bus") if resSet.IsErr() { err := fmt.Errorf("GetKernelBusBase(): in set kernel bus base to kernel ctx, err=\n\t%w", resSet.Err()) return mL1.NewErr[*KBusBase](err) } _ = mKt.IKernelBus(Bus_) return mL1.NewRes(Bus_) } // Log -- возвращает лог шины. func (sf *KBusBase) Log() mKt.ILogBuf { return sf.log } func (sf *KBusBase) run() { sf.log.Debug("KBusBase.run()") for { break } } // Unsubscribe -- отписывает обработчик от топика. func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) { sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name()) sf.dictSub.Unsubscribe(handler) } // Subscribe -- подписывает обработчик на топик. func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] { sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name()) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name()) sf.log.Err(err.Error()) return mL1.NewErr[bool](err) } res := sf.dictSub.Subscribe(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return mL1.NewErr[bool](err) } return mL1.NewRes(true) } // SendRequest -- отправляет запрос в шину данных. func (sf *KBusBase) SendRequest(topic mKa.ATopic, binReq []byte) mKt.IResult[[]byte] { sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic) sf.log.Err(err.Error()) return mL1.NewErr[[]byte](err) } res := sf.dictServe.SendRequest(topic, binReq) if res.IsErr() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err()) sf.log.Err(err.Error()) return mL1.NewErr[[]byte](err) } return res } // RegisterServe -- регистрирует обработчики входящих запросов. func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) mKt.IResult[bool] { if handler == nil { return mL1.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil")) } sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name()) res := sf.dictServe.Register(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return mL1.NewErr[bool](err) } return mL1.NewRes(true) } // Publish -- публикует сообщение в шину. func (sf *KBusBase) Publish(topic mKa.ATopic, binMsg []byte) mKt.IResult[bool] { sf.log.Debug("KBusBase.Publish(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic) sf.log.Err(err.Error()) return mL1.NewErr[bool](err) } // Асинхронный запуск чтения go sf.dictSub.Read(topic, binMsg) return mL1.NewRes(true) } // IsWork -- возвращает признак работы шины. func (sf *KBusBase) IsWork() bool { return sf.IsWork_.Get() } // Ожидает закрытия шины в отдельном потоке. func (sf *KBusBase) close() { sf.Ctx_.Wait() sf.Ctx_.Lock() defer sf.Ctx_.Unlock() if !sf.IsWork_.Get() { return } sf.IsWork_.Reset() sf.Ctx_.Wg().Done(strBusBaseStream) sf.log.Debug("KBusBase.close(): done") }