// package dict_topic -- потокобезопасный словарь топиков package dict_topic import ( "fmt" "strings" "sync" "p78git.ddns.net/svi/gobus/api/netapi" "p78git.ddns.net/svi/gobus/internal/dict_topic/topic" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/types" ) // DictTopic -- потокобезопасный словарь топиков type DictTopic struct { dict map[alias.TopicName]types.ITopic // Словарь тпоиков dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов block sync.RWMutex } // NewDictTopic -- возвращает новый потокобезопасный словарь топиков func NewDictTopic() (*DictTopic, error) { sf := &DictTopic{ dict: make(map[alias.TopicName]types.ITopic), dictProxy: make(map[alias.ClientName]types.IClientProxy), } _ = types.IDictTopic(sf) return sf, nil } // Unsubscribe -- отписывает клиента от топиков func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) error { sf.block.RLock() defer sf.block.RUnlock() if clientProxy == nil { return fmt.Errorf("DictTopic.Unsubscribe(): IClientProxy==nil") } sf.unsubscribe(clientProxy) return nil } // Отписывает под блокировкой клиента от топиков func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) { sample := clientProxy.SampleTopic() for name, topic := range sf.dict { if !strings.Contains(string(name), string(sample)) { continue } topic.Unsubscribe(clientProxy.ClientName()) } } // Subscribe -- подписывает клиента на топики func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) error { sf.block.RLock() defer sf.block.RUnlock() if clientProxy == nil { return fmt.Errorf("DictTopic.Subscribe(): IClientProxy==nil") } sf.subscribe(clientProxy) return nil } // Подписывает по образцу топика клиента на события (внутри блокировки) func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) { { // Сначала проверить наличие такого клиента и отписать его _, isOk := sf.dictProxy[clientProxy.ClientName()] if isOk { sf.unsubscribe(clientProxy) } } sample := clientProxy.SampleTopic() for name, topic := range sf.dict { if !strings.Contains(string(name), string(sample)) { continue } topic.Subscribe(clientProxy) } sf.dictProxy[clientProxy.ClientName()] = clientProxy } // Public -- публикует в новый топик (при необходимости создаёт) func (sf *DictTopic) Public(msg *netapi.TopicMsg) error { sf.block.Lock() defer sf.block.Unlock() if msg == nil { return fmt.Errorf("DictTopic.Add(): msg==nil") } top_, isOk := sf.dict[alias.TopicName(msg.Topic)] if !isOk { var err error top_, err = topic.NewTopic(msg) if err != nil { return fmt.Errorf("DictTopic.Add(): in create ITopic, err=\n\t%w", err) } sf.dict[top_.Name()] = top_ { // Теперь пройтись по все прокси-клиентам и подписать их на новый топик for _, clientProxy := range sf.dictProxy { top_.Subscribe(clientProxy) } } } top_.Set(msg) // Оповещение подписчиков (на самом деле уже сообщение присвоено) return nil } // Get -- возвращает топик по имени func (sf *DictTopic) Get(name alias.TopicName) types.ITopic { sf.block.RLock() defer sf.block.RUnlock() topic := sf.dict[name] return topic } // Len -- возвращает число всех топиков func (sf *DictTopic) Len() int { sf.block.RLock() defer sf.block.RUnlock() return len(sf.dict) } // Size -- возвращает размер всех топиков func (sf *DictTopic) Size() int { sf.block.RLock() defer sf.block.RUnlock() res := 0 for _, topic := range sf.dict { res += topic.Stat().Size() } return res }