# Recommend-System
**Repository Path**: kk_helloo/Recommend-System
## Basic Information
- **Project Name**: Recommend-System
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2022-04-25
- **Last Updated**: 2025-06-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 基于大数据平台的电影推荐系统
```bash
# 首先将对应版本的安装包放在./hs_docker/tools 目录下
# 执行命令
# 进入项目目录
cd [Recommend-System](https://gitee.com/kk_helloo/Recommend-System.git)
# 更改文件执行权限
chmod -R 777 ./pre.sh
# pre.sh 使用docker compose部署 hadoop+spark集群以及文件采集系统
./pre.sh
# 启动hadoop+spark集群
./start-hs.sh
# 访问http://ip:8088, http://ip:50070, http://ip:8080
# 将u.data、u.item 保存至hdfs
./savefile.sh
# 训练模型
./train.sh
# 测试模型
docker exec -it hadoop-node1 spark-submit --master spark://hadoop-node1:7077 /usr/local/predict.py --U 123
# 启动日志采集服务
# docker exec -it log-system bash
# zkServer.sh start
# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# flume-ng agent -c $FLUME_HOME/conf/ -f $FLUME_HOME/conf/log-kafka.properties -n pro -Dflume.root.logger=INFO,console
```
# 架构设计

- ml-100k数据集保存在hdfs上,spark读取hdfs上的文件
- 在Spark Standalone环境下使用ALS训练得到的推荐模型保存至hdfs
- 从hdfs中加载模型完成推荐
- Flume监控日志文件,并将更新的日志文件推送到kafka
- Spark Streaming消费kafka
# 环境搭建
使用 Dockerfile 构建镜像,在 Dockerfile 文件中输入构建镜像所需的指令和说明。
集群均搭建在 centos 环境中:
```docker
FROM centos:7
```
## Hadoop ➕ Spark集群
有一台主要的计算机 master,在 HDFS 担任 NameNode 角色,在 MapReduce2(yarn)担任ResourceManager 角色
有多台计算机,在 HDFS 担任 DataNode 角色、在 MapReduce2(yarn)担任 NodeManager 角色

