| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- // package dict_topic -- потокобезопасный словарь топиков
- package dict_topic
- import (
- "fmt"
- "strings"
- "sync"
- "p78git.ddns.net/svi/gobus/api/netapi"
- "p78git.ddns.net/svi/gobus/internal/dict_topic/topic"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // DictTopic -- потокобезопасный словарь топиков
- type DictTopic struct {
- dict map[alias.TopicName]types.ITopic // Словарь тпоиков
- dictProxy map[alias.ClientName]types.IClientProxy // Словарь прокси для клиентов
- block sync.RWMutex
- }
- // NewDictTopic -- возвращает новый потокобезопасный словарь топиков
- func NewDictTopic() *DictTopic {
- sf := &DictTopic{
- dict: make(map[alias.TopicName]types.ITopic),
- dictProxy: make(map[alias.ClientName]types.IClientProxy),
- }
- _ = types.IDictTopic(sf)
- return sf
- }
- // Unsubscribe -- отписывает клиента от топиков
- func (sf *DictTopic) Unsubscribe(clientProxy types.IClientProxy) error {
- sf.block.RLock()
- defer sf.block.RUnlock()
- if clientProxy == nil {
- return fmt.Errorf("DictTopic.Unsubscribe(): IClientProxy==nil")
- }
- sf.unsubscribe(clientProxy)
- return nil
- }
- // Отписывает под блокировкой клиента от топиков
- func (sf *DictTopic) unsubscribe(clientProxy types.IClientProxy) {
- clientName := clientProxy.ClientName()
- sampleTopic := string(clientProxy.SampleTopic())
- for name, topic := range sf.dict {
- if !strings.Contains(string(name), sampleTopic) {
- continue
- }
- topic.Unsubscribe(clientName)
- }
- }
- // Subscribe -- подписывает клиента на топики
- func (sf *DictTopic) Subscribe(clientProxy types.IClientProxy) error {
- sf.block.RLock()
- defer sf.block.RUnlock()
- if clientProxy == nil {
- return fmt.Errorf("DictTopic.Subscribe(): IClientProxy==nil")
- }
- sf.subscribe(clientProxy)
- return nil
- }
- // Подписывает по образцу топика клиента на события (внутри блокировки)
- func (sf *DictTopic) subscribe(clientProxy types.IClientProxy) {
- { // Сначала проверить наличие такого клиента и отписать его
- _, isOk := sf.dictProxy[clientProxy.ClientName()]
- if isOk {
- sf.unsubscribe(clientProxy)
- }
- }
- sample := clientProxy.SampleTopic()
- for name, topic := range sf.dict {
- if !strings.Contains(string(name), string(sample)) {
- continue
- }
- topic.Subscribe(clientProxy)
- }
- sf.dictProxy[clientProxy.ClientName()] = clientProxy
- }
- // Public -- публикует в новый топик (при необходимости создаёт)
- func (sf *DictTopic) Public(msg *netapi.TopicMsg) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- if msg == nil {
- return fmt.Errorf("DictTopic.Add(): msg==nil")
- }
- top_, isOk := sf.dict[alias.TopicName(msg.Topic)]
- if !isOk {
- var err error
- top_, err = topic.NewTopic(msg)
- if err != nil {
- return fmt.Errorf("DictTopic.Add(): in create ITopic, err=\n\t%w", err)
- }
- sf.dict[top_.Name()] = top_
- { // Теперь пройтись по все прокси-клиентам и подписать их на новый топик
- for _, clientProxy := range sf.dictProxy {
- top_.Subscribe(clientProxy)
- }
- }
- }
- top_.Set(msg) // Оповещение подписчиков (на самом деле уже сообщение присвоено)
- return nil
- }
- // Get -- возвращает топик по имени
- func (sf *DictTopic) Get(name alias.TopicName) types.ITopic {
- sf.block.RLock()
- defer sf.block.RUnlock()
- topic := sf.dict[name]
- return topic
- }
- // Len -- возвращает число всех топиков
- func (sf *DictTopic) Len() int {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return len(sf.dict)
- }
- // Size -- возвращает размер всех топиков
- func (sf *DictTopic) Size() int {
- sf.block.RLock()
- defer sf.block.RUnlock()
- res := 0
- for _, topic := range sf.dict {
- res += topic.Stat().Size()
- }
- return res
- }
|