# flink-ai-extended **Repository Path**: looen/flink-ai-extended ## Basic Information - **Project Name**: flink-ai-extended - **Description**: flink-ai-extended - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2019-12-18 - **Last Updated**: 2021-12-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README [](https://travis-ci.org/alibaba/flink-ai-extended) # flink-ai-extended This project is to extend deep learning framework on the Flink project. Currently supports tensorflow running on flink. **contents** - [TensorFlow support](#tensorflow-support) * [Support Version](#support-version) * [Quick Start](#quick-start) + [Setup](#setup) + [Build From Source](#build-from-source) + [Build Source in virtual environment](#build-source-in-virtual-environment) + [Example](#example) * [Distributed Running](#distributed-running) + [Deployment](#deployment) + [Running Distributed Programs](#running-distributed-programs) * [Distributed Running Example](#distributed-running-example) + [Setup & Build](#setup---build) + [Start Service](#start-service) + [Prepare data & code](#prepare-data---code) + [Submit train job](#submit-train-job) + [Visit Flink Cluster](#visit-flink-cluster) + [Stop all docker containers](#stop-all-docker-containers) + [Summary](#summary) * [Optional Tools](#optional-tools) + [Build framework and tensorflow python package Independently](#build-framework-and-tensorflow-python-package-independently) + [Build custom virtual environment package](#build-custom-virtual-environment-package) - [Structure](#structure) - [For More Information](#for-more-information) - [License](#license) # TensorFlow support TensorFlow is a deep learning system developed by Google and open source, which is widely used in the field of deep learning. There are many inconveniences in distributed use and resource management of native TensorFlow, but it can not integrate with the existing widely used large data processing framework. Flink is a data processing framework. It is widely used in data extraction, feature preprocessing and data cleaning. This project combines TensorFlow with Flink and provides users with more convenient and useful tools. **Currently, Flink job code uses java language and the algorithm code uses python language.** ## Support Version TensorFlow: 1.13.1 Flink: 1.10.0 ## Quick Start ### Setup **Requirements** 1. python: 2.7 future support python 3 2. pip 3. cmake >= 3.6 4. java 1.8 5. maven >=3.3.0 **Install python2** macOS ```shell /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" export PATH="/usr/local/bin:/usr/local/sbin:$PATH" brew install python@2 ``` Ubuntu ```shell sudo apt install python-dev ``` **Install pip** ```shell curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py python get-pip.py ``` Ubuntu you can install with command: ```shell sudo apt install python-pip ``` **Install pip dependencies** Install the pip package dependencies (if using a virtual environment, omit the --user argument): ```shell pip install -U --user pip six numpy wheel mock grpcio grpcio-tools ``` **Install cmake** cmake version must >= 3.6 [cmake download page](https://cmake.org/download/) **Install java 8** [java download page](http://www.oracle.com/technetwork/java/javase/downloads/index.html) **Install maven** maven version >=3.3.0 [download maven page](http://maven.apache.org/download.cgi) ```shell tar -xvf apache-maven-3.6.1-bin.tar.gz mv -rf apache-maven-3.6.1 /usr/local/ ``` configuration environment variables ```shell MAVEN_HOME=/usr/local/apache-maven-3.6.1 export MAVEN_HOME export PATH=${PATH}:${MAVEN_HOME}/bin ``` ### Build From Source **Compiling source code depends on tensorflow 1.13.1. Compiling commands will automatically install tensorflow 1.13.1** ```shell mvn -DskipTests=true clean install ``` **If you run all tests, this step may take a long time, about 20 minutes, and wait patiently.** **You can also skip the test run command: mvn -DskipTests=true clean install** **Optional Commands** ```shell # run all tests mvn clean install # skip unit tests mvn -DskipUTs=true clean install # skip integration tests mvn -DskipITs=true clean install ``` If the above command is executed successfully, congratulations on your successful deployment of flink-ai-extended. Now you can write algorithm programs. ### Build Source in virtual environment * change project [pom.xml](pom.xml) item pip.install.option from --user to -U * create virtual environment: ```shell virtualenv tfenv ``` * enter the virtual environment ```shell source tfenv/bin/activate ``` * install pip dependencies ```shell pip install -U pip six numpy wheel mock grpcio grpcio-tools ``` * build source ```shell mvn clean install ``` * exit from virtual environment ```shell deactivate ``` ### Example 1. tensorflow add example **
python code:
** ```python import tensorflow as tf import time import sys from flink_ml_tensorflow.tensorflow_context import TFContext def build_graph(): global a i = 1 a = tf.placeholder(tf.float32, shape=None, name="a") b = tf.reduce_mean(a, name="b") r_list = [] v = tf.Variable(dtype=tf.float32, initial_value=tf.constant(1.0), name="v_" + str(i)) c = tf.add(b, v, name="c_" + str(i)) add = tf.assign(v, c, name="assign_" + str(i)) sum = tf.summary.scalar(name="sum_" + str(i), tensor=c) r_list.append(add) global_step = tf.contrib.framework.get_or_create_global_step() global_step_inc = tf.assign_add(global_step, 1) r_list.append(global_step_inc) return r_list def map_func(context): tf_context = TFContext(context) job_name = tf_context.get_role_name() index = tf_context.get_index() cluster_json = tf_context.get_tf_cluster() cluster = tf.train.ClusterSpec(cluster=cluster_json) server = tf.train.Server(cluster, job_name=job_name, task_index=index) sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False, device_filters=["/job:ps", "/job:worker/task:%d" % index]) t = time.time() if 'ps' == job_name: from time import sleep while True: sleep(1) else: with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:' + str(index), cluster=cluster)): train_ops = build_graph() hooks = [tf.train.StopAtStepHook(last_step=2)] with tf.train.MonitoredTrainingSession(master=server.target, config=sess_config, checkpoint_dir="./target/tmp/s1/" + str(t), hooks=hooks) as mon_sess: while not mon_sess.should_stop(): print (mon_sess.run(train_ops, feed_dict={a: [1.0, 2.0, 3.0]})) sys.stdout.flush() ``` **java code:
** add maven dependencies ```xml