audio: Bug fix: Remove depenency on finalizer

This change fixes playerImpl leak by removing dependency on
finalizer usages. Now readLoop can (or should) be called multiple
times even after closing. Only when (*Player).Close is called
explicitly, the read loop cannot be started again.

Fixes #852
This commit is contained in:
Hajime Hoshi 2019-04-28 00:13:51 +09:00
parent 4f831de008
commit e4f0a0aa04
2 changed files with 67 additions and 68 deletions

View File

@ -222,24 +222,24 @@ type Player struct {
}
type playerImpl struct {
mux *mux
src io.ReadCloser
srcEOF bool
sampleRate int
mux *mux
src io.ReadCloser
srcEOF bool
sampleRate int
closedExplicitly bool
runningReadLoop bool
buf []byte
pos int64
volume float64
closeCh chan struct{}
closedCh chan struct{}
readLoopEndedCh chan struct{}
seekCh chan seekArgs
seekedCh chan error
proceedCh chan []int16
proceededCh chan proceededValues
finalized bool
closeCh chan struct{}
closedCh chan struct{}
seekCh chan seekArgs
seekedCh chan error
proceedCh chan []int16
proceededCh chan proceededValues
syncCh chan func()
m sync.Mutex
}
@ -275,18 +275,17 @@ func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) {
}
p := &Player{
&playerImpl{
mux: context.mux,
src: src,
sampleRate: context.sampleRate,
buf: nil,
volume: 1,
closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
readLoopEndedCh: make(chan struct{}),
seekCh: make(chan seekArgs),
seekedCh: make(chan error),
proceedCh: make(chan []int16),
proceededCh: make(chan proceededValues),
mux: context.mux,
src: src,
sampleRate: context.sampleRate,
buf: nil,
volume: 1,
closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
seekCh: make(chan seekArgs),
seekedCh: make(chan error),
proceedCh: make(chan []int16),
proceededCh: make(chan proceededValues),
},
}
if seeker, ok := p.p.src.(io.Seeker); ok {
@ -299,9 +298,6 @@ func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) {
}
runtime.SetFinalizer(p, (*Player).finalize)
go func() {
p.p.readLoop()
}()
return p, nil
}
@ -325,7 +321,6 @@ func NewPlayerFromBytes(context *Context, src []byte) (*Player, error) {
func (p *Player) finalize() {
runtime.SetFinalizer(p, nil)
p.p.setFinalized(true)
// TODO: It is really hard to say concurrent safety.
// Refactor this package to reduce goroutines.
if !p.IsPlaying() {
@ -333,19 +328,6 @@ func (p *Player) finalize() {
}
}
func (p *playerImpl) setFinalized(finalized bool) {
p.m.Lock()
p.finalized = finalized
p.m.Unlock()
}
func (p *playerImpl) isFinalized() bool {
p.m.Lock()
b := p.finalized
p.m.Unlock()
return b
}
// Close closes the stream.
//
// When closing, the stream owned by the player will also be closed by calling its Close.
@ -359,27 +341,45 @@ func (p *Player) Close() error {
func (p *playerImpl) Close() error {
p.mux.removePlayer(p)
p.m.Lock()
p.closedExplicitly = true
p.m.Unlock()
// src.Close is called only when Player's Close is called.
if err := p.src.Close(); err != nil {
return err
}
return p.closeImpl()
}
func (p *playerImpl) closeImpl() error {
select {
case p.closeCh <- struct{}{}:
<-p.closedCh
return nil
case <-p.readLoopEndedCh:
func (p *playerImpl) ensureReadLoop() error {
p.m.Lock()
defer p.m.Unlock()
if p.closedExplicitly {
return fmt.Errorf("audio: the player is already closed")
}
if p.runningReadLoop {
return nil
}
go p.readLoop()
return nil
}
func (p *playerImpl) closeImpl() error {
if err := p.ensureReadLoop(); err != nil {
return err
}
p.closeCh <- struct{}{}
<-p.closedCh
return nil
}
func (p *playerImpl) bufferToInt16(lengthInBytes int) ([]int16, error) {
select {
case p.proceedCh <- make([]int16, lengthInBytes/2):
r := <-p.proceededCh
return r.buf, r.err
case <-p.readLoopEndedCh:
return nil, fmt.Errorf("audio: the player is already closed")
if err := p.ensureReadLoop(); err != nil {
return nil, err
}
p.proceedCh <- make([]int16, lengthInBytes/2)
r := <-p.proceededCh
return r.buf, r.err
}
// Play plays the stream.
@ -395,12 +395,13 @@ func (p *playerImpl) Play() {
}
func (p *playerImpl) readLoop() {
p.m.Lock()
p.runningReadLoop = true
p.m.Unlock()
defer func() {
// Note: the error is ignored
p.src.Close()
// Receiving from a closed channel returns quickly
// i.e. `case <-p.readLoopEndedCh:` can check if this loops is ended.
close(p.readLoopEndedCh)
p.m.Lock()
p.runningReadLoop = false
p.m.Unlock()
}()
timer := time.NewTimer(0)
@ -599,14 +600,14 @@ func (p *playerImpl) Seek(offset time.Duration) error {
if _, ok := p.src.(io.Seeker); !ok {
panic("audio: player to be sought must be io.Seeker")
}
if err := p.ensureReadLoop(); err != nil {
return err
}
o := int64(offset) * bytesPerSample * int64(p.sampleRate) / int64(time.Second)
o &= mask
select {
case p.seekCh <- seekArgs{o, io.SeekStart}:
return <-p.seekedCh
case <-p.readLoopEndedCh:
return fmt.Errorf("audio: the player is already closed")
}
p.seekCh <- seekArgs{o, io.SeekStart}
return <-p.seekedCh
}
// Pause pauses the playing.

View File

@ -112,9 +112,7 @@ func (m *mux) Read(b []byte) (int, error) {
}
}
for _, p := range closed {
if p.isFinalized() {
p.closeImpl()
}
p.closeImpl()
delete(m.ps, p)
}