audio: Reduce locks by using channels

This commit is contained in:
Hajime Hoshi 2017-12-23 17:24:51 +09:00
parent ba3de063ac
commit db77658935

View File

@ -73,16 +73,15 @@ func (p *players) Read(b []byte) (int, error) {
} }
l := len(b) l := len(b)
for player := range p.players {
if err := player.getReadErr(); err != nil {
return 0, err
}
}
l &= mask l &= mask
b16s := [][]int16{} b16s := [][]int16{}
for player := range p.players { for player := range p.players {
b16s = append(b16s, player.bufferToInt16(l)) buf, err := player.bufferToInt16(l)
if err != nil {
return 0, err
}
b16s = append(b16s, buf)
} }
for i := 0; i < l/2; i++ { for i := 0; i < l/2; i++ {
x := 0 x := 0
@ -326,19 +325,31 @@ type Player struct {
src ReadSeekCloser src ReadSeekCloser
srcEOF bool srcEOF bool
sampleRate int sampleRate int
reading bool
readErr error
buf []byte buf []byte
pos int64 pos int64
volume float64 volume float64
closeCh chan struct{} closeCh chan struct{}
closedCh chan struct{} closedCh chan error
seekCh chan seekArgs
seekedCh chan error
proceedCh chan []int16
proceededCh chan proceededValues
m sync.RWMutex m sync.RWMutex
} }
type seekArgs struct {
offset int64
whence int
}
type proceededValues struct {
buf []int16
err error
}
// NewPlayer creates a new player with the given stream. // NewPlayer creates a new player with the given stream.
// //
// src's format must be linear PCM (16bits little endian, 2 channel stereo) // src's format must be linear PCM (16bits little endian, 2 channel stereo)
@ -359,6 +370,12 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) {
sampleRate: context.sampleRate, sampleRate: context.sampleRate,
buf: []byte{}, buf: []byte{},
volume: 1, volume: 1,
closeCh: make(chan struct{}),
closedCh: make(chan error),
seekCh: make(chan seekArgs),
seekedCh: make(chan error),
proceedCh: make(chan []int16),
proceededCh: make(chan proceededValues),
} }
// Get the current position of the source. // Get the current position of the source.
pos, err := p.src.Seek(0, io.SeekCurrent) pos, err := p.src.Seek(0, io.SeekCurrent)
@ -367,6 +384,10 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) {
} }
p.pos = pos p.pos = pos
runtime.SetFinalizer(p, (*Player).Close) runtime.SetFinalizer(p, (*Player).Close)
go func() {
p.readLoop()
}()
return p, nil return p, nil
} }
@ -398,40 +419,14 @@ func (p *Player) Close() error {
runtime.SetFinalizer(p, nil) runtime.SetFinalizer(p, nil)
p.players.removePlayer(p) p.players.removePlayer(p)
p.m.Lock() p.closeCh <- struct{}{}
err := p.src.Close() return <-p.closedCh
p.m.Unlock()
close(p.closeCh)
<-p.closedCh
return err
} }
func (p *Player) bufferToInt16(lengthInBytes int) []int16 { func (p *Player) bufferToInt16(lengthInBytes int) ([]int16, error) {
r := make([]int16, lengthInBytes/2) p.proceedCh <- make([]int16, lengthInBytes/2)
r := <-p.proceededCh
p.m.Lock() return r.buf, r.err
l := lengthInBytes
// Buffer size needs to be much more than the actual required length
// so that noise due to empty buffer can be avoided.
if len(p.buf) < lengthInBytes*4 && !p.srcEOF {
p.m.Unlock()
return r
}
if l > len(p.buf) {
l = len(p.buf)
}
for i := 0; i < l/2; i++ {
r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8)
r[i] = int16(float64(r[i]) * p.volume)
}
p.pos += int64(l)
p.buf = p.buf[l:]
p.m.Unlock()
return r
} }
// Play plays the stream. // Play plays the stream.
@ -439,35 +434,31 @@ func (p *Player) bufferToInt16(lengthInBytes int) []int16 {
// Play always returns nil. // Play always returns nil.
func (p *Player) Play() error { func (p *Player) Play() error {
p.players.addPlayer(p) p.players.addPlayer(p)
p.startRead()
return nil return nil
} }
func (p *Player) startRead() {
p.m.Lock()
if !p.reading && p.readErr == nil {
p.closeCh = make(chan struct{})
p.closedCh = make(chan struct{})
p.reading = true
p.srcEOF = false
go func() {
p.readLoop()
p.m.Lock()
p.reading = false
p.m.Unlock()
// TODO: How about rewinding?
close(p.closedCh)
}()
}
p.m.Unlock()
}
func (p *Player) readLoop() { func (p *Player) readLoop() {
t := time.After(0) t := time.After(0)
var readErr error
for { for {
select { select {
case <-p.closeCh: case <-p.closeCh:
p.closedCh <- p.src.Close()
return return
case s := <-p.seekCh:
pos, err := p.src.Seek(s.offset, s.whence)
p.m.Lock()
p.buf = nil
p.pos = pos
p.srcEOF = false
p.m.Unlock()
p.seekedCh <- err
t = time.After(time.Millisecond)
break
case <-t: case <-t:
p.m.Lock() p.m.Lock()
if len(p.buf) >= 4096*16 { if len(p.buf) >= 4096*16 {
@ -475,8 +466,12 @@ func (p *Player) readLoop() {
p.m.Unlock() p.m.Unlock()
break break
} }
p.m.Unlock()
buf := make([]byte, 4096) buf := make([]byte, 4096)
n, err := p.src.Read(buf) n, err := p.src.Read(buf)
p.m.Lock()
p.buf = append(p.buf, buf[:n]...) p.buf = append(p.buf, buf[:n]...)
if err == io.EOF { if err == io.EOF {
p.srcEOF = true p.srcEOF = true
@ -484,16 +479,47 @@ func (p *Player) readLoop() {
if p.srcEOF && len(p.buf) == 0 { if p.srcEOF && len(p.buf) == 0 {
t = nil t = nil
p.m.Unlock() p.m.Unlock()
return break
} }
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
p.readErr = err readErr = err
t = nil t = nil
p.m.Unlock() p.m.Unlock()
return break
} }
t = time.After(time.Millisecond) t = time.After(time.Millisecond)
p.m.Unlock() p.m.Unlock()
case buf := <-p.proceedCh:
if readErr != nil {
p.proceededCh <- proceededValues{buf, readErr}
return
}
lengthInBytes := len(buf) * 2
l := lengthInBytes
p.m.Lock()
// Buffer size needs to be much more than the actual required length
// so that noise caused by empty buffer can be avoided.
if len(p.buf) < lengthInBytes*4 && !p.srcEOF {
p.m.Unlock()
p.proceededCh <- proceededValues{buf, nil}
break
}
if l > len(p.buf) {
l = len(p.buf)
}
for i := 0; i < l/2; i++ {
buf[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8)
buf[i] = int16(float64(buf[i]) * p.volume)
}
p.pos += int64(l)
p.buf = p.buf[l:]
p.m.Unlock()
p.proceededCh <- proceededValues{buf, nil}
} }
} }
} }
@ -505,13 +531,6 @@ func (p *Player) eof() bool {
return r return r
} }
func (p *Player) getReadErr() error {
p.m.RLock()
e := p.readErr
p.m.RUnlock()
return e
}
// IsPlaying returns boolean indicating whether the player is playing. // IsPlaying returns boolean indicating whether the player is playing.
func (p *Player) IsPlaying() bool { func (p *Player) IsPlaying() bool {
return p.players.hasPlayer(p) return p.players.hasPlayer(p)
@ -530,19 +549,8 @@ func (p *Player) Rewind() error {
func (p *Player) Seek(offset time.Duration) error { func (p *Player) Seek(offset time.Duration) error {
o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second) o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second)
o &= mask o &= mask
p.seekCh <- seekArgs{o, io.SeekStart}
p.m.Lock() return <-p.seekedCh
pos, err := p.src.Seek(o, io.SeekStart)
if err != nil {
p.m.Unlock()
return err
}
p.buf = nil
p.pos = pos
p.srcEOF = false
p.m.Unlock()
return nil
} }
// Pause pauses the playing. // Pause pauses the playing.