68 lines
1.2 KiB
Go
68 lines
1.2 KiB
Go
|
package observable
|
||
|
|
||
|
// Ref: github.com/Dreamacro/clash/common/observable
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type Observable struct {
|
||
|
iterable Iterable
|
||
|
listener map[Subscription]*Subscriber
|
||
|
mux sync.Mutex
|
||
|
done bool
|
||
|
}
|
||
|
|
||
|
func (o *Observable) process() {
|
||
|
for item := range o.iterable {
|
||
|
o.mux.Lock()
|
||
|
for _, sub := range o.listener {
|
||
|
sub.Emit(item)
|
||
|
}
|
||
|
o.mux.Unlock()
|
||
|
}
|
||
|
o.close()
|
||
|
}
|
||
|
|
||
|
func (o *Observable) close() {
|
||
|
o.mux.Lock()
|
||
|
defer o.mux.Unlock()
|
||
|
|
||
|
o.done = true
|
||
|
for _, sub := range o.listener {
|
||
|
sub.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *Observable) Subscribe() (Subscription, error) {
|
||
|
o.mux.Lock()
|
||
|
defer o.mux.Unlock()
|
||
|
if o.done {
|
||
|
return nil, errors.New("observable is closed")
|
||
|
}
|
||
|
subscriber := newSubscriber()
|
||
|
o.listener[subscriber.Out()] = subscriber
|
||
|
return subscriber.Out(), nil
|
||
|
}
|
||
|
|
||
|
func (o *Observable) UnSubscribe(sub Subscription) {
|
||
|
o.mux.Lock()
|
||
|
defer o.mux.Unlock()
|
||
|
subscriber, exist := o.listener[sub]
|
||
|
if !exist {
|
||
|
return
|
||
|
}
|
||
|
delete(o.listener, sub)
|
||
|
subscriber.Close()
|
||
|
}
|
||
|
|
||
|
func NewObservable(any Iterable) *Observable {
|
||
|
observable := &Observable{
|
||
|
iterable: any,
|
||
|
listener: map[Subscription]*Subscriber{},
|
||
|
}
|
||
|
go observable.process()
|
||
|
return observable
|
||
|
}
|