1 Star 1 Fork 0

SummerGao/iotdb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
.github
.mvn/wrapper
antlr
cli
client-cpp
client-py
cluster
code-coverage
compile-tools
cross-tests
distribution
docker
docs
example
flink-iotdb-connector
flink-tsfile-connector
grafana
hadoop
hive-connector
jdbc
licenses
server
service-rpc
session
site
spark-iotdb-connector
spark-tsfile
src
README.md
pom.xml
test/e2e
thrift
tools
tsfile
zeppelin-interpreter
.asf.yaml
.checkstyle
.dockerignore
.gitignore
Jenkinsfile
LICENSE
LICENSE-binary
NOTICE
NOTICE-binary
README.md
README_ZH.md
RELEASE_NOTES.md
asf.header
checkstyle.xml
codecov.yml
java-google-style.xml
jenkins.pom
mvnw.cmd
mvnw.sh
pom.xml
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

TsFile-Spark-Connector User Guide

1. About TsFile-Spark-Connector

TsFile-Spark-Connector implements the support of Spark for external data sources of Tsfile type. This enables users to read, write and query Tsfile by Spark.

With this connector, you can

  • load a single TsFile, from either the local file system or hdfs, into Spark
  • load all files in a specific directory, from either the local file system or hdfs, into Spark
  • write data from Spark into TsFile

2. System Requirements

Spark Version Scala Version Java Version TsFile
>= 2.2 2.11 1.8 0.10.0

Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.

3. Quick Start

Local Mode

Start Spark with TsFile-Spark-Connector in local mode:

./<spark-shell-path>  --jars  tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar

Note:

Distributed Mode

Start Spark with TsFile-Spark-Connector in distributed mode (That is, the spark cluster is connected by spark-shell):

. /<spark-shell-path>   --jars  tsfile-spark-connector.jar,tsfile-0.10.0-jar-with-dependencies.jar  --master spark://ip:7077

Note:

4. Data Type Correspondence

TsFile data type SparkSQL data type
BOOLEAN BooleanType
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
TEXT StringType

5. Schema Inference

The way to display TsFile is dependent on the schema. Take the following TsFile structure as an example: There are three Measurements in the TsFile schema: status, temperature, and hardware. The basic information of these three measurements is as follows:

name type encode
status Boolean PLAIN
temperature Float RLE
hardware Text PLAIN

The existing data in the TsFile is as follows:

root.ln.wf01.wt01 root.ln.wf02.wt02
status temperature hardware status
time value time value time value
1 True 1 2.2 2 "aaa" 1 True
3 True 2 2.2 4 "bbb" 2 False
5 False 3 2.1 6 "ccc" 4 True

The corresponding SparkSQL table is as follows:

time root.ln.wf02.wt02.temperature root.ln.wf02.wt02.status root.ln.wf02.wt02.hardware root.ln.wf01.wt01.temperature root.ln.wf01.wt01.status root.ln.wf01.wt01.hardware
1 null true null 2.2 true null
2 null false aaa 2.2 null null
3 null null null 2.1 true null
4 null true bbb null null null
5 null null null null false null
6 null null ccc null null null

You can also use narrow table form which as follows: (You can see part 6 about how to use narrow form)

time device_name status hardware temperature
1 root.ln.wf02.wt01 true null 2.2
1 root.ln.wf02.wt02 true null null
2 root.ln.wf02.wt01 null null 2.2
2 root.ln.wf02.wt02 false aaa null
3 root.ln.wf02.wt01 true null 2.1
4 root.ln.wf02.wt02 true bbb null
5 root.ln.wf02.wt01 false null null
6 root.ln.wf02.wt02 null ccc null

6. Scala API

NOTE: Remember to assign necessary read and write permissions in advance.

Example 1: read from the local file system

import org.apache.iotdb.tsfile._
val wide_df = spark.read.tsfile("test.tsfile")  
wide_df.show

val narrow_df = spark.read.tsfile("test.tsfile", true)  
narrow_df.show

Example 2: read from the hadoop file system

import org.apache.iotdb.tsfile._
val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
wide_df.show

val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)  
narrow_df.show

Example 3: read from a specific directory

import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop") 
df.show

Note 1: Global time ordering of all TsFiles in a directory is not supported now.

Note 2: Measurements of the same name should have the same schema.

Example 4: query in wide form

import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
newDf.show
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show

Example 5: query in narrow form

import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
newDf.show
import org.apache.iotdb.tsfile._
val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.createOrReplaceTempView("tsfile_table")
val newDf = spark.sql("select count(*) from tsfile_table")
newDf.show

Example 6: write in wide form

// we only support wide_form table to write
import org.apache.iotdb.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile") 
df.show
df.write.tsfile("hdfs://localhost:9000/output")

val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
newDf.show

Example 6: write in narrow form

// we only support wide_form table to write
import org.apache.iotdb.tsfile._

val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true) 
df.show
df.write.tsfile("hdfs://localhost:9000/output", true)

val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
newDf.show

Appendix A: Old Design of Schema Inference

