代码拉取完成,页面将自动刷新
# coding=utf-8
import time
import multiprocessing as mp
import tal_data
import tal_audio_issue as tai
def mp_trunc_worker(tag, *args):
_, is_trunc = tai.is_audio_truncated(*args)
return tag, is_trunc
def mp_volume_low_worker(tag, *args):
_, is_low_volume = tai.is_audio_low_volume(*args)
return tag, is_low_volume
def mp_readless_worker(tag, *args):
is_trunc = False
# _, is_trunc = tai.is_audio_truncated(*args)
_, is_readless = tai.is_audio_readless(*args)
if is_trunc:
is_readless = False # 去掉截断的
return tag, is_readless
def mp_readmore_worker(tag, *args):
_, is_readmore = tai.is_audio_readmore(*args)
return tag, is_readmore
def calc_acc_pr(p_label_result, n_label_result):
t, f = 0, 0
print('p标注', len(p_label_result))
for stem, val in p_label_result.items():
if val:
t += 1
print(stem, val, sep='\t')
print('n标注', len(n_label_result))
for stem, val in n_label_result.items():
if not val:
f += 1
print(stem, val, sep='\t')
tp = t
fn = len(p_label_result) - tp
tn = f
fp = len(n_label_result) - tn
print(f'tp={tp}', f'fn={fn}')
print(f'tn={tn}', f'fp={fp}')
al = tp + tn + fp + fn
acc = (tp + tn) / al
p = tp / (tp + fp)
r = tp / (tp + fn)
f1 = 2 * p * r / (p + r)
# print(f'all={al}', f'acc={acc}', f'p={p}', f'r={r}', f'f1={f1}')
print(f'all={al}', f'acc={acc * 100:.2f}%', f'p={p * 100:.2f}%', f'r={r * 100:.2f}%', f'f1={f1 * 100:.2f}%')
def test_det_readless():
stem_label_dict = tal_data.get_stem_label_dict('L1')
stem_audio_dict = tal_data.get_stem_audiopath_dict('L1')
stem_json_dict = tal_data.get_stem_jsonpath_dict('L1')
valid_stem = [x[0] for x in stem_label_dict.items()]
print(f'valid stem cnt {len(valid_stem)}')
final_count_target = {}
final_count_not_target = {}
with mp.Pool() as pool:
pool_instances = []
for stem in valid_stem:
audiopath = stem_audio_dict[stem] # 音频文件路径
jsonpath = stem_json_dict[stem] # 先声评测结果json路径
label_debug_ = stem_label_dict[stem]
pool_instances.append(
# pool.apply_async(mp_readless_worker, args=(stem, audiopath, jsonpath))
pool.apply_async(mp_readless_worker, args=(stem, audiopath, jsonpath, label_debug_))
)
for i, instance in enumerate(pool_instances):
stem, readless_flag = instance.get()
label = stem_label_dict[stem]
print(i, stem, label, readless_flag, sep='\t')
if label.find('少读') >= 0:
final_count_target[stem] = readless_flag
else:
final_count_not_target[stem] = readless_flag
# for stem in valid_stem:
# label = stem_label_dict[stem]
# audiofile = stem_audio_dict[stem]
# jsonfile = stem_json_dict[stem]
# _, is_trunc = mp_trunc_worker(stem, audiofile, jsonfile)
# _, readless_flag = mp_readless_worker(stem, None, jsonfile)
#
# if is_trunc:
# readless_flag = False
#
# if label.find('少读') >= 0:
# final_count_target[stem] = readless_flag
# else:
# final_count_not_target[stem] = readless_flag
calc_acc_pr(final_count_target, final_count_not_target)
def test_det_readmore():
stem_json_dict = tal_data.get_stem_jsonpath_dict('L1')
stem_label_dict = tal_data.get_stem_label_dict('L1')
valid_stem = [x[0] for x in stem_label_dict.items()]
print(f'valid stem cnt {len(valid_stem)}')
final_count_target = {}
final_count_not_target = {}
for stem in valid_stem:
label = stem_label_dict[stem]
jsonfile = stem_json_dict[stem]
_, readmore_flag = mp_readmore_worker(stem, None, jsonfile)
if label.find('多读') >= 0:
final_count_target[stem] = readmore_flag
else:
final_count_not_target[stem] = readmore_flag
calc_acc_pr(final_count_target, final_count_not_target)
def test_det_trunc():
stem_label_dict = tal_data.get_stem_label_dict('L1')
stem_audio_dict = tal_data.get_stem_audiopath_dict('L1')
stem_json_dict = tal_data.get_stem_jsonpath_dict('L1')
valid_stem = [x[0] for x in stem_label_dict.items()]
print(f'valid stem cnt {len(valid_stem)}')
final_count_target = {}
final_count_not_target = {}
with mp.Pool() as pool:
pool_instances = []
for stem in valid_stem:
audiopath = stem_audio_dict[stem] # 音频文件路径
jsonpath = stem_json_dict[stem] # 先声评测结果json路径
pool_instances.append(
pool.apply_async(mp_trunc_worker, args=(stem, audiopath, jsonpath))
)
for i, instance in enumerate(pool_instances):
stem, is_trunc = instance.get()
label = stem_label_dict[stem]
print(i, stem, label, is_trunc, sep='\t')
if label.find('音频截断') >= 0:
final_count_target[stem] = is_trunc
else:
final_count_not_target[stem] = is_trunc
calc_acc_pr(final_count_target, final_count_not_target)
def test_det_low_volume():
level = 'L2'
stem_label_dict = tal_data.get_stem_label_dict(level)
stem_audio_dict = tal_data.get_stem_audiopath_dict(level)
stem_json_dict = tal_data.get_stem_jsonpath_dict(level)
valid_stem = [x[0] for x in stem_label_dict.items()]
print(f'valid stem cnt {len(valid_stem)}')
final_count_target = {}
final_count_not_target = {}
with mp.Pool() as pool:
pool_instances = []
for stem in valid_stem:
audiopath = stem_audio_dict[stem]
jsonpath = stem_json_dict[stem]
label_debug_ = stem_label_dict[stem]
pool_instances.append(
# pool.apply_async(mp_volume_low_worker, args=(stem, audiopath, jsonpath))
pool.apply_async(mp_volume_low_worker, args=(stem, audiopath, jsonpath, label_debug_))
)
for i, instance in enumerate(pool_instances):
stem, no_voice = instance.get()
label = stem_label_dict[stem]
print(i, stem, label, no_voice, sep='\t')
if label.find('无主讲人声音') >= 0 or label.find('无声音') >= 0:
final_count_target[stem] = no_voice
else:
final_count_not_target[stem] = no_voice
calc_acc_pr(final_count_target, final_count_not_target)
def test_show_version():
ver = tai.version
ver_short = tai.short_version
mj = tai.__version__.MAJOR
mn = tai.__version__.MINOR
pt = tai.__version__.PATCH
print(ver, ver_short, mj, mn, pt)
if __name__ == '__main__':
t0 = time.time()
test_show_version()
# test_det_trunc()
# test_det_readless()
# test_det_readmore()
test_det_low_volume()
t_dur = time.time() - t0
print(f'elapsed: {t_dur}s')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。