# springbatch **Repository Path**: beyond-prototype/springbatch ## Basic Information - **Project Name**: springbatch - **Description**: springbatch with mongodb - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2023-10-21 - **Last Updated**: 2023-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 1. MongoDB ``` springbatch % docker pull mongo springbatch % mkdir ~/docker/mongo springbatch % docker run -itd --name mongo -v ~/docker/mongo:/data/db -p 27017:27017 mongo --auth springbatch % docker stop mongo ``` ### Login MongoDB ``` springbatch % docker start mongo springbatch % docker exec -it mongo mongosh admin Current Mongosh Log ID: 6533b9b0caa60065d0142269 Connecting to: mongodb://127.0.0.1:27017/admin?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.1 Using MongoDB: 7.0.2 Using Mongosh: 2.0.1 For mongosh info see: https://docs.mongodb.com/mongodb-shell/ To help improve our products, anonymous usage data is collected and sent to MongoDB periodically (https://www.mongodb.com/legal/privacy-policy). You can opt-out by running the disableTelemetry() command. admin> db.createUser({user:"root",pwd:"12345",roles:[{role:"userAdminAnyDatabase", db:"admin"}, 'readWriteAnyDatabase']}) { ok: 1 } admin> db.auth("root","12345") { ok: 1 } admin> show dbs admin 132.00 KiB config 12.00 KiB local 40.00 KiB admin> use demo switched to db demo demo> db.createCollection("CustomerExtention") { ok: 1 } demo> show collections CustomerExtention demo> db.CustomerExtention.insertOne({"globalIdentifer":"G00000001","localIdentifer":"L000001", "market":"HK", "premierMarkers":""}) { acknowledged: true, insertedId: ObjectId("6533bce7caa60065d014226a") } demo> db.CustomerExtention.find() [ { _id: ObjectId("6533bce7caa60065d014226a"), globalIdentifer: 'G00000001', localIdentifer: 'L000001', market: 'HK', premierMarkers: '' } ] demo> db.CustomerExtention.find({"globalIdentifer":"G00000001"}) [ { _id: ObjectId("6533bce7caa60065d014226a"), globalIdentifer: 'G00000001', localIdentifer: 'L000001', market: 'HK', premierMarkers: '' } ] demo> db.CustomerExtention.drop() ``` https://blog.51cto.com/u_16213633/7383567 https://blog.csdn.net/m0_62264368/article/details/129379688 ### Import CSV file to MongoDB Create a customer-linkage-data.csv file in the mongo docker container and use `mongoimport` to import the file into the `CustomerLinkage` collection. ``` springbatch % docker exec -it mongo bash root@7f756fb6e50a:/# touch customer-linkage-data.csv root@7f756fb6e50a:/# echo "L000001,HK,G000001,TRB|MGT" > customer-linkage-data.csv root@7f756fb6e50a:/# echo "L000002,SG,G000002,TRB|MGT" >> customer-linkage-data.csv root@7f756fb6e50a:/# echo "L000003,UK,G000003,TRB" >> customer-linkage-data.csv root@7f756fb6e50a:/# echo "L000004,CN,G000004,TRB|MGT" >> customer-linkage-data.csv root@7f756fb6e50a:/# cat customer-linkage-data.csv L000001,HK,G000001,TRB|MGT L000002,SG,G000002,TRB|MGT L000003,UK,G000003,TRB L000004,CN,G000004,TRB|MGT root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage --type csv --columnsHaveTypes --fields "_id.localIdentifer.string(),_id.market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data.csv 2023-10-21T14:08:40.786+0000 connected to: mongodb://localhost/ 2023-10-21T14:08:40.789+0000 4 document(s) imported successfully. 0 document(s) failed to import. root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage2 --type csv --columnsHaveTypes --fields "localIdentifer.string(),market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data.csv 2023-10-21T23:22:45.187+0000 connected to: mongodb://localhost/ 2023-10-21T23:22:45.202+0000 4 document(s) imported successfully. 0 document(s) failed to import. ``` Logon MongoDB and check the data in `CustomerLinkage` collection. ``` xwp@weipaoxus-MacBook-Pro cngit % docker exec -it mongo mongosh admin Current Mongosh Log ID: 6533dc808714c131e01939b3 Connecting to: mongodb://127.0.0.1:27017/admin?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.1 Using MongoDB: 7.0.2 Using Mongosh: 2.0.1 For mongosh info see: https://docs.mongodb.com/mongodb-shell/ admin> db.auth("root","12345") { ok: 1 } admin> use demo switched to db demo demo> db.CustomerLinkage.find() [ { _id: { localIdentifer: 'L000003', market: 'UK' }, globalIdentifer: 'G000003', premierMarkers: 'TRB' }, { _id: { localIdentifer: 'L000001', market: 'HK' }, globalIdentifer: 'G000001', premierMarkers: 'TRB|MGT' }, { _id: { localIdentifer: 'L000004', market: 'CN' }, globalIdentifer: 'G000004', premierMarkers: 'TRB|MGT' }, { _id: { localIdentifer: 'L000002', market: 'SG' }, globalIdentifer: 'G000002', premierMarkers: 'TRB|MGT' } ] demo> db.CustomerLinkage2.find() [ { _id: ObjectId("65345d45f427c3f316ccf425"), localIdentifer: 'L000001', market: 'HK', globalIdentifer: 'G000001', premierMarkers: 'TRB|MGT' }, { _id: ObjectId("65345d45f427c3f316ccf426"), localIdentifer: 'L000003', market: 'UK', globalIdentifer: 'G000003', premierMarkers: 'TRB' }, { _id: ObjectId("65345d45f427c3f316ccf427"), localIdentifer: 'L000002', market: 'SG', globalIdentifer: 'G000002', premierMarkers: 'TRB|MGT' }, { _id: ObjectId("65345d45f427c3f316ccf428"), localIdentifer: 'L000004', market: 'CN', globalIdentifer: 'G000004', premierMarkers: 'TRB|MGT' } ] ``` Update the `CustomerLinkage` collection with duplcated data: ``` root@7f756fb6e50a:/# touch customer-linkage-data-update.csv root@7f756fb6e50a:/# echo "L000001,HK,G000001,MGT" > customer-linkage-data-update.csv root@7f756fb6e50a:/# cat customer-linkage-data-update.csv L000001,HK,G000001,MGT root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage --type csv --columnsHaveTypes --fields "_id.localIdentifer.string(),_id.market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data-update.csv 2023-10-21T22:56:27.956+0000 continuing through error: E11000 duplicate key error collection: demo.CustomerLinkage index: _id_ dup key: { _id: { localIdentifer: "L000001", market: "HK" } } 2023-10-21T22:56:27.956+0000 0 document(s) imported successfully. 1 document(s) failed to import. root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage2 --type csv --columnsHaveTypes --fields "localIdentifer.string(),market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data-update.csv 2023-10-21T23:25:42.921+0000 connected to: mongodb://localhost/ 2023-10-21T23:25:42.925+0000 1 document(s) imported successfully. 0 document(s) failed to import. demo> db.CustomerLinkage2.find({localIdentifer: 'L000001'}) [ { _id: ObjectId("65345d45f427c3f316ccf425"), localIdentifer: 'L000001', market: 'HK', globalIdentifer: 'G000001', premierMarkers: 'TRB|MGT' }, { _id: ObjectId("65345df6ec027e06b121d249"), localIdentifer: 'L000001', market: 'HK', globalIdentifer: 'G000001', premierMarkers: 'MGT' } ] ``` ### MongoDB SQL to batch process data ```sql demo> db.CustomerLinkage.find({_id: {localIdentifer: 'L000001', market: 'HK'}}) demo> db.CustomerLinkage.find({globalIdentifer:'G000001'}) [ { _id: { localIdentifer: 'L000001', market: 'HK' }, globalIdentifer: 'G000001', premierMarkers: 'TRB|MGT' } ] demo> db.CustomerLinkage.update({_id: {localIdentifer: 'L000001', market: 'HK'}}, {$set: {premierMarkers: 'TRB|MGT'}}) DeprecationWarning: Collection.update() is deprecated. Use updateOne, updateMany, or bulkWrite. { acknowledged: true, insertedId: null, matchedCount: 1, modifiedCount: 1, upsertedCount: 0 } demo> db.CustomerLinkage.find({_id: {localIdentifer: 'L000001', market: 'HK'}}) [ { _id: { localIdentifer: 'L000001', market: 'HK' }, globalIdentifer: 'G000001', premierMarkers: 'ABC' } ] demo> db.CustomerLinkage.find().forEach(function(doc){db.CustomerLinkage.update({_id: {localIdentifer: doc.localIdentifer, market: doc.market}}, {$set: {premierMarkers: doc.premierMarkers}})}); demo> db.CustomerLinkage.find().forEach(function(item){print(item._id.localIdentifer + ',' + item._id.market + ',' + item.premierMarkers)}); L000003,UK,TRB L000001,HK,TRB|MGT L000004,CN,TRB|MGT L000002,SG,TRB|MGT demo> db.getCollection('CustomerLinkage').find() ``` Reference: https://www.cnblogs.com/cccnqin/p/15843156.html ### 3. Create SpringBoot Application ```java package com.example.demo; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import org.bson.Document; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; ``` MongoDB提供了一个非常有用的模块,叫作MongoDB的MongoClient。可以用来连接到MongoDB数据库,还提供一个非常强大的方法叫做insertMany,可以帮助我们将数据插入MongoDB数据库: ## 2. Create SpringBoot Project Update pom.xml ```xml org.springframework.boot spring-boot-starter-batch org.hsqldb hsqldb runtime ``` mvn spring-boot:run ``` . ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.7.5) 2023-10-22 09:17:33.304 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : Starting SpringbatchApplication using Java 17.0.1 on weipaoxus-MacBook-Pro.local with PID 5619 (/Users/xwp/Desktop/cngit/springbatch/target/classes started by xwp in /Users/xwp/Desktop/cngit/springbatch) 2023-10-22 09:17:33.306 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : No active profile set, falling back to 1 default profile: "default" 2023-10-22 09:17:34.074 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2023-10-22 09:17:34.257 INFO 5619 --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Driver does not support get/set network timeout for connections. (feature not supported) 2023-10-22 09:17:34.258 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed. 2023-10-22 09:17:34.412 INFO 5619 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL 2023-10-22 09:17:34.523 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor. 2023-10-22 09:17:34.673 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : Started SpringbatchApplication in 1.874 seconds (JVM running for 2.23) 2023-10-22 09:17:34.676 INFO 5619 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [] 2023-10-22 09:17:34.727 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}] 2023-10-22 09:17:34.750 INFO 5619 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep] 2023-10-22 09:17:34.768 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 HK G000002 TRB|MGT null 2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000021 UK G000002 TRB null 2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000022 SG G000002 MGT null 2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000030 SG G000003 TRB null 2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000040 US G000004 MGT null Writing CustomerMarker: 2023-10-20 L0000020 HK G000002 TRB|MGT Processing Writing CustomerMarker: 2023-10-20 L0000021 UK G000002 TRB Processing Writing CustomerMarker: 2023-10-20 L0000022 SG G000002 MGT Processing Writing CustomerMarker: 2023-10-20 L0000030 SG G000003 TRB Processing Writing CustomerMarker: 2023-10-20 L0000040 US G000004 MGT Processing 2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000050 JP G000005 MGT null 2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000060 CN G000006 MGT null 2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000070 KR G000007 MGT null 2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000080 TH G000008 TRB null Writing CustomerMarker: 2023-10-20 L0000050 JP G000005 MGT Processing Writing CustomerMarker: 2023-10-20 L0000060 CN G000006 MGT Processing Writing CustomerMarker: 2023-10-20 L0000070 KR G000007 MGT Processing Writing CustomerMarker: 2023-10-20 L0000080 TH G000008 TRB Processing 2023-10-22 09:17:34.778 INFO 5619 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduDataStep] executed in 27ms 2023-10-22 09:17:34.789 INFO 5619 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [ihubhkUpdateStep] 2023-10-22 09:17:34.796 INFO 5619 --- [ main] c.b.s.IhubDataBatchConfiguration : Processing CustomerMarker: 2023-10-21 L0000020 HK TRB null Updating CustomerMarker: 2023-10-21 L0000020 HK TRB Updated 2023-10-22 09:17:34.801 INFO 5619 --- [ main] o.s.batch.core.step.AbstractStep : Step: [ihubhkUpdateStep] executed in 12ms 2023-10-22 09:17:34.807 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 66ms 2023-10-22 09:17:34.814 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated... 2023-10-22 09:17:34.818 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed. ``` ## 3. MongoDB Integration Update pom.xml ```xml org.springframework.boot spring-boot-starter-data-mongodb ``` Update application.yml ```yaml spring: data: mongodb: host: localhost port: 27017 database: demo authentication-database: admin username: developer password: 12345 ``` Create developer role ``` demo> db.createUser({user:"developer",pwd:"12345",roles:['readWrite']}) ``` mvn spring-boot:run ``` 2023-10-22 10:13:19.528 INFO 7641 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data MongoDB repositories in DEFAULT mode. 2023-10-22 10:13:19.539 INFO 7641 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 8 ms. Found 0 MongoDB repository interfaces. 2023-10-22 10:13:19.849 INFO 7641 --- [ main] org.mongodb.driver.client : MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync|spring-boot", "version": "4.6.1"}, "os": {"type": "Darwin", "name": "Mac OS X", "architecture": "x86_64", "version": "14.0"}, "platform": "Java/Oracle Corporation/17.0.1+12-39"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=MongoCredential{mechanism=null, userName='developer', source='admin', password=, mechanismProperties=}, streamFactoryFactory=null, commandListeners=[], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@8ed9cf]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=JAVA_LEGACY, serverApi=null, autoEncryptionSettings=null, contextProvider=null} 2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:51}] to localhost:27017 2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:50}] to localhost:27017 2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=21, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=17702708} ``` demo> db.createCollection('CustomerMarker'); { ok: 1 } Update CustomerMarker.java ```java import org.springframework.data.mongodb.core.mapping.Document; @Document("CustomerMarker") public class CustomerMarker { } ``` Create CustomerMarkerService.java ```java @Service public class CustomerMarkerService { @Autowired private MongoTemplate mongoTemplate; public int create(List markers) { log.info("Record Count: " + markers.size()); BulkWriteResult result = mongoTemplate.bulkOps(BulkMode.UNORDERED, CustomerMarker.class).insert(markers).execute(); log.info("Inserted Count: " + result.getInsertedCount()); return result.getInsertedCount(); } } ``` Update GcduDataBatchConfiguration.java ```java public class GcduDataBatchConfiguration { @Autowired private CustomerMarkerService customerMarkerService; @Bean @Qualifier("gcduItemWriter") public ItemWriter gcduItemWriter() { return new ItemWriter() { @Override public void write(List items) throws Exception { for (CustomerMarker item : items) { System.out.println("Writing "+ item.toString()); } customerMarkerService.create(items); } }; } } ``` springbatch % mvn spring-boot:run ``` 2023-10-22 11:18:54.940 INFO 9572 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}] 2023-10-22 11:18:54.959 INFO 9572 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep] 2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT null 2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT null 2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB null 2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT null 2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT Processing 2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT Processing 2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB Processing 2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT Processing 2023-10-22 11:18:54.981 INFO 9572 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 4 2023-10-22 11:18:55.105 INFO 9572 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:77}] to localhost:27017 2023-10-22 11:18:55.132 INFO 9572 --- [ main] c.b.batch.service.CustomerMarkerService : Inserted Count: 4 ``` Check the database ``` demo> db.CustomerMarker.find() [ { _id: ObjectId("6534949f0eef72760b45ebef"), globalIdentifer: 'G000001', localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534949f0eef72760b45ebf0"), globalIdentifer: 'G000001', localIdentifer: 'L0000012', market: 'SG', markers: 'MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534949f0eef72760b45ebf1"), globalIdentifer: 'G000001', localIdentifer: 'L0000011', market: 'UK', markers: 'TRB', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534949f0eef72760b45ebf2"), globalIdentifer: 'G000002', localIdentifer: 'L0000020', market: 'CN', markers: 'MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' } ] ``` Update CustomerMarkerService.java ```java public int update(List markers) { log.info("Record Count: " + markers.size()); List> updateList = new ArrayList<>(markers.size()); markers.forEach( marker -> { Query query = new Query(new Criteria("localIdentifer").is(marker.getLocalIdentifer())); Update update = new Update(); update.set("markers", marker.getMarkers()); update.set("status", "Updated"); updateList.add(Pair.of(query, update)); }); BulkOperations bulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, CustomerMarker.class); BulkWriteResult result = bulkOps.upsert(updateList).execute(); log.info("Modified Count: " + result.getModifiedCount()); return result.getModifiedCount(); } ``` Update IhubDataBatchConfiguration.java ```java @Qualifier("ihubhkItemWriter") public ItemWriter ihubhkItemWriter() { return new ItemWriter() { @Override public void write(List items) throws Exception { for (CustomerMarker item : items) { log.info("Updating "+ item.toString()); } customerMarkerService.update(items); } }; } ``` Check data in mongodb ``` demo> db.CustomerMarker.find({localIdentifer: 'L0000010'}) [ { _id: ObjectId("6534949f0eef72760b45ebef"), globalIdentifer: 'G000001', localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' } ] demo> db.CustomerMarker.deleteMany({}) { acknowledged: true, deletedCount: 4 } ``` Prepare sample data ihubhk-data.txt to update ``` 2023-10-21,L0000010,HK,,SAL ``` mvn spring-boot:run ``` 2023-10-22 12:31:43.903 INFO 10827 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}] 2023-10-22 12:31:43.924 INFO 10827 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep] 2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT null 2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT null 2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB null 2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT null 2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT Processing 2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT Processing 2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB Processing 2023-10-22 12:31:43.947 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT Processing 2023-10-22 12:31:43.948 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 4 2023-10-22 12:31:44.072 INFO 10827 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:86}] to localhost:27017 2023-10-22 12:31:44.087 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Inserted Count: 4 2023-10-22 12:31:44.092 INFO 10827 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduDataStep] executed in 168ms 2023-10-22 12:31:44.103 INFO 10827 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [ihubhkUpdateStep] 2023-10-22 12:31:44.108 INFO 10827 --- [ main] c.b.b.config.IhubDataBatchConfiguration : Processing CustomerMarker: 2023-10-21 L0000010 HK SAL null 2023-10-22 12:31:44.108 INFO 10827 --- [ main] c.b.b.config.IhubDataBatchConfiguration : Updating CustomerMarker: 2023-10-21 L0000010 HK SAL Updated 2023-10-22 12:31:44.109 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 1 2023-10-22 12:31:44.123 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Modified Count: 1 2023-10-22 12:31:44.125 INFO 10827 --- [ main] o.s.batch.core.step.AbstractStep : Step: [ihubhkUpdateStep] executed in 22ms 2023-10-22 12:31:44.132 INFO 10827 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 217ms ``` Check database, the `L0000010` record is updated with `markers: 'SAL' and `status: 'Updated'` ``` demo> db.CustomerMarker.find({}) [ { _id: ObjectId("6534a5af5e98330cb6b09341"), globalIdentifer: 'G000001', localIdentifer: 'L0000010', market: 'HK', markers: 'SAL', asOfDate: '2023-10-20', status: 'Updated', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534a5af5e98330cb6b09342"), globalIdentifer: 'G000001', localIdentifer: 'L0000012', market: 'SG', markers: 'MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534a5af5e98330cb6b09343"), globalIdentifer: 'G000001', localIdentifer: 'L0000011', market: 'UK', markers: 'TRB', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' }, { _id: ObjectId("6534a5af5e98330cb6b09344"), globalIdentifer: 'G000002', localIdentifer: 'L0000020', market: 'CN', markers: 'MGT', asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.CustomerMarker' } ] ``` ### Load Json to MongoDB demo> db.createCollection('PremierMarker'); { ok: 1 } demo> db.PremierMarker.find({}); Create `gcdu-premier-marker-data.json` ```json [ { "globalIdentifier": "G000001", "asOfDate": "2023-10-20", "markers": [ { "market": "HK", "localIdentifer": "L0000010", "markers": "TRB|MGT" }, { "market": "SG", "localIdentifer": "L0000012", "markers": "MGT" }, { "market": "UK", "localIdentifer": "L0000011", "markers": "TRB" } ] }, { "globalIdentifier": "G000002", "asOfDate": "2023-10-20", "markers": [ { "market": "CN", "localIdentifer": "L0000020", "markers": "TRB|MGT" } ] } ] ``` mvn spring-boot:run ``` 2023-10-22 15:34:15.256 INFO 13807 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduJsonDataStep] 2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Processing PremierMarker: 2023-10-20 G000001 null 2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Processing PremierMarker: 2023-10-20 G000002 null 2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Writing PremierMarker: 2023-10-20 G000001 Processing 2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Writing PremierMarker: 2023-10-20 G000002 Processing 2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.batch.service.PremierMarkerService : Record Count: 2 2023-10-22 15:34:15.419 INFO 13807 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:103}] to localhost:27017 2023-10-22 15:34:15.436 INFO 13807 --- [ main] c.b.batch.service.PremierMarkerService : Inserted Count: 2 2023-10-22 15:34:15.440 INFO 13807 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduJsonDataStep] executed in 183ms ``` Check the database: ``` demo> db.PremierMarker.find({'markers':{"$elemMatch": {"localIdentifer": "L0000010"}}}) [ { _id: ObjectId("6539172cbf02d62753cbae18"), globalIdentifier: 'G000001', markers: [ { localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT' }, { localIdentifer: 'L0000012', market: 'SG', markers: 'MGT' }, { localIdentifer: 'L0000011', market: 'UK', markers: 'TRB' } ], asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.PremierMarker' } ] demo> demo> db.PremierMarker.find({}); [ { _id: ObjectId("6539172cbf02d62753cbae18"), globalIdentifier: 'G000001', markers: [ { localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT' }, { localIdentifer: 'L0000012', market: 'SG', markers: 'MGT' }, { localIdentifer: 'L0000011', market: 'UK', markers: 'TRB' } ], asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.PremierMarker' }, { _id: ObjectId("6539172cbf02d62753cbae19"), globalIdentifier: 'G000002', markers: [ { localIdentifer: 'L0000020', market: 'CN', markers: 'TRB|MGT' } ], asOfDate: '2023-10-20', status: 'Processing', _class: 'com.beyondprototype.batch.model.PremierMarker' } ] ``` ### Postgres ``` docker pull postgres mkdir ~/docker/postgres docker run --name postgres -e POSTGRES_PASSWORD=Abc1234% -p 5432:5432 -v ~/docker/postgres:/var/lib/postgresql/data -d postgres docker exec -it postgres /bin/bash root@0e22ef2e8194:/# su postgres postgres@0e22ef2e8194:/$ psql psql (16.0 (Debian 16.0-1.pgdg120+1)) Type "help" for help. postgres=# \l List of databases Name | Owner | Encoding | Locale Provider | Collate | Ctype | ICU Locale | ICU Rules | Access privileges -----------+----------+----------+-----------------+------------+------------+------------+-----------+----------------------- postgres | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | | template0 | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | | =c/postgres + | | | | | | | | postgres=CTc/postgres template1 | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | | =c/postgres + | | | | | | | | postgres=CTc/postgres (3 rows) postgres=# CREATE TABLE CUSTOMER_MARKER (LOCAL_ID VARCHAR(20), MARKET CHAR(2), MARKERS VARCHAR(30), STATUS CHAR(2), PRIMARY KEY (MARKET, LOCAL_ID)) ; postgres=# CREATE TABLE CUSTOMER_LINKAGE (LOCAL_ID VARCHAR(20), MARKET CHAR(2), GLOBAL_ID VARCHAR(20), MARKERS VARCHAR(30), STATUS CHAR(2), PRIMARY KEY (MARKET, LOCAL_ID)) ; postgres=# INSERT INTO CUSTOMER_MARKER VALUES ('L1234567890', 'US', 'TRB|MGT', 'A'); postgres=# INSERT INTO CUSTOMER_MARKER VALUES ('L1234567891', 'UK', 'MGT', 'A'); postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567890', 'US', 'G1234567890', 'XXX', 'A'); postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567891', 'UK', 'G1234567890', 'XXX', 'A'); postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567892', 'CN', 'G1234567891', 'XXX', 'A'); postgres=# SELECT * FROM CUSTOMER_LINKAGE l LEFT OUTER JOIN CUSTOMER_MARKER c ON l.MARKET = c.MARKET and l.LOCAL_ID = c.LOCAL_ID; local_id | market | global_id | markers | status | markers | status -------------+--------+-------------+---------+--------+---------+-------- L1234567890 | US | G1234567890 | XXX | A | TRB|MGT | A L1234567891 | UK | G1234567890 | XXX | A | MGT | A L1234567892 | CN | G1234567891 | XXX | A | | (3 rows) postgres=# SELECT l.local_id, l.market , l.global_id,l. status, case when c.markers is null then l.markers else c.markers end as markers FROM CUSTOMER_LINKAGE l LEFT OUTER JOIN CUSTOMER_MARKER c ON l.MARKET = c.MARKET and l.LOCAL_ID = c.LOCAL_ID; local_id | market | global_id | status | markers -------------+--------+-------------+--------+--------- L1234567890 | US | G1234567890 | A | TRB|MGT L1234567891 | UK | G1234567890 | A | MGT L1234567892 | CN | G1234567891 | A | XXX (3 rows) ```