// package topic -- потокобезопасный топик шины данных package topic import ( "fmt" "sync" "p78git.ddns.net/svi/gobus/api/netapi" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/types" ) // Topic -- потокобезопасный топик шины данных type Topic struct { name alias.TopicName dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик msg *netapi.TopicMsg // Хранимое сообщение топика block sync.RWMutex } // Newtopic -- возвращает новый топик func NewTopic(msg *netapi.TopicMsg) (*Topic, error) { if msg == nil { return nil, fmt.Errorf("NewTopic(): name is empty") } sf := &Topic{ name: alias.TopicName(msg.Topic), msg: msg, dictProxy: make(map[alias.ClientName]types.IClientProxy), } _ = types.ITopic(sf) return sf, nil } // Unsubscribe -- отписывает клиента от себя func (sf *Topic) Unsubscribe(clientName alias.ClientName) { sf.block.Lock() defer sf.block.Unlock() delete(sf.dictProxy, clientName) } // Subscribe -- подписывает клиента на себя func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error { sf.block.Lock() defer sf.block.Unlock() if clientProxy == nil { return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil") } sf.dictProxy[clientProxy.ClientName()] = clientProxy return nil } // Name -- возвращает имя топика func (sf *Topic) Name() alias.TopicName { return sf.name } // Get -- возвращает содержимое топика func (sf *Topic) Get() *netapi.TopicMsg { sf.block.RLock() defer sf.block.RUnlock() return sf.msg } // Set -- устанавливает значение топика func (sf *Topic) Set(msg *netapi.TopicMsg) error { sf.block.Lock() defer sf.block.Unlock() if msg == nil { return fmt.Errorf("Topic.Set(): ERROR msg==nil\n") } if sf.name != alias.TopicName(msg.Topic) { return fmt.Errorf("Topic.Set(): topic name(%q)!=msg.Topic(%v)\n", sf.name, msg.Topic) } sf.msg = msg for _, proxy := range sf.dictProxy { if !proxy.IsWork() { delete(sf.dictProxy, proxy.ClientName()) continue } proxy.Write(msg) } return nil } // Size -- возвращает размер топика func (sf *Topic) Size() int { sf.block.RLock() defer sf.block.RUnlock() // *2 потому что имя хранится два раза return len(sf.name)*2 + len(sf.msg.Msg) }