### 安装JDK
因为 Hadoop 是以 Java 开发的,所以必须先安装 Java 环境
```docker
ADD ./tools/jdk-8u212-linux-x64.tar.gz /usr/local/
ENV JAVA_HOME /usr/local/jdk1.8.0_212/
ENV CLASSPATH $JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ENV PATH $JAVA_HOME/bin:$PATH
```
### 设置SSH无密码登录
集群必须通过SSH与本地计算机以及其他主机连接,为了让系统顺利运行而不需要手动输入密码,就需要SSH设置成无密码登录,即以事先交换的SSH key(密钥)来进行身份验证
```docker
#安装必备的软件包
RUN yum -y install net-tools
RUN yum -y install which
RUN yum -y install openssh-server openssh-clients
RUN yum -y install sudo
RUN yum clean all
#配置SSH免密登录
RUN ssh-keygen -q -t rsa -b 2048 -f /etc/ssh/ssh_host_rsa_key -N ''
RUN ssh-keygen -q -t ecdsa -f /etc/ssh/ssh_host_ecdsa_key -N ''
RUN ssh-keygen -q -t dsa -f /etc/ssh/ssh_host_ed25519_key -N ''
RUN ssh-keygen -f /root/.ssh/id_rsa -N ''
RUN touch /root/.ssh/authorized_keys
RUN cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
RUN echo "root:ss123456" | chpasswd
COPY ./configs/ssh_config /etc/ssh/ssh_config
```
### 安装Hadoop并设置环境变量
将Hadoop安装到/usr/local目录下
```docker
# 添加Hadoop并设置环境变量
ADD ./tools/hadoop-2.8.5.tar.gz /usr/local
ENV HADOOP_HOME /usr/local/hadoop-2.8.5
ENV PATH $HADOOP_HOME/bin:$PATH
```
### 修改Hadoop配置文件
- 设置Hadoop-env.sh文件——Hadoop-env.sh是Hadoop的配置文件,在这里必须设置Java的安装路径
```docker
export JAVA_HOME=/usr/local/jdk1.8.0_212
```
- 修改core-site.xml——设置HDFS的默认名称及数据文件存储位置
```docker
fs.defaultFS
hdfs://hadoop-node1:9000/
hadoop.tmp.dir
file:/data/hadoop/tmp
```
- 设置yarn-site.xml——包含MapReduce2(Yarn)相关的配置设置
```docker
yarn.nodemanager.aux-services
mapreduce_shuffle
```
- 设置mapred-site.xml——设置监控Map与Reduce程序的JobTracker任务分配情况以及TaskTracker任务运行情况,Hadoop提供了设置的模板,可自行复制修改
```docker
mapreduce.framework.name
yarn
```
- 设置hdfs-site.xml——设置HDFS分布式文件系统
```docker
dfs.replication
2
dfs.permissions.enabled
false
dfs.namenode.name.dir
file:/data/hadoop/dfs/name
dfs.datanode.data.dir
file:/data/hadoop/dfs/data
dfs.webhdfs.enabled
true
```
- 编辑masters文件——告诉系统哪一台服务器是NameNode
```docker
hadoop-node1
```
- 编辑slaves文件——告诉系统哪些服务器是DataNode
```docker
hadoop-node2
hadoop-node3
```
- 拷贝相关的配置文件到镜像中
```docker
COPY ./configs/hadoop-env.sh $HADOOP_HOME/etc/hadoop/hadoop-env.sh
COPY ./configs/hdfs-site.xml $HADOOP_HOME/etc/hadoop/hdfs-site.xml
COPY ./configs/core-site.xml $HADOOP_HOME/etc/hadoop/core-site.xml
COPY ./configs/yarn-site.xml $HADOOP_HOME/etc/hadoop/yarn-site.xml
COPY ./configs/mapred-site.xml $HADOOP_HOME/etc/hadoop/mapred-site.xml
COPY ./configs/master $HADOOP_HOME/etc/hadoop/master
COPY ./configs/slaves $HADOOP_HOME/etc/hadoop/slaves
COPY ./script/start-hadoop.sh $HADOOP_HOME/start-hadoop.sh
COPY ./script/restart-hadoop.sh $HADOOP_HOME/restart-hadoop.sh
```
### 创建并格式化HDFS目录
```docker
RUN mkdir -p /data/hadoop/dfs/data && \
mkdir -p /data/hadoop/dfs/name && \
mkdir -p /data/hadoop/tmp
```
### 安装Scala
因为Spark是以Scala开发的,所以必须先安装Scala
```docker
#添加Scala并设置环境变量
ADD ./tools/scala-2.12.12.tgz /usr/local
ENV SCALA_HOME /usr/local/scala-2.12.12
ENV PATH $SCALA_HOME/bin:$PATH
```
### 安装Spark并设置环境变量
将Spark安装到/usr/local目录下
```docker
#添加Spark并设置环境变量
ADD ./tools/spark-3.0.1-bin-hadoop2.7.tgz /usr/local
ENV SPARK_HOME /usr/local/spark-3.0.1-bin-hadoop2.7
ENV PATH $SPARK_HOME/bin:$PATH
```
### 修改Spark配置文件
- 设置Spark-env.sh文件——Spark-env.sh是Spark的配置文件,在这里必须设置Java的安装路径
```docker
export JAVA_HOME=/usr/local/jdk1.8.0_212/
```
- 设置slaves文件——设置Spark Standalone Cluster有哪些服务器
```docker
hadoop-node2
hadoop-node3
```
- 拷贝相关的配置文件到镜像中
```docker
COPY ./configs/spark-env.sh $SPARK_HOME/conf/spark-env.sh
COPY ./configs/slaves $SPARK_HOME/conf/slaves
```
### 安装numpy包
基于pyspark开发,需要numpy包
```docker
#添加pip并安装numpy
ADD ./tools/setuptools-12.0.3.tar.gz /usr/local/
RUN python /usr/local/setuptools-12.0.3/setup.py install
ADD ./tools/pip-20.2.4.tar.gz /usr/local/
WORKDIR /usr/local/pip-20.2.4/
RUN sudo python setup.py install
RUN pip install numpy -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com
```
---
## Docker Compose部署集群
```docker
version: "3.8"
services:
hadoop-node1:
restart: always
build: ./hs_docker/
container_name: hadoop-node1
hostname: hadoop-node1
privileged: true
ports:
- 8080:8080
- 7077:7077
- 8088:8088
- 8032:8032
- 18080:18080
- 19888:19888
- 50070:50070
- 8888:8888
- 9000:9000
- 1100:11000
- 50030:50030
- 8050:8050
- 8900:8900
hadoop-node2:
restart: always
build: ./hs_docker/
container_name: hadoop-node2
hostname: hadoop-node2
privileged: true
ports:
- 8042:8042
- 50010:50010
- 50200:50020
depends_on:
- hadoop-node1
hadoop-node3:
restart: always
build: ./hs_docker/
container_name: hadoop-node3
hostname: hadoop-node3
privileged: true
ports:
- 18042:8042
- 50011:50011
- 50021:50021
depends_on:
- hadoop-node1
```
### 启动Hadoop
进入maste容器(hadoop-node1)在安装目录下执行
- 启动HDFS
```docker
./sbin/start-dfs.sh
```
- 启动yarn
```docker
./sbin/start-yarn.sh
```
- 同时启动HDFS、yarn
```docker
./sbin/start-all.sh
```
### 打开Hadoop Web界面
http://ip:50070

