// package dict_topic -- потокобезопасный словарь топиков package dict_topic import ( "log" "strings" "sync" "p78git.ddns.net/svi/gobus/internal/dict_topic/topic" "p78git.ddns.net/svi/gobus/pkg/alias" "p78git.ddns.net/svi/gobus/pkg/types" ) // DictTopic -- потокобезопасный словарь топиков type DictTopic struct { dict map[alias.TopicName]types.ITopic // Словарь тпоиков dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов block sync.RWMutex } // NewDictTopic -- возвращает новый потокобезопасный словарь топиков func NewDictTopic() (*DictTopic, error) { sf := &DictTopic{ dict: make(map[alias.TopicName]types.ITopic), dictProxy: make(map[alias.ClientName]types.IClientProxy), } _ = types.IDictTopic(sf) return sf, nil } // Unsubscribe -- отписывает клиента от топиков func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) { sf.block.RLock() defer sf.block.RUnlock() sf.unsubscribe(clientProxy) } // Отписывает под блокировкой клиента от топиков func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) { sample := clientProxy.SampleTopic() for name, topic := range sf.dict { if !strings.Contains(string(name), string(sample)) { continue } topic.Unsubscribe(clientProxy.ClientName()) } } // Subscribe -- подписывает клиента на топики func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) { sf.block.RLock() defer sf.block.RUnlock() sf.subscribe(clientProxy) } // Подписывает по образцу топика клиента на события (внутри блокировки) func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) { { // Сначала проверить наличие такого клиента и отписать его _, isOk := sf.dictProxy[clientProxy.ClientName()] if isOk { sf.unsubscribe(clientProxy) } } sample := clientProxy.SampleTopic() for name, topic := range sf.dict { if !strings.Contains(string(name), string(sample)) { continue } topic.Subscribe(clientProxy) } sf.dictProxy[clientProxy.ClientName()] = clientProxy } // Add -- добавляет новый топик func (sf *DictTopic) Add(name alias.TopicName) { sf.block.Lock() defer sf.block.Unlock() topic, err := topic.NewTopic(name) if err != nil { log.Printf("DictTopic.Add(): in create ITopic, err=\n\t%v\n", err) return } sf.dict[name] = topic { // Теперь пройтись по все прокси-клиентам и подписать их на новый топик for _, clientProxy := range sf.dictProxy { topic.Subscribe(clientProxy) } } } // Get -- возвращает топик по имени func (sf *DictTopic) Get(name alias.TopicName) types.ITopic { sf.block.RLock() defer sf.block.RUnlock() topic := sf.dict[name] return topic } // Len -- возвращает число всех топиков func (sf *DictTopic) Len() int { sf.block.RLock() defer sf.block.RUnlock() return len(sf.dict) } // Size -- возвращает размер всех топиков func (sf *DictTopic) Size() int { sf.block.RLock() defer sf.block.RUnlock() res := 0 for _, topic := range sf.dict { res += topic.Size() } return res }