audio/internal/readerdriver: Share one goroutine to read the source to the buffers (iOS/macOS)

This fix improves the latency of the audio.

Closes #1662
This commit is contained in:
Hajime Hoshi 2021-06-09 01:47:36 +09:00
parent 67e5fae9c0
commit 95c494f47e

View File

@ -197,35 +197,41 @@ type playerImpl struct {
state playerState
err error
eof bool
cond *sync.Cond
volume float64
m sync.Mutex
}
type players struct {
players map[C.AudioQueueRef]*playerImpl
toResume map[*playerImpl]struct{}
m sync.Mutex
cond *sync.Cond
}
func (p *players) add(player *playerImpl, audioQueue C.AudioQueueRef) {
p.m.Lock()
defer p.m.Unlock()
p.cond.L.Lock()
defer p.cond.L.Unlock()
if p.players == nil {
p.players = map[C.AudioQueueRef]*playerImpl{}
}
runLoop := len(p.players) == 0
p.players[audioQueue] = player
if runLoop {
// Use the only one loop for multiple players (#1662).
go p.loop()
}
}
func (p *players) get(audioQueue C.AudioQueueRef) *playerImpl {
p.m.Lock()
defer p.m.Unlock()
p.cond.L.Lock()
defer p.cond.L.Unlock()
return p.players[audioQueue]
}
func (p *players) remove(audioQueue C.AudioQueueRef) {
p.m.Lock()
defer p.m.Unlock()
p.cond.L.Lock()
defer p.cond.L.Unlock()
pl, ok := p.players[audioQueue]
if !ok {
@ -233,11 +239,56 @@ func (p *players) remove(audioQueue C.AudioQueueRef) {
}
delete(p.players, audioQueue)
delete(p.toResume, pl)
p.cond.Signal()
}
func (p *players) shouldWait() bool {
if len(p.players) == 0 {
return false
}
for _, pl := range p.players {
if pl.canReadSourceToBuffer() {
return false
}
}
return true
}
func (p *players) wait() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for p.shouldWait() {
p.cond.Wait()
}
return len(p.players) > 0
}
func (p *players) loop() {
var players []*playerImpl
for {
if !p.wait() {
return
}
p.cond.L.Lock()
players = players[:0]
for _, pl := range p.players {
players = append(players, pl)
}
p.cond.L.Unlock()
for _, pl := range players {
pl.readSourceToBuffer()
}
}
}
func (p *players) suspend() error {
p.m.Lock()
defer p.m.Unlock()
p.cond.L.Lock()
defer p.cond.L.Unlock()
for _, pl := range p.players {
if !pl.IsPlaying() {
@ -259,13 +310,13 @@ func (p *players) suspend() error {
func (p *players) resume() error {
// playerImpl's Play can touch p. Avoid the deadlock.
p.m.Lock()
p.cond.L.Lock()
players := map[*playerImpl]struct{}{}
for pl := range p.toResume {
players[pl] = struct{}{}
delete(p.toResume, pl)
}
p.m.Unlock()
p.cond.L.Unlock()
for pl := range players {
pl.Play()
@ -276,14 +327,15 @@ func (p *players) resume() error {
return nil
}
var thePlayers players
var thePlayers = &players{
cond: sync.NewCond(&sync.Mutex{}),
}
func (c *context) NewPlayer(src io.Reader) Player {
p := &player{
p: &playerImpl{
context: c,
src: src,
cond: sync.NewCond(&sync.Mutex{}),
volume: 1,
},
}
@ -296,8 +348,8 @@ func (p *player) Err() error {
}
func (p *playerImpl) Err() error {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return p.err
}
@ -310,8 +362,8 @@ func (p *playerImpl) Play() {
// Call Play asynchronously since AudioQueuePrime and AudioQueuePlay might take long.
ch := make(chan struct{})
go func() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
close(ch)
p.playImpl()
}()
@ -328,7 +380,6 @@ func (p *playerImpl) playImpl() {
return
}
var runLoop bool
if p.audioQueue == nil {
audioQueue, audioQueueBuffers, err := p.context.audioQueuePool.Get()
if err != nil {
@ -338,9 +389,10 @@ func (p *playerImpl) playImpl() {
p.audioQueue = audioQueue
p.unqueuedBufs = audioQueueBuffers
C.AudioQueueSetParameter(p.audioQueue, C.kAudioQueueParam_Volume, C.AudioQueueParameterValue(p.volume))
thePlayers.add(p, p.audioQueue)
runLoop = true
p.m.Unlock()
thePlayers.add(p, p.audioQueue)
p.m.Lock()
}
buf := make([]byte, p.context.maxBufferSize())
@ -395,11 +447,6 @@ func (p *playerImpl) playImpl() {
}
p.state = playerPlay
p.cond.Signal()
if runLoop {
go p.loop()
}
}
func (p *player) Pause() {
@ -407,8 +454,8 @@ func (p *player) Pause() {
}
func (p *playerImpl) Pause() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
if p.err != nil {
return
@ -425,7 +472,6 @@ func (p *playerImpl) Pause() {
return
}
p.state = playerPaused
p.cond.Signal()
}
func (p *player) Reset() {
@ -433,8 +479,8 @@ func (p *player) Reset() {
}
func (p *playerImpl) Reset() {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
p.resetImpl()
}
@ -456,9 +502,9 @@ func (p *playerImpl) resetImpl() {
}
// AudioQueueReset invokes the callback directry.
q := p.audioQueue
p.cond.L.Unlock()
p.m.Unlock()
osstatus := C.AudioQueueReset(q)
p.cond.L.Lock()
p.m.Lock()
if osstatus != C.noErr && p.err == nil {
p.setErrorImpl(fmt.Errorf("readerdriver: AudioQueueReset failed: %d", osstatus))
return
@ -468,7 +514,7 @@ func (p *playerImpl) resetImpl() {
p.state = playerPaused
p.buf = p.buf[:0]
p.eof = false
p.cond.Signal()
thePlayers.cond.Signal()
}
func (p *player) IsPlaying() bool {
@ -476,8 +522,8 @@ func (p *player) IsPlaying() bool {
}
func (p *playerImpl) IsPlaying() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return p.state == playerPlay
}
@ -487,8 +533,8 @@ func (p *player) Volume() float64 {
}
func (p *playerImpl) Volume() float64 {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return p.volume
}
@ -497,8 +543,8 @@ func (p *player) SetVolume(volume float64) {
}
func (p *playerImpl) SetVolume(volume float64) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
p.volume = volume
if p.audioQueue == nil {
@ -512,8 +558,8 @@ func (p *player) UnplayedBufferSize() int {
}
func (p *playerImpl) UnplayedBufferSize() int {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return len(p.buf)
}
@ -523,8 +569,8 @@ func (p *player) Close() error {
}
func (p *playerImpl) Close() error {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return p.closeImpl()
}
@ -535,9 +581,9 @@ func (p *playerImpl) closeImpl() error {
// AudioQueueStop might invoke AudioQueueReset. Unlock the mutex here to avoid a deadlock.
q := p.audioQueue
p.cond.L.Unlock()
p.m.Unlock()
osstatus := C.AudioQueueStop(q, C.true)
p.cond.L.Lock()
p.m.Lock()
if osstatus != C.noErr && p.err == nil {
// setErrorImpl calls closeImpl. Do not call this.
@ -548,18 +594,24 @@ func (p *playerImpl) closeImpl() error {
if err := p.context.audioQueuePool.Put(p.audioQueue); err != nil && p.err == nil {
p.err = err
}
p.m.Unlock()
thePlayers.remove(p.audioQueue)
p.m.Lock()
p.audioQueue = nil
}
p.state = playerClosed
p.cond.Signal()
return p.err
}
//export ebiten_readerdriver_render
func ebiten_readerdriver_render(inUserData unsafe.Pointer, inAQ C.AudioQueueRef, inBuffer C.AudioQueueBufferRef) {
p := thePlayers.get(inAQ)
// The player might be already closed.
if p == nil {
return
}
queued, err := p.appendBuffer(inBuffer)
if err != nil {
p.setError(err)
@ -569,8 +621,8 @@ func ebiten_readerdriver_render(inUserData unsafe.Pointer, inAQ C.AudioQueueRef,
return
}
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
p.unqueuedBufs = append(p.unqueuedBufs, inBuffer)
if len(p.unqueuedBufs) == 2 && p.eof {
p.resetImpl()
@ -578,8 +630,8 @@ func ebiten_readerdriver_render(inUserData unsafe.Pointer, inAQ C.AudioQueueRef,
}
func (p *playerImpl) appendBuffer(inBuffer C.AudioQueueBufferRef) (bool, error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
return p.appendBufferImpl(inBuffer)
}
@ -594,7 +646,6 @@ func (p *playerImpl) appendBufferImpl(inBuffer C.AudioQueueBufferRef) (bool, err
n = len(p.buf)
}
buf := p.buf[:n]
signal := len(p.buf[n:]) < p.context.maxBufferSize()
for i, b := range buf {
*(*byte)(unsafe.Pointer(uintptr(inBuffer.mAudioData) + uintptr(i))) = b
@ -612,40 +663,13 @@ func (p *playerImpl) appendBufferImpl(inBuffer C.AudioQueueBufferRef) (bool, err
}
p.buf = p.buf[n:]
if signal {
p.cond.Signal()
}
thePlayers.cond.Signal()
return true, nil
}
func (p *playerImpl) shouldWait() bool {
switch p.state {
case playerPaused:
return true
case playerPlay:
// If the buffer has too much data, wait until the buffer data is consumed.
// If the source reaches EOF, wait until the state is reset.
return len(p.buf) >= p.context.maxBufferSize() || p.eof
case playerClosed:
return false
default:
panic("not reached")
}
}
func (p *playerImpl) wait() bool {
p.cond.L.Lock()
defer p.cond.L.Unlock()
for p.shouldWait() {
p.cond.Wait()
}
return p.state == playerPlay
}
func (p *playerImpl) setError(err error) {
p.cond.L.Lock()
defer p.cond.L.Unlock()
p.m.Lock()
defer p.m.Unlock()
p.setErrorImpl(err)
}
@ -654,25 +678,46 @@ func (p *playerImpl) setErrorImpl(err error) {
p.closeImpl()
}
func (p *playerImpl) loop() {
buf := make([]byte, 4096)
for {
if !p.wait() {
return
}
func (p *playerImpl) canReadSourceToBuffer() bool {
p.m.Lock()
defer p.m.Unlock()
n, err := p.src.Read(buf)
if err != nil && err != io.EOF {
p.setError(err)
return
}
if p.eof {
return false
}
return len(p.buf) < p.context.maxBufferSize()
}
p.cond.L.Lock()
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF && len(p.buf) == 0 {
p.eof = true
}
p.cond.L.Unlock()
func (p *playerImpl) readSourceToBuffer() {
p.m.Lock()
defer p.m.Unlock()
if p.err != nil {
return
}
if p.state == playerClosed {
return
}
maxBufferSize := p.context.maxBufferSize()
if len(p.buf) >= maxBufferSize {
return
}
src := p.src
p.m.Unlock()
buf := make([]byte, maxBufferSize)
n, err := src.Read(buf)
p.m.Lock()
if err != nil && err != io.EOF {
p.setError(err)
return
}
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF && len(p.buf) == 0 {
p.eof = true
}
}