audio/internal/readerplayer: Move the buffer for resuming to the driver side

Closes #1633
This commit is contained in:
Hajime Hoshi 2021-05-05 21:46:19 +09:00
parent 4e0e5c6bbc
commit aa9f669ec3
4 changed files with 65 additions and 50 deletions

View File

@ -20,7 +20,6 @@ import (
type Context interface { type Context interface {
NewPlayer(io.Reader) Player NewPlayer(io.Reader) Player
MaxBufferSize() int
Suspend() error Suspend() error
Resume() error Resume() error
io.Closer io.Closer
@ -59,7 +58,7 @@ func (c *context) oneBufferSize() int {
// maxBufferSize returns the maximum size of the buffer for the audio source. // maxBufferSize returns the maximum size of the buffer for the audio source.
// This buffer is used when unreading on pausing the player. // This buffer is used when unreading on pausing the player.
func (c *context) MaxBufferSize() int { func (c *context) maxBufferSize() int {
// The number of underlying buffers should be 2. // The number of underlying buffers should be 2.
return c.oneBufferSize() * 2 return c.oneBufferSize() * 2
} }

View File

@ -80,8 +80,22 @@ type player struct {
} }
func (p *player) Pause() { func (p *player) Pause() {
// TODO: Implement the 'true' pause after #1633 is fixed. p.cond.L.Lock()
p.Reset() defer p.cond.L.Unlock()
if p.err != nil {
return
}
if p.closed {
return
}
if p.p == nil {
return
}
if err := p.p.Pause(); err != nil {
p.setErrorImpl(err)
return
}
} }
func (p *player) Play() { func (p *player) Play() {
@ -224,10 +238,10 @@ func (p *player) shouldWait() bool {
if p.p == nil { if p.p == nil {
return false return false
} }
if !p.p.IsPlaying() { if p.p.IsPlaying() {
return false return p.p.UnplayedBufferSize() >= int64(p.context.maxBufferSize())
} }
return p.p.UnplayedBufferSize() >= int64(p.context.MaxBufferSize()) return true
} }
func (p *player) wait() bool { func (p *player) wait() bool {

View File

@ -137,11 +137,17 @@ func (p *player) Pause() {
} }
// Change the state first. appendBuffer is called as an 'ended' callback. // Change the state first. appendBuffer is called as an 'ended' callback.
var data [2][]float32
for _, n := range p.bufferSourceNodes { for _, n := range p.bufferSourceNodes {
for ch := 0; ch < 2; ch++ {
t := n.Get("buffer").Call("getChannelData", ch)
data[ch] = append(data[ch], float32ArrayToFloat32Slice(t)...)
}
n.Set("onended", nil) n.Set("onended", nil)
n.Call("stop") n.Call("stop")
n.Call("disconnect") n.Call("disconnect")
} }
p.buf = append(fromLR(data[0], data[1]), p.buf...)
p.state = playerPaused p.state = playerPaused
p.bufferSourceNodes = p.bufferSourceNodes[:0] p.bufferSourceNodes = p.bufferSourceNodes[:0]
p.nextPos = 0 p.nextPos = 0
@ -264,7 +270,7 @@ func (p *player) UnplayedBufferSize() int64 {
for _, n := range p.bufferSourceNodes { for _, n := range p.bufferSourceNodes {
sec += n.Get("buffer").Get("duration").Float() sec += n.Get("buffer").Get("duration").Float()
} }
return int64(sec * float64(p.context.sampleRate*p.context.channelNum*p.context.bitDepthInBytes)) return int64(len(p.buf)) + int64(sec*float64(p.context.sampleRate*p.context.channelNum*p.context.bitDepthInBytes))
} }
func (p *player) Err() error { func (p *player) Err() error {
@ -287,10 +293,6 @@ func (w *go2cppDriverWrapper) NewPlayer(r io.Reader) Player {
return w.c.NewPlayer(r) return w.c.NewPlayer(r)
} }
func (w *go2cppDriverWrapper) MaxBufferSize() int {
return w.c.MaxBufferSize()
}
func (w *go2cppDriverWrapper) Suspend() error { func (w *go2cppDriverWrapper) Suspend() error {
// Do nothing so far. // Do nothing so far.
return nil return nil
@ -317,6 +319,24 @@ func toLR(data []byte) ([]float32, []float32) {
return l, r return l, r
} }
func fromLR(l, r []float32) []byte {
const max = 1 << 15
if len(l) != len(r) {
panic("readerdriver: len(l) must equal to len(r) at fromLR")
}
bs := make([]byte, len(l)*4)
for i := range l {
lv := int16(l[i] * max)
bs[4*i] = byte(lv)
bs[4*i+1] = byte(lv >> 8)
rv := int16(r[i] * max)
bs[4*i+2] = byte(rv)
bs[4*i+3] = byte(rv >> 8)
}
return bs
}
func float32SliceToTypedArray(s []float32) js.Value { func float32SliceToTypedArray(s []float32) js.Value {
h := (*reflect.SliceHeader)(unsafe.Pointer(&s)) h := (*reflect.SliceHeader)(unsafe.Pointer(&s))
h.Len *= 4 h.Len *= 4
@ -329,3 +349,16 @@ func float32SliceToTypedArray(s []float32) js.Value {
buf := a.Get("buffer") buf := a.Get("buffer")
return js.Global().Get("Float32Array").New(buf, a.Get("byteOffset"), a.Get("byteLength").Int()/4) return js.Global().Get("Float32Array").New(buf, a.Get("byteOffset"), a.Get("byteLength").Int()/4)
} }
func float32ArrayToFloat32Slice(v js.Value) []float32 {
bs := make([]byte, v.Get("byteLength").Int())
js.CopyBytesToGo(bs, js.Global().Get("Uint8Array").New(v.Get("buffer"), v.Get("byteOffset"), v.Get("byteLength")))
h := (*reflect.SliceHeader)(unsafe.Pointer(&bs))
h.Len /= 4
h.Cap /= 4
f32s := *(*[]float32)(unsafe.Pointer(h))
runtime.KeepAlive(bs)
return f32s
}

View File

@ -92,7 +92,7 @@ func (p *readerPlayer) ensurePlayer() error {
p.factory.context = c p.factory.context = c
} }
if p.stream == nil { if p.stream == nil {
s, err := newTimeStream(p.src, p.factory.sampleRate, p.factory.context.MaxBufferSize()) s, err := newTimeStream(p.src, p.factory.sampleRate)
if err != nil { if err != nil {
return err return err
} }
@ -130,9 +130,7 @@ func (p *readerPlayer) Pause() {
return return
} }
n := p.player.UnplayedBufferSize()
p.player.Pause() p.player.Pause()
p.stream.Unread(int(n))
p.context.removePlayer(p) p.context.removePlayer(p)
} }
@ -234,20 +232,16 @@ type timeStream struct {
r io.Reader r io.Reader
sampleRate int sampleRate int
pos int64 pos int64
buf []byte
unread int
maxBufferSize int
// m is a mutex for this stream. // m is a mutex for this stream.
// All the exported functions are protected by this mutex as Read can be read from a different goroutine than Seek. // All the exported functions are protected by this mutex as Read can be read from a different goroutine than Seek.
m sync.Mutex m sync.Mutex
} }
func newTimeStream(r io.Reader, sampleRate int, maxBufferSize int) (*timeStream, error) { func newTimeStream(r io.Reader, sampleRate int) (*timeStream, error) {
s := &timeStream{ s := &timeStream{
r: r, r: r,
sampleRate: sampleRate, sampleRate: sampleRate,
maxBufferSize: maxBufferSize,
} }
if seeker, ok := s.r.(io.Seeker); ok { if seeker, ok := s.r.(io.Seeker); ok {
// Get the current position of the source. // Get the current position of the source.
@ -260,35 +254,12 @@ func newTimeStream(r io.Reader, sampleRate int, maxBufferSize int) (*timeStream,
return s, nil return s, nil
} }
func (s *timeStream) Unread(n int) {
s.m.Lock()
defer s.m.Unlock()
if s.unread+n > len(s.buf) {
// This should not happen usually, but the player's UnplayedBufferSize can include some errors.
n = len(s.buf) - s.unread
}
s.unread += n
s.pos -= int64(n)
}
func (s *timeStream) Read(buf []byte) (int, error) { func (s *timeStream) Read(buf []byte) (int, error) {
s.m.Lock() s.m.Lock()
defer s.m.Unlock() defer s.m.Unlock()
if s.unread > 0 {
n := copy(buf, s.buf[len(s.buf)-s.unread:])
s.unread -= n
s.pos += int64(n)
return n, nil
}
n, err := s.r.Read(buf) n, err := s.r.Read(buf)
s.pos += int64(n) s.pos += int64(n)
s.buf = append(s.buf, buf[:n]...)
if m := s.maxBufferSize; len(s.buf) > m {
s.buf = s.buf[len(s.buf)-m:]
}
return n, err return n, err
} }
@ -312,8 +283,6 @@ func (s *timeStream) Seek(offset time.Duration) error {
} }
s.pos = pos s.pos = pos
s.buf = s.buf[:0]
s.unread = 0
return nil return nil
} }