// package kbus_base -- базовая часть шины данных. package kbus_base import ( "fmt" "sync" mL0 "gitp78su.ipnodns.ru/svi/kern/v4/lev0" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/stream_name" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/defs/topic" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/etypes/ebool" "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mKs "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kspec" mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub" "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx" ) var ( busBaseStreamName = stream_name.NewAStreamName("bus_base") ) // KBusBase -- базовая часть шины данных. type KBusBase struct { KCtx_ mKs.IKernelCtx IsWork_ mKs.ISafeBool lCtx mKs.ILocalCtx log mKs.ILogBuf dictSub mKs.IDictTopicSub dictServe mKs.IDictTopicServe } var ( Bus_ *KBusBase block sync.Mutex ) // GetKernelBusBase -- возвращает базовую шину сообщений. func GetKernelBusBase() *KBusBase { block.Lock() defer block.Unlock() if Bus_ != nil { return Bus_ } kCtx := kctx.GetKernelCtx() lCtx := mL1.NewLocalCtx(kCtx.Ctx()) Bus_ = &KBusBase{ KCtx_: kCtx, IsWork_: mL1.NewSafeBool(false), dictSub: dict_topic_sub.NewDictTopicSub(), dictServe: dict_topic_serve.NewDictServe(), lCtx: lCtx, } Bus_.log = Bus_.lCtx.Log() go Bus_.close() go Bus_.run() Bus_.IsWork_.Set() Bus_.KCtx_.Wg().Add(busBaseStreamName) Bus_.KCtx_.Set("kernBusBase", Bus_, "base of data bus") _ = mKs.IKernelBus(Bus_) return Bus_ } // Log -- возвращает лог шины. func (sf *KBusBase) Log() mKs.ILogBuf { return sf.log } func (sf *KBusBase) run() { sf.log.Debug("KBusBase.run()") for { break } } // Unsubscribe -- отписывает обработчик от топика. func (sf *KBusBase) Unsubscribe(handler mKs.IBusHandlerSubscribe) { sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name()) sf.dictSub.Unsubscribe(handler) } // Subscribe -- подписывает обработчик на топик. func (sf *KBusBase) Subscribe(handler mKs.IBusHandlerSubscribe) mKs.IResult[kspec.EBool] { sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name()) mL0.Hassert(!sf.IsWork_.Get(), "KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name()) sf.dictSub.Subscribe(handler) return mL0.NewOk(ebool.NewEBool(true)) } // SendRequest -- отправляет запрос в шину данных. func (sf *KBusBase) SendRequest(topic *topic.ATopic, binReq []byte) mL0.IResult[[]byte] { sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic) sf.log.Err(err.Error()) return mL0.NewErr[[]byte](err) } res := sf.dictServe.SendRequest(topic, binReq) if res.IsErr() { err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err()) sf.log.Err(err.Error()) return mL0.NewErr[[]byte](err) } return res } // RegisterServe -- регистрирует обработчики входящих запросов. func (sf *KBusBase) RegisterServe(handler mKs.IBusHandlerServe) mL0.IResult[kspec.EBool] { if handler == nil { return mL0.NewErr[kspec.EBool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil")) } sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name()) res := sf.dictServe.Register(handler) if res.IsErr() { err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err()) sf.log.Err(err.Error()) return mL0.NewErr[kspec.EBool](err) } return mL0.NewOk(ebool.NewEBool(true)) } // Publish -- публикует сообщение в шину. func (sf *KBusBase) Publish(topic *topic.ATopic, binMsg []byte) mL0.IResult[kspec.EBool] { sf.log.Debug("KBusBase.Publish(): topic='%v'", topic) if !sf.IsWork_.Get() { err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic) sf.log.Err(err.Error()) return mL0.NewErr[kspec.EBool](err) } // Асинхронный запуск чтения go sf.dictSub.Read(topic, binMsg) return mL0.NewOk(ebool.NewEBool(true)) } // IsWork -- возвращает признак работы шины. func (sf *KBusBase) IsWork() kspec.EBool { return ebool.NewEBool(sf.IsWork_.Get()) } // Ожидает закрытия шины в отдельном потоке. func (sf *KBusBase) close() { sf.KCtx_.Wait() sf.KCtx_.Lock() defer sf.KCtx_.Unlock() if !sf.IsWork_.Get() { return } sf.IsWork_.Reset() sf.KCtx_.Wg().Done(busBaseStreamName) sf.log.Debug("KBusBase.close(): done") }