audio: Async reading sources (#443)

This commit is contained in:
Hajime Hoshi 2017-12-23 02:05:49 +09:00
parent 31350dc497
commit a5f8c1b1f8

View File

@ -61,7 +61,7 @@ const (
mask = ^(channelNum*bytesPerSample - 1) mask = ^(channelNum*bytesPerSample - 1)
) )
func (p *players) Read(b []uint8) (int, error) { func (p *players) Read(b []byte) (int, error) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
@ -72,25 +72,18 @@ func (p *players) Read(b []uint8) (int, error) {
if len(players) == 0 { if len(players) == 0 {
l := len(b) l := len(b)
l &= mask l &= mask
copy(b, make([]uint8, l)) copy(b, make([]byte, l))
return l, nil return l, nil
} }
closed := []*Player{}
for _, player := range players {
player.resetBufferIfSeeking()
}
l := len(b) l := len(b)
for _, player := range players { for _, player := range players {
n, err := player.readToBuffer(l) select {
if err == io.EOF { case err := <-player.readCh:
closed = append(closed, player) if err != nil {
} else if err != nil {
return 0, err return 0, err
} }
if n < l { default:
l = n
} }
} }
l &= mask l &= mask
@ -115,12 +108,11 @@ func (p *players) Read(b []uint8) (int, error) {
} }
for _, player := range players { for _, player := range players {
player.proceed(l) if player.eof() {
delete(p.players, player)
}
} }
for _, pl := range closed {
delete(p.players, pl)
}
return l, nil return l, nil
} }
@ -278,7 +270,7 @@ func (c *Context) loop() {
l := (c.frames * int64(bytesPerFrame)) - c.writtenBytes l := (c.frames * int64(bytesPerFrame)) - c.writtenBytes
l &= mask l &= mask
c.writtenBytes += l c.writtenBytes += l
buf := make([]uint8, l) buf := make([]byte, l)
if _, err := io.ReadFull(c.players, buf); err != nil { if _, err := io.ReadFull(c.players, buf); err != nil {
audiobinding.SetError(err) audiobinding.SetError(err)
return return
@ -314,7 +306,7 @@ type bytesReadSeekCloser struct {
reader *bytes.Reader reader *bytes.Reader
} }
func (b *bytesReadSeekCloser) Read(buf []uint8) (int, error) { func (b *bytesReadSeekCloser) Read(buf []byte) (int, error) {
return b.reader.Read(buf) return b.reader.Read(buf)
} }
@ -328,12 +320,12 @@ func (b *bytesReadSeekCloser) Close() error {
} }
// BytesReadSeekCloser creates ReadSeekCloser from bytes. // BytesReadSeekCloser creates ReadSeekCloser from bytes.
func BytesReadSeekCloser(b []uint8) ReadSeekCloser { func BytesReadSeekCloser(b []byte) ReadSeekCloser {
return &bytesReadSeekCloser{reader: bytes.NewReader(b)} return &bytesReadSeekCloser{reader: bytes.NewReader(b)}
} }
type readingResult struct { type readingResult struct {
data []uint8 data []byte
err error err error
} }
@ -341,16 +333,17 @@ type readingResult struct {
type Player struct { type Player struct {
players *players players *players
src ReadSeekCloser src ReadSeekCloser
srcEOF bool
sampleRate int sampleRate int
buf []uint8 buf []byte
pos int64 pos int64
volume float64 volume float64
seeking bool readCh chan error
nextPos int64 closeCh chan struct{}
closedCh chan struct{}
srcM sync.Mutex
m sync.RWMutex m sync.RWMutex
} }
@ -372,7 +365,7 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) {
players: context.players, players: context.players,
src: src, src: src,
sampleRate: context.sampleRate, sampleRate: context.sampleRate,
buf: []uint8{}, buf: []byte{},
volume: 1, volume: 1,
} }
// Get the current position of the source. // Get the current position of the source.
@ -393,7 +386,7 @@ func NewPlayer(context *Context, src ReadSeekCloser) (*Player, error) {
// The format of src should be same as noted at NewPlayer. // The format of src should be same as noted at NewPlayer.
// //
// NewPlayerFromBytes's error is always nil as of 1.5.0-alpha. // NewPlayerFromBytes's error is always nil as of 1.5.0-alpha.
func NewPlayerFromBytes(context *Context, src []uint8) (*Player, error) { func NewPlayerFromBytes(context *Context, src []byte) (*Player, error) {
b := BytesReadSeekCloser(src) b := BytesReadSeekCloser(src)
p, err := NewPlayer(context, b) p, err := NewPlayer(context, b)
if err != nil { if err != nil {
@ -410,44 +403,40 @@ func NewPlayerFromBytes(context *Context, src []uint8) (*Player, error) {
// //
// Close returns error when closing the source returns error. // Close returns error when closing the source returns error.
func (p *Player) Close() error { func (p *Player) Close() error {
p.players.removePlayer(p)
runtime.SetFinalizer(p, nil) runtime.SetFinalizer(p, nil)
p.srcM.Lock() p.players.removePlayer(p)
err := p.src.Close()
p.srcM.Unlock()
return err
}
func (p *Player) readToBuffer(length int) (int, error) { p.m.Lock()
b := make([]uint8, length) err := p.src.Close()
p.srcM.Lock() p.m.Unlock()
n, err := p.src.Read(b)
p.srcM.Unlock() close(p.closeCh)
if err != nil { <-p.closedCh
return 0, err
} return err
p.buf = append(p.buf, b[:n]...)
return len(p.buf), nil
} }
func (p *Player) bufferToInt16(lengthInBytes int) []int16 { func (p *Player) bufferToInt16(lengthInBytes int) []int16 {
r := make([]int16, lengthInBytes/2) r := make([]int16, lengthInBytes/2)
// This function must be called on the same goruotine of readToBuffer.
p.m.RLock() p.m.Lock()
for i := 0; i < lengthInBytes/2; i++ { l := lengthInBytes
if len(p.buf) < lengthInBytes {
if !p.srcEOF {
p.m.Unlock()
return r
}
l = len(p.buf)
}
for i := 0; i < l/2; i++ {
r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8) r[i] = int16(p.buf[2*i]) | (int16(p.buf[2*i+1]) << 8)
r[i] = int16(float64(r[i]) * p.volume) r[i] = int16(float64(r[i]) * p.volume)
} }
p.m.RUnlock()
return r
}
func (p *Player) proceed(length int) { p.pos += int64(l)
// This function must be called on the same goruotine of readToBuffer. p.buf = p.buf[l:]
p.m.Lock()
p.buf = p.buf[length:]
p.pos += int64(length)
p.m.Unlock() p.m.Unlock()
return r
} }
// Play plays the stream. // Play plays the stream.
@ -455,9 +444,67 @@ func (p *Player) proceed(length int) {
// Play always returns nil. // Play always returns nil.
func (p *Player) Play() error { func (p *Player) Play() error {
p.players.addPlayer(p) p.players.addPlayer(p)
p.startRead()
return nil return nil
} }
func (p *Player) startRead() {
p.m.Lock()
if p.readCh == nil {
p.readCh = make(chan error)
p.closeCh = make(chan struct{})
p.closedCh = make(chan struct{})
p.srcEOF = false
go func() {
if err := p.readLoop(); err != nil {
p.readCh <- err
}
p.m.Lock()
p.readCh = nil
p.m.Unlock()
close(p.closedCh)
}()
}
p.m.Unlock()
}
func (p *Player) readLoop() error {
t := time.Tick(1 * time.Millisecond)
for {
select {
case <-p.closeCh:
p.closeCh = nil
return nil
case <-t:
p.m.Lock()
if len(p.buf) < 4096*16 && !p.srcEOF {
buf := make([]byte, 4096)
n, err := p.src.Read(buf)
p.buf = append(p.buf, buf[:n]...)
if err == io.EOF {
p.srcEOF = true
}
if p.srcEOF && len(p.buf) == 0 {
p.m.Unlock()
return nil
}
if err != nil && err != io.EOF {
p.m.Unlock()
return err
}
}
p.m.Unlock()
}
}
}
func (p *Player) eof() bool {
p.m.Lock()
r := p.srcEOF && len(p.buf) == 0
p.m.Unlock()
return r
}
// IsPlaying returns boolean indicating whether the player is playing. // IsPlaying returns boolean indicating whether the player is playing.
func (p *Player) IsPlaying() bool { func (p *Player) IsPlaying() bool {
return p.players.hasPlayer(p) return p.players.hasPlayer(p)
@ -476,28 +523,19 @@ func (p *Player) Rewind() error {
func (p *Player) Seek(offset time.Duration) error { func (p *Player) Seek(offset time.Duration) error {
o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second) o := int64(offset) * bytesPerSample * channelNum * int64(p.sampleRate) / int64(time.Second)
o &= mask o &= mask
p.srcM.Lock()
p.m.Lock()
pos, err := p.src.Seek(o, io.SeekStart) pos, err := p.src.Seek(o, io.SeekStart)
p.srcM.Unlock()
if err != nil { if err != nil {
p.m.Unlock()
return err return err
} }
p.m.Lock() p.buf = nil
p.seeking = true p.pos = pos
p.nextPos = pos p.srcEOF = false
p.m.Unlock() p.m.Unlock()
return nil
}
func (p *Player) resetBufferIfSeeking() { return nil
// This function must be called on the same goruotine of readToBuffer.
p.m.Lock()
if p.seeking {
p.buf = []uint8{}
p.pos = p.nextPos
p.seeking = false
}
p.m.Unlock()
} }
// Pause pauses the playing. // Pause pauses the playing.