### 启动Spark
进入maste容器(hadoop-node1)在安装目录下执行
```docker
./sbin/start-all.sh
```
分别执行的命令为
```docker
./sbin/start-master.sh
./sbin/start-slaves.sh spark://hadoop-node1:7077
```
### 打开Spark Web界面
http://ip:8080

### 连通性测试
- 上传文件至HDFS
```docker
docker exec -it hadoop-node1 /bin/bash
hdfs dfs -mkdir /input
hdfs dfs -put /usr/local/ml-100k/u.data /input
```

- spark-shell读取文件
```docker
spark-shell
#spark-shell --master spark://hadoop-node1:7077
#因为一些我不知道也解决不了的问题(猜测是资源分配问题),集群无法启动,只能在本地上测试
Scala>val data = sc.textFile("hdfs://hadoop-node1:9000/input/u.data")
Scala>data.first()
```

## Flume ➕ Kafka日志采集系统
启动另一台服务器(容器),搭建日志采集系统,Dockerfile 文件中构建镜像所需的指令和说明如下:
```docker
FROM centos:7
```
### 安装JDK
```docker
ADD ./tools/jdk-8u212-linux-x64.tar.gz /usr/local/
ENV JAVA_HOME /usr/local/jdk1.8.0_212/
ENV CLASSPATH $JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
ENV PATH $JAVA_HOME/bin:$PATH
```
### 安装Zookeeper并设置环境变量
早期版本的kafka用zookeeper做meta信息存储,consumer的消费状态,group的管理以及 offset的值等,即Kafka的运行依赖于Zookeeper,但在新版本中的Kafka已经减少了这种依赖。
```docker
ADD ./tools/apache-zookeeper-3.6.3-bin.tar.gz /usr/local
ENV ZOOKEEPER_HOME /usr/local/apache-zookeeper-3.6.3-bin
ENV PATH $ZOOKEEPER_HOME/bin:$PATH
```
### 修改Zookeeper配置文件
- 复制/conf/zoo_sample.cfg模板配置文件命名为zoo.cfg并修改dataDir的路径
```docker
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/local/apache-zookeeper-3.6.3-bin/tmp
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
```
- 拷贝相关的配置文件到镜像中
```docker
COPY ./configs/zoo.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
```
### 安装Kafka并设置环境变量
```docker
#添加kafka并设置环境变量
ADD ./tools/kafka_2.13-3.1.0.tgz /usr/local
ENV KAFKA_HOME /usr/local/kafka_2.13-3.1.0
ENV PATH $KAFKA_HOME/bin:$PATH
```
### 修改Kafka配置文件
- 修改server.properties中的内容
```docker
listeners=PLAINTEXT://:9092
# log-system为本机ip/hostname
advertised.listeners=PLAINTEXT://log-system:9092
zookeeper.connect=localhost:2181
```
- 拷贝相关的配置文件到镜像中
```docker
COPY ./configs/server.properties $KAFKA_HOME/config/server.properties
```
### 安装Flume并添加环境变量
```docker
#添加flume并设置环境变量
ADD ./tools/apache-flume-1.9.0-bin.tar.gz /usr/local
ENV FLUME_HOME /usr/local/apache-flume-1.9.0-bin
ENV PATH $FLUME_HOME/bin:$PATH
```
### 修改Flume配置文件
```docker
pro.sources = s1
pro.channels = c1
pro.sinks = k1
pro.sources.s1.type = exec #将命令产生的输出作为源
#持续监控stdout.log文件
pro.sources.s1.command = tail -F /data/flume/log/stdout.log
pro.channels.c1.type = memory
pro.channels.c1.capacity = 1000
pro.channels.c1.transactionCapacity = 100
pro.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#指定消息发送到kafka的topic
pro.sinks.k1.kafka.topic = quickstart-events
#log-system为主机名
pro.sinks.k1.kafka.bootstrap.servers = log-system:9092
pro.sinks.k1.kafka.flumeBatchSize = 20
pro.sinks.k1.kafka.producer.acks = 1
pro.sinks.k1.kafka.producer.linger.ms = 1
pro.sinks.k1.kafka.producer.compression.type = snappy
pro.sources.s1.channels = c1
pro.sinks.k1.channel = c1
```
- 拷贝相关的配置文件到镜像中
```docker
COPY ./configs/log-kafka.properties $FLUME_HOME/conf/log-kafka.properties
```
## Docker Compose部署
在上述docker-compose文件中添加
```docker
log-system:
restart: always
build: ./log_docker/
container_name: log-system
hostname: log-system
privileged: true
ports:
- 2181:2181
- 9092:9092
```
### 启动Zookeeper
```docker
zkServer.sh start
```

