| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // package kbus_base -- базовая часть шины данных.
- package kbus_base
- import (
- "fmt"
- "sync"
- mKa "gitp78su.ipnodns.ru/svi/kern/v4/lev0/kalias"
- mKt "gitp78su.ipnodns.ru/svi/kern/v4/lev0/ktypes"
- mL1 "gitp78su.ipnodns.ru/svi/kern/v4/lev1"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_serve"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kbus/dict_topic_sub"
- "gitp78su.ipnodns.ru/svi/kern/v4/lev2/kctx"
- )
- const (
- strBusBaseStream = "bus_base"
- )
- // KBusBase -- базовая часть шины данных.
- type KBusBase struct {
- Ctx_ mKt.IKernelCtx
- IsWork_ mKt.ISafeBool
- ctx mKt.ILocalCtx
- log mKt.ILogBuf
- dictSub mKt.IDictTopicSub
- dictServe mKt.IDictTopicServe
- }
- var (
- Bus_ *KBusBase
- block sync.Mutex
- )
- // GetKernelBusBase -- возвращает базовую шину сообщений.
- func GetKernelBusBase() mKt.IResult[*KBusBase] {
- block.Lock()
- defer block.Unlock()
- if Bus_ != nil {
- return mL1.NewRes(Bus_)
- }
- resKernCtx := kctx.GetKernelCtx()
- if resKernCtx.IsErr() {
- err := fmt.Errorf("GetKernelBusBase(): in get from kernCtx, err=\n\t%w", resKernCtx.Err())
- return mL1.NewErr[*KBusBase](err)
- }
- kCtx := resKernCtx.Val()
- resLocCtx := mL1.NewLocalCtx(kCtx.Ctx())
- if resLocCtx.IsErr() {
- err := fmt.Errorf("GetKernelBusBase(): in get from localCtx, err=\n\t%w", resLocCtx.Err())
- return mL1.NewErr[*KBusBase](err)
- }
- resDictServe := dict_topic_serve.NewDictServe()
- if resDictServe.IsErr() {
- err := fmt.Errorf("GetKernelBusBase(): in get from dictServe, err=\n\t%w", resDictServe.Err())
- return mL1.NewErr[*KBusBase](err)
- }
- resDictTopicSu := dict_topic_sub.NewDictTopicSub()
- if resDictTopicSu.IsErr() {
- err := fmt.Errorf("GetKernelBusBase(): in get from dictTopicSu, err=\n\t%w", resDictTopicSu.Err())
- return mL1.NewErr[*KBusBase](err)
- }
- Bus_ = &KBusBase{
- Ctx_: resKernCtx.Val(),
- IsWork_: mL1.NewSafeBool(),
- dictSub: resDictTopicSu.Val(),
- dictServe: mKt.IDictTopicServe(resDictServe.Val()),
- ctx: resLocCtx.Val(),
- }
- Bus_.log = Bus_.ctx.Log()
- go Bus_.close()
- go Bus_.run()
- Bus_.IsWork_.Set()
- res := Bus_.Ctx_.Wg().Add(strBusBaseStream)
- res.Hassert("GetKernelBusBase(): in add name stream(%v)", strBusBaseStream)
- resSet := Bus_.Ctx_.Set("kernBusBase", Bus_, "base of data bus")
- if resSet.IsErr() {
- err := fmt.Errorf("GetKernelBusBase(): in set kernel bus base to kernel ctx, err=\n\t%w", resSet.Err())
- return mL1.NewErr[*KBusBase](err)
- }
- _ = mKt.IKernelBus(Bus_)
- return mL1.NewRes(Bus_)
- }
- // Log -- возвращает лог шины.
- func (sf *KBusBase) Log() mKt.ILogBuf {
- return sf.log
- }
- func (sf *KBusBase) run() {
- sf.log.Debug("KBusBase.run()")
- for {
- break
- }
- }
- // Unsubscribe -- отписывает обработчик от топика.
- func (sf *KBusBase) Unsubscribe(handler mKt.IBusHandlerSubscribe) {
- sf.log.Debug("KBusBase.Unsubscribe(): handler='%v'", handler.Name())
- sf.dictSub.Unsubscribe(handler)
- }
- // Subscribe -- подписывает обработчик на топик.
- func (sf *KBusBase) Subscribe(handler mKt.IBusHandlerSubscribe) mKt.IResult[bool] {
- sf.log.Debug("KBusBase.Subscribe(): handler='%v'", handler.Name())
- if !sf.IsWork_.Get() {
- err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', bus already closed", handler.Name())
- sf.log.Err(err.Error())
- return mL1.NewErr[bool](err)
- }
- res := sf.dictSub.Subscribe(handler)
- if res.IsErr() {
- err := fmt.Errorf("KBusBase.Subscribe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
- sf.log.Err(err.Error())
- return mL1.NewErr[bool](err)
- }
- return mL1.NewRes(true)
- }
- // SendRequest -- отправляет запрос в шину данных.
- func (sf *KBusBase) SendRequest(topic mKa.ATopic, binReq []byte) mKt.IResult[[]byte] {
- sf.log.Debug("KBusBase.SendRequest(): topic='%v'", topic)
- if !sf.IsWork_.Get() {
- err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', bus already closed", topic)
- sf.log.Err(err.Error())
- return mL1.NewErr[[]byte](err)
- }
- res := sf.dictServe.SendRequest(topic, binReq)
- if res.IsErr() {
- err := fmt.Errorf("KBusBase.SendRequest(): topic='%v', err=\n\t%w", topic, res.Err())
- sf.log.Err(err.Error())
- return mL1.NewErr[[]byte](err)
- }
- return res
- }
- // RegisterServe -- регистрирует обработчики входящих запросов.
- func (sf *KBusBase) RegisterServe(handler mKt.IBusHandlerServe) mKt.IResult[bool] {
- if handler == nil {
- return mL1.NewErr[bool](fmt.Errorf("KBusBase.RegisterServe(): IBusHandlerServe==nil"))
- }
- sf.log.Debug("KBusBase.RegisterServe(): handler='%v'", handler.Name())
- res := sf.dictServe.Register(handler)
- if res.IsErr() {
- err := fmt.Errorf("KBusBase.RegisterServe(): handler='%v', err=\n\t%w", handler.Name(), res.Err())
- sf.log.Err(err.Error())
- return mL1.NewErr[bool](err)
- }
- return mL1.NewRes(true)
- }
- // Publish -- публикует сообщение в шину.
- func (sf *KBusBase) Publish(topic mKa.ATopic, binMsg []byte) mKt.IResult[bool] {
- sf.log.Debug("KBusBase.Publish(): topic='%v'", topic)
- if !sf.IsWork_.Get() {
- err := fmt.Errorf("KBusBase.Publish(): topic='%v',bus already closed", topic)
- sf.log.Err(err.Error())
- return mL1.NewErr[bool](err)
- }
- // Асинхронный запуск чтения
- go sf.dictSub.Read(topic, binMsg)
- return mL1.NewRes(true)
- }
- // IsWork -- возвращает признак работы шины.
- func (sf *KBusBase) IsWork() bool {
- return sf.IsWork_.Get()
- }
- // Ожидает закрытия шины в отдельном потоке.
- func (sf *KBusBase) close() {
- sf.Ctx_.Wait()
- sf.Ctx_.Lock()
- defer sf.Ctx_.Unlock()
- if !sf.IsWork_.Get() {
- return
- }
- sf.IsWork_.Reset()
- sf.Ctx_.Wg().Done(strBusBaseStream)
- sf.log.Debug("KBusBase.close(): done")
- }
|