The real-time update of the recommendation network model is one of the important technical indicators, and online learning can effectively improve the real-time update of the recommendation network model.
Key differences between online learning and offline training:
The user's streaming training data is pushed to Kafka. MindSpore Pandas reads data from Kafka and performs feature engineering transformation, and then writes to the feature storage engine. MindData reads data from the storage engine as training data for training. MindSpore, as a service resident, continuously receives data and performs training, with the overall process shown in the following figure:
mindpandas v0.1.0
mindspore_rec v0.2.0
kafka-python v2.0.2
The following is an example of the process of online learning with the Criteo dataset training Wide&Deep. The sample code is located at Online Learning.
MindSpore Recommender provides a specialized algorithm model RecModel
for online learning, which is combined with MindSpore Pandas, a real-time data source Kafka for data reading and feature processing, to implement a simple online learning process.
First define a custom dataset for real-time data processing, where the constructor parameter receiver
is of type DataReceiver
in MindPands for receiving real-time data, and __getitem__
means read data one at a time.
class StreamingDataset:
def __init__(self, receiver):
self.data_ = []
self.receiver_ = receiver
def __getitem__(self, item):
while not self.data_:
data = self.receiver_.recv()
if data is not None:
self.data_ = data.tolist()
last_row = self.data_.pop()
return np.array(last_row[0], dtype=np.int32), np.array(last_row[1], dtype=np.float32), np.array(last_row[2], dtype=np.float32)
Then the above custom dataset is encapsulated into the online dataset required by RecModel
.
from mindpandas.channel import DataReceiver
from mindspore_rec import RecModel as Model
receiver = DataReceiver(address=config.address, namespace=config.namespace,
dataset_name=config.dataset_name, shard_id=0)
stream_dataset = StreamingDataset(receiver)
dataset = ds.GeneratorDataset(stream_dataset, column_names=["id", "weight", "label"])
dataset = dataset.batch(config.batch_size)
train_net, _ = GetWideDeepNet(config)
train_net.set_train()
model = Model(train_net)
After configuring the export strategy for the model Checkpoint, start the online training process.
ckptconfig = CheckpointConfig(save_checkpoint_steps=100, keep_checkpoint_max=5)
ckpoint_cb = ModelCheckpoint(prefix='widedeep_train', directory="./ckpt", config=ckptconfig)
model.online_train(dataset, callbacks=[TimeMonitor(1), callback, ckpoint_cb], dataset_sink_mode=True)
The following describes the start process for each module involved in the online learning process:
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
To install other versions, please refer to https://archive.apache.org/dist/kafka/.
bin/zookeeper-server-start.sh config/zookeeper.properties
Open another command terminal and start the kafka service.
bin/kafka-server-start.sh config/server.properties
Enter into the recommender repo online learning example directory and start kafka_client, kafka_client needs to be started only once, and you can use kafka to set the number of partitions corresponding to the topic.
cd recommender/examples/online_learning
python kafka_client.py
yrctl start --master --address $MASTER_HOST_IP
# Parameter description
# --master: indicates that the current host is the master node. Non-master nodes do not need to specify the '--master' parameter
# --address: ip of master node
producer is used to simulate an online learning scenario where a local criteo dataset is written to Kafka for use by the consumer. The current sample uses multiple processes to read two files and write the data to Kafka.
python producer.py --file1=$CRITEO_DATASET_FILE_PATH --file2=$CRITEO_DATASET_FILE_PATH
# Parameter description
# --file1: Path to the local disk for the criteo dataset
# --file2: Path to the local disk for the criteo dataset
# The above files are all Criteo original dataset text files, File1 and File2 can be processed concurrently, File1 and File2 can be the same or different, if they are the same it is equivalent to each sample in the file being used twice.
python consumer.py --num_shards=$DEVICE_NUM --address=$LOCAL_HOST_IP --dataset_name=$DATASET_NAME
--max_dict=$PATH_TO_VAL_MAX_DICT --min_dict=$PATH_TO_VAL_MIN_DICT --map_dict=$PATH_TO_CAT_TO_ID_DICT
# Parameter description
# --num_shards: The number of device cards on the corresponding training side is set to 1 for single-card training and 8 for 8-card training.
# --address: address of current sender
# --dataset_name: dataset name
# --namespace: channel name
# --max_dict: Maximum dictionary of dense feature columns
# --min_dict: Minimum dictionary of dense feature columns
# --map_dict: Dictionary of sparse feature columns
The consumer needs 3 dataset-related files for feature engineering of criteo dataset: all_val_max_dict.pkl
, all_val_min_dict.pkl
and cat2id_dict.pkl
. $PATH_TO_VAL_MAX_DICT
, $PATH_TO_VAL_MIN_DICT
and $PATH_TO_CAT_TO_ID_DICT
, which are the absolute paths to these files on the environment, respectively. The specific production method of these 3 PKL files can be found in process_data.py, switching the original criteo dataset to produce the corresponding .pkl files.
For fhe yaml used by config, please refer to default_config.yaml.
Single-card traininf:
python online_train.py --address=$LOCAL_HOST_IP --dataset_name=criteo
# Parameter description:
# --address: Local host ip. Receiving training data from MindSpore Pandas requires configuration
# --dataset_name: Dataset name, consistent with the consumer module
Start with multi-card training MPI mode:
bash mpirun_dist_online_train.sh [$RANK_SIZE] [$LOCAL_HOST_IP]
# Parameter description:
# RANK_SIZE:Number of multi-card training cards
# LOCAL_HOST_IP:Local host ip for MindSpore Pandas to receive training data
Dynamic networking method to start multi-card training:
bash run_dist_online_train.sh [$WORKER_NUM] [$SHED_HOST] [$SCHED_PORT] [$LOCAL_HOST_IP]
# Parameter description:
# WORKER_NUM:Number of multi-card training cards
# SHED_HOST:IP of the Scheduler role required for MindSpore dynamic networking
# SCHED_PORT:Port of the Scheduler role required for MindSpore dynamic networking
# LOCAL_HOST_IP:Local host ip. Receiving training data from MindSpore Pandas requires configuration
When training is successfully started, the following log is output:
epoch and step represent the number of epoch and step corresponding to the current training step, and wide_loss and deep_loss represent the training loss values in the wide&deep network.
epoch: 1, step: 1, wide_loss: 0.66100323, deep_loss: 0.72502613
epoch: 1, step: 2, wide_loss: 0.46781272, deep_loss: 0.5293098
epoch: 1, step: 3, wide_loss: 0.363207, deep_loss: 0.42204413
epoch: 1, step: 4, wide_loss: 0.3051032, deep_loss: 0.36126155
epoch: 1, step: 5, wide_loss: 0.24045062, deep_loss: 0.29395688
epoch: 1, step: 6, wide_loss: 0.24296054, deep_loss: 0.29386574
epoch: 1, step: 7, wide_loss: 0.20943595, deep_loss: 0.25780612
epoch: 1, step: 8, wide_loss: 0.19562452, deep_loss: 0.24153553
epoch: 1, step: 9, wide_loss: 0.16500896, deep_loss: 0.20854339
epoch: 1, step: 10, wide_loss: 0.2188702, deep_loss: 0.26011512
epoch: 1, step: 11, wide_loss: 0.14963374, deep_loss: 0.18867904
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。