| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- // package topic -- потокобезопасный топик шины данных
- package topic
- import (
- "fmt"
- "sync"
- "p78git.ddns.net/svi/gobus/api/netapi"
- "p78git.ddns.net/svi/gobus/internal/dict_topic/topic/topic_stat"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // Topic -- потокобезопасный топик шины данных
- type Topic struct {
- dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
- msg *netapi.TopicMsg // Хранимое сообщение топика
- stat *topic_stat.TopicStat // Статистика топика
- block sync.RWMutex
- }
- // Newtopic -- возвращает новый топик
- func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
- stat, err := topic_stat.NewTopicStat(msg)
- if err != nil {
- return nil, fmt.Errorf("NewTopic(): in create TopicStat, err=\n\t%w", err)
- }
- sf := &Topic{
- stat: stat,
- msg: msg,
- dictProxy: make(map[alias.ClientName]types.IClientProxy),
- }
- _ = types.ITopic(sf)
- return sf, nil
- }
- // Unsubscribe -- отписывает клиента от себя
- func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
- sf.block.Lock()
- defer sf.block.Unlock()
- delete(sf.dictProxy, clientName)
- }
- // Subscribe -- подписывает клиента на себя
- func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- if clientProxy == nil {
- return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
- }
- sf.dictProxy[clientProxy.ClientName()] = clientProxy
- return nil
- }
- // Name -- возвращает имя топика
- func (sf *Topic) Name() alias.TopicName {
- return sf.stat.Name()
- }
- // Get -- возвращает содержимое топика
- func (sf *Topic) Get() *netapi.TopicMsg {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return sf.msg
- }
- // Set -- устанавливает значение топика
- func (sf *Topic) Set(msg *netapi.TopicMsg) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- if err := sf.stat.Update(msg); err != nil {
- return fmt.Errorf("Topic.Set(): ERROR in update self stat, topic(%q), err==\n\t%w", sf.stat.Name(), err)
- }
- sf.msg = msg
- for _, proxy := range sf.dictProxy {
- if !proxy.IsWork() {
- delete(sf.dictProxy, proxy.ClientName())
- continue
- }
- proxy.Write(msg)
- }
- return nil
- }
- // Stat -- возвращает статистику топика
- func (sf *Topic) Stat() types.ITopicStat {
- return sf.stat
- }
|