// package kbus_base -- базовая часть шины данных. package kbus_base import ( "fmt" "sync" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/alias" mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) var ( busBaseStreamName = mKa.NewAStreamName("bus_base") ) // KBusBase -- базовая часть шины данных. type KBusBase struct { KCtx_ mKt.IKernelCtx IsWork_ mKt.ISafeBool lCtx mKt.ILocalCtx log mKt.ILogBuf dictSub mKt.IDictTopicSub dictServe mKt.IDictTopicServe } var ( Bus_ *KBusBase block sync.Mutex ) // GetKernelBusBase -- возвращает базовую шину сообщений. func GetKernelBusBase() *KBusBase { block.Lock() defer block.Unlock() if Bus_ != nil { return Bus_ } kCtx := kctx.GetKernelCtx() lCtx := mL1.NewLocalCtx(kCtx.Ctx()) Bus_ = &KBusBase{ KCtx_: kCtx, IsWork_: mL1.NewSafeBool(), dictSub: dict_topic_sub.NewDictTopicSub(), dictServe: dict_topic_serve.NewDictServe(), lCtx: lCtx, } Bus_.log = Bus_.lCtx.Log() go Bus_.close() go Bus_.run() Bus_.IsWork_.Set() Bus_.KCtx_.Wg().Add(busBaseStreamName) Bus_.KCtx_.Set("kernBusBase", Bus_, "base of data bus") _ = mKt.IKernelBus(Bus_) return Bus_ } // Log -- возвращает лог шины. func (sf *KBusBase) Log() mKt.ILogBuf { return sf.log } func (sf *KBusBase) run() { sf.log.Debug("KBusBase.run()") for { break } } // Unsubscribe -- отписывает обработчик от топика. func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) { sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name()) sf.dictSub.Unsubscribe(handler) } // Subscribe -- подписывает обработчик на топик. func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] { sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name()) mL0.Hassert(!sf.IsWork_.Get(), "KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name()) sf.dictSub.Subscribe(handler) return mL0.NewRes(true) } // SendRequest -- отправляет запрос в шину данных. func (sf *KBusBase) SendRequest(topic *mKa.ATopic, binReq []byte) mL0.IResult[[]byte] { sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic) sf.log.Err(err.Error()) return mL0.NewErr[[]byte](err) } res := sf.dictServe.SendRequest(topic, binReq) if res.IsErr() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err()) sf.log.Err(err.Error()) return mL0.NewErr[[]byte](err) } return res } // RegisterServe -- регистрирует обработчики входящих запросов. func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) mL0.IResult[bool] { if handler == nil { return mL0.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil")) } sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name()) res := sf.dictServe.Register(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return mL0.NewErr[bool](err) } return mL0.NewRes(true) } // Publish -- публикует сообщение в шину. func (sf *KBusBase) Publish(topic *mKa.ATopic, binMsg []byte) mL0.IResult[bool] { sf.log.Debug("KBusBase.Publish(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic) sf.log.Err(err.Error()) return mL0.NewErr[bool](err) } // Асинхронный запуск чтения go sf.dictSub.Read(topic, binMsg) return mL0.NewRes(true) } // IsWork -- возвращает признак работы шины. func (sf *KBusBase) IsWork() bool { return sf.IsWork_.Get() } // Ожидает закрытия шины в отдельном потоке. func (sf *KBusBase) close() { sf.KCtx_.Wait() sf.KCtx_.Lock() defer sf.KCtx_.Unlock() if !sf.IsWork_.Get() { return } sf.IsWork_.Reset() sf.KCtx_.Wg().Done(busBaseStreamName) sf.log.Debug("KBusBase.close(): done") }