3 Star 0 Fork 0

Gitee 极速下载/amazon-kinesis-client-python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/awslabs/amazon-kinesis-client-python
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

Amazon Kinesis Client Library for Python

Version UnitTestCoverage

This package provides an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon, which is part of the Amazon KCL for Java. Developers can use the Amazon KCL to build distributed applications that process streaming data reliably at scale. The Amazon KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume. This interface manages the interaction with the MultiLangDaemon so that developers can focus on implementing their record processor executable. A record processor executable typically looks something like:

    #!env python
    from amazon_kclpy import kcl
    import json, base64

    class RecordProcessor(kcl.RecordProcessorBase):

        def initialize(self, initialiation_input):
            pass

        def process_records(self, process_records_input):
            pass

        def lease_lost(self, lease_lost_input):
            pass

        def shard_ended(self, shard_ended_input):
            pass

        def shutdown_requested(self, shutdown_requested_input):
            pass

    if __name__ == "__main__":
        kclprocess = kcl.KCLProcess(RecordProcessor())
        kclprocess.run()

Before You Get Started

Before running the samples, you'll want to make sure that your environment is configured to allow the samples to use your AWS Security Credentials.

By default the samples use the DefaultCredentialsProvider so you'll want to make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this such as providing a ~/.aws/credentials file, or if you're running on EC2, you can associate an IAM role with your instance with appropriate access.

For questions regarding Amazon Kinesis Service and the client libraries please visit the Amazon Kinesis Forums

Running the Sample

Using the amazon_kclpy package requires the MultiLangDaemon which is provided by the Amazon KCL for Java. These jars will be downloaded automatically by the install command, but you can explicitly download them with the download_jars command. From the root of this repo, run:

python setup.py download_jars
python setup.py install

If you'd like to override the default search location for the jars, you can set the KCL_MVN_REPO_SEARCH_URL environment variable to the location of the maven repository you'd like to use.

export KCL_MVN_REPO_SEARCH_URL=https://path/to/maven/repo

Now the amazon_kclpy and boto (used by the sample putter script) and required jars should be installed in your environment. To start the sample putter, run:

sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

This will create an Amazon Kinesis stream called words and put the words specified by the -w options into the stream once each. Use -p SECONDS to indicate a period over which to repeatedly put these words.

Now we would like to run an Amazon KCL for Python application that reads records from the stream we just created, but first take a look in the samples directory, you'll find a file called sample.properties, cat that file:

cat samples/sample.properties

You'll see several properties defined there. executableName indicates the executable for the MultiLangDaemon to run, streamName is the Kinesis stream to read from, appName is the Amazon KCL application name to use which will be the name of an Amazon DynamoDB table that gets created by the Amazon KCL, initialPositionInStream tells the Amazon KCL how to start reading from shards upon a fresh startup. To run the sample application you can use a helper script included in this package. Note you must provide a path to java (version 1.7 or greater) to run the Amazon KCL.

amazon_kclpy_helper.py --print_command \
    --java <path-to-java> --properties samples/sample.properties

This will print the command needed to run the sample which you can copy paste, or surround the command with back ticks to run it.

`amazon_kclpy_helper.py --print_command \
    --java <path-to-java> --properties samples/sample.properties`

Alternatively, if you don't have the source on hand, but want to run the sample app you can use the --sample argument to indicate you'd like to get the sample.properties file from the installation location.

amazon_kclpy_helper.py --print_command --java <path-to-java> --sample

Running on EC2

Running on EC2 is simple. Assuming you are already logged into an EC2 instance running Amazon Linux, the following steps will prepare your environment for running the sample app. Note the version of java that ships with Amazon Linux can be found at /usr/bin/java and should be 1.7 or greater.

sudo yum install python-pip

sudo pip install virtualenv

virtualenv /tmp/kclpy-sample-env

source /tmp/kclpy-sample-env/bin/activate

pip install amazon_kclpy

Under the Hood - What You Should Know about Amazon KCL's MultiLangDaemon

Amazon KCL for Python uses Amazon KCL for Java internally. We have implemented a Java-based daemon, called the MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn the user-defined record processor script/program as a sub-process. The MultiLangDaemon communicates with this sub-process over standard input/output using a simple protocol, and therefore the record processor script/program can be written in any language.

At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an Amazon Kinesis Shard. The MultiLangDaemon will make sure of that, without any need for the developer to intervene.

In this release, we have abstracted these implementation details away and exposed an interface that enables you to focus on writing record processing logic in Python. This approach enables Amazon KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.

See Also

Release Notes

Release 3.0.5 (June 6, 2025)

  • #274 Add ability to set mvn repo search URL with a unique ENV Variable
  • #279 Create sample KCL build tests
  • #280 Add dependabot auto-merge and workflow dispatch
  • #283 Update AWS credentials used for running sample workflow
  • #287 Upgrade org.checkerframework:checker-qual from 2.5.2 to 3.49.4
  • #285 Upgrade netty.version from 4.1.118.Final to 4.2.1.Final
  • #284 Upgrade commons-beanutils:commons-beanutils from 1.9.4 to 1.11.0

Release 3.0.3 (March 25, 2025)

  • Downgrade logback from 1.5.16 to 1.3.15 to maintain JDK 8 compatability

