| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- // package topic -- потокобезопасный топик шины данных
- package topic
- import (
- "fmt"
- "sync"
- "p78git.ddns.net/svi/gobus/pkg/alias"
- "p78git.ddns.net/svi/gobus/pkg/net/netapi"
- "p78git.ddns.net/svi/gobus/pkg/types"
- )
- // Topic -- потокобезопасный топик шины данных
- type Topic struct {
- name alias.TopicName
- dictProxy map[alias.ClientName]types.IClientProxy // Словарь подписок клиентов на топик
- srcNum int32 // Номер источника
- val []byte // Содержимое топика
- block sync.RWMutex
- }
- // Newtopic -- возвращает новый топик
- func NewTopic(name alias.TopicName) (*Topic, error) {
- if name == "" {
- return nil, fmt.Errorf("NewTopic(): name is empty")
- }
- sf := &Topic{
- name: name,
- val: []byte{},
- dictProxy: make(map[alias.ClientName]types.IClientProxy),
- }
- _ = types.ITopic(sf)
- return sf, nil
- }
- // Unsubscribe -- отписывает клиента от себя
- func (sf *Topic) Unsubscribe(clientName alias.ClientName) {
- sf.block.Lock()
- defer sf.block.Unlock()
- delete(sf.dictProxy, clientName)
- }
- // Subscribe -- попдисывает клиента на себя
- func (sf *Topic) Subscribe(clientProxy types.IClientProxy) {
- sf.block.Lock()
- defer sf.block.Unlock()
- sf.dictProxy[clientProxy.ClientName()] = clientProxy
- }
- // Name -- возвращает имя топика
- func (sf *Topic) Name() alias.TopicName {
- return sf.name
- }
- // Get -- возвращает содержимое топика
- func (sf *Topic) Get() *netapi.TopicMsg {
- sf.block.RLock()
- defer sf.block.RUnlock()
- msg := &netapi.TopicMsg{
- Source: sf.srcNum,
- Topic: string(sf.name),
- Msg: sf.val,
- }
- return msg
- }
- // Set -- устанавливает значение топика
- func (sf *Topic) Set(msg *netapi.TopicMsg) {
- sf.block.Lock()
- defer sf.block.Unlock()
- sf.srcNum = msg.Source
- sf.val = msg.Msg
- for _, proxy := range sf.dictProxy {
- if !proxy.IsWork() {
- delete(sf.dictProxy, proxy.ClientName())
- continue
- }
- proxy.Write(msg)
- }
- }
- // Size -- возвращает размер топика
- func (sf *Topic) Size() int {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return len(sf.name) + len(sf.val)
- }
|