### 启动kafka
```docker
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
```

### 测试kafka
- 创建Topic

- 向Topic写入

- 读取

### 启动Flume
```docker
flume-ng agent -c $FLUME_HOME/conf/ -f $FLUME_HOME/conf/log-kafka.properties -n pro -Dflume.root.logger=INFO,console
```

### 测试Flume
- 向被监控的文件中写入内容


- kafka消费

### spark streaming消费kafka
```bash
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(3))
val kafkaPara: Map[String, Object] = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"log-system:9092", ConsumerConfig.GROUP_ID_CONFIG -> "test1", "key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("quickstart-events"), kafkaPara))
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.print()
ssc.start()
ssc.awaitTermination()
```

# 数据采集
## ml-100k推荐数据
MovieLens是一个推荐系统和虚拟社区网站,主要功能是使用协同过滤技术向会员推荐电影。该网站提供了不同大小的MovieLens数据集
```docker
http://grouplens.org/datasets/movielens/
```
下载ml-100k数据集后解压缩,主要使用到两个数据文件:
- u.data —— 用户评分数据:包含四个字段:user id(用户id)、item id(项目 id)、rating(评分)、timestamp(时间戳)
- u.item —— 电影的数据:比如:movie id(电影 id)、movie title(电影片名)
将数据文件存储至hdfs中。
## 日志采集(未自动化)
flume监控指定目录下的文件,将检测的结果推送到kafka队列
# 算法开发(离线推荐)

