From 79985535c3936575c8dc49b26d6837c8893655fc Mon Sep 17 00:00:00 2001 From: Hajime Hoshi Date: Tue, 5 Apr 2016 01:04:14 +0900 Subject: [PATCH] audio: Refactoring: mixingStream has the global lock --- exp/audio/audio.go | 161 ++++++++++++++++++++++++++++----------------- 1 file changed, 102 insertions(+), 59 deletions(-) diff --git a/exp/audio/audio.go b/exp/audio/audio.go index acc8a2cd9..c7f3c9afc 100644 --- a/exp/audio/audio.go +++ b/exp/audio/audio.go @@ -24,8 +24,11 @@ import ( ) type mixingStream struct { - context *Context + sampleRate int writtenBytes int + frames int + players map[*Player]struct{} + sync.Mutex } func min(a, b int) int { @@ -45,16 +48,16 @@ const ( ) func (s *mixingStream) Read(b []byte) (int, error) { - s.context.Lock() - defer s.context.Unlock() + s.Lock() + defer s.Unlock() - bytesPerFrame := s.context.sampleRate * bytesPerSample * channelNum / ebiten.FPS - x := s.context.frames*bytesPerFrame + len(b) + bytesPerFrame := s.sampleRate * bytesPerSample * channelNum / ebiten.FPS + x := s.frames*bytesPerFrame + len(b) if x <= s.writtenBytes { return 0, nil } - if len(s.context.players) == 0 { + if len(s.players) == 0 { l := min(len(b), x-s.writtenBytes) l &= mask copy(b, make([]byte, l)) @@ -63,7 +66,7 @@ func (s *mixingStream) Read(b []byte) (int, error) { } closed := []*Player{} l := len(b) - for p := range s.context.players { + for p := range s.players { _, err := p.readToBuffer(l) if err == io.EOF { closed = append(closed, p) @@ -74,7 +77,7 @@ func (s *mixingStream) Read(b []byte) (int, error) { } l &= mask b16s := [][]int16{} - for p := range s.context.players { + for p := range s.players { b16s = append(b16s, p.bufferToInt16(l)) } for i := 0; i < l/2; i++ { @@ -91,37 +94,101 @@ func (s *mixingStream) Read(b []byte) (int, error) { b[2*i] = byte(x) b[2*i+1] = byte(x >> 8) } - for p := range s.context.players { + for p := range s.players { p.proceed(l) } for _, p := range closed { - delete(s.context.players, p) + delete(s.players, p) } s.writtenBytes += l return l, nil } +func (s *mixingStream) update() { + s.Lock() + defer s.Unlock() + s.frames++ +} + +func (s *mixingStream) newPlayer(src ReadSeekCloser) (*Player, error) { + s.Lock() + defer s.Unlock() + p := &Player{ + stream: s, + src: src, + buf: []byte{}, + volume: 1, + } + // Get the current position of the source. + pos, err := p.src.Seek(0, 1) + if err != nil { + return nil, err + } + p.pos = pos + runtime.SetFinalizer(p, (*Player).Close) + return p, nil +} + +func (s *mixingStream) closePlayer(player *Player) error { + s.Lock() + defer s.Unlock() + runtime.SetFinalizer(player, nil) + return player.src.Close() +} + +func (s *mixingStream) addPlayer(player *Player) { + s.Lock() + defer s.Unlock() + s.players[player] = struct{}{} +} + +func (s *mixingStream) removePlayer(player *Player) { + s.Lock() + defer s.Unlock() + delete(s.players, player) +} + +func (s *mixingStream) hasPlayer(player *Player) bool { + s.Lock() + defer s.Unlock() + _, ok := s.players[player] + return ok +} + +func (s *mixingStream) seekPlayer(player *Player, offset time.Duration) error { + s.Lock() + defer s.Unlock() + o := int64(offset) * bytesPerSample * channelNum * int64(s.sampleRate) / int64(time.Second) + o &= mask + return player.seek(o) +} + +func (s *mixingStream) playerCurrent(player *Player) time.Duration { + s.Lock() + defer s.Unlock() + sample := player.pos / bytesPerSample / channelNum + return time.Duration(sample) * time.Second / time.Duration(s.sampleRate) +} + // TODO: Enable to specify the format like Mono8? type Context struct { sampleRate int - players map[*Player]struct{} - frames int + stream *mixingStream errorCh chan error - sync.Mutex } func NewContext(sampleRate int) (*Context, error) { // TODO: Panic if one context exists. c := &Context{ sampleRate: sampleRate, - players: map[*Player]struct{}{}, errorCh: make(chan error), } - s := &mixingStream{ - context: c, + c.stream = &mixingStream{ + sampleRate: sampleRate, + players: map[*Player]struct{}{}, } - p, err := newPlayer(s, c.sampleRate) + p, err := newPlayer(c.stream, c.sampleRate) if err != nil { return nil, err } @@ -150,21 +217,18 @@ func NewContext(sampleRate int) (*Context, error) { // you will find audio stops when the game stops e.g. when the window is deactivated. // In unsync mode, the audio never stops even when the game stops. func (c *Context) Update() error { - c.Lock() - defer c.Unlock() select { case err := <-c.errorCh: return err default: } - c.frames++ + c.stream.update() return nil } // SampleRate returns the sample rate. +// All audio source must have the same sample rate. func (c *Context) SampleRate() int { - c.Lock() - defer c.Unlock() return c.sampleRate } @@ -174,11 +238,11 @@ type ReadSeekCloser interface { } type Player struct { - context *Context - src ReadSeekCloser - buf []byte - pos int64 - volume float64 + stream *mixingStream + src ReadSeekCloser + buf []byte + pos int64 + volume float64 } // NewPlayer creates a new player with the given data to the given channel. @@ -188,27 +252,11 @@ type Player struct { // src's format must be linear PCM (16bits, 2 channel stereo, little endian) // without a header (e.g. RIFF header). func (c *Context) NewPlayer(src ReadSeekCloser) (*Player, error) { - c.Lock() - defer c.Unlock() - p := &Player{ - context: c, - src: src, - buf: []byte{}, - volume: 1, - } - // Get the current position of the source. - pos, err := p.src.Seek(0, 1) - if err != nil { - return nil, err - } - p.pos = pos - runtime.SetFinalizer(p, (*Player).Close) - return p, nil + return c.stream.newPlayer(src) } func (p *Player) Close() error { - runtime.SetFinalizer(p, nil) - return p.src.Close() + return p.stream.closePlayer(p) } func (p *Player) readToBuffer(length int) (int, error) { @@ -239,16 +287,12 @@ func (p *Player) bufferLength() int { } func (p *Player) Play() error { - p.context.Lock() - defer p.context.Unlock() - - p.context.players[p] = struct{}{} + p.stream.addPlayer(p) return nil } func (p *Player) IsPlaying() bool { - _, ok := p.context.players[p] - return ok + return p.stream.hasPlayer(p) } func (p *Player) Rewind() error { @@ -256,10 +300,12 @@ func (p *Player) Rewind() error { } func (p *Player) Seek(offset time.Duration) error { + return p.stream.seekPlayer(p, offset) +} + +func (p *Player) seek(offset int64) error { p.buf = []byte{} - o := int64(offset) * bytesPerSample * channelNum * int64(p.context.sampleRate) / int64(time.Second) - o &= mask - pos, err := p.src.Seek(o, 0) + pos, err := p.src.Seek(offset, 0) if err != nil { return err } @@ -268,15 +314,12 @@ func (p *Player) Seek(offset time.Duration) error { } func (p *Player) Pause() error { - p.context.Lock() - defer p.context.Unlock() - - delete(p.context.players, p) + p.stream.removePlayer(p) return nil } func (p *Player) Current() time.Duration { - return time.Duration(p.pos/bytesPerSample/channelNum) * time.Second / time.Duration(p.context.sampleRate) + return p.stream.playerCurrent(p) } func (p *Player) Volume() float64 {