Release 3.0.2 (March 24, 2025)

[BREAKING CHANGES] - Release 3.0.2 contains a dependency version that is not compatible with JDK 8. Please upgrade to a later version if your KCL application requires JDK 8.

  • KCL 3.0.2 Changelog Upgrade KCL and KCL-Multilang dependencies from 3.0.0 to 3.0.2
  • #266 Upgrade netty.version from 4.1.108.Final to 4.1.118.Final
  • #265 Upgrade logback.version from 1.3.14 to 1.5.16

Release 3.0.1 (November 6, 2024)

  • New lease assignment / load balancing algorithm
    • KCL 3.x introduces a new lease assignment and load balancing algorithm. It assigns leases among workers based on worker utilization metrics and throughput on each lease, replacing the previous lease count-based lease assignment algorithm.
    • When KCL detects higher variance in CPU utilization among workers, it proactively reassigns leases from over-utilized workers to under-utilized workers for even load balancing. This ensures even CPU utilization across workers and removes the need to over-provision the stream processing compute hosts.
  • Optimized DynamoDB RCU usage
    • KCL 3.x optimizes DynamoDB read capacity unit (RCU) usage on the lease table by implementing a global secondary index with leaseOwner as the partition key. This index mirrors the leaseKey attribute from the base lease table, allowing workers to efficiently discover their assigned leases by querying the index instead of scanning the entire table.
    • This approach significantly reduces read operations compared to earlier KCL versions, where workers performed full table scans, resulting in higher RCU consumption.
  • Graceful lease handoff
    • KCL 3.x introduces a feature called "graceful lease handoff" to minimize data reprocessing during lease reassignments. Graceful lease handoff allows the current worker to complete checkpointing of processed records before transferring the lease to another worker. For graceful lease handoff, you should implement checkpointing logic within the existing shutdownRequested() method.
    • This feature is enabled by default in KCL 3.x, but you can turn off this feature by adjusting the configuration property isGracefulLeaseHandoffEnabled.
    • While this approach significantly reduces the probability of data reprocessing during lease transfers, it doesn't completely eliminate the possibility. To maintain data integrity and consistency, it's crucial to design your downstream consumer applications to be idempotent. This ensures that the application can handle potential duplicate record processing without adverse effects.
  • New DynamoDB metadata management artifacts
    • KCL 3.x introduces two new DynamoDB tables for improved lease management:
      • Worker metrics table: Records CPU utilization metrics from each worker. KCL uses these metrics for optimal lease assignments, balancing resource utilization across workers. If CPU utilization metric is not available, KCL assigns leases to balance the total sum of shard throughput per worker instead.
      • Coordinator state table: Stores internal state information for workers. Used to coordinate in-place migration from KCL 2.x to KCL 3.x and leader election among workers.
    • Follow this documentation to add required IAM permissions for your KCL application.
  • Other improvements and changes
    • Dependency on the AWS SDK for Java 1.x has been fully removed.
      • The Glue Schema Registry integration functionality no longer depends on AWS SDK for Java 1.x. Previously, it required this as a transient dependency.
      • Multilangdaemon has been upgraded to use AWS SDK for Java 2.x. It no longer depends on AWS SDK for Java 1.x.
    • idleTimeBetweenReadsInMillis (PollingConfig) now has a minimum default value of 200.
      • This polling configuration property determines the publishers wait time between GetRecords calls in both success and failure cases. Previously, setting this value below 200 caused unnecessary throttling. This is because Amazon Kinesis Data Streams supports up to five read transactions per second per shard for shared-throughput consumers.
    • Shard lifecycle management is improved to deal with edge cases around shard splits and merges to ensure records continue being processed as expected.
  • Migration
    • The programming interfaces of KCL 3.x remain identical with KCL 2.x for an easier migration. For detailed migration instructions, please refer to the Migrate consumers from KCL 2.x to KCL 3.x page in the Amazon Kinesis Data Streams developer guide.
  • Configuration properties
    • New configuration properties introduced in KCL 3.x are listed in this doc.
    • Deprecated configuration properties in KCL 3.x are listed in this doc. You need to keep the deprecated configuration properties during the migration from any previous KCL version to KCL 3.x.
  • Metrics
    • New CloudWatch metrics introduced in KCL 3.x are explained in the Monitor the Kinesis Client Library with Amazon CloudWatch in the Amazon Kinesis Data Streams developer guide. The following operations are newly added in KCL 3.x:
      • LeaseAssignmentManager
      • WorkerMetricStatsReporter
      • LeaseDiscovery

Release 3.0.0 (November 6, 2024)

We found an issue with the release 3.0.0 regarding the build failure. Please use the release 3.0.1 to use KCL 3.0.


For 2.x and 1.x release notes, please see v2.x/README.md

License

This library is licensed under the Apache 2.0 License.

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.

简介

amazon-kinesis-client-python 提供了一个到 Amazon Kinesis Client Library(KCL)接口,它是 Amazon KCL for 展开 收起
README
Apache-2.0
取消

发行版

暂无发行版

语言

近期动态

不能加载更多了
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/mirrors/amazon-kinesis-client-python.git
git@gitee.com:mirrors/amazon-kinesis-client-python.git
mirrors
amazon-kinesis-client-python
amazon-kinesis-client-python
master

搜索帮助