-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
197 lines (167 loc) · 4.38 KB
/
main.go
File metadata and controls
197 lines (167 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Package pubsub provides a light weight in-memory way of doing subject based publish-subscribe.
package pubsub
import (
"errors"
"strings"
"sync"
"unicode"
)
// A PubSub represents the client that mangages subscribing, publishing and routing of messages to subject handlers.
type PubSub struct {
mu *sync.RWMutex
subscriptions map[int]*Subscription
ssid int
}
// A Subscription is a subscription to a specific subject.
type Subscription struct {
mu *sync.Mutex
sid int
subject string
ps *PubSub
mh MsgHandler
mch chan *Msg //message channel
uch chan bool //unsubscribe channel
}
// A Msg is a message that is to be handled by subscribers when data is published to a subject.
type Msg struct {
subject string
// The data that the message is meant for a handler to use.
Data interface{}
}
// A MsgHandler is a function that subject subscribers must pass to handle messages.
type MsgHandler func(m *Msg)
var (
//ErrInvalidSubject is returned when a subject name is invalid
ErrInvalidSubject = errors.New("Subject name was invalid")
)
// NewPubSub instantiates a new PubSub client.
func NewPubSub() *PubSub {
subs := make(map[int]*Subscription)
ps := PubSub{mu: &sync.RWMutex{}, subscriptions: subs, ssid: 0}
return &ps
}
// Subscribe subscribes to a subject.
// subject - The subject you want to subscribe to.
// mh - The message handler.
// args - Number of allowed concurrent go routines. Default is not to throttle.
// Returns ErrInvalidSubject if the subject is invalid
func (ps *PubSub) Subscribe(subject string, mh MsgHandler) (*Subscription, error) {
err := validateSubject(subject)
if err != nil {
return nil, err
}
ps.mu.Lock()
s := newSubscription(ps.ssid, subject, ps, mh)
ps.subscriptions[ps.ssid] = s
ps.ssid++
go ps.subListen(s)
ps.mu.Unlock()
return s, nil
}
// Unsubscribe unsubscribes from a subject.
func (s *Subscription) Unsubscribe() {
s.ps.mu.Lock()
s.uch <- true
delete(s.ps.subscriptions, s.sid)
s.ps.mu.Unlock()
}
// Publish publishes data to a subject.
// subject - the subject you want to pass the data to
// data - the data you want to pass
// Returns ErrInvalidSubject if the subject is invalid
func (ps *PubSub) Publish(subject string, data interface{}) error {
err := validateSubject(subject)
if err != nil {
return err
}
msg := newMessage(subject, data)
ps.mu.RLock()
for _, s := range ps.subscriptions {
if subjectMatches(s.subject, msg.subject) {
s.mch <- msg
}
}
ps.mu.RUnlock()
return nil
}
func (ps *PubSub) subListen(s *Subscription) {
L:
for {
select {
case msg := <-s.mch:
s.mh(msg)
case <-s.uch:
break L
}
}
}
func newMessage(subject string, data interface{}) *Msg {
return &Msg{subject: subject, Data: data}
}
func newSubscription(sid int, subject string, ps *PubSub, mh MsgHandler) *Subscription {
nmh := newMessageHandlerWrapper(mh)
return &Subscription{mu: &sync.Mutex{}, sid: sid, subject: subject, ps: ps, mh: nmh, mch: make(chan *Msg), uch: make(chan bool)}
}
func newMessageHandlerWrapper(mh MsgHandler) MsgHandler {
nmh := func(m *Msg) {
go mh(m)
}
return nmh
}
func subjectMatches(subscribeSubject, msgSubject string) bool {
if subscribeSubject == msgSubject {
return true
}
subjectTokens := strings.Split(subscribeSubject, ".")
msgSubjectTokens := strings.Split(msgSubject, ".")
subTokensLen := len(subjectTokens)
if subTokensLen > len(msgSubjectTokens) {
return false
}
for i, token := range msgSubjectTokens {
if (i+1 == subTokensLen || i+1 > subTokensLen) && subjectTokens[subTokensLen-1] == ">" {
return true
}
if i+1 > subTokensLen {
return false
}
subjectToken := subjectTokens[i]
if subjectToken == "*" {
continue
}
if token != subjectToken {
return false
}
}
return true
}
func validateSubject(subject string) error {
if subject == "" {
return ErrInvalidSubject
}
tokens := strings.Split(subject, ".")
lenTokens := len(tokens)
for i, token := range tokens {
if token == ">" && i+1 != lenTokens {
return ErrInvalidSubject
}
if token == "*" || token == ">" {
continue
}
if !isASCII(token) {
return ErrInvalidSubject
}
if strings.Contains(token, "*") || strings.Contains(token, ">") || strings.Contains(token, " ") {
return ErrInvalidSubject
}
}
return nil
}
func isASCII(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] > unicode.MaxASCII {
return false
}
}
return true
}