# aliyun-emapreduce-datasources
**Repository Path**: aliyun/aliyun-emapreduce-datasources
## Basic Information
- **Project Name**: aliyun-emapreduce-datasources
- **Description**: Extended datasource support for Spark/Hadoop on Aliyun E-MapReduce.
- **Primary Language**: Unknown
- **License**: Artistic-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-05-08
- **Last Updated**: 2025-05-30
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
[](https://travis-ci.org/aliyun/aliyun-emapreduce-sdk)
# Spark on Aliyun
## Requirements
- Spark1.3+
## Introduction
- This SDK supports interaction with Aliyun's base service, e.g. OSS, ODPS, LogService and ONS, in Spark runtime environment.
## Build and Install
```
git clone https://github.com/aliyun/aliyun-spark-sdk.git
cd aliyun-spark-sdk
mvn clean package -DskipTests
```
#### Use SDK in Eclipse project directly
- copy sdk jar to your project
- right click Eclipse project -> Properties -> Java Build Path -> Add JARs
- choose and import the sdk
- you can use the sdk in your Eclipse project
#### Maven
```
com.aliyun.emr
emr-maxcompute_2.10
1.4.2-SNAPSHOT
com.aliyun.emr
emr-logservice_2.10
1.4.2-SNAPSHOT
com.aliyun.emr
emr-tablestore
1.4.2-SNAPSHOT
com.aliyun.emr
emr-ons_2.10
1.4.2-SNAPSHOT
com.aliyun.emr
emr-mns_2.10
1.4.2-SNAPSHOT
com.aliyun.emr
emr-core
1.4.2-SNAPSHOT
```
## OSS Support
In this section, we will demonstrate how to manipulate the Aliyun OSS data in Spark.
### OSS Extension - Native OSS FileSystem
A native way to read and write regular files on Aliyun OSS. The advantage of this way is you can access files on OSS that came from other Aliyun base service or other tools. But file in Aliyun OSS has 48.8TB limit.
### OSS URI
- **oss**://[accesskeyId:accessKeySecret@]bucket[.endpoint]/object/path
We can set OSS "AccessKeyId/AccessKeySecret" and "endpoint" in OSS URI.
### OSS usage
Now, we provide a transparent way to support Aliyun OSS, with no code changes and just few configurations. All you need to do is just to provide two configuations in your project:
```
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
```
Then, you can load OSS data through `SparkContext.textFile(...)`, like:
```
val conf = new SparkConf()
conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
val sc = new SparkContext(conf)
val path = "oss://accesskeyId:accessKeySecret@bucket.endpoint/input"
val rdd = sc.textFile(path)
```
Similarly, you can upload data through `RDD.saveAsTextFile(...)`, like:
```
val data = sc.parallelize(1 to 10)
data.saveAsTextFile("oss://accesskeyId:accessKeySecret@bucket.endpoint/output")
```
## ODPS Support
In this section, we will demonstrate how to manipulate the Aliyun ODPS data in Spark.
### Step-1. Initialize and OdpsOps
Before read/write ODPS data, we need to initialize an OdpsOps, like:
```
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkContext, SparkConf}
object Sample {
def main(args: Array[String]): Unit = {
// == Step-1 ==
val accessKeyId = ""
val accessKeySecret = ""
// intranet endpoints for example
val urls = Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com")
val conf = new SparkConf().setAppName("Spark Odps Sample")
val sc = new SparkContext(conf)
val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, urls(0), urls(1))
// == Step-2 ==
...
// == Step-3 ==
...
}
// == Step-2 ==
// function definition
// == Step-3 ==
// function definition
}
```
In above codes, the variables accessKeyId and accessKeySecret are assigned to users by system; they are named as ID pair, and used for user identification and signature authentication for OSS access. See [Aliyun AccessKeys](https://ak-console.aliyun.com/#/accesskey) for more information.
### Step-2. Load ODPS Data into Spark
```
// == Step-2 ==
val project =
val table =
val numPartitions = 2
val inputData = odpsOps.readTable(project, table, read, numPartitions)
inputData.top(10).foreach(println)
// == Step-3 ==
...
```
In above codes, we need to define a `read` function to preprocess ODPS data:
```
def read(record: Record, schema: TableSchema): String = {
record.getString(0)
}
```
It means to load ODPS table's first column into Spark.
### Step-3. Save results into Aliyun ODPS.
```
val resultData = inputData.map(e => s"$e has been processed.")
odpsOps.saveToTable(project, table, resultData, write)
```
In above codes, we need to define a `write` function to preprocess reslult data before write odps table:
```
def write(s: String, emptyReord: Record, schema: TableSchema): Unit = {
val r = emptyReord
r.set(0, s)
}
```
It means to write each line of result RDD into the first column of ODPS table.
## ONS Support
In this section, we will demonstrate how to comsume ONS message in Spark.
```
// cId: Aliyun ONS ConsumerID
// topic: Message Topic
// subExpression: Message Tag
val Array(cId, topic, subExpression, parallelism, interval) = args
val accessKeyId = "accessKeyId"
val accessKeySecret = "accessKeySecret"
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(interval.toInt)
val conf = new SparkConf().setAppName("Spark ONS Sample")
val ssc = new StreamingContext(conf, batchInterval)
// define `func` to preprocess each message
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
```
## LogService Support
In this section, we will demonstrate how to comsume Loghub data in Spark Streaming.
```
if (args.length < 8) {
System.err.println(
"""Usage: TestLoghub
|
""".stripMargin)
System.exit(1)
}
val logserviceProject = args(0) // The project name in your LogService.
val logStoreName = args(1) // The name of of logstream.
val loghubGroupName = args(2) // Processes with the same loghubGroupName will consume data of logstream together.
val loghubEndpoint = args(3) // API endpoint of LogService
val accessKeyId = args(4) // AccessKeyId
val accessKeySecret = args(5) // AccessKeySecret
val numReceivers = args(6).toInt
val batchInterval = Milliseconds(args(7).toInt * 1000)
val conf = new SparkConf().setAppName("Test Loghub")
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStream,
loghubGroupName,
endpoint,
numReceivers,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.foreachRDD(rdd => println(rdd.count()))
ssc.start()
ssc.awaitTermination()
```
## TableStore support
* [HadoopMR on TableStore](docs/HadoopMR-on-TableStore.md)
* [Spark on TableStore](docs/Spark-on-TableStore.md)
* [Hive/SparkSQL on TableStore](docs/Hive-SparkSQL-on-TableStore.md)
## Future Work
- Support more Aliyun base service
- Support more friendly code migration.
## License
Licensed under the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0.html)