# java-pubsublite-spark
**Repository Path**: mirrors_googleapis/java-pubsublite-spark
## Basic Information
- **Project Name**: java-pubsublite-spark
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-01-15
- **Last Updated**: 2026-02-14
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Google Pub/Sub Lite Spark Connector Client for Java
Java idiomatic client for [Pub/Sub Lite Spark Connector][product-docs].
[![Maven][maven-version-image]][maven-version-link]
![Stability][stability-image]
- [Product Documentation][product-docs]
- [Client Library Documentation][javadocs]
## Quickstart
If you are using Maven, add this to your pom.xml file:
```xml
com.google.cloud
pubsublite-spark-sql-streaming
1.0.0
```
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
implementation 'com.google.cloud:pubsublite-spark-sql-streaming:1.0.0'
```
If you are using SBT, add this to your dependencies:
```Scala
libraryDependencies += "com.google.cloud" % "pubsublite-spark-sql-streaming" % "1.0.0"
```
## Authentication
See the [Authentication][authentication] section in the base directory's README.
## Authorization
The client application making API calls must be granted [authorization scopes][auth-scopes] required for the desired Pub/Sub Lite Spark Connector APIs, and the authenticated principal must have the [IAM role(s)][predefined-iam-roles] required to access GCP resources using the Pub/Sub Lite Spark Connector API calls.
## Getting Started
### Prerequisites
You will need a [Google Cloud Platform Console][developer-console] project with the Pub/Sub Lite Spark Connector [API enabled][enable-api].
You will need to [enable billing][enable-billing] to use Google Pub/Sub Lite Spark Connector.
[Follow these instructions][create-project] to get your project set up. You will also need to set up the local development environment by
[installing the Google Cloud SDK][cloud-sdk] and running the following commands in command line:
`gcloud auth login` and `gcloud config set project [YOUR PROJECT ID]`.
### Installation and setup
You'll need to obtain the `pubsublite-spark-sql-streaming` library. See the [Quickstart](#quickstart) section
to add `pubsublite-spark-sql-streaming` as a dependency in your code.
## About Pub/Sub Lite Spark Connector
[Google Cloud Pub/Sub Lite][product-docs] is a zonal, real-time messaging
service that lets you send and receive messages between independent
applications. You can manually configure the throughput and storage capacity
for Pub/Sub Lite systems.
The Pub/Sub Lite Spark connector supports Pub/Sub Lite as an input source to
Apache Spark Structured Streaming in both the default micro-batch processing
mode and the _experimental_ continous processing mode. The connector works in
all Apache Spark distributions, including [Google Cloud Dataproc](https://cloud.google.com/dataproc/docs/)
and manual Spark installations.
## Requirements
### Creating a new subscription or using an existing subscription
Follow [the instruction](https://cloud.google.com/pubsub/lite/docs/quickstart#create_a_lite_subscription) to create a new subscription or use an existing subscription. If using an existing subscription, the connector will read from the oldest unacknowledged message in the subscription.
### Creating a Google Cloud Dataproc cluster (Optional)
If you do not have an Apache Spark environment, you can create a [Cloud Dataproc](https://cloud.google.com/dataproc/docs) cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use `spark-submit` on any cluster.
```
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
```
## Downloading and Using the Connector
The latest version of the connector is publicly available from the [Maven Central repository](https://search.maven.org/artifact/com.google.cloud/pubsublite-spark-sql-streaming). You can download and pass it in the `--jars` option when using the `spark-submit` command.
## Compatibility
| Connector version | Spark version |
| --- | --- |
| ≤0.3.4 | 2.4.X |
| Current | 3.X.X |
## Usage
### Samples
There are 3 java samples (word count, simple write, simple read) under [samples](https://github.com/googleapis/java-pubsublite-spark/tree/master/samples) that shows using the connector inside Dataproc.
### Reading data from Pub/Sub Lite
Here is an example in Python:
```python
df = spark.readStream \
.format("pubsublite") \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
.load
```
Here is an example in Java:
```java
Dataset df = spark
.readStream()
.format("pubsublite")
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
.load();
```
Note that the connector supports both MicroBatch Processing and [Continuous Processing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing).
### Writing data to Pub/Sub Lite
Here is an example in Python:
```python
df.writeStream \
.format("pubsublite") \
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode("complete") \
.trigger(processingTime="2 seconds") \
.start()
```
Here is an example in Java:
```java
df.writeStream()
.format("pubsublite")
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
.start();
```
### Properties
When reading from Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.subscription | String | Y | | Full subscription path that the connector will read from. |
| pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | 50_000_000 | Max number of bytes per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Long.MAX | Max number of messages per partition that will be cached in workers before Spark processes the messages. |
| pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
When writing to Pub/Sub Lite, the connector supports a number of configuration options:
| Option | Type | Required | Default Value | Meaning |
| ------ | ---- | -------- | ------------- | ------- |
| pubsublite.topic | String | Y | | Full topic path that the connector will write to. |
| gcp.credentials.key | String | N | [Application Default Credentials](https://cloud.google.com/docs/authentication/production#automatically) | Service account JSON in base64. |
### Data Schema
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:
| Data Field | Spark Data Type | Notes |
| ---------- | --------------- | ----- |
| subscription | StringType | Full subscription path |
| partition | LongType | |
| offset | LongType | |
| key | BinaryType | |
| data | BinaryType | |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | |
| publish_timestamp | TimestampType | |
| event_timestamp | TimestampType | Nullable |
When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:
| Data Field | Spark Data Type | Required |
| ---------- | --------------- | ----- |
| key | BinaryType | N |
| data | BinaryType | N |
| attributes | MapType\[StringType, ArrayType\[BinaryType\]\] | N |
| event_timestamp | TimestampType | N |
Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.
## Building the Connector
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
```sh
mvn package
```
## FAQ
### What is the cost for the Pub/Sub Lite?
See the [Pub/Sub Lite pricing documentation](https://cloud.google.com/pubsub/lite/pricing).
### Can I configure the number of Spark partitions?
No, the number of Spark partitions is set to be the number of Pub/Sub Lite partitions of the topic that the subscription is attached to.
### How do I authenticate outside Cloud Compute Engine / Cloud Dataproc?
Use a service account JSON key and `GOOGLE_APPLICATION_CREDENTIALS` as described [here](https://cloud.google.com/docs/authentication/getting-started).
Credentials can be provided with `gcp.credentials.key` option, it needs to be passed in as a base64-encoded string.
Example:
```java
spark.readStream.format("pubsublite").option("gcp.credentials.key", "")
```
## Samples
Samples are in the [`samples/`](https://github.com/googleapis/java-pubsublite-spark/tree/main/samples) directory.
| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
| Admin Utils | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/AdminUtils.java) |
| Common Utils | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/CommonUtils.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/CommonUtils.java) |
| Publish Words | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/PublishWords.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/PublishWords.java) |
| Read Results | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/ReadResults.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/ReadResults.java) |
| Simple Read | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/SimpleRead.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/SimpleRead.java) |
| Simple Write | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/SimpleWrite.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/SimpleWrite.java) |
| Word Count | [source code](https://github.com/googleapis/java-pubsublite-spark/blob/main/samples/snippets/src/main/java/pubsublite/spark/WordCount.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-spark&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/spark/WordCount.java) |
## Troubleshooting
To get help, follow the instructions in the [shared Troubleshooting document][troubleshooting].
## Transport
Pub/Sub Lite Spark Connector uses gRPC for the transport layer.
## Supported Java Versions
Java 8 or above is required for using this client.
Google's Java client libraries,
[Google Cloud Client Libraries][cloudlibs]
and
[Google Cloud API Libraries][apilibs],
follow the
[Oracle Java SE support roadmap][oracle]
(see the Oracle Java SE Product Releases section).
### For new development
In general, new feature development occurs with support for the lowest Java
LTS version covered by Oracle's Premier Support (which typically lasts 5 years
from initial General Availability). If the minimum required JVM for a given
library is changed, it is accompanied by a [semver][semver] major release.
Java 11 and (in September 2021) Java 17 are the best choices for new
development.
### Keeping production systems current
Google tests its client libraries with all current LTS versions covered by
Oracle's Extended Support (which typically lasts 8 years from initial
General Availability).
#### Legacy support
Google's client libraries support legacy versions of Java runtimes with long
term stable libraries that don't receive feature updates on a best efforts basis
as it may not be possible to backport all patches.
Google provides updates on a best efforts basis to apps that continue to use
Java 7, though apps might need to upgrade to current versions of the library
that supports their JVM.
#### Where to find specific information
The latest versions and the supported Java versions are identified on
the individual GitHub repository `github.com/GoogleAPIs/java-SERVICENAME`
and on [google-cloud-java][g-c-j].
## Versioning
This library follows [Semantic Versioning](http://semver.org/).
## Contributing
Contributions to this library are always welcome and highly encouraged.
See [CONTRIBUTING][contributing] for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in
this project you agree to abide by its terms. See [Code of Conduct][code-of-conduct] for more
information.
## License
Apache 2.0 - See [LICENSE][license] for more information.
## CI Status
Java Version | Status
------------ | ------
Java 8 | [![Kokoro CI][kokoro-badge-image-2]][kokoro-badge-link-2]
Java 8 OSX | [![Kokoro CI][kokoro-badge-image-3]][kokoro-badge-link-3]
Java 8 Windows | [![Kokoro CI][kokoro-badge-image-4]][kokoro-badge-link-4]
Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5]
Java is a registered trademark of Oracle and/or its affiliates.
[product-docs]: https://cloud.google.com/pubsub/lite/docs
[javadocs]: https://cloud.google.com/java/docs/reference/pubsublite-spark-sql-streaming/latest/history
[kokoro-badge-image-1]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java7.svg
[kokoro-badge-link-1]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java7.html
[kokoro-badge-image-2]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8.svg
[kokoro-badge-link-2]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8.html
[kokoro-badge-image-3]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8-osx.svg
[kokoro-badge-link-3]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8-osx.html
[kokoro-badge-image-4]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8-win.svg
[kokoro-badge-link-4]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java8-win.html
[kokoro-badge-image-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java11.svg
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-pubsublite-spark/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/pubsublite-spark-sql-streaming.svg
[maven-version-link]: https://search.maven.org/search?q=g:com.google.cloud%20AND%20a:pubsublite-spark-sql-streaming&core=gav
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
[iam-policy]: https://cloud.google.com/iam/docs/overview#cloud-iam-policy
[developer-console]: https://console.developers.google.com/
[create-project]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
[cloud-sdk]: https://cloud.google.com/sdk/
[troubleshooting]: https://github.com/googleapis/google-cloud-common/blob/main/troubleshooting/readme.md#troubleshooting
[contributing]: https://github.com/googleapis/java-pubsublite-spark/blob/main/CONTRIBUTING.md
[code-of-conduct]: https://github.com/googleapis/java-pubsublite-spark/blob/main/CODE_OF_CONDUCT.md#contributor-code-of-conduct
[license]: https://github.com/googleapis/java-pubsublite-spark/blob/main/LICENSE
[enable-billing]: https://cloud.google.com/apis/docs/getting-started#enabling_billing
[enable-api]: https://console.cloud.google.com/flows/enableapi?apiid=pubsublite.googleapis.com
[libraries-bom]: https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google-Cloud-Platform-Libraries-BOM
[shell_img]: https://gstatic.com/cloudssh/images/open-btn.png
[semver]: https://semver.org/
[cloudlibs]: https://cloud.google.com/apis/docs/client-libraries-explained
[apilibs]: https://cloud.google.com/apis/docs/client-libraries-explained#google_api_client_libraries
[oracle]: https://www.oracle.com/java/technologies/java-se-support-roadmap.html
[g-c-j]: http://github.com/googleapis/google-cloud-java