The way to display TsFile is related to TsFile Schema. Take the following TsFile structure as an example: There are three Measurements in the Schema of TsFile: status, temperature, and hardware. The basic info of these three Measurements is as follows:

NameTypeEncode
statusBooleanPLAIN
temperatureFloatRLE
hardwareTextPLAIN
Basic info of Measurements

The existing data in the file is as follows:

delta\_object:root.ln.wf01.wt01delta\_object:root.ln.wf02.wt02delta\_object:root.sgcc.wf03.wt01
statustemperaturehardwarestatusstatustemperature
timevaluetimevaluetimevaluetimevaluetimevaluetimevalue
1True12.22"aaa"1True2True33.3
3True22.24"bbb"2False3True66.6
5 False 32.16"ccc"4True4True88.8
7 True 42.08"ddd"5False6True99.9
A set of time-series data

There are two ways to show it out:

the default way

Two columns will be created to store the full path of the device: time(LongType) and delta_object(StringType).

  • time : Timestamp, LongType
  • delta_object : Delta_object ID, StringType

Next, a column is created for each Measurement to store the specific data. The SparkSQL table structure is as follows:

time(LongType) delta\_object(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1 root.ln.wf01.wt01 True2.2null
1 root.ln.wf02.wt02 Truenullnull
2 root.ln.wf01.wt01 null2.2null
2 root.ln.wf02.wt02 Falsenull"aaa"
2 root.sgcc.wf03.wt01 Truenullnull
3 root.ln.wf01.wt01 True2.1null
3 root.sgcc.wf03.wt01 True3.3null
4 root.ln.wf01.wt01 null2.0null
4 root.ln.wf02.wt02 Truenull"bbb"
4 root.sgcc.wf03.wt01 Truenullnull
5 root.ln.wf01.wt01 Falsenullnull
5 root.ln.wf02.wt02 Falsenullnull
5 root.sgcc.wf03.wt01 Truenullnull
6 root.ln.wf02.wt02 nullnull"ccc"
6 root.sgcc.wf03.wt01 null6.6null
7 root.ln.wf01.wt01 Truenullnull
8 root.ln.wf02.wt02 nullnull"ddd"
8 root.sgcc.wf03.wt01 null8.8null
9 root.sgcc.wf03.wt01 null9.9null

unfolding delta_object column

Expand the device column by "." into multiple columns, ignoring the root directory "root". Convenient for richer aggregation operations. If the user wants to use this display way, the parameter "delta_object_name" needs to be set in the table creation statement (refer to Example 5 in Section 5.1 of this manual), as in this example, parameter "delta_object_name" is set to "root.device.turbine". The number of path layers needs to be one-to-one. At this point, one column is created for each layer of the device path except the "root" layer. The column name is the name in the parameter and the value is the name of the corresponding layer of the device. Next, one column will be created for each Measurement to store the specific data.

Then The SparkSQL Table Structure is as follow:

time(LongType) group(StringType) field(StringType) device(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1 ln wf01 wt01 True2.2null
1 ln wf02 wt02 Truenullnull
2 ln wf01 wt01 null2.2null
2 ln wf02 wt02 Falsenull"aaa"
2 sgcc wf03 wt01 Truenullnull
3 ln wf01 wt01 True2.1null
3 sgcc wf03 wt01 True3.3null
4 ln wf01 wt01 null2.0null
4 ln wf02 wt02 Truenull"bbb"
4 sgcc wf03 wt01 Truenullnull
5 ln wf01 wt01 Falsenullnull
5 ln wf02 wt02 Falsenullnull
5 sgcc wf03 wt01 Truenullnull
6 ln wf02 wt02 nullnull"ccc"
6 sgcc wf03 wt01 null6.6null
7 ln wf01 wt01 Truenullnull
8 ln wf02 wt02 nullnull"ddd"
8 sgcc wf03 wt01 null8.8null
9 sgcc wf03 wt01 null9.9null

TsFile-Spark-Connector can display one or more TsFiles as a table in SparkSQL By SparkSQL. It also allows users to specify a single directory or use wildcards to match multiple directories. If there are multiple TsFiles, the union of the measurements in all TsFiles will be retained in the table, and the measurement with the same name will have the same data type by default. Note that if there is a situation with the same name but different data types, TsFile-Spark-Connector will not guarantee the correctness of the results.

The writing process is to write a DataFrame as one or more TsFiles. By default, two columns need to be included: time and delta_object. The rest of the columns are used as Measurement. If user wants to write the second table structure back to TsFile, user can set the "delta_object_name" parameter(refer to Section 5.1 of Section 5.1 of this manual).

Appendix B: Old Note

NOTE: Check the jar packages in the root directory of your Spark and replace libthrift-0.9.2.jar and libfb303-0.9.2.jar with libthrift-0.9.1.jar and libfb303-0.9.1.jar respectively.

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/summergaolib/iotdb.git
git@gitee.com:summergaolib/iotdb.git
summergaolib
iotdb
iotdb
master

搜索帮助