# https://www.mp3-tech.org/programmer/frame_header.html
# http://gabriel.mp3-tech.org/mp3infotag.html
__all__ = [
'LAMEEncodingFlags',
'LAMEHeader',
'LAMEReplayGain',
'MP3',
'MP3StreamInfo',
'MPEGFrameHeader',
'VBRIHeader',
'VBRIToC',
'XingHeader',
'XingToC',
]
import os
import re
import struct
from functools import lru_cache
import more_itertools
from attr import (
attrib,
attrs,
)
from tbm_utils import (
AttrMapping,
LabelList,
datareader,
humanize_filesize,
)
from .id3v1 import ID3v1
from .id3v2 import (
ID3v2,
ID3v2Frames,
)
from .tables import (
LAMEBitrateMode,
LAMEChannelMode,
LAMEPreset,
LAMEReplayGainOrigin,
LAMEReplayGainType,
LAMESurroundInfo,
MP3BitrateMode,
MP3Bitrates,
MP3ChannelMode,
MP3SampleRates,
MP3SamplesPerFrame,
)
from ..exceptions import FormatError
from ..models import (
Format,
StreamInfo,
)
from ..utils import (
humanize_bitrate,
humanize_sample_rate,
)
try:
import bitstruct.c as bitstruct
bitstruct.Error = (TypeError, ValueError)
except ImportError:
import bitstruct
bitstruct.Error = (bitstruct.Error,)
[docs]@attrs(
repr=False,
kw_only=True,
)
class LAMEReplayGain(AttrMapping):
peak = attrib()
track_type = attrib(converter=LAMEReplayGainType)
track_origin = attrib(converter=LAMEReplayGainOrigin)
track_adjustment = attrib()
album_type = attrib(converter=LAMEReplayGainType)
album_origin = attrib(converter=LAMEReplayGainOrigin)
album_adjustment = attrib()
@datareader
@classmethod
def parse(cls, data):
peak_data = struct.unpack('>I', data.read(4))[0]
if peak_data == 0:
gain_peak = None
else:
gain_peak = peak_data / 2 ** 23
track_gain_type_, track_gain_origin_, track_gain_sign, track_gain_adjustment_ = bitstruct.unpack(
'u3 u3 b1 u9',
data.read(2),
)
track_gain_type = LAMEReplayGainType(track_gain_type_)
track_gain_origin = LAMEReplayGainOrigin(track_gain_origin_)
track_gain_adjustment = track_gain_adjustment_ / 10.0
if track_gain_sign:
track_gain_adjustment *= -1
album_gain_type_, album_gain_origin_, album_gain_sign, album_gain_adjustment_ = bitstruct.unpack(
'u3 u3 b1 u9',
data.read(2),
)
album_gain_type = LAMEReplayGainType(album_gain_type_)
album_gain_origin = LAMEReplayGainOrigin(album_gain_origin_)
album_gain_adjustment = album_gain_adjustment_ / 10.0
if album_gain_sign:
album_gain_adjustment *= -1
return cls(
peak=gain_peak,
track_type=track_gain_type,
track_origin=track_gain_origin,
track_adjustment=track_gain_adjustment,
album_type=album_gain_type,
album_origin=album_gain_origin,
album_adjustment=album_gain_adjustment,
)
[docs]@attrs(
repr=False,
kw_only=True,
)
class LAMEEncodingFlags(AttrMapping):
nogap_continuation = attrib(converter=bool)
nogap_continued = attrib(converter=bool)
nssafejoint = attrib(converter=bool)
nspsytune = attrib(converter=bool)
[docs]class XingToC(LabelList):
item_label = ('entry', 'entries')
[docs]class VBRIToC(LabelList):
item_label = ('entry', 'entries')
[docs]@attrs(
repr=False,
kw_only=True,
)
class MP3StreamInfo(StreamInfo):
_start = attrib()
_end = attrib()
_size = attrib()
_vbri = attrib()
_xing = attrib()
version = attrib()
layer = attrib()
protected = attrib(converter=bool)
bitrate = attrib()
bitrate_mode = attrib(converter=MP3BitrateMode)
channel_mode = attrib(converter=MP3ChannelMode)
channels = attrib()
duration = attrib()
sample_rate = attrib()
@datareader
@staticmethod
def count_mpeg_frames(data):
num_frames = 0
buffer_size = 128
buffer = data.peek(buffer_size)
while len(buffer) >= buffer_size:
sync_start = buffer.find(b'\xFF')
if sync_start >= 0: # pragma: nobranch
data.seek(sync_start, os.SEEK_CUR)
try:
frame = MPEGFrameHeader.parse(data)
num_frames += 1
data.seek(frame._start + frame._size, os.SEEK_SET)
except (FormatError, *bitstruct.Error): # pragma: nocover
data.seek(1, os.SEEK_CUR)
else:
data.seek(buffer_size, os.SEEK_CUR)
buffer = data.peek(buffer_size)
return num_frames
@datareader
@staticmethod
@lru_cache()
def find_mpeg_frames(data):
frames = []
cached_frames = None
buffer_size = 128
buffer = data.peek(buffer_size)
while len(buffer) >= buffer_size:
sync_start = buffer.find(b'\xFF')
if sync_start >= 0:
data.seek(sync_start, os.SEEK_CUR)
if bitstruct.unpack('u11', data.peek(2))[0] == 2047:
for _ in range(4):
try:
frame = MPEGFrameHeader.parse(data)
frames.append(frame)
if frame._xing and frame._xing.num_frames:
break
data.seek(frame._start + frame._size, os.SEEK_SET)
except (FormatError, *bitstruct.Error):
data.seek(1, os.SEEK_CUR)
break
else:
data.seek(sync_start + 1, os.SEEK_CUR)
if frames and (len(frames) >= 4 or frames[0]._xing):
break
if len(frames) >= 2 and cached_frames is None:
cached_frames = frames.copy()
del frames[:]
else:
data.seek(buffer_size, os.SEEK_CUR)
buffer = data.peek(buffer_size)
# I actually found a PNG file that had multiple consecutive MPEG frames parsed.
# The all_equal check combats this false positive by
# making sure certain attributes don't change between frames.
if not frames:
if (
cached_frames
and more_itertools.all_equal(
[
frame.channel_mode,
frame.channels,
frame.layer,
frame.sample_rate,
frame.version,
]
for frame in cached_frames
)
):
frames = cached_frames
else:
raise FormatError("No XING header and insufficient MPEG frames.")
return frames
@datareader
@classmethod
def parse(cls, data):
frames = cls.find_mpeg_frames(data)
samples_per_frame, _ = MP3SamplesPerFrame[(frames[0].version, frames[0].layer)]
data.seek(0, os.SEEK_END)
end_pos = data.tell()
# This is an arbitrary amount that should hopefully encompass all end tags.
# Starting low so as not to add unnecessary processing time.
chunk_size = 64 * 1024
if end_pos > chunk_size:
data.seek(-chunk_size, os.SEEK_END)
else:
data.seek(0, os.SEEK_SET)
end_buffer = data.read()
end_tag_offset = 0
for tag_type in [b'APETAGEX', b'LYRICSBEGIN', b'TAG']:
tag_offset = end_buffer.rfind(tag_type)
if tag_offset > 0:
tag_offset = len(end_buffer) - tag_offset
if tag_offset > end_tag_offset:
end_tag_offset = tag_offset
audio_start = frames[0]._start
audio_end = end_pos - end_tag_offset
audio_size = audio_end - audio_start
bitrate_mode = MP3BitrateMode.UNKNOWN
vbri_header = frames[0]._vbri
xing_header = frames[0]._xing
if xing_header:
if xing_header.num_frames:
num_samples = samples_per_frame * xing_header.num_frames
else:
# Some XING headers have num_frames==0.
# Manually count all the MPEG frames for bitrate/duration calculations.
# Remove XING frame from frame count for bitrate calculation accuracy.
data.seek(frames[0]._start, os.SEEK_SET)
num_samples = samples_per_frame * (cls.count_mpeg_frames(data) - 1)
# I prefer to include the Xing/LAME header as part of the audio.
# Google Music seems to do so for calculating client ID.
# Haven't tested in too many other scenarios.
# But, there should be enough low-level info for people to calculate this if desired.
if xing_header._lame:
# Old versions of LAME wrote invalid delay/padding
# for short MP3s with low bitrate.
# Subtract them only them if there would be samples left.
lame_padding = xing_header._lame.delay + xing_header._lame.padding
if lame_padding < num_samples:
num_samples -= lame_padding
if xing_header._lame.bitrate_mode in [1, 8]:
bitrate_mode = MP3BitrateMode.CBR
elif xing_header._lame.bitrate_mode in [2, 9]:
bitrate_mode = MP3BitrateMode.ABR
elif xing_header._lame.bitrate_mode in [3, 4, 5, 6]:
bitrate_mode = MP3BitrateMode.VBR
elif vbri_header:
num_samples = samples_per_frame * vbri_header.num_frames
bitrate_mode = MP3BitrateMode.VBR
else:
num_samples = samples_per_frame * (audio_size / frames[0]._size)
if bitrate_mode is MP3BitrateMode.UNKNOWN:
if more_itertools.all_equal([frame['bitrate'] for frame in frames]): # pragma: nobranch
bitrate_mode = MP3BitrateMode.CBR
if bitrate_mode is MP3BitrateMode.CBR:
bitrate = frames[0].bitrate
else:
# Subtract Xing/LAME frame size from audio_size for bitrate calculation accuracy.
if xing_header:
bitrate = ((audio_size - frames[0]._size) * 8 * frames[0].sample_rate) / num_samples
else:
bitrate = (audio_size * 8 * frames[0].sample_rate) / num_samples
duration = (audio_size * 8) / bitrate
version = frames[0].version
layer = frames[0].layer
protected = frames[0].protected
sample_rate = frames[0].sample_rate
channel_mode = frames[0].channel_mode
channels = frames[0].channels
return cls(
start=audio_start,
end=audio_end,
size=audio_size,
vbri=vbri_header,
xing=xing_header,
version=version,
layer=layer,
protected=protected,
bitrate=bitrate,
bitrate_mode=bitrate_mode,
channel_mode=channel_mode,
channels=channels,
duration=duration,
sample_rate=sample_rate,
)
[docs]class MP3(Format):
"""MP3 file format object.
Extends `Format`.
Attributes:
pictures (list): A list of :class:`ID3v2Picture` objects.
streaminfo (MP3StreamInfo): The audio stream information.
tags (ID3v2Frames): The ID3v2 tag frames, if present.
"""
tags_type = ID3v2Frames
@classmethod
def parse(cls, data):
self = super()._load(data)
try:
self._id3 = ID3v2.parse(self._obj)
self.pictures = self._id3.pictures
self.tags = self._id3.tags
except FormatError:
self._obj.seek(0, os.SEEK_SET)
self.streaminfo = MP3StreamInfo.parse(self._obj)
# Use ID3v1 if present and ID3v2 is not.
if '_id3' not in self:
self._obj.seek(self.streaminfo._start + self.streaminfo._size, os.SEEK_SET)
end_buffer = self._obj.read()
apev2_index = end_buffer.find(b'APETAGEX')
if apev2_index != -1:
end_buffer = end_buffer[apev2_index + 8:]
id3v1_index = end_buffer.find(b'TAG')
if id3v1_index != -1:
id3v1 = ID3v1.parse(end_buffer[id3v1_index : id3v1_index + 128])
self._id3 = id3v1
self.tags = self._id3.tags
self._obj.close()
return self