audio: Reimplement audio by using multiple oto.Player

This commit is contained in:
Hajime Hoshi 2019-04-29 02:27:12 +09:00
parent 339e76afec
commit c112c31e7e
6 changed files with 139 additions and 379 deletions

View File

@ -33,13 +33,17 @@ package audio
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
"sync" "sync"
"time" "time"
)
"github.com/hajimehoshi/ebiten/internal/web" const (
channelNum = 2
bytesPerSample = 2 * channelNum
) )
// A Context represents a current state of audio. // A Context represents a current state of audio.
@ -51,13 +55,14 @@ import (
type Context struct { type Context struct {
c context c context
mux *mux
sampleRate int sampleRate int
err error err error
inited bool inited bool
suspended bool suspended bool
ready bool ready bool
players map[*playerImpl]struct{}
m sync.Mutex m sync.Mutex
} }
@ -88,9 +93,9 @@ func NewContext(sampleRate int) (*Context, error) {
c := &Context{ c := &Context{
sampleRate: sampleRate, sampleRate: sampleRate,
c: newContext(sampleRate), c: newContext(sampleRate),
players: map[*playerImpl]struct{}{},
} }
theContext = c theContext = c
c.mux = newMux()
h := getHook() h := getHook()
h.OnSuspendAudio(func() { h.OnSuspendAudio(func() {
@ -105,6 +110,7 @@ func NewContext(sampleRate int) (*Context, error) {
}) })
h.AppendHookOnBeforeUpdate(func() error { h.AppendHookOnBeforeUpdate(func() error {
// On Android, audio loop cannot be started unless JVM is accessible. After updating one frame, JVM should exist.
c.m.Lock() c.m.Lock()
c.inited = true c.inited = true
c.m.Unlock() c.m.Unlock()
@ -120,8 +126,6 @@ func NewContext(sampleRate int) (*Context, error) {
return err return err
}) })
go c.loop()
return c, nil return c, nil
} }
@ -141,6 +145,13 @@ func (c *Context) playable() bool {
return i && !s return i && !s
} }
func (c *Context) hasError() bool {
c.m.Lock()
r := c.err != nil
c.m.Unlock()
return r
}
func (c *Context) setError(err error) { func (c *Context) setError(err error) {
// TODO: What if c.err already exists? // TODO: What if c.err already exists?
c.m.Lock() c.m.Lock()
@ -154,25 +165,28 @@ func (c *Context) setReady() {
c.m.Unlock() c.m.Unlock()
} }
func (c *Context) loop() { func (c *Context) addPlayer(p *playerImpl) {
defer c.c.Close() c.m.Lock()
defer c.m.Unlock()
c.players[p] = struct{}{}
p := c.c.NewPlayer() // Check the source duplication
defer p.Close() srcs := map[io.ReadCloser]struct{}{}
for p := range c.players {
for { if _, ok := srcs[p.src]; ok {
if !c.playable() { c.err = errors.New("audio: a same source is used by multiple Player")
runtime.Gosched()
continue
}
if _, err := io.CopyN(p, c.mux, 2048); err != nil {
c.setError(err)
return return
} }
c.setReady() srcs[p.src] = struct{}{}
} }
} }
func (c *Context) removePlayer(p *playerImpl) {
c.m.Lock()
delete(c.players, p)
c.m.Unlock()
}
// IsReady returns a boolean value indicating whether the audio is ready or not. // IsReady returns a boolean value indicating whether the audio is ready or not.
// //
// On some browsers, user interaction like click or pressing keys is required to start audio. // On some browsers, user interaction like click or pressing keys is required to start audio.
@ -236,9 +250,8 @@ type Player struct {
} }
type playerImpl struct { type playerImpl struct {
mux *mux context *Context
src io.ReadCloser src io.ReadCloser
srcEOF bool
sampleRate int sampleRate int
playing bool playing bool
closedExplicitly bool closedExplicitly bool
@ -248,22 +261,9 @@ type playerImpl struct {
pos int64 pos int64
volume float64 volume float64
closeCh chan struct{}
closedCh chan struct{}
seekCh chan struct{}
seekedCh chan struct{}
proceedCh chan []int16
proceededCh chan proceededValues
syncCh chan func()
m sync.Mutex m sync.Mutex
} }
type proceededValues struct {
buf []int16
err error
}
// NewPlayer creates a new player with the given stream. // NewPlayer creates a new player with the given stream.
// //
// src's format must be linear PCM (16bits little endian, 2 channel stereo) // src's format must be linear PCM (16bits little endian, 2 channel stereo)
@ -282,17 +282,10 @@ type proceededValues struct {
func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) { func NewPlayer(context *Context, src io.ReadCloser) (*Player, error) {
p := &Player{ p := &Player{
&playerImpl{ &playerImpl{
mux: context.mux, context: context,
src: src, src: src,
sampleRate: context.sampleRate, sampleRate: context.sampleRate,
buf: nil, volume: 1,
volume: 1,
closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
seekCh: make(chan struct{}),
seekedCh: make(chan struct{}),
proceedCh: make(chan []int16),
proceededCh: make(chan proceededValues),
}, },
} }
if seeker, ok := p.p.src.(io.Seeker); ok { if seeker, ok := p.p.src.(io.Seeker); ok {
@ -348,62 +341,27 @@ func (p *Player) Close() error {
func (p *playerImpl) Close() error { func (p *playerImpl) Close() error {
p.m.Lock() p.m.Lock()
defer p.m.Unlock()
p.playing = false p.playing = false
p.mux.removePlayer(p)
if p.closedExplicitly { if p.closedExplicitly {
p.m.Unlock()
return fmt.Errorf("audio: the player is already closed") return fmt.Errorf("audio: the player is already closed")
} }
p.closedExplicitly = true p.closedExplicitly = true
// src.Close is called only when Player's Close is called. // src.Close is called only when Player's Close is called.
if err := p.src.Close(); err != nil { if err := p.src.Close(); err != nil {
p.m.Unlock()
return err return err
} }
p.m.Unlock()
return p.closeImpl()
}
func (p *playerImpl) ensureReadLoop() error { if !p.runningReadLoop {
p.m.Lock()
defer p.m.Unlock()
if p.closedExplicitly {
return fmt.Errorf("audio: the player is already closed")
}
if p.runningReadLoop {
return nil return nil
} }
// Set runningReadLoop true here, not in the loop, or this causes deadlock with channels in Seek. // p.runningReadLoop is set to false in the loop.
// While the for loop doesn't start, Seeks tries to send something to the channels.
// The for loop must start without any locking.
p.runningReadLoop = true
go p.readLoop()
return nil return nil
} }
func (p *playerImpl) closeImpl() error {
p.m.Lock()
r := p.runningReadLoop
p.m.Unlock()
if !r {
return nil
}
p.closeCh <- struct{}{}
<-p.closedCh
return nil
}
func (p *playerImpl) bufferToInt16(lengthInBytes int) ([]int16, error) {
if err := p.ensureReadLoop(); err != nil {
return nil, err
}
p.proceedCh <- make([]int16, lengthInBytes/2)
r := <-p.proceededCh
return r.buf, r.err
}
// Play plays the stream. // Play plays the stream.
// //
// Play always returns nil. // Play always returns nil.
@ -414,163 +372,115 @@ func (p *Player) Play() error {
func (p *playerImpl) Play() { func (p *playerImpl) Play() {
p.m.Lock() p.m.Lock()
defer p.m.Unlock()
if p.closedExplicitly {
p.context.setError(fmt.Errorf("audio: the player is already closed"))
return
}
p.playing = true p.playing = true
p.mux.addPlayer(p) if p.runningReadLoop {
p.m.Unlock() return
}
// Set p.runningReadLoop to true here, not in the loop. This prevents duplicated active loops.
p.runningReadLoop = true
p.context.addPlayer(p)
go p.loop()
return
} }
func (p *playerImpl) readLoop() { func (p *playerImpl) loop() {
w := p.context.c.NewPlayer()
wclosed := make(chan struct{})
defer func() {
<-wclosed
w.Close()
}()
defer func() { defer func() {
p.m.Lock() p.m.Lock()
p.playing = false p.playing = false
p.context.removePlayer(p)
p.runningReadLoop = false p.runningReadLoop = false
p.m.Unlock() p.m.Unlock()
}() }()
timer := time.NewTimer(0) ch := make(chan []byte)
timerCh := timer.C defer close(ch)
var readErr error
for {
select {
case <-p.closeCh:
p.closedCh <- struct{}{}
return
case <-p.seekCh: go func() {
p.seekedCh <- struct{}{} for buf := range ch {
if timer != nil { if _, err := w.Write(buf); err != nil {
timer.Stop() p.context.setError(err)
}
timer = time.NewTimer(time.Millisecond)
timerCh = timer.C
case <-timerCh:
// If the buffer has 1 second, that's enough.
p.m.Lock()
if len(p.buf) >= p.sampleRate*bytesPerSample {
if timer != nil {
timer.Stop()
}
timer = time.NewTimer(100 * time.Millisecond)
timerCh = timer.C
p.m.Unlock()
break break
} }
// Try to read the buffer for 1/60[s].
s := 60
if web.IsAndroidChrome() {
s = 10
} else if web.IsBrowser() {
s = 20
}
l := p.sampleRate * bytesPerSample / s
l &= mask
buf := make([]byte, l)
n, err := p.src.Read(buf)
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF {
p.srcEOF = true
}
if p.srcEOF && len(p.buf) == 0 {
if timer != nil {
timer.Stop()
}
timer = nil
timerCh = nil
p.m.Unlock()
break
}
p.m.Unlock()
if err != nil && err != io.EOF {
readErr = err
if timer != nil {
timer.Stop()
}
timer = nil
timerCh = nil
break
}
if timer != nil {
timer.Stop()
}
if web.IsBrowser() {
timer = time.NewTimer(10 * time.Millisecond)
} else {
timer = time.NewTimer(time.Millisecond)
}
timerCh = timer.C
case buf := <-p.proceedCh:
if readErr != nil {
p.proceededCh <- proceededValues{buf, readErr}
return
}
p.m.Lock()
if p.shouldSkipImpl() {
// Return zero values.
p.proceededCh <- proceededValues{buf, nil}
p.m.Unlock()
break
}
lengthInBytes := len(buf) * 2
l := lengthInBytes
if l > len(p.buf) {
l = len(p.buf)
}
for i := 0; i < l/2; i++ {
buf[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8)
buf[i] = int16(float64(buf[i]) * p.volume)
}
p.pos += int64(l)
p.buf = p.buf[l:]
p.m.Unlock()
p.proceededCh <- proceededValues{buf[:l/2], nil}
} }
close(wclosed)
}()
for {
buf, ok := p.read()
if !ok {
return
}
ch <- buf
} }
} }
func (p *playerImpl) shouldSkip() bool { func (p *playerImpl) read() ([]byte, bool) {
p.m.Lock() p.m.Lock()
r := p.shouldSkipImpl() defer p.m.Unlock()
p.m.Unlock()
return r
}
func (p *playerImpl) shouldSkipImpl() bool { if p.context.hasError() {
// When p.buf is nil, the player just starts playing or seeking. return nil, false
// Note that this is different from len(p.buf) == 0 && p.buf != nil.
if p.buf == nil {
return true
} }
if p.eofImpl() {
return true if p.closedExplicitly {
return nil, false
} }
return false
}
func (p *playerImpl) bufferSizeInBytes() int { const bufSize = 2048
p.m.Lock()
s := len(p.buf)
p.m.Unlock()
return s
}
func (p *playerImpl) eof() bool { var buf []byte
p.m.Lock() var err error
r := p.eofImpl() var proceed int64
p.m.Unlock() if p.playing && p.context.playable() {
return r newBuf := make([]byte, bufSize-len(p.buf))
} n := 0
n, err = p.src.Read(newBuf)
buf = append(p.buf, newBuf[:n]...)
func (p *playerImpl) eofImpl() bool { n2 := len(buf) - len(buf)%bytesPerSample
return p.srcEOF && len(p.buf) == 0 buf, p.buf = buf[:n2], buf[n2:]
proceed = int64(len(buf))
} else {
// Fill zero values, or the driver can block forever as trying to proceed.
buf = make([]byte, bufSize)
}
if err == io.EOF && len(buf) == 0 {
return nil, false
}
if err != nil && err != io.EOF {
p.context.setError(err)
return nil, false
}
for i := 0; i < len(buf)/2; i++ {
v16 := int16(buf[2*i]) | (int16(buf[2*i+1]) << 8)
v16 = int16(float64(v16) * p.volume)
buf[2*i] = byte(v16)
buf[2*i+1] = byte(v16 >> 8)
}
p.pos += proceed
p.context.setReady()
return buf, true
} }
// IsPlaying returns boolean indicating whether the player is playing. // IsPlaying returns boolean indicating whether the player is playing.
@ -614,13 +524,12 @@ func (p *playerImpl) Seek(offset time.Duration) error {
if _, ok := p.src.(io.Seeker); !ok { if _, ok := p.src.(io.Seeker); !ok {
panic("audio: player to be sought must be io.Seeker") panic("audio: player to be sought must be io.Seeker")
} }
if err := p.ensureReadLoop(); err != nil {
return err
}
p.m.Lock() p.m.Lock()
defer p.m.Unlock()
o := int64(offset) * bytesPerSample * int64(p.sampleRate) / int64(time.Second) o := int64(offset) * bytesPerSample * int64(p.sampleRate) / int64(time.Second)
o &= mask o = o - (o % bytesPerSample)
seeker, ok := p.src.(io.Seeker) seeker, ok := p.src.(io.Seeker)
if !ok { if !ok {
@ -628,17 +537,11 @@ func (p *playerImpl) Seek(offset time.Duration) error {
} }
pos, err := seeker.Seek(o, io.SeekStart) pos, err := seeker.Seek(o, io.SeekStart)
if err != nil { if err != nil {
p.m.Unlock()
return err return err
} }
p.buf = nil p.buf = nil
p.pos = pos p.pos = pos
p.srcEOF = false
p.m.Unlock()
p.seekCh <- struct{}{}
<-p.seekedCh
return nil return nil
} }
@ -653,7 +556,6 @@ func (p *Player) Pause() error {
func (p *playerImpl) Pause() { func (p *playerImpl) Pause() {
p.m.Lock() p.m.Lock()
p.playing = false p.playing = false
p.mux.removePlayer(p)
p.m.Unlock() p.m.Unlock()
} }

View File

@ -41,6 +41,9 @@ func TestGC(t *testing.T) {
} }
p.Play() p.Play()
// 200[ms] should be enough all the bytes are consumed.
// TODO: This is a darty hack. Would it be possible to use virtual time?
time.Sleep(200 * time.Millisecond)
got = PlayersNumForTesting() got = PlayersNumForTesting()
if want := 1; got != want { if want := 1; got != want {
t.Errorf("PlayersNum() after Play: got: %d, want: %d", got, want) t.Errorf("PlayersNum() after Play: got: %d, want: %d", got, want)

View File

@ -1,145 +0,0 @@
// Copyright 2019 The Ebiten Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package audio
import (
"errors"
"io"
"runtime"
"sync"
)
type mux struct {
ps map[*playerImpl]struct{}
m sync.RWMutex
}
const (
channelNum = 2
bytesPerSample = 2 * channelNum
// TODO: This assumes that bytesPerSample is a power of 2.
mask = ^(bytesPerSample - 1)
)
func newMux() *mux {
return &mux{
ps: map[*playerImpl]struct{}{},
}
}
func (m *mux) Read(b []byte) (int, error) {
m.m.Lock()
defer m.m.Unlock()
// Check the source duplication
srcs := map[io.ReadCloser]struct{}{}
for p := range m.ps {
if _, ok := srcs[p.src]; ok {
return 0, errors.New("audio: a same source is used by multiple Player")
}
srcs[p.src] = struct{}{}
}
if len(m.ps) == 0 {
l := len(b)
l &= mask
copy(b, make([]byte, l))
return l, nil
}
l := len(b)
l &= mask
allSkipped := true
// TODO: Now a player is not locked. Should we lock it?
for p := range m.ps {
if p.shouldSkip() {
continue
}
allSkipped = false
s := p.bufferSizeInBytes()
if l > s {
l = s
l &= mask
}
}
if allSkipped {
l = 0
}
if l == 0 {
// If l is 0, all the ps might reach EOF at the next update.
// However, this Read might block forever and never causes context switch
// on single-thread environment (e.g. browser).
// Call Gosched to cause context switch on purpose.
runtime.Gosched()
}
b16s := [][]int16{}
for p := range m.ps {
buf, err := p.bufferToInt16(l)
if err != nil {
return 0, err
}
if l > len(buf)*2 {
l = len(buf) * 2
}
b16s = append(b16s, buf)
}
for i := 0; i < l/2; i++ {
x := 0
for _, b16 := range b16s {
x += int(b16[i])
}
if x > (1<<15)-1 {
x = (1 << 15) - 1
}
if x < -(1 << 15) {
x = -(1 << 15)
}
b[2*i] = byte(x)
b[2*i+1] = byte(x >> 8)
}
closed := []*playerImpl{}
for p := range m.ps {
if p.eof() {
closed = append(closed, p)
}
}
for _, p := range closed {
p.closeImpl()
delete(m.ps, p)
}
return l, nil
}
func (m *mux) addPlayer(player *playerImpl) {
m.m.Lock()
m.ps[player] = struct{}{}
m.m.Unlock()
}
func (m *mux) removePlayer(player *playerImpl) {
m.m.Lock()
delete(m.ps, player)
m.m.Unlock()
}

View File

@ -16,8 +16,8 @@ package audio
func PlayersNumForTesting() int { func PlayersNumForTesting() int {
c := CurrentContext() c := CurrentContext()
c.mux.m.Lock() c.m.Lock()
n := len(c.mux.ps) n := len(c.players)
c.mux.m.Unlock() c.m.Unlock()
return n return n
} }

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/gopherjs/gopherwasm v1.1.0 github.com/gopherjs/gopherwasm v1.1.0
github.com/hajimehoshi/bitmapfont v1.1.1 github.com/hajimehoshi/bitmapfont v1.1.1
github.com/hajimehoshi/go-mp3 v0.2.0 github.com/hajimehoshi/go-mp3 v0.2.0
github.com/hajimehoshi/oto v0.3.3 github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120
github.com/jakecoffman/cp v0.1.0 github.com/jakecoffman/cp v0.1.0
github.com/jfreymuth/oggvorbis v1.0.0 github.com/jfreymuth/oggvorbis v1.0.0
github.com/jfreymuth/vorbis v1.0.0 // indirect github.com/jfreymuth/vorbis v1.0.0 // indirect

4
go.sum
View File

@ -18,8 +18,8 @@ github.com/hajimehoshi/go-mp3 v0.1.2/go.mod h1:4i+c5pDNKDrxl1iu9iG90/+fhP37lio6g
github.com/hajimehoshi/go-mp3 v0.2.0 h1:isy34iDg+96PsNuFbTdRRXzKr6a1gc2nhsPuFSfXacY= github.com/hajimehoshi/go-mp3 v0.2.0 h1:isy34iDg+96PsNuFbTdRRXzKr6a1gc2nhsPuFSfXacY=
github.com/hajimehoshi/go-mp3 v0.2.0/go.mod h1:4i+c5pDNKDrxl1iu9iG90/+fhP37lio6gNhjCx9WBJw= github.com/hajimehoshi/go-mp3 v0.2.0/go.mod h1:4i+c5pDNKDrxl1iu9iG90/+fhP37lio6gNhjCx9WBJw=
github.com/hajimehoshi/oto v0.1.1/go.mod h1:hUiLWeBQnbDu4pZsAhOnGqMI1ZGibS6e2qhQdfpwz04= github.com/hajimehoshi/oto v0.1.1/go.mod h1:hUiLWeBQnbDu4pZsAhOnGqMI1ZGibS6e2qhQdfpwz04=
github.com/hajimehoshi/oto v0.3.3 h1:Wi7VVtxe9sF2rbDBIJtVXnpFWhRfK57hw0JY7tR2qXM= github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120 h1:A2KJGAMyVidi7XxtrgfjFwq6KS2U3hYKdf+46tIycwM=
github.com/hajimehoshi/oto v0.3.3/go.mod h1:e9eTLBB9iZto045HLbzfHJIc+jP3xaKrjZTghvb6fdM= github.com/hajimehoshi/oto v0.3.4-0.20190429120749-cf91156c8120/go.mod h1:e9eTLBB9iZto045HLbzfHJIc+jP3xaKrjZTghvb6fdM=
github.com/jakecoffman/cp v0.1.0 h1:sgSYEGUgfwiT447fRjloa2c5b6UyYP+7muR3gQK+Ep0= github.com/jakecoffman/cp v0.1.0 h1:sgSYEGUgfwiT447fRjloa2c5b6UyYP+7muR3gQK+Ep0=
github.com/jakecoffman/cp v0.1.0/go.mod h1:a3xPx9N8RyFAACD644t2dj/nK4SuLg1v+jL61m2yVo4= github.com/jakecoffman/cp v0.1.0/go.mod h1:a3xPx9N8RyFAACD644t2dj/nK4SuLg1v+jL61m2yVo4=
github.com/jfreymuth/oggvorbis v1.0.0 h1:aOpiihGrFLXpsh2osOlEvTcg5/aluzGQeC7m3uYWOZ0= github.com/jfreymuth/oggvorbis v1.0.0 h1:aOpiihGrFLXpsh2osOlEvTcg5/aluzGQeC7m3uYWOZ0=