# springbatch
**Repository Path**: liuxi01/springbatch
## Basic Information
- **Project Name**: springbatch
- **Description**: springbatch with mongodb
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2023-12-19
- **Last Updated**: 2023-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 1. MongoDB
```
springbatch % docker pull mongo
springbatch % mkdir ~/docker/mongo
springbatch % docker run -itd --name mongo -v ~/docker/mongo:/data/db -p 27017:27017 mongo --auth
springbatch % docker stop mongo
```
### Login MongoDB
```
springbatch % docker start mongo
springbatch % docker exec -it mongo mongosh admin
Current Mongosh Log ID: 6533b9b0caa60065d0142269
Connecting to: mongodb://127.0.0.1:27017/admin?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.1
Using MongoDB: 7.0.2
Using Mongosh: 2.0.1
For mongosh info see: https://docs.mongodb.com/mongodb-shell/
To help improve our products, anonymous usage data is collected and sent to MongoDB periodically (https://www.mongodb.com/legal/privacy-policy).
You can opt-out by running the disableTelemetry() command.
admin> db.createUser({user:"root",pwd:"12345",roles:[{role:"userAdminAnyDatabase", db:"admin"}, 'readWriteAnyDatabase']})
{ ok: 1 }
admin> db.auth("root","12345")
{ ok: 1 }
admin> show dbs
admin 132.00 KiB
config 12.00 KiB
local 40.00 KiB
admin> use demo
switched to db demo
demo> db.createCollection("CustomerExtention")
{ ok: 1 }
demo> show collections
CustomerExtention
demo> db.CustomerExtention.insertOne({"globalIdentifer":"G00000001","localIdentifer":"L000001", "market":"HK", "premierMarkers":""})
{
acknowledged: true,
insertedId: ObjectId("6533bce7caa60065d014226a")
}
demo> db.CustomerExtention.find()
[
{
_id: ObjectId("6533bce7caa60065d014226a"),
globalIdentifer: 'G00000001',
localIdentifer: 'L000001',
market: 'HK',
premierMarkers: ''
}
]
demo> db.CustomerExtention.find({"globalIdentifer":"G00000001"})
[
{
_id: ObjectId("6533bce7caa60065d014226a"),
globalIdentifer: 'G00000001',
localIdentifer: 'L000001',
market: 'HK',
premierMarkers: ''
}
]
demo> db.CustomerExtention.drop()
```
https://blog.51cto.com/u_16213633/7383567
https://blog.csdn.net/m0_62264368/article/details/129379688
### Import CSV file to MongoDB
Create a customer-linkage-data.csv file in the mongo docker container and use `mongoimport` to import the file into the `CustomerLinkage` collection.
```
springbatch % docker exec -it mongo bash
root@7f756fb6e50a:/# touch customer-linkage-data.csv
root@7f756fb6e50a:/# echo "L000001,HK,G000001,TRB|MGT" > customer-linkage-data.csv
root@7f756fb6e50a:/# echo "L000002,SG,G000002,TRB|MGT" >> customer-linkage-data.csv
root@7f756fb6e50a:/# echo "L000003,UK,G000003,TRB" >> customer-linkage-data.csv
root@7f756fb6e50a:/# echo "L000004,CN,G000004,TRB|MGT" >> customer-linkage-data.csv
root@7f756fb6e50a:/# cat customer-linkage-data.csv
L000001,HK,G000001,TRB|MGT
L000002,SG,G000002,TRB|MGT
L000003,UK,G000003,TRB
L000004,CN,G000004,TRB|MGT
root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage --type csv --columnsHaveTypes --fields "_id.localIdentifer.string(),_id.market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data.csv
2023-10-21T14:08:40.786+0000 connected to: mongodb://localhost/
2023-10-21T14:08:40.789+0000 4 document(s) imported successfully. 0 document(s) failed to import.
root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage2 --type csv --columnsHaveTypes --fields "localIdentifer.string(),market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data.csv
2023-10-21T23:22:45.187+0000 connected to: mongodb://localhost/
2023-10-21T23:22:45.202+0000 4 document(s) imported successfully. 0 document(s) failed to import.
```
Logon MongoDB and check the data in `CustomerLinkage` collection.
```
xwp@weipaoxus-MacBook-Pro cngit % docker exec -it mongo mongosh admin
Current Mongosh Log ID: 6533dc808714c131e01939b3
Connecting to: mongodb://127.0.0.1:27017/admin?directConnection=true&serverSelectionTimeoutMS=2000&appName=mongosh+2.0.1
Using MongoDB: 7.0.2
Using Mongosh: 2.0.1
For mongosh info see: https://docs.mongodb.com/mongodb-shell/
admin> db.auth("root","12345")
{ ok: 1 }
admin> use demo
switched to db demo
demo> db.CustomerLinkage.find()
[
{
_id: { localIdentifer: 'L000003', market: 'UK' },
globalIdentifer: 'G000003',
premierMarkers: 'TRB'
},
{
_id: { localIdentifer: 'L000001', market: 'HK' },
globalIdentifer: 'G000001',
premierMarkers: 'TRB|MGT'
},
{
_id: { localIdentifer: 'L000004', market: 'CN' },
globalIdentifer: 'G000004',
premierMarkers: 'TRB|MGT'
},
{
_id: { localIdentifer: 'L000002', market: 'SG' },
globalIdentifer: 'G000002',
premierMarkers: 'TRB|MGT'
}
]
demo> db.CustomerLinkage2.find()
[
{
_id: ObjectId("65345d45f427c3f316ccf425"),
localIdentifer: 'L000001',
market: 'HK',
globalIdentifer: 'G000001',
premierMarkers: 'TRB|MGT'
},
{
_id: ObjectId("65345d45f427c3f316ccf426"),
localIdentifer: 'L000003',
market: 'UK',
globalIdentifer: 'G000003',
premierMarkers: 'TRB'
},
{
_id: ObjectId("65345d45f427c3f316ccf427"),
localIdentifer: 'L000002',
market: 'SG',
globalIdentifer: 'G000002',
premierMarkers: 'TRB|MGT'
},
{
_id: ObjectId("65345d45f427c3f316ccf428"),
localIdentifer: 'L000004',
market: 'CN',
globalIdentifer: 'G000004',
premierMarkers: 'TRB|MGT'
}
]
```
Update the `CustomerLinkage` collection with duplcated data:
```
root@7f756fb6e50a:/# touch customer-linkage-data-update.csv
root@7f756fb6e50a:/# echo "L000001,HK,G000001,MGT" > customer-linkage-data-update.csv
root@7f756fb6e50a:/# cat customer-linkage-data-update.csv
L000001,HK,G000001,MGT
root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage --type csv --columnsHaveTypes --fields "_id.localIdentifer.string(),_id.market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data-update.csv
2023-10-21T22:56:27.956+0000 continuing through error: E11000 duplicate key error collection: demo.CustomerLinkage index: _id_ dup key: { _id: { localIdentifer: "L000001", market: "HK" } }
2023-10-21T22:56:27.956+0000 0 document(s) imported successfully. 1 document(s) failed to import.
root@7f756fb6e50a:/# mongoimport -u root -p 12345 --authenticationDatabase=admin -d demo -c CustomerLinkage2 --type csv --columnsHaveTypes --fields "localIdentifer.string(),market.string(),globalIdentifer.string(),premierMarkers.string()" --file ./customer-linkage-data-update.csv
2023-10-21T23:25:42.921+0000 connected to: mongodb://localhost/
2023-10-21T23:25:42.925+0000 1 document(s) imported successfully. 0 document(s) failed to import.
demo> db.CustomerLinkage2.find({localIdentifer: 'L000001'})
[
{
_id: ObjectId("65345d45f427c3f316ccf425"),
localIdentifer: 'L000001',
market: 'HK',
globalIdentifer: 'G000001',
premierMarkers: 'TRB|MGT'
},
{
_id: ObjectId("65345df6ec027e06b121d249"),
localIdentifer: 'L000001',
market: 'HK',
globalIdentifer: 'G000001',
premierMarkers: 'MGT'
}
]
```
### MongoDB SQL to batch process data
```sql
demo> db.CustomerLinkage.find({_id: {localIdentifer: 'L000001', market: 'HK'}})
demo> db.CustomerLinkage.find({globalIdentifer:'G000001'})
[
{
_id: { localIdentifer: 'L000001', market: 'HK' },
globalIdentifer: 'G000001',
premierMarkers: 'TRB|MGT'
}
]
demo> db.CustomerLinkage.update({_id: {localIdentifer: 'L000001', market: 'HK'}}, {$set: {premierMarkers: 'TRB|MGT'}})
DeprecationWarning: Collection.update() is deprecated. Use updateOne, updateMany, or bulkWrite.
{
acknowledged: true,
insertedId: null,
matchedCount: 1,
modifiedCount: 1,
upsertedCount: 0
}
demo> db.CustomerLinkage.find({_id: {localIdentifer: 'L000001', market: 'HK'}})
[
{
_id: { localIdentifer: 'L000001', market: 'HK' },
globalIdentifer: 'G000001',
premierMarkers: 'ABC'
}
]
demo> db.CustomerLinkage.find().forEach(function(doc){db.CustomerLinkage.update({_id: {localIdentifer: doc.localIdentifer, market: doc.market}}, {$set: {premierMarkers: doc.premierMarkers}})});
demo> db.CustomerLinkage.find().forEach(function(item){print(item._id.localIdentifer + ',' + item._id.market + ',' + item.premierMarkers)});
L000003,UK,TRB
L000001,HK,TRB|MGT
L000004,CN,TRB|MGT
L000002,SG,TRB|MGT
demo> db.getCollection('CustomerLinkage').find()
```
Reference:
https://www.cnblogs.com/cccnqin/p/15843156.html
### 3. Create SpringBoot Application
```java
package com.example.demo;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
```
MongoDB提供了一个非常有用的模块,叫作MongoDB的MongoClient。可以用来连接到MongoDB数据库,还提供一个非常强大的方法叫做insertMany,可以帮助我们将数据插入MongoDB数据库:
## 2. Create SpringBoot Project
Update pom.xml
```xml
org.springframework.boot
spring-boot-starter-batch
org.hsqldb
hsqldb
runtime
```
mvn spring-boot:run
```
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.7.5)
2023-10-22 09:17:33.304 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : Starting SpringbatchApplication using Java 17.0.1 on weipaoxus-MacBook-Pro.local with PID 5619 (/Users/xwp/Desktop/cngit/springbatch/target/classes started by xwp in /Users/xwp/Desktop/cngit/springbatch)
2023-10-22 09:17:33.306 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : No active profile set, falling back to 1 default profile: "default"
2023-10-22 09:17:34.074 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2023-10-22 09:17:34.257 INFO 5619 --- [ main] com.zaxxer.hikari.pool.PoolBase : HikariPool-1 - Driver does not support get/set network timeout for connections. (feature not supported)
2023-10-22 09:17:34.258 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2023-10-22 09:17:34.412 INFO 5619 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL
2023-10-22 09:17:34.523 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2023-10-22 09:17:34.673 INFO 5619 --- [ main] c.b.springbatch.SpringbatchApplication : Started SpringbatchApplication in 1.874 seconds (JVM running for 2.23)
2023-10-22 09:17:34.676 INFO 5619 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: []
2023-10-22 09:17:34.727 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}]
2023-10-22 09:17:34.750 INFO 5619 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep]
2023-10-22 09:17:34.768 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 HK G000002 TRB|MGT null
2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000021 UK G000002 TRB null
2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000022 SG G000002 MGT null
2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000030 SG G000003 TRB null
2023-10-22 09:17:34.769 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000040 US G000004 MGT null
Writing CustomerMarker: 2023-10-20 L0000020 HK G000002 TRB|MGT Processing
Writing CustomerMarker: 2023-10-20 L0000021 UK G000002 TRB Processing
Writing CustomerMarker: 2023-10-20 L0000022 SG G000002 MGT Processing
Writing CustomerMarker: 2023-10-20 L0000030 SG G000003 TRB Processing
Writing CustomerMarker: 2023-10-20 L0000040 US G000004 MGT Processing
2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000050 JP G000005 MGT null
2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000060 CN G000006 MGT null
2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000070 KR G000007 MGT null
2023-10-22 09:17:34.774 INFO 5619 --- [ main] c.b.s.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000080 TH G000008 TRB null
Writing CustomerMarker: 2023-10-20 L0000050 JP G000005 MGT Processing
Writing CustomerMarker: 2023-10-20 L0000060 CN G000006 MGT Processing
Writing CustomerMarker: 2023-10-20 L0000070 KR G000007 MGT Processing
Writing CustomerMarker: 2023-10-20 L0000080 TH G000008 TRB Processing
2023-10-22 09:17:34.778 INFO 5619 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduDataStep] executed in 27ms
2023-10-22 09:17:34.789 INFO 5619 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [ihubhkUpdateStep]
2023-10-22 09:17:34.796 INFO 5619 --- [ main] c.b.s.IhubDataBatchConfiguration : Processing CustomerMarker: 2023-10-21 L0000020 HK TRB null
Updating CustomerMarker: 2023-10-21 L0000020 HK TRB Updated
2023-10-22 09:17:34.801 INFO 5619 --- [ main] o.s.batch.core.step.AbstractStep : Step: [ihubhkUpdateStep] executed in 12ms
2023-10-22 09:17:34.807 INFO 5619 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 66ms
2023-10-22 09:17:34.814 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2023-10-22 09:17:34.818 INFO 5619 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
```
## 3. MongoDB Integration
Update pom.xml
```xml
org.springframework.boot
spring-boot-starter-data-mongodb
```
Update application.yml
```yaml
spring:
data:
mongodb:
host: localhost
port: 27017
database: demo
authentication-database: admin
username: developer
password: 12345
```
Create developer role
```
demo> db.createUser({user:"developer",pwd:"12345",roles:['readWrite']})
```
mvn spring-boot:run
```
2023-10-22 10:13:19.528 INFO 7641 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data MongoDB repositories in DEFAULT mode.
2023-10-22 10:13:19.539 INFO 7641 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 8 ms. Found 0 MongoDB repository interfaces.
2023-10-22 10:13:19.849 INFO 7641 --- [ main] org.mongodb.driver.client : MongoClient with metadata {"driver": {"name": "mongo-java-driver|sync|spring-boot", "version": "4.6.1"}, "os": {"type": "Darwin", "name": "Mac OS X", "architecture": "x86_64", "version": "14.0"}, "platform": "Java/Oracle Corporation/17.0.1+12-39"} created with settings MongoClientSettings{readPreference=primary, writeConcern=WriteConcern{w=null, wTimeout=null ms, journal=null}, retryWrites=true, retryReads=true, readConcern=ReadConcern{level=null}, credential=MongoCredential{mechanism=null, userName='developer', source='admin', password=, mechanismProperties=}, streamFactoryFactory=null, commandListeners=[], codecRegistry=ProvidersCodecRegistry{codecProviders=[ValueCodecProvider{}, BsonValueCodecProvider{}, DBRefCodecProvider{}, DBObjectCodecProvider{}, DocumentCodecProvider{}, IterableCodecProvider{}, MapCodecProvider{}, GeoJsonCodecProvider{}, GridFSFileCodecProvider{}, Jsr310CodecProvider{}, JsonObjectCodecProvider{}, BsonCodecProvider{}, EnumCodecProvider{}, com.mongodb.Jep395RecordCodecProvider@8ed9cf]}, clusterSettings={hosts=[localhost:27017], srvServiceName=mongodb, mode=SINGLE, requiredClusterType=UNKNOWN, requiredReplicaSetName='null', serverSelector='null', clusterListeners='[]', serverSelectionTimeout='30000 ms', localThreshold='30000 ms'}, socketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=0, receiveBufferSize=0, sendBufferSize=0}, heartbeatSocketSettings=SocketSettings{connectTimeoutMS=10000, readTimeoutMS=10000, receiveBufferSize=0, sendBufferSize=0}, connectionPoolSettings=ConnectionPoolSettings{maxSize=100, minSize=0, maxWaitTimeMS=120000, maxConnectionLifeTimeMS=0, maxConnectionIdleTimeMS=0, maintenanceInitialDelayMS=0, maintenanceFrequencyMS=60000, connectionPoolListeners=[], maxConnecting=2}, serverSettings=ServerSettings{heartbeatFrequencyMS=10000, minHeartbeatFrequencyMS=500, serverListeners='[]', serverMonitorListeners='[]'}, sslSettings=SslSettings{enabled=false, invalidHostNameAllowed=false, context=null}, applicationName='null', compressorList=[], uuidRepresentation=JAVA_LEGACY, serverApi=null, autoEncryptionSettings=null, contextProvider=null}
2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:2, serverValue:51}] to localhost:27017
2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:50}] to localhost:27017
2023-10-22 10:13:19.879 INFO 7641 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=21, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=17702708}
```
demo> db.createCollection('CustomerMarker');
{ ok: 1 }
Update CustomerMarker.java
```java
import org.springframework.data.mongodb.core.mapping.Document;
@Document("CustomerMarker")
public class CustomerMarker {
}
```
Create CustomerMarkerService.java
```java
@Service
public class CustomerMarkerService {
@Autowired
private MongoTemplate mongoTemplate;
public int create(List extends CustomerMarker> markers) {
log.info("Record Count: " + markers.size());
BulkWriteResult result = mongoTemplate.bulkOps(BulkMode.UNORDERED, CustomerMarker.class).insert(markers).execute();
log.info("Inserted Count: " + result.getInsertedCount());
return result.getInsertedCount();
}
}
```
Update GcduDataBatchConfiguration.java
```java
public class GcduDataBatchConfiguration {
@Autowired
private CustomerMarkerService customerMarkerService;
@Bean
@Qualifier("gcduItemWriter")
public ItemWriter gcduItemWriter() {
return new ItemWriter() {
@Override
public void write(List extends CustomerMarker> items) throws Exception {
for (CustomerMarker item : items) {
System.out.println("Writing "+ item.toString());
}
customerMarkerService.create(items);
}
};
}
}
```
springbatch % mvn spring-boot:run
```
2023-10-22 11:18:54.940 INFO 9572 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}]
2023-10-22 11:18:54.959 INFO 9572 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep]
2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT null
2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT null
2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB null
2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT null
2023-10-22 11:18:54.979 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT Processing
2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT Processing
2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB Processing
2023-10-22 11:18:54.980 INFO 9572 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT Processing
2023-10-22 11:18:54.981 INFO 9572 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 4
2023-10-22 11:18:55.105 INFO 9572 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:77}] to localhost:27017
2023-10-22 11:18:55.132 INFO 9572 --- [ main] c.b.batch.service.CustomerMarkerService : Inserted Count: 4
```
Check the database
```
demo> db.CustomerMarker.find()
[
{
_id: ObjectId("6534949f0eef72760b45ebef"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000010',
market: 'HK',
markers: 'TRB|MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534949f0eef72760b45ebf0"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000012',
market: 'SG',
markers: 'MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534949f0eef72760b45ebf1"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000011',
market: 'UK',
markers: 'TRB',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534949f0eef72760b45ebf2"),
globalIdentifer: 'G000002',
localIdentifer: 'L0000020',
market: 'CN',
markers: 'MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
}
]
```
Update CustomerMarkerService.java
```java
public int update(List extends CustomerMarker> markers) {
log.info("Record Count: " + markers.size());
List> updateList = new ArrayList<>(markers.size());
markers.forEach( marker -> {
Query query = new Query(new Criteria("localIdentifer").is(marker.getLocalIdentifer()));
Update update = new Update();
update.set("markers", marker.getMarkers());
update.set("status", "Updated");
updateList.add(Pair.of(query, update));
});
BulkOperations bulkOps = mongoTemplate.bulkOps(BulkMode.UNORDERED, CustomerMarker.class);
BulkWriteResult result = bulkOps.upsert(updateList).execute();
log.info("Modified Count: " + result.getModifiedCount());
return result.getModifiedCount();
}
```
Update IhubDataBatchConfiguration.java
```java
@Qualifier("ihubhkItemWriter")
public ItemWriter ihubhkItemWriter() {
return new ItemWriter() {
@Override
public void write(List extends CustomerMarker> items) throws Exception {
for (CustomerMarker item : items) {
log.info("Updating "+ item.toString());
}
customerMarkerService.update(items);
}
};
}
```
Check data in mongodb
```
demo> db.CustomerMarker.find({localIdentifer: 'L0000010'})
[
{
_id: ObjectId("6534949f0eef72760b45ebef"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000010',
market: 'HK',
markers: 'TRB|MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
}
]
demo> db.CustomerMarker.deleteMany({})
{ acknowledged: true, deletedCount: 4 }
```
Prepare sample data ihubhk-data.txt to update
```
2023-10-21,L0000010,HK,,SAL
```
mvn spring-boot:run
```
2023-10-22 12:31:43.903 INFO 10827 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] launched with the following parameters: [{}]
2023-10-22 12:31:43.924 INFO 10827 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduDataStep]
2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT null
2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT null
2023-10-22 12:31:43.944 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB null
2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Processing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT null
2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000010 HK G000001 TRB|MGT Processing
2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000012 SG G000001 MGT Processing
2023-10-22 12:31:43.945 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000011 UK G000001 TRB Processing
2023-10-22 12:31:43.947 INFO 10827 --- [ main] c.b.b.config.GcduDataBatchConfiguration : Writing CustomerMarker: 2023-10-20 L0000020 CN G000002 MGT Processing
2023-10-22 12:31:43.948 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 4
2023-10-22 12:31:44.072 INFO 10827 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:86}] to localhost:27017
2023-10-22 12:31:44.087 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Inserted Count: 4
2023-10-22 12:31:44.092 INFO 10827 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduDataStep] executed in 168ms
2023-10-22 12:31:44.103 INFO 10827 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [ihubhkUpdateStep]
2023-10-22 12:31:44.108 INFO 10827 --- [ main] c.b.b.config.IhubDataBatchConfiguration : Processing CustomerMarker: 2023-10-21 L0000010 HK SAL null
2023-10-22 12:31:44.108 INFO 10827 --- [ main] c.b.b.config.IhubDataBatchConfiguration : Updating CustomerMarker: 2023-10-21 L0000010 HK SAL Updated
2023-10-22 12:31:44.109 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Record Count: 1
2023-10-22 12:31:44.123 INFO 10827 --- [ main] c.b.batch.service.CustomerMarkerService : Modified Count: 1
2023-10-22 12:31:44.125 INFO 10827 --- [ main] o.s.batch.core.step.AbstractStep : Step: [ihubhkUpdateStep] executed in 22ms
2023-10-22 12:31:44.132 INFO 10827 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=batchJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 217ms
```
Check database, the `L0000010` record is updated with `markers: 'SAL' and `status: 'Updated'`
```
demo> db.CustomerMarker.find({})
[
{
_id: ObjectId("6534a5af5e98330cb6b09341"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000010',
market: 'HK',
markers: 'SAL',
asOfDate: '2023-10-20',
status: 'Updated',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534a5af5e98330cb6b09342"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000012',
market: 'SG',
markers: 'MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534a5af5e98330cb6b09343"),
globalIdentifer: 'G000001',
localIdentifer: 'L0000011',
market: 'UK',
markers: 'TRB',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
},
{
_id: ObjectId("6534a5af5e98330cb6b09344"),
globalIdentifer: 'G000002',
localIdentifer: 'L0000020',
market: 'CN',
markers: 'MGT',
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.CustomerMarker'
}
]
```
### Load Json to MongoDB
demo> db.createCollection('PremierMarker');
{ ok: 1 }
demo> db.PremierMarker.find({});
Create `gcdu-premier-marker-data.json`
```json
[
{
"globalIdentifier": "G000001",
"asOfDate": "2023-10-20",
"markers": [
{
"market": "HK",
"localIdentifer": "L0000010",
"markers": "TRB|MGT"
},
{
"market": "SG",
"localIdentifer": "L0000012",
"markers": "MGT"
},
{
"market": "UK",
"localIdentifer": "L0000011",
"markers": "TRB"
}
]
},
{
"globalIdentifier": "G000002",
"asOfDate": "2023-10-20",
"markers": [
{
"market": "CN",
"localIdentifer": "L0000020",
"markers": "TRB|MGT"
}
]
}
]
```
mvn spring-boot:run
```
2023-10-22 15:34:15.256 INFO 13807 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [loadGcduJsonDataStep]
2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Processing PremierMarker: 2023-10-20 G000001 null
2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Processing PremierMarker: 2023-10-20 G000002 null
2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Writing PremierMarker: 2023-10-20 G000001 Processing
2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.b.c.GcduJasonDataBatchConfiguration : Writing PremierMarker: 2023-10-20 G000002 Processing
2023-10-22 15:34:15.285 INFO 13807 --- [ main] c.b.batch.service.PremierMarkerService : Record Count: 2
2023-10-22 15:34:15.419 INFO 13807 --- [ main] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:103}] to localhost:27017
2023-10-22 15:34:15.436 INFO 13807 --- [ main] c.b.batch.service.PremierMarkerService : Inserted Count: 2
2023-10-22 15:34:15.440 INFO 13807 --- [ main] o.s.batch.core.step.AbstractStep : Step: [loadGcduJsonDataStep] executed in 183ms
```
Check the database:
```
demo> db.PremierMarker.find({'markers':{"$elemMatch": {"localIdentifer": "L0000010"}}})
[
{
_id: ObjectId("6539172cbf02d62753cbae18"),
globalIdentifier: 'G000001',
markers: [
{ localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT' },
{ localIdentifer: 'L0000012', market: 'SG', markers: 'MGT' },
{ localIdentifer: 'L0000011', market: 'UK', markers: 'TRB' }
],
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.PremierMarker'
}
]
demo>
demo> db.PremierMarker.find({});
[
{
_id: ObjectId("6539172cbf02d62753cbae18"),
globalIdentifier: 'G000001',
markers: [
{ localIdentifer: 'L0000010', market: 'HK', markers: 'TRB|MGT' },
{ localIdentifer: 'L0000012', market: 'SG', markers: 'MGT' },
{ localIdentifer: 'L0000011', market: 'UK', markers: 'TRB' }
],
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.PremierMarker'
},
{
_id: ObjectId("6539172cbf02d62753cbae19"),
globalIdentifier: 'G000002',
markers: [
{ localIdentifer: 'L0000020', market: 'CN', markers: 'TRB|MGT' }
],
asOfDate: '2023-10-20',
status: 'Processing',
_class: 'com.beyondprototype.batch.model.PremierMarker'
}
]
```
### Postgres
```
docker pull postgres
mkdir ~/docker/postgres
docker run --name postgres -e POSTGRES_PASSWORD=Abc1234% -p 5432:5432 -v ~/docker/postgres:/var/lib/postgresql/data -d postgres
docker exec -it postgres /bin/bash
root@0e22ef2e8194:/# su postgres
postgres@0e22ef2e8194:/$ psql
psql (16.0 (Debian 16.0-1.pgdg120+1))
Type "help" for help.
postgres=# \l
List of databases
Name | Owner | Encoding | Locale Provider | Collate | Ctype | ICU Locale | ICU Rules | Access privileges
-----------+----------+----------+-----------------+------------+------------+------------+-----------+-----------------------
postgres | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | |
template0 | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | | =c/postgres +
| | | | | | | | postgres=CTc/postgres
template1 | postgres | UTF8 | libc | en_US.utf8 | en_US.utf8 | | | =c/postgres +
| | | | | | | | postgres=CTc/postgres
(3 rows)
postgres=# CREATE TABLE CUSTOMER_MARKER (LOCAL_ID VARCHAR(20), MARKET CHAR(2), MARKERS VARCHAR(30), STATUS CHAR(2), PRIMARY KEY (MARKET, LOCAL_ID)) ;
postgres=# CREATE TABLE CUSTOMER_LINKAGE (LOCAL_ID VARCHAR(20), MARKET CHAR(2), GLOBAL_ID VARCHAR(20), MARKERS VARCHAR(30), STATUS CHAR(2), PRIMARY KEY (MARKET, LOCAL_ID)) ;
postgres=# INSERT INTO CUSTOMER_MARKER VALUES ('L1234567890', 'US', 'TRB|MGT', 'A');
postgres=# INSERT INTO CUSTOMER_MARKER VALUES ('L1234567891', 'UK', 'MGT', 'A');
postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567890', 'US', 'G1234567890', 'XXX', 'A');
postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567891', 'UK', 'G1234567890', 'XXX', 'A');
postgres=# INSERT INTO CUSTOMER_LINKAGE VALUES ('L1234567892', 'CN', 'G1234567891', 'XXX', 'A');
postgres=# SELECT * FROM CUSTOMER_LINKAGE l LEFT OUTER JOIN CUSTOMER_MARKER c ON l.MARKET = c.MARKET and l.LOCAL_ID = c.LOCAL_ID;
local_id | market | global_id | markers | status | markers | status
-------------+--------+-------------+---------+--------+---------+--------
L1234567890 | US | G1234567890 | XXX | A | TRB|MGT | A
L1234567891 | UK | G1234567890 | XXX | A | MGT | A
L1234567892 | CN | G1234567891 | XXX | A | |
(3 rows)
postgres=# SELECT l.local_id, l.market , l.global_id,l. status, case when c.markers is null then l.markers else c.markers end as markers FROM CUSTOMER_LINKAGE l LEFT OUTER JOIN CUSTOMER_MARKER c ON l.MARKET = c.MARKET and l.LOCAL_ID = c.LOCAL_ID;
local_id | market | global_id | status | markers
-------------+--------+-------------+--------+---------
L1234567890 | US | G1234567890 | A | TRB|MGT
L1234567891 | UK | G1234567890 | A | MGT
L1234567892 | CN | G1234567891 | A | XXX
(3 rows)
```