代码拉取完成,页面将自动刷新
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Data operations, will be used in train.py and eval.py
"""
import os
import math
import random
import codecs
from pathlib import Path
import numpy as np
import pandas as pd
import mindspore.dataset as ds
class Generator():
def __init__(self, input_list):
self.input_list = input_list
def __getitem__(self, item):
return np.array(self.input_list[item][0], dtype=np.int32), np.array(self.input_list[item][1], dtype=np.int32)
def __len__(self):
return len(self.input_list)
class DataProcessor:
"""
preprocess dataset
"""
def get_dict_len(self):
"""
get number of different words in the whole dataset
"""
if self.doConvert:
return len(self.Vocab)
return -1
def collect_weight(self, glove_path, embed_size):
""" collect weight """
vocab_size = self.get_dict_len()
embedding_index = {}
with open(glove_path) as f:
for line in f:
values = line.split()
word = values[0]
vec = np.array(values[1:], dtype='float32')
embedding_index[word] = vec
weight_np = np.zeros((vocab_size, embed_size)).astype(np.float32)
for word, vec in embedding_index.items():
try:
index = self.Vocab[word]
except KeyError:
continue
weight_np[index, :] = vec
return weight_np
def create_train_dataset(self, epoch_size, batch_size, collect_weight=False, glove_path='', embed_size=50):
if collect_weight:
weight_np = self.collect_weight(glove_path, embed_size)
np.savetxt('./weight.txt', weight_np)
dataset = ds.GeneratorDataset(source=Generator(input_list=self.train),
column_names=["data", "label"], shuffle=False)
dataset = dataset.batch(batch_size=batch_size, drop_remainder=True)
return dataset
def create_test_dataset(self, batch_size):
dataset = ds.GeneratorDataset(source=Generator(input_list=self.test),
column_names=["data", "label"], shuffle=False)
dataset = dataset.batch(batch_size=batch_size, drop_remainder=True)
return dataset
class MovieReview(DataProcessor):
"""
preprocess MovieReview dataset
"""
def __init__(self, root_dir, maxlen, split):
"""
input:
root_dir: the root directory path of the MR dataset
maxlen: set the max length of the sentence
split: set the ratio of training set to testing set
rank: the logic order of the worker
size: the worker num
"""
self.path = root_dir
self.feelMap = {
'neg': 0,
'pos': 1
}
self.files = []
self.doConvert = False
mypath = Path(self.path)
if not mypath.exists() or not mypath.is_dir():
print("please check the root_dir!")
raise ValueError
# walk through the root_dir
for root, _, filename in os.walk(self.path):
for each in filename:
self.files.append(os.path.join(root, each))
break
# check whether get two files
if len(self.files) != 2:
print("There are {} files in the root_dir".format(len(self.files)))
raise ValueError
# begin to read data
self.word_num = 0
self.maxlen = 0
self.minlen = float("inf")
self.maxlen = float("-inf")
self.Pos = []
self.Neg = []
for filename in self.files:
with codecs.open(filename, 'r', 'Latin1') as f:
ff = f.read()
with codecs.open(filename, 'w', 'utf-8') as file_object:
file_object.write(ff)
self.read_data(filename)
self.PosNeg = self.Pos + self.Neg
self.text2vec(maxlen=maxlen)
self.split_dataset(split=split)
def read_data(self, filePath):
"""
read text into memory
input:
filePath: the path where the data is stored in
"""
with open(filePath, 'r') as f:
for sentence in f.readlines():
sentence = sentence.replace('\n', '')\
.replace('"', '')\
.replace('\'', '')\
.replace('.', '')\
.replace(',', '')\
.replace('[', '')\
.replace(']', '')\
.replace('(', '')\
.replace(')', '')\
.replace(':', '')\
.replace('--', '')\
.replace('-', '')\
.replace('\\', '')\
.replace('0', '')\
.replace('1', '')\
.replace('2', '')\
.replace('3', '')\
.replace('4', '')\
.replace('5', '')\
.replace('6', '')\
.replace('7', '')\
.replace('8', '')\
.replace('9', '')\
.replace('`', '')\
.replace('=', '')\
.replace('$', '')\
.replace('/', '')\
.replace('*', '')\
.replace(';', '')\
.replace('<b>', '')\
.replace('%', '')
sentence = sentence.split(' ')
sentence = list(filter(lambda x: x, sentence))
if sentence:
self.word_num += len(sentence)
self.maxlen = self.maxlen if self.maxlen >= len(sentence) else len(sentence)
self.minlen = self.minlen if self.minlen <= len(sentence) else len(sentence)
if 'pos' in filePath:
self.Pos.append([sentence, self.feelMap['pos']])
else:
self.Neg.append([sentence, self.feelMap['neg']])
def text2vec(self, maxlen):
"""
convert the sentence into a vector in an int type
input:
maxlen: max length of the sentence
"""
# Vocab = {word : index}
self.Vocab = dict()
for SentenceLabel in self.Pos+self.Neg:
vector = [0]*maxlen
for index, word in enumerate(SentenceLabel[0]):
if index >= maxlen:
break
if word not in self.Vocab.keys():
self.Vocab[word] = len(self.Vocab)
vector[index] = len(self.Vocab) - 1
else:
vector[index] = self.Vocab[word]
SentenceLabel[0] = vector
self.doConvert = True
def split_dataset(self, split):
"""
split the dataset into training set and test set
input:
split: the ratio of training set to test set
rank: logic order
size: device num
"""
trunk_pos_size = math.ceil((1-split)*len(self.Pos))
trunk_neg_size = math.ceil((1-split)*len(self.Neg))
trunk_num = int(1/(1-split))
pos_temp = list()
neg_temp = list()
for index in range(trunk_num):
pos_temp.append(self.Pos[index*trunk_pos_size:(index+1)*trunk_pos_size])
neg_temp.append(self.Neg[index*trunk_neg_size:(index+1)*trunk_neg_size])
self.test = pos_temp.pop(2)+neg_temp.pop(2)
self.train = [i for item in pos_temp+neg_temp for i in item]
random.shuffle(self.train)
class Subjectivity(DataProcessor):
"""
preprocess Subjectivity dataset
"""
def __init__(self, root_dir, maxlen, split):
self.path = root_dir
self.feelMap = {
'neg': 0,
'pos': 1
}
self.files = []
self.doConvert = False
mypath = Path(self.path)
if not mypath.exists() or not mypath.is_dir():
print("please check the root_dir!")
raise ValueError
# walk through the root_dir
for root, _, filename in os.walk(self.path):
for each in filename:
self.files.append(os.path.join(root, each))
break
# begin to read data
self.word_num = 0
self.maxlen = 0
self.minlen = float("inf")
self.maxlen = float("-inf")
self.Pos = []
self.Neg = []
for filename in self.files:
self.read_data(filename)
self.PosNeg = self.Pos + self.Neg
self.text2vec(maxlen=maxlen)
self.split_dataset(split=split)
def read_data(self, filePath):
"""
read text into memory
input:
filePath: the path where the data is stored in
"""
with open(filePath, 'r', encoding="ISO-8859-1") as f:
for sentence in f.readlines():
sentence = sentence.replace('\n', '')\
.replace('"', '')\
.replace('\'', '')\
.replace('.', '')\
.replace(',', '')\
.replace('[', '')\
.replace(']', '')\
.replace('(', '')\
.replace(')', '')\
.replace(':', '')\
.replace('--', '')\
.replace('-', '')\
.replace('\\', '')\
.replace('0', '')\
.replace('1', '')\
.replace('2', '')\
.replace('3', '')\
.replace('4', '')\
.replace('5', '')\
.replace('6', '')\
.replace('7', '')\
.replace('8', '')\
.replace('9', '')\
.replace('`', '')\
.replace('=', '')\
.replace('$', '')\
.replace('/', '')\
.replace('*', '')\
.replace(';', '')\
.replace('<b>', '')\
.replace('%', '')
sentence = sentence.split(' ')
sentence = list(filter(lambda x: x, sentence))
if sentence:
self.word_num += len(sentence)
self.maxlen = self.maxlen if self.maxlen >= len(sentence) else len(sentence)
self.minlen = self.minlen if self.minlen <= len(sentence) else len(sentence)
if 'quote' in filePath:
self.Pos.append([sentence, self.feelMap['pos']])
elif 'plot' in filePath:
self.Neg.append([sentence, self.feelMap['neg']])
def text2vec(self, maxlen):
"""
convert the sentence into a vector in an int type
input:
maxlen: max length of the sentence
"""
# Vocab = {word : index}
self.Vocab = dict()
for SentenceLabel in self.Pos+self.Neg:
vector = [0]*maxlen
for index, word in enumerate(SentenceLabel[0]):
if index >= maxlen:
break
if word not in self.Vocab.keys():
self.Vocab[word] = len(self.Vocab)
vector[index] = len(self.Vocab) - 1
else:
vector[index] = self.Vocab[word]
SentenceLabel[0] = vector
self.doConvert = True
def split_dataset(self, split):
"""
split the dataset into training set and test set
input:
split: the ratio of training set to test set
rank: logic order
size: device num
"""
trunk_pos_size = math.ceil((1-split)*len(self.Pos))
trunk_neg_size = math.ceil((1-split)*len(self.Neg))
trunk_num = int(1/(1-split))
pos_temp = list()
neg_temp = list()
for index in range(trunk_num):
pos_temp.append(self.Pos[index*trunk_pos_size:(index+1)*trunk_pos_size])
neg_temp.append(self.Neg[index*trunk_neg_size:(index+1)*trunk_neg_size])
self.test = pos_temp.pop(2)+neg_temp.pop(2)
self.train = [i for item in pos_temp+neg_temp for i in item]
random.shuffle(self.train)
class SST2(DataProcessor):
"""
preprocess SST2 dataset
"""
def __init__(self, root_dir, maxlen, split):
self.path = root_dir
self.files = []
self.train = []
self.test = []
self.doConvert = False
mypath = Path(self.path)
if not mypath.exists() or not mypath.is_dir():
print("please check the root_dir!")
raise ValueError
# walk through the root_dir
for root, _, filename in os.walk(self.path):
for each in filename:
self.files.append(os.path.join(root, each))
break
# begin to read data
self.word_num = 0
self.maxlen = 0
self.minlen = float("inf")
self.maxlen = float("-inf")
for filename in self.files:
if 'train' in filename or 'dev' in filename:
with codecs.open(filename, 'r') as f:
ff = f.read()
with codecs.open(filename, 'w', 'utf-8') as file_object:
file_object.write(ff)
self.read_data(filename)
self.text2vec(maxlen=maxlen)
self.split_dataset(split=split)
def read_data(self, filePath):
"""
read text into memory
input:
filePath: the path where the data is stored in
"""
df = pd.read_csv(filePath, delimiter='\t')
for sentence, label in zip(df['sentence'], df['label']):
sentence = sentence.replace('\n', '')\
.replace('"', '')\
.replace('\'', '')\
.replace('.', '')\
.replace(',', '')\
.replace('[', '')\
.replace(']', '')\
.replace('(', '')\
.replace(')', '')\
.replace(':', '')\
.replace('--', '')\
.replace('-', '')\
.replace('\\', '')\
.replace('0', '')\
.replace('1', '')\
.replace('2', '')\
.replace('3', '')\
.replace('4', '')\
.replace('5', '')\
.replace('6', '')\
.replace('7', '')\
.replace('8', '')\
.replace('9', '')\
.replace('`', '')\
.replace('=', '')\
.replace('$', '')\
.replace('/', '')\
.replace('*', '')\
.replace(';', '')\
.replace('<b>', '')\
.replace('%', '')
sentence = sentence.split(' ')
sentence = list(filter(lambda x: x, sentence))
if sentence:
self.word_num += len(sentence)
self.maxlen = self.maxlen if self.maxlen >= len(sentence) else len(sentence)
self.minlen = self.minlen if self.minlen <= len(sentence) else len(sentence)
if 'train' in filePath:
self.train.append([sentence, label])
elif 'dev' in filePath:
self.test.append([sentence, label])
def text2vec(self, maxlen):
"""
convert the sentence into a vector in an int type
input:
maxlen: max length of the sentence
"""
# Vocab = {word : index}
self.Vocab = dict()
for SentenceLabel in self.train+self.test:
vector = [0]*maxlen
for index, word in enumerate(SentenceLabel[0]):
if index >= maxlen:
break
if word not in self.Vocab.keys():
self.Vocab[word] = len(self.Vocab)
vector[index] = len(self.Vocab) - 1
else:
vector[index] = self.Vocab[word]
SentenceLabel[0] = vector
self.doConvert = True
def split_dataset(self, split):
"""
split the dataset into training set and test set
input:
split: the ratio of training set to test set
rank: logic order
size: device num
"""
random.shuffle(self.train)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。