14 Star 69 Fork 28

Gitee 极速下载/Alink

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/alibaba/Alink
克隆/下载
pyalink-dataframe.md 3.32 KB
一键复制 编辑 原始数据 按行查看 历史

与 Dataframe 互操作

PyAlink 提供了与 pandas DataFrame 的互转操作,能够方便地使用 Python 生态中已有的强大工具。 pandas 的 DataFrame 是 Python 已有生态中表示、存储二维表的十分理想的选择,DataFrame 自身提供了一定的数据处理与可视化能力,同时又可以方便的转换为 Python 中的其他数据结构。

BatchOperator 转 Dataframe

对于各个 BatchOperator,提供了 collectToDataframe() 的成员方法,将 BatchOperator 内的 DataSet 转换为 DataFrame;直接使用 print() 方法也可以直接以 DataFrame 的形式进行打印。

source = CsvSourceBatchOp()\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
res = source.select(["sepal_length", "sepal_width"])
df = res.collectToDataframe()
# Operations with df
res.print()

BatchOperator 中也提供了静态方法 collectoToDataframes 可以对多个 BatchOperator 同时进行转换。 每一次单个 BatchOperator 调用 collectoToDataframe 都会导致作业图进行一次计算,多次调用时将有效率问题,更重要的是,有些算法在多次调用时可能结果不一致,例如采样操作等。 此时可以使用这个静态方法来解决这些问题。

source = CsvSourceBatchOp() \
    .setSchemaStr(
    "sepal_length double, sepal_width double, petal_length double, petal_width double, category string") \
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")

split = SplitBatchOp().setFraction(0.1).linkFrom(source)
[d0, d1, d2] = collectToDataframes(source, split, split.getSideOutput(0))
print(d0, d1, d2)

Dataframe 转 BatchOperator/StreamOperator

对于用户拥有的 Dataframe 数据,PyAlink 也提供了向 BatchOperator/StreamOperator 转换的方法。 具体说来,BatchOperator/StreamOperator 中都具有静态方法 fromDataframe(df, schemaStr),其中 df 是 DataFrame 数据, schemaStr 是由数据列名和类型构成的 schema 字符串。

schema = "f_string string,f_long long,f_int int,f_double double,f_boolean boolean"
op = BatchOperator.fromDataframe(df, schema)
op.print()

op = StreamOperator.fromDataframe(df, schema)
op.print(key="op")
StreamOperator.execute()

同时,PyAlink 也提供了静态方法来进行转换:dataframeToOperator(df, schemaStr, opType),这里 dfschemaStr 参数与上文相同,opType 取值为 "batch""stream"

使用注意

  • 从 BatchOperator 向 Dataframe 转换时:如果 BatchOperator 所存储的数据中有空值 null,那么转换后的 Dataframe 将无法保证完全转换为 BatchOperator 的列类型。此时,对应的列需要手动进行处理。
  • 从 Dataframe 向 BatchOperator 转换时:如果 Dataframe 中有空值时,需要预先替换为 None,这样才能正确地进行之后的处理。
  • 当数据量大时,由于数据转换需要进行大量的 Python 与 Java 数据之间的交换,效率将会受到影响。
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/mirrors/Alink.git
git@gitee.com:mirrors/Alink.git
mirrors
Alink
Alink
master

搜索帮助

Cb406eda 1850385 E526c682 1850385