diff --git a/audio/audio.go b/audio/audio.go index 2a211dff9..93fe11feb 100644 --- a/audio/audio.go +++ b/audio/audio.go @@ -222,24 +222,24 @@ type Player struct { } type playerImpl struct { - mux *mux - src io.ReadCloser - srcEOF bool - sampleRate int + mux *mux + src io.ReadCloser + srcEOF bool + sampleRate int + closedExplicitly bool + runningReadLoop bool buf []byte pos int64 volume float64 - closeCh chan struct{} - closedCh chan struct{} - readLoopEndedCh chan struct{} - seekCh chan seekArgs - seekedCh chan error - proceedCh chan []int16 - proceededCh chan proceededValues - - finalized bool + closeCh chan struct{} + closedCh chan struct{} + seekCh chan seekArgs + seekedCh chan error + proceedCh chan []int16 + proceededCh chan proceededValues + syncCh chan func() m sync.Mutex } @@ -275,18 +275,17 @@ func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) { } p := &Player{ &playerImpl{ - mux: context.mux, - src: src, - sampleRate: context.sampleRate, - buf: nil, - volume: 1, - closeCh: make(chan struct{}), - closedCh: make(chan struct{}), - readLoopEndedCh: make(chan struct{}), - seekCh: make(chan seekArgs), - seekedCh: make(chan error), - proceedCh: make(chan []int16), - proceededCh: make(chan proceededValues), + mux: context.mux, + src: src, + sampleRate: context.sampleRate, + buf: nil, + volume: 1, + closeCh: make(chan struct{}), + closedCh: make(chan struct{}), + seekCh: make(chan seekArgs), + seekedCh: make(chan error), + proceedCh: make(chan []int16), + proceededCh: make(chan proceededValues), }, } if seeker, ok := p.p.src.(io.Seeker); ok { @@ -299,9 +298,6 @@ func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) { } runtime.SetFinalizer(p, (*Player).finalize) - go func() { - p.p.readLoop() - }() return p, nil } @@ -325,7 +321,6 @@ func NewPlayerFromBytes(context *Context, src []byte) (*Player, error) { func (p *Player) finalize() { runtime.SetFinalizer(p, nil) - p.p.setFinalized(true) // TODO: It is really hard to say concurrent safety. // Refactor this package to reduce goroutines. if !p.IsPlaying() { @@ -333,19 +328,6 @@ func (p *Player) finalize() { } } -func (p *playerImpl) setFinalized(finalized bool) { - p.m.Lock() - p.finalized = finalized - p.m.Unlock() -} - -func (p *playerImpl) isFinalized() bool { - p.m.Lock() - b := p.finalized - p.m.Unlock() - return b -} - // Close closes the stream. // // When closing, the stream owned by the player will also be closed by calling its Close. @@ -359,27 +341,45 @@ func (p *Player) Close() error { func (p *playerImpl) Close() error { p.mux.removePlayer(p) + p.m.Lock() + p.closedExplicitly = true + p.m.Unlock() + // src.Close is called only when Player's Close is called. + if err := p.src.Close(); err != nil { + return err + } return p.closeImpl() } -func (p *playerImpl) closeImpl() error { - select { - case p.closeCh <- struct{}{}: - <-p.closedCh - return nil - case <-p.readLoopEndedCh: +func (p *playerImpl) ensureReadLoop() error { + p.m.Lock() + defer p.m.Unlock() + if p.closedExplicitly { return fmt.Errorf("audio: the player is already closed") } + if p.runningReadLoop { + return nil + } + go p.readLoop() + return nil +} + +func (p *playerImpl) closeImpl() error { + if err := p.ensureReadLoop(); err != nil { + return err + } + p.closeCh <- struct{}{} + <-p.closedCh + return nil } func (p *playerImpl) bufferToInt16(lengthInBytes int) ([]int16, error) { - select { - case p.proceedCh <- make([]int16, lengthInBytes/2): - r := <-p.proceededCh - return r.buf, r.err - case <-p.readLoopEndedCh: - return nil, fmt.Errorf("audio: the player is already closed") + if err := p.ensureReadLoop(); err != nil { + return nil, err } + p.proceedCh <- make([]int16, lengthInBytes/2) + r := <-p.proceededCh + return r.buf, r.err } // Play plays the stream. @@ -395,12 +395,13 @@ func (p *playerImpl) Play() { } func (p *playerImpl) readLoop() { + p.m.Lock() + p.runningReadLoop = true + p.m.Unlock() defer func() { - // Note: the error is ignored - p.src.Close() - // Receiving from a closed channel returns quickly - // i.e. `case <-p.readLoopEndedCh:` can check if this loops is ended. - close(p.readLoopEndedCh) + p.m.Lock() + p.runningReadLoop = false + p.m.Unlock() }() timer := time.NewTimer(0) @@ -599,14 +600,14 @@ func (p *playerImpl) Seek(offset time.Duration) error { if _, ok := p.src.(io.Seeker); !ok { panic("audio: player to be sought must be io.Seeker") } + if err := p.ensureReadLoop(); err != nil { + return err + } + o := int64(offset) * bytesPerSample * int64(p.sampleRate) / int64(time.Second) o &= mask - select { - case p.seekCh <- seekArgs{o, io.SeekStart}: - return <-p.seekedCh - case <-p.readLoopEndedCh: - return fmt.Errorf("audio: the player is already closed") - } + p.seekCh <- seekArgs{o, io.SeekStart} + return <-p.seekedCh } // Pause pauses the playing. diff --git a/audio/mux.go b/audio/mux.go index 6cff9f2d2..b81521d46 100644 --- a/audio/mux.go +++ b/audio/mux.go @@ -112,9 +112,7 @@ func (m *mux) Read(b []byte) (int, error) { } } for _, p := range closed { - if p.isFinalized() { - p.closeImpl() - } + p.closeImpl() delete(m.ps, p) }