| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- // package topic -- потокобезопасный топик шины данных
- package topic
- import (
- "fmt"
- "sync"
- "p78git.ddns.net/svi/gobus/api/netapi"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // Topic -- потокобезопасный топик шины данных
- type Topic struct {
- name alias.TopicName
- dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
- msg *netapi.TopicMsg // Хранимое сообщение топика
- block sync.RWMutex
- }
- // Newtopic -- возвращает новый топик
- func NewTopic(msg *netapi.TopicMsg) (*Topic, error) {
- if msg == nil {
- return nil, fmt.Errorf("NewTopic(): name is empty")
- }
- sf := &Topic{
- name: alias.TopicName(msg.Topic),
- msg: msg,
- dictProxy: make(map[alias.ClientName]types.IClientProxy),
- }
- _ = types.ITopic(sf)
- return sf, nil
- }
- // Unsubscribe -- отписывает клиента от себя
- func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
- sf.block.Lock()
- defer sf.block.Unlock()
- delete(sf.dictProxy, clientName)
- }
- // Subscribe -- подписывает клиента на себя
- func (sf *Topic) Subscribe(clientProxy types.IClientProxy) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- if clientProxy == nil {
- return fmt.Errorf("Topic.Subscribe(): ICLientProxy==nil")
- }
- sf.dictProxy[clientProxy.ClientName()] = clientProxy
- return nil
- }
- // Name -- возвращает имя топика
- func (sf *Topic) Name() alias.TopicName {
- return sf.name
- }
- // Get -- возвращает содержимое топика
- func (sf *Topic) Get() *netapi.TopicMsg {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return sf.msg
- }
- // Set -- устанавливает значение топика
- func (sf *Topic) Set(msg *netapi.TopicMsg) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- if msg == nil {
- return fmt.Errorf("Topic.Set(): ERROR msg==nil\n")
- }
- if sf.name != alias.TopicName(msg.Topic) {
- return fmt.Errorf("Topic.Set(): topic name(%q)!=msg.Topic(%v)\n", sf.name, msg.Topic)
- }
- sf.msg = msg
- for _, proxy := range sf.dictProxy {
- if !proxy.IsWork() {
- delete(sf.dictProxy, proxy.ClientName())
- continue
- }
- proxy.Write(msg)
- }
- return nil
- }
- // Size -- возвращает размер топика
- func (sf *Topic) Size() int {
- sf.block.RLock()
- defer sf.block.RUnlock()
- // *2 потому что имя хранится два раза
- return len(sf.name)*2 + len(sf.msg.Msg)
- }
|