audio/internal/readerdriver: Create a dedicated loop to read the source (js)

Closes #1637
This commit is contained in:
Hajime Hoshi 2021-05-10 00:47:03 +09:00
parent df40217427
commit 2c8d3826fa

View File

@ -19,6 +19,7 @@ import (
"io" "io"
"reflect" "reflect"
"runtime" "runtime"
"sync"
"syscall/js" "syscall/js"
"unsafe" "unsafe"
@ -102,6 +103,8 @@ type player struct {
nextPos float64 nextPos float64
bufferSourceNodes []js.Value bufferSourceNodes []js.Value
appendBufferFunc js.Func appendBufferFunc js.Func
cond *sync.Cond
} }
func (c *context) NewPlayer(src io.Reader) Player { func (c *context) NewPlayer(src io.Reader) Player {
@ -109,6 +112,7 @@ func (c *context) NewPlayer(src io.Reader) Player {
context: c, context: c,
src: src, src: src,
gain: c.audioContext.Call("createGain"), gain: c.audioContext.Call("createGain"),
cond: sync.NewCond(&sync.Mutex{}),
} }
p.appendBufferFunc = js.FuncOf(p.appendBuffer) p.appendBufferFunc = js.FuncOf(p.appendBuffer)
p.gain.Call("connect", c.audioContext.Get("destination")) p.gain.Call("connect", c.audioContext.Get("destination"))
@ -127,18 +131,44 @@ func (c *context) Resume() error {
} }
func (p *player) Play() { func (p *player) Play() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.err != nil { if p.err != nil {
return return
} }
if p.state != playerPaused { if p.state != playerPaused {
return return
} }
buf := make([]byte, p.context.maxBufferSize())
for len(p.buf) < p.context.maxBufferSize() {
n, err := p.src.Read(buf)
if err != nil && err != io.EOF {
p.setErrorImpl(err)
return
}
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF {
p.eof = true
break
}
}
p.state = playerPlay p.state = playerPlay
p.appendBuffer(js.Undefined(), nil) p.appendBufferImpl(js.Undefined())
p.appendBuffer(js.Undefined(), nil) p.appendBufferImpl(js.Undefined())
go p.loop()
} }
func (p *player) Pause() { func (p *player) Pause() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.pauseImpl()
}
func (p *player) pauseImpl() {
if p.err != nil { if p.err != nil {
return return
} }
@ -161,27 +191,36 @@ func (p *player) Pause() {
p.state = playerPaused p.state = playerPaused
p.bufferSourceNodes = p.bufferSourceNodes[:0] p.bufferSourceNodes = p.bufferSourceNodes[:0]
p.nextPos = 0 p.nextPos = 0
p.cond.Signal()
} }
func (p *player) appendBuffer(this js.Value, args []js.Value) interface{} { func (p *player) appendBuffer(this js.Value, args []js.Value) interface{} {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.appendBufferImpl(this)
return nil
}
func (p *player) appendBufferImpl(audioBuffer js.Value) {
// appendBuffer is called as the 'ended' callback of a buffer. // appendBuffer is called as the 'ended' callback of a buffer.
// 'this' is an AudioBufferSourceNode that already finishes its playing. // 'this' is an AudioBufferSourceNode that already finishes its playing.
for i, n := range p.bufferSourceNodes { for i, n := range p.bufferSourceNodes {
if jsutil.Equal(n, this) { if jsutil.Equal(n, audioBuffer) {
p.bufferSourceNodes = append(p.bufferSourceNodes[:i], p.bufferSourceNodes[i+1:]...) p.bufferSourceNodes = append(p.bufferSourceNodes[:i], p.bufferSourceNodes[i+1:]...)
break break
} }
} }
if p.state != playerPlay { if p.state != playerPlay {
return nil return
} }
if p.eof { if p.eof && len(p.buf) == 0 {
if len(p.bufferSourceNodes) == 0 { if len(p.bufferSourceNodes) == 0 {
p.Pause() p.pauseImpl()
} }
return nil return
} }
c := p.context.audioContext.Get("currentTime").Float() c := p.context.audioContext.Get("currentTime").Float()
@ -190,37 +229,16 @@ func (p *player) appendBuffer(this js.Value, args []js.Value) interface{} {
p.nextPos = c + 1.0/60.0 p.nextPos = c + 1.0/60.0
} }
tmp := make([]byte, 4096) bs := make([]byte, p.context.oneBufferSize())
bs := make([]byte, 0, p.context.oneBufferSize()) n := copy(bs, p.buf)
for cap(bs)-len(bs) > 0 { p.buf = p.buf[n:]
if len(p.buf) > 0 { if len(p.buf) < p.context.maxBufferSize() {
n := len(p.buf) p.cond.Signal()
if need := cap(bs) - len(bs); n > need {
n = need
}
bs = append(bs, p.buf[:n]...)
p.buf = p.buf[n:]
continue
}
n, err := p.src.Read(tmp)
if err != nil && err != io.EOF {
p.err = err
p.Pause()
return nil
}
if need := cap(bs) - len(bs); n > need {
p.buf = append(p.buf, tmp[need:]...)
n = need
}
bs = append(bs, tmp[:n]...)
if err == io.EOF {
p.eof = true
break
}
} }
if len(bs) == 0 { if len(bs) == 0 {
return nil // createBuffer fails with 0 bytes. Add some zeros instead.
bs = make([]byte, 4096)
} }
l, r := toLR(bs) l, r := toLR(bs)
@ -244,7 +262,7 @@ func (p *player) appendBuffer(this js.Value, args []js.Value) interface{} {
p.nextPos += buf.Get("duration").Float() p.nextPos += buf.Get("duration").Float()
p.bufferSourceNodes = append(p.bufferSourceNodes, s) p.bufferSourceNodes = append(p.bufferSourceNodes, s)
return nil return
} }
func (p *player) IsPlaying() bool { func (p *player) IsPlaying() bool {
@ -252,6 +270,9 @@ func (p *player) IsPlaying() bool {
} }
func (p *player) Reset() { func (p *player) Reset() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.err != nil { if p.err != nil {
return return
} }
@ -259,7 +280,7 @@ func (p *player) Reset() {
return return
} }
p.Pause() p.pauseImpl()
p.eof = false p.eof = false
p.buf = p.buf[:0] p.buf = p.buf[:0]
} }
@ -273,6 +294,9 @@ func (p *player) SetVolume(volume float64) {
} }
func (p *player) UnplayedBufferSize() int64 { func (p *player) UnplayedBufferSize() int64 {
p.cond.L.Lock()
defer p.cond.L.Unlock()
// This is not an accurate buffer size as part of the buffers might already be consumed. // This is not an accurate buffer size as part of the buffers might already be consumed.
var sec float64 var sec float64
for _, n := range p.bufferSourceNodes { for _, n := range p.bufferSourceNodes {
@ -282,15 +306,79 @@ func (p *player) UnplayedBufferSize() int64 {
} }
func (p *player) Err() error { func (p *player) Err() error {
p.cond.L.Lock()
defer p.cond.L.Unlock()
return p.err return p.err
} }
func (p *player) Close() error { func (p *player) Close() error {
p.cond.L.Lock()
defer p.cond.L.Unlock()
return p.closeImpl()
}
func (p *player) closeImpl() error {
runtime.SetFinalizer(p, nil) runtime.SetFinalizer(p, nil)
p.Reset() p.Reset()
p.state = playerClosed p.state = playerClosed
p.appendBufferFunc.Release() p.appendBufferFunc.Release()
return nil p.cond.Signal()
return p.err
}
func (p *player) setErrorImpl(err error) {
p.err = err
p.closeImpl()
}
func (p *player) shouldWait() bool {
switch p.state {
case playerPaused:
// Even when the player is paused, the loop immediately ends.
// WebAudio doesn't have a notion of pause.
return false
case playerPlay:
return len(p.buf) >= p.context.maxBufferSize() || p.eof
case playerClosed:
return false
default:
panic("not reached")
}
}
func (p *player) wait() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for p.shouldWait() {
p.cond.Wait()
}
return p.state == playerPlay && !p.eof
}
func (p *player) loop() {
buf := make([]byte, 4096)
for {
if !p.wait() {
return
}
p.cond.L.Lock()
n, err := p.src.Read(buf)
if err != nil && err != io.EOF {
p.setErrorImpl(err)
p.cond.L.Unlock()
return
}
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF {
p.eof = true
p.cond.L.Unlock()
return
}
p.cond.L.Unlock()
}
} }
type go2cppDriverWrapper struct { type go2cppDriverWrapper struct {