audio: Non-blocking reading buffers

This commit is contained in:
Hajime Hoshi 2017-06-04 16:46:02 +09:00
parent 10bd1c1786
commit 8957b1b857

View File

@ -39,8 +39,7 @@ import (
)
type players struct {
players map[*Player]struct{}
seekings map[*Player]struct{}
players map[*Player]struct{}
sync.RWMutex
}
@ -65,9 +64,6 @@ func (p *players) Read(b []uint8) (int, error) {
players := []*Player{}
for player := range p.players {
if _, ok := p.seekings[player]; ok {
continue
}
players = append(players, player)
}
if len(players) == 0 {
@ -79,12 +75,13 @@ func (p *players) Read(b []uint8) (int, error) {
closed := []*Player{}
l := len(b)
for _, player := range players {
if err := player.readToBuffer(l); err == io.EOF {
n, err := player.readToBuffer(l)
if err == io.EOF {
closed = append(closed, player)
} else if err != nil {
return 0, err
}
l = min(player.bufferLength(), l)
l = min(n, l)
}
l &= mask
b16s := [][]int16{}
@ -126,18 +123,6 @@ func (p *players) removePlayer(player *Player) {
p.Unlock()
}
func (p *players) addSeeking(player *Player) {
p.Lock()
p.seekings[player] = struct{}{}
p.Unlock()
}
func (p *players) removeSeeking(player *Player) {
p.Lock()
delete(p.seekings, player)
p.Unlock()
}
func (p *players) hasPlayer(player *Player) bool {
p.RLock()
_, ok := p.players[player]
@ -213,8 +198,7 @@ func NewContext(sampleRate int) (*Context, error) {
}
theContext = c
c.players = &players{
players: map[*Player]struct{}{},
seekings: map[*Player]struct{}{},
players: map[*Player]struct{}{},
}
return c, nil
@ -306,6 +290,7 @@ type Player struct {
players *players
src ReadSeekCloser
sampleRate int
readingCh chan error
buf []uint8
pos int64
@ -380,21 +365,41 @@ func (p *Player) Close() error {
return err
}
func (p *Player) readToBuffer(length int) error {
bb := make([]uint8, length)
p.srcM.Lock()
n, err := p.src.Read(bb)
p.srcM.Unlock()
if 0 < n {
p.m.Lock()
p.buf = append(p.buf, bb[:n]...)
p.m.Unlock()
func (p *Player) readToBuffer(length int) (int, error) {
if p.readingCh == nil {
p.readingCh = make(chan error)
go func() {
defer close(p.readingCh)
bb := make([]uint8, length)
p.srcM.Lock()
n, err := p.src.Read(bb)
p.srcM.Unlock()
if err != nil {
p.readingCh <- err
return
}
if 0 < n {
p.m.Lock()
p.buf = append(p.buf, bb[:n]...)
p.m.Unlock()
}
}()
}
select {
case err := <-p.readingCh:
p.readingCh = nil
return len(p.buf), err
case <-time.After(15 * time.Millisecond):
return length, nil
}
return err
}
func (p *Player) bufferToInt16(lengthInBytes int) []int16 {
r := make([]int16, lengthInBytes/2)
// This function must be called on the same goruotine of readToBuffer.
if p.readingCh != nil {
return r
}
p.m.RLock()
for i := 0; i < lengthInBytes/2; i++ {
r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8)
@ -405,19 +410,16 @@ func (p *Player) bufferToInt16(lengthInBytes int) []int16 {
}
func (p *Player) proceed(length int) {
// This function must be called on the same goruotine of readToBuffer.
if p.readingCh != nil {
return
}
p.m.Lock()
p.buf = p.buf[length:]
p.pos += int64(length)
p.m.Unlock()
}
func (p *Player) bufferLength() int {
p.m.RLock()
l := len(p.buf)
p.m.RUnlock()
return l
}
// Play plays the stream.
//
// Play always returns nil.
@ -450,12 +452,9 @@ func (p *Player) Rewind() error {
//
// Seek returns error when seeking the source returns error.
func (p *Player) Seek(offset time.Duration) error {
p.players.addSeeking(p)
defer p.players.removeSeeking(p)
o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second)
o &= mask
p.srcM.Lock()
// TODO: Seek finishes soon but after that, Read takes long time and players.Read can block.
pos, err := p.src.Seek(o, io.SeekStart)
p.srcM.Unlock()
if err != nil {