计算图是主流深度学习框架普遍采用的, 如Tensorflow, Caffe和Mxnet等. 事实上, Spark这样的大数据处理工具也是用计算图来调度任务的. 为了更好地支持深度学习算法, Angel也支持了计算图框架. 与Tensorflow等相比, Angel的计算图更轻量, 主要表现在:
节点
是层(layer), 而不是操作
(operator). Tensorflow等使用操作
作为图中的结点, 十分灵活, 适合二次开发(封装), 但也给机器学习算发开发者带来更陡的学习曲线与更大的工作量, 因此老版本的Tensorflow也一直被诟病"API太底层,开发效率低", 后来的Tensorflow版本才提供基于层(layer)的高级API. 鉴于这一点, Angel只提供粗粒度的计算图.需要指出的是, Angel目前不支持CNN, RNN等, 只关注推荐领域的常用算法.
要了解计算图是怎样构建的, 先要了解其组成元素Layer的结构(关于层更详细的信息, 请参考Angel中的层), 如下:
abstract class Layer(val name: String, val outputDim: Int)(implicit val graph: AngelGraph)
extends Serializable {
var status: STATUS.Value = STATUS.Null
val input = new ListBuffer[Layer]()
val consumer = new ListBuffer[Layer]()
def addInput(layer: Layer): Unit = {
input.append(layer)
}
def addConsumer(layer: Layer): Unit = {
consumer.append(layer)
}
def calOutput(): Matrix = ???
def gatherGrad(): Matrix = ???
}
这个抽象类已将层的大部分功能描述清楚, 具体如下:
事实上, 构建图的具体操作在inputlayer/linearlayer/joinlayer的基类中已完成, 用户自定义layer不必关心, 如下:
abstract class InputLayer(name: String, outputDim: Int)(implicit graph: AngelGraph)
extends Layer(name, outputDim)(graph) {
graph.addInput(this)
def calBackward(): Matrix
}
abstract class JoinLayer(name: String, outputDim: Int, val inputLayers: Array[Layer])(implicit graph: AngelGraph)
extends Layer(name, outputDim)(graph) {
inputLayers.foreach { layer =>
layer.addConsumer(this)
this.addInput(layer)
}
def calGradOutput(idx: Int): Matrix
}
abstract class LinearLayer(name: String, outputDim: Int, val inputLayer: Layer)(implicit graph: AngelGraph)
extends Layer(name, outputDim)(graph) {
inputLayer.addConsumer(this)
this.addInput(inputLayer)
def calGradOutput(): Matrix
}
注: LossLayer是一种特殊的LinearLayer, 所以这里没有给出.
通过input/consumer构建起了一个复杂的图, 虽然可以从图中的任意节点对图进行遍历, 但是为了方便, 在AngelGraph中还是存储verge节点, 便于对图的操作, 如下:
class AngelGraph(val placeHolder: PlaceHolder, val conf: SharedConf) extends Serializable {
def this(placeHolder: PlaceHolder) = this(placeHolder, SharedConf.get())
private val inputLayers = new ListBuffer[InputLayer]()
private var lossLayer: LossLayer = _
private val trainableLayer = new ListBuffer[Trainable]()
def addInput(layer: InputLayer): Unit = {
inputLayers.append(layer)
}
def setOutput(layer: LossLayer): Unit = {
lossLayer = layer
}
def getOutputLayer: LossLayer = {
lossLayer
}
def addTrainable(layer: Trainable): Unit = {
trainableLayer.append(layer)
}
def getTrainable: ListBuffer[Trainable] = {
trainableLayer
}
verge有两大类:
calBackward
. 为了加入inputLayer, Angel要求所有的inputLayer中都调用AngelGraph的addInput方法将自已加入AngelGraph中. 事实上, 在InputLayer的基类中已完成这一操作, 用户新增inputLayer不必关心这一点predict
或calOutput
即可. 由于losslayer是linearlayer的子类, 所以用户自定义lossLayer可手动调用setOutput(layer: LossLayer)
, 但用户新增losslayer的机会不多, 更多的是增加lossfunc.有了inputLayers, lossLayer后, 从AngelGraph中遍历图十分方便, 正向计算只要调用losslayer的predict
方法, 反向计算只要调用inputlayer的calBackward
. 但是梯度计算, 参数更新不方便, 为了方便参数更新, AngelGraph中增加了一个trainableLayer的变量, 用以保存带参数的层.
通过layer的input/consumer构建起了图的边(节点的关系), 在AngelGraph中保存特殊节点(inputlayer/losslayer/trainablelayer)方便前向与后向计算与参数更新. 最后数据是怎样输入的呢? -- 通过PlaceHolder
Angel中的PlaceHolder在构建AngelGraph中传给Graph, 而Graph又作为隐式参数传给Layer, 所以在所有的Layer中都可以访问placeholder(即数据).
目前, Angel中只允许有一个PlaceHolder, 以后会去除这一限制, 允许多种数据输入. PlaceHolder只存放一个mini-batch的数据, 主要方法如下:
class PlaceHolder(val conf: SharedConf) extends Serializable {
def feedData(data: Array[LabeledData]): Unit
def getFeats: Matrix
def getLabel: Matrix
def getBatchSize: Int
def getFeatDim: Long
def getIndices: Vector
}
通过feedData
, 将Array[LabeledData]类型的数据给placeholder后, 便可以从其中获得:
上一节中构建起了计算图的拓朴结构, 这一节要讲述它是怎样运行的
Angel的状态机有如下几个状态:
这些状态是依次进行的, 如下图所示:
状态机的引入主要是保证运算的顺序进行, 减少重复计算. 例如有多个层消费同一层的输出, 在计算时, 可以根所据状态进行判断, 只要计算一次. 状态机在代码中的体现为:
def feedData(data: Array[LabeledData]): Unit = {
deepFirstDown(lossLayer.asInstanceOf[Layer])(
(lay: Layer) => lay.status != STATUS.Null,
(lay: Layer) => lay.status = STATUS.Null
)
placeHolder.feedData(data)
}
override def calOutput(): Matrix = {
status match {
case STATUS.Null =>
// do come forward calculation
status = STATUS.Forward
case _ =>
}
output
}
override def calBackward(): Matrix = {
status match {
case STATUS.Forward =>
val gradTemp = gatherGrad()
// do backward calculation
status = STATUS.Backward
case _ =>
}
backward
}
override def pushGradient(): Unit = {
status match {
case STATUS.Backward =>
// calculate gradient and push to PS
status = STATUS.Gradient
case _ =>
}
}
override def update(epoch: Int = 0): Unit = {
status match {
case STATUS.Gradient =>
optimizer.update(weightId, 1, epoch)
status = STATUS.Update
case _ =>
throw new AngelException("STATUS Error, please calculate Gradient frist!")
}
}
具体的代码在GraphLearner
, 这理给出框架,
def trainOneEpoch(epoch: Int, iter: Iterator[Array[LabeledData]], numBatch: Int): Double = {
var batchCount: Int = 0
var loss: Double = 0.0
while (iter.hasNext) {
graph.feedData(iter.next())
graph.pullParams()
loss = graph.calLoss() // forward
graph.calBackward() // backward
graph.pushGradient() // pushgrad
PSAgentContext.get().barrier(ctx.getTaskId.getIndex)
if (ctx.getTaskId.getIndex == 0) {
graph.update(epoch * numBatch + batchCount) // update parameters on PS
}
PSAgentContext.get().barrier(ctx.getTaskId.getIndex)
batchCount += 1
LOG.info(s"epoch $epoch batch $batchCount is finished!")
}
loss
}
步骤如下:
calOutput
方法, 依次计算output, 计算完后将它的状态设为forward
. 对于状态已是forward
的情况, 则直接返回上一次计算的结果, 这样避免重复计算CalGradOutput
方法, 完成后向计算. 计算完后将它的状态设为backward
. 对于状态已是backward
的情况, 则直接返回上一次计算的结果, 这样避免重复计算backward
只计算了网络结点的梯度, 并没有计算参数的梯度. 这一步计算参数的梯度, 只需调用trainable
的pushGradient
即可. 这个方法会先计算梯度, 然后再将梯度推送到PS上, 最后将状态设为gradient
update
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。