132 lines
2.6 KiB
Go
132 lines
2.6 KiB
Go
package runner
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func find(s map[int32]*Subscriber, id int32) bool {
|
|
for _, sub := range s {
|
|
if sub.id == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestProxyStream(t *testing.T) {
|
|
stream := NewProxyStream()
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
wg.Add(1)
|
|
go func(_ int) {
|
|
defer wg.Done()
|
|
_ = stream.AddSubscriber()
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
if got, exp := len(stream.subscribers), 10; got != exp {
|
|
t.Errorf("expect subscribers count to be %d, got %d", exp, got)
|
|
}
|
|
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
stream.Reload()
|
|
doneCh <- struct{}{}
|
|
}()
|
|
|
|
var reloadCount atomic.Int32
|
|
for _, sub := range stream.subscribers {
|
|
wg.Add(1)
|
|
go func(sub *Subscriber) {
|
|
defer wg.Done()
|
|
<-sub.msgCh
|
|
reloadCount.Add(1)
|
|
}(sub)
|
|
}
|
|
wg.Wait()
|
|
<-doneCh
|
|
|
|
if got, exp := reloadCount.Load(), int32(10); got != exp {
|
|
t.Errorf("expect reloadCount %d, got %d", exp, got)
|
|
}
|
|
|
|
stream.RemoveSubscriber(2)
|
|
if find(stream.subscribers, 2) {
|
|
t.Errorf("expected subscriber 2 not to be found")
|
|
}
|
|
|
|
stream.AddSubscriber()
|
|
if !find(stream.subscribers, 11) {
|
|
t.Errorf("expected subscriber 11 to be found")
|
|
}
|
|
|
|
stream.Stop()
|
|
if got, exp := len(stream.subscribers), 0; got != exp {
|
|
t.Errorf("expected subscribers count to be %d, got %d", exp, got)
|
|
}
|
|
}
|
|
|
|
func TestBuildFailureMessage(t *testing.T) {
|
|
stream := NewProxyStream()
|
|
sub := stream.AddSubscriber()
|
|
|
|
msg := BuildFailedMsg{
|
|
Error: "build failed",
|
|
Command: "go build",
|
|
Output: "error output",
|
|
}
|
|
|
|
go stream.BuildFailed(msg)
|
|
|
|
received := <-sub.msgCh
|
|
assert.Equal(t, StreamMessageBuildFailed, received.Type)
|
|
assert.Equal(t, msg, received.Data)
|
|
}
|
|
|
|
func TestStreamMessageAsSSE(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
msg := StreamMessage{Type: StreamMessageReload, Data: nil}
|
|
assert.Equal(t, "event: reload\ndata: null\n\n", msg.AsSSE())
|
|
}
|
|
|
|
func TestStringifyMarshalError(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
result := stringify(make(chan int))
|
|
assert.Contains(t, result, "Failed to marshal message")
|
|
}
|
|
|
|
func TestProxyStreamRemoveUnknownSubscriber(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
stream := NewProxyStream()
|
|
_ = stream.AddSubscriber()
|
|
before := len(stream.subscribers)
|
|
|
|
assert.NotPanics(t, func() {
|
|
stream.RemoveSubscriber(999)
|
|
})
|
|
assert.Len(t, stream.subscribers, before)
|
|
}
|
|
|
|
func TestProxyStreamReloadAndBuildFailedNoSubscribers(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
stream := NewProxyStream()
|
|
|
|
assert.NotPanics(t, func() {
|
|
stream.Reload()
|
|
})
|
|
|
|
assert.NotPanics(t, func() {
|
|
stream.BuildFailed(BuildFailedMsg{Error: "err", Command: "go build", Output: "out"})
|
|
})
|
|
}
|