// package topic -- потокобезопасный топик шины данных package topic import ( "fmt" "sync" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/net/netapi" "p78git.ddns.net/svi/gobus/pkg/types" ) // Topic -- потокобезопасный топик шины данных type Topic struct { name alias.TopicName dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик srcNum int32 // Номер источника val []byte // Содержимое топика block sync.RWMutex } // Newtopic -- возвращает новый топик func NewTopic(name alias.TopicName) (*Topic, error) { if name == "" { return nil, fmt.Errorf("NewTopic(): name is empty") } sf := &Topic{ name: name, val: []byte{}, dictProxy: make(map[alias.ClientName]types.IClientProxy), } _ = types.ITopic(sf) return sf, nil } // Unsubscribe -- отписывает клиента от себя func (sf *Topic) Unsubscribe(clientName alias.ClientName) { sf.block.Lock() defer sf.block.Unlock() delete(sf.dictProxy, clientName) } // Subscribe -- попдисывает клиента на себя func (sf *Topic) Subscribe(clientProxy types.IClientProxy) { sf.block.Lock() defer sf.block.Unlock() sf.dictProxy[clientProxy.ClientName()] = clientProxy } // Name -- возвращает имя топика func (sf *Topic) Name() alias.TopicName { return sf.name } // Get -- возвращает содержимое топика func (sf *Topic) Get() *netapi.TopicMsg { sf.block.RLock() defer sf.block.RUnlock() msg := &netapi.TopicMsg{ Source: sf.srcNum, Topic: string(sf.name), Msg: sf.val, } return msg } // Set -- устанавливает значение топика func (sf *Topic) Set(msg *netapi.TopicMsg) { sf.block.Lock() defer sf.block.Unlock() sf.srcNum = msg.Source sf.val = msg.Msg for _, proxy := range sf.dictProxy { if !proxy.IsWork() { delete(sf.dictProxy, proxy.ClientName()) continue } proxy.Write(msg) } } // Size -- возвращает размер топика func (sf *Topic) Size() int { sf.block.RLock() defer sf.block.RUnlock() return len(sf.name) + len(sf.val) }