使用 `ALS.train`进行训练
- Import导入相关的链接库
```docker
from pyspark import SparkContext,SparkConf
from pyspark.mllib.recommendation import ALS,Rating
```
- 主程序代码
- 将原始数据转换为`ALS`训练格式`RDD[Rating]`
```docker
def prepare_data(spark_context):
# ------------read data -------------
# raw_user_data = spark_context.textFile("file:/usr/local/ml-100k/u.data")
raw_user_data = spark_context.textFile("hdfs://hadoop1:9000/input/u.data")
raw_ratings = raw_user_data.map(lambda line: line.split("\t")[:3])
ratings_rdd = raw_ratings.map(lambda x: Rating(int(x[0]), int(x[1]), float(x[2])))
return ratings_rdd
```
- 使用`ALS.train`进行训练
- 训练完成后创建推荐模型`MatrixFactorizationModel`并保存
```docker
def save_mode(spark_context,model):
try:
model.save(spark_context, "usr/local/datas/als-model")
except Exception as e:
print ("Error")
print(str(e))
```
---
```docker
if __name__ =="__main__":
sc = create_spark_context()
print("==================preparing data===================")
rating_rdd = prepare_data(sc)
print("==================model training===================")
#ALS:rank=5",iterations = 5, lambda = 0.1
als_model = ALS.train(rating_rdd,5,iterations=5,lambda_=0.1)
print("==================model saving===================")
save_mode(sc,als_model)
sc.stop()
```
## 使用模型进行推荐
- 主程序代码
- 创建电影ID与名称对照表
```docker
def prepare_data(spark_context):
# item_rdd = spark_context.textFile("file:/usr/local/ml-100k/u.item")
item_rdd = spark_context.textFile("hdfs://hadoop1:9000/input/u.item")
movie_title = item_rdd.map(lambda line: line.split("|")) \
.map(lambda a: (float(a[0]), a[1]))
movie_title_dict = movie_title.collectAsMap()
return movie_title_dict
```
- 加载保存的推荐模型
```docker
def load_model(spark_context):
try:
model = MatrixFactorizationModel.load(spark_context, 'hdfs://hadoop-node1:9000/datas/als-model')
print (model)
return model
except Exception:
print ("Error Loading")
```
- 使用模型进行推荐
```docker
def recommend_movies(als, movies, user_id):
rmd_movies = als.recommendProducts(user_id, 10)
print('recommend movies:{}'.format(rmd_movies))
for rmd in rmd_movies:
print("for user{} recomment movie:{}".format(rmd[0], movies[rmd[1]]))
return rmd_movies
def recommend_users(als, movies, movie_id):
rmd_users = als.recommendUsers(movie_id, 10)
# print('for ID:{0},movie:{1},user:'.format(movie_id, movies[movie_id]))
print('for ID:{},movie:{},user:'.format(movie_id, movies[movie_id]))
for rmd in rmd_users:
print("ID:{},rating:{}".format(rmd[0], rmd[2]))
def recommend(als_model, movie_dic):
if sys.argv[1] == '--U':
recommend_movies(als_model, movie_dic, int(sys.argv[2]))
if sys.argv[1] == '--M':
recommend_users(als_model, movie_dic, int(sys.argv[2]))
```
---
```docker
if __name__ == "__main__":
#
if len(sys.argv) != 3:
print("please input parameters: --U user_id or --M movie_id")
exit(-1)
sc = create_spark_context()
print('============= preparing data =============')
movie_title_dic = prepare_data(sc)
print('============= loading model =============')
als_load_model = load_model(sc)
print('============= recommend =============')
recommend(als_load_model, movie_title_dic)
```
## 在Spark Standalone 运行代码
在任意容器中执行
```docker
spark-submit --master spark://hadoop-node1:7077 /usr/local/als.py
# 训练模型als.py代码保存在/usr/local目录下
```



```docker
spark-submit --master spark://hadoop-node1:7077 /usr/local/predict.py --U 197
# 测试推荐predict.py代码保存在/usr/local目录下
```


