1 Star 0 Fork 0

妥協/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
couchdb.go 37.27 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339
/*
Copyright IBM Corp. 2016, 2017 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package couchdb
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"mime/multipart"
"net/http"
"net/http/httputil"
"net/textproto"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/hyperledger/fabric/common/flogging"
logging "github.com/op/go-logging"
)
var logger = flogging.MustGetLogger("couchdb")
//time between retry attempts in milliseconds
const retryWaitTime = 125
// DBOperationResponse is body for successful database calls.
type DBOperationResponse struct {
Ok bool
id string
rev string
}
// DBInfo is body for database information.
type DBInfo struct {
DbName string `json:"db_name"`
UpdateSeq string `json:"update_seq"`
Sizes struct {
File int `json:"file"`
External int `json:"external"`
Active int `json:"active"`
} `json:"sizes"`
PurgeSeq int `json:"purge_seq"`
Other struct {
DataSize int `json:"data_size"`
} `json:"other"`
DocDelCount int `json:"doc_del_count"`
DocCount int `json:"doc_count"`
DiskSize int `json:"disk_size"`
DiskFormatVersion int `json:"disk_format_version"`
DataSize int `json:"data_size"`
CompactRunning bool `json:"compact_running"`
InstanceStartTime string `json:"instance_start_time"`
}
//ConnectionInfo is a structure for capturing the database info and version
type ConnectionInfo struct {
Couchdb string `json:"couchdb"`
Version string `json:"version"`
Vendor struct {
Name string `json:"name"`
} `json:"vendor"`
}
//RangeQueryResponse is used for processing REST range query responses from CouchDB
type RangeQueryResponse struct {
TotalRows int `json:"total_rows"`
Offset int `json:"offset"`
Rows []struct {
ID string `json:"id"`
Key string `json:"key"`
Value struct {
Rev string `json:"rev"`
} `json:"value"`
Doc json.RawMessage `json:"doc"`
} `json:"rows"`
}
//QueryResponse is used for processing REST query responses from CouchDB
type QueryResponse struct {
Warning string `json:"warning"`
Docs []json.RawMessage `json:"docs"`
}
//Doc is used for capturing if attachments are return in the query from CouchDB
type Doc struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
Attachments json.RawMessage `json:"_attachments"`
}
//DocID is a minimal structure for capturing the ID from a query result
type DocID struct {
ID string `json:"_id"`
}
//QueryResult is used for returning query results from CouchDB
type QueryResult struct {
ID string
Value []byte
Attachments []*Attachment
}
//CouchConnectionDef contains parameters
type CouchConnectionDef struct {
URL string
Username string
Password string
MaxRetries int
MaxRetriesOnStartup int
RequestTimeout time.Duration
}
//CouchInstance represents a CouchDB instance
type CouchInstance struct {
conf CouchConnectionDef //connection configuration
client *http.Client // a client to connect to this instance
}
//CouchDatabase represents a database within a CouchDB instance
type CouchDatabase struct {
CouchInstance CouchInstance //connection configuration
DBName string
}
//DBReturn contains an error reported by CouchDB
type DBReturn struct {
StatusCode int `json:"status_code"`
Error string `json:"error"`
Reason string `json:"reason"`
}
//Attachment contains the definition for an attached file for couchdb
type Attachment struct {
Name string
ContentType string
Length uint64
AttachmentBytes []byte
}
//DocMetadata returns the ID, version and revision for a couchdb document
type DocMetadata struct {
ID string
Rev string
Version string
}
//FileDetails defines the structure needed to send an attachment to couchdb
type FileDetails struct {
Follows bool `json:"follows"`
ContentType string `json:"content_type"`
Length int `json:"length"`
}
//CouchDoc defines the structure for a JSON document value
type CouchDoc struct {
JSONValue []byte
Attachments []*Attachment
}
//BatchRetrieveDocMedatadataResponse is used for processing REST batch responses from CouchDB
type BatchRetrieveDocMedatadataResponse struct {
Rows []struct {
ID string `json:"id"`
Doc struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
Version string `json:"version"`
} `json:"doc"`
} `json:"rows"`
}
//BatchUpdateResponse defines a structure for batch update response
type BatchUpdateResponse struct {
ID string `json:"id"`
Error string `json:"error"`
Reason string `json:"reason"`
Ok bool `json:"ok"`
Rev string `json:"rev"`
}
//Base64Attachment contains the definition for an attached file for couchdb
type Base64Attachment struct {
ContentType string `json:"content_type"`
AttachmentData string `json:"data"`
}
// closeResponseBody discards the body and then closes it to enable returning it to
// connection pool
func closeResponseBody(resp *http.Response) {
io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body
resp.Body.Close()
}
//CreateConnectionDefinition for a new client connection
func CreateConnectionDefinition(couchDBAddress, username, password string, maxRetries,
maxRetriesOnStartup int, requestTimeout time.Duration) (*CouchConnectionDef, error) {
logger.Debugf("Entering CreateConnectionDefinition()")
connectURL := &url.URL{
Host: couchDBAddress,
Scheme: "http",
}
//parse the constructed URL to verify no errors
finalURL, err := url.Parse(connectURL.String())
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
logger.Debugf("Created database configuration URL=[%s]", finalURL.String())
logger.Debugf("Exiting CreateConnectionDefinition()")
//return an object containing the connection information
return &CouchConnectionDef{finalURL.String(), username, password, maxRetries,
maxRetriesOnStartup, requestTimeout}, nil
}
//CreateDatabaseIfNotExist method provides function to create database
func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() (*DBOperationResponse, error) {
logger.Debugf("Entering CreateDatabaseIfNotExist()")
dbInfo, couchDBReturn, err := dbclient.GetDatabaseInfo()
if err != nil {
if couchDBReturn == nil || couchDBReturn.StatusCode != 404 {
return nil, err
}
}
if dbInfo == nil && couchDBReturn.StatusCode == 404 {
logger.Debugf("Database %s does not exist.", dbclient.DBName)
connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
connectURL.Path = dbclient.DBName
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
//process the URL with a PUT, creates the database
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPut, connectURL.String(), nil, "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
//Get the response from the create REST call
dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
if dbResponse.Ok == true {
logger.Debugf("Created database %s ", dbclient.DBName)
}
logger.Debugf("Exiting CreateDatabaseIfNotExist()")
return dbResponse, nil
}
logger.Debugf("Database %s already exists", dbclient.DBName)
logger.Debugf("Exiting CreateDatabaseIfNotExist()")
return nil, nil
}
//GetDatabaseInfo method provides function to retrieve database information
func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) {
connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, nil, err
}
connectURL.Path = dbclient.DBName
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, couchDBReturn, err := dbclient.CouchInstance.handleRequest(http.MethodGet, connectURL.String(), nil, "", "", maxRetries)
if err != nil {
return nil, couchDBReturn, err
}
defer closeResponseBody(resp)
dbResponse := &DBInfo{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
// trace the database info response
if logger.IsEnabledFor(logging.DEBUG) {
dbResponseJSON, err := json.Marshal(dbResponse)
if err == nil {
logger.Debugf("GetDatabaseInfo() dbResponseJSON: %s", dbResponseJSON)
}
}
return dbResponse, couchDBReturn, nil
}
//VerifyCouchConfig method provides function to verify the connection information
func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBReturn, error) {
logger.Debugf("Entering VerifyCouchConfig()")
defer logger.Debugf("Exiting VerifyCouchConfig()")
connectURL, err := url.Parse(couchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, nil, err
}
connectURL.Path = "/"
//get the number of retries for startup
maxRetriesOnStartup := couchInstance.conf.MaxRetriesOnStartup
resp, couchDBReturn, err := couchInstance.handleRequest(http.MethodGet, connectURL.String(), nil,
couchInstance.conf.Username, couchInstance.conf.Password, maxRetriesOnStartup)
if err != nil {
return nil, couchDBReturn, fmt.Errorf("Unable to connect to CouchDB, check the hostname and port: %s", err.Error())
}
defer closeResponseBody(resp)
dbResponse := &ConnectionInfo{}
errJSON := json.NewDecoder(resp.Body).Decode(&dbResponse)
if errJSON != nil {
return nil, nil, fmt.Errorf("Unable to connect to CouchDB, check the hostname and port: %s", errJSON.Error())
}
// trace the database info response
if logger.IsEnabledFor(logging.DEBUG) {
dbResponseJSON, err := json.Marshal(dbResponse)
if err == nil {
logger.Debugf("VerifyConnection() dbResponseJSON: %s", dbResponseJSON)
}
}
//check to see if the system databases exist
//Verifying the existence of the system database accomplishes two steps
//1. Ensures the system databases are created
//2. Verifies the username password provided in the CouchDB config are valid for system admin
err = CreateSystemDatabasesIfNotExist(*couchInstance)
if err != nil {
logger.Errorf("Unable to connect to CouchDB, error: %s Check the admin username and password.\n", err.Error())
return nil, nil, fmt.Errorf("Unable to connect to CouchDB, error: %s Check the admin username and password.\n", err.Error())
}
return dbResponse, couchDBReturn, nil
}
//DropDatabase provides method to drop an existing database
func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) {
logger.Debugf("Entering DropDatabase()")
connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
connectURL.Path = dbclient.DBName
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodDelete, connectURL.String(), nil, "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
if dbResponse.Ok == true {
logger.Debugf("Dropped database %s ", dbclient.DBName)
}
logger.Debugf("Exiting DropDatabase()")
if dbResponse.Ok == true {
return dbResponse, nil
}
return dbResponse, fmt.Errorf("Error dropping database")
}
// EnsureFullCommit calls _ensure_full_commit for explicit fsync
func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error) {
logger.Debugf("Entering EnsureFullCommit()")
connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
connectURL.Path = dbclient.DBName + "/_ensure_full_commit"
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, connectURL.String(), nil, "", "", maxRetries)
if err != nil {
logger.Errorf("Failed to invoke _ensure_full_commit Error: %s\n", err.Error())
return nil, err
}
defer closeResponseBody(resp)
dbResponse := &DBOperationResponse{}
json.NewDecoder(resp.Body).Decode(&dbResponse)
if dbResponse.Ok == true {
logger.Debugf("_ensure_full_commit database %s ", dbclient.DBName)
}
logger.Debugf("Exiting EnsureFullCommit()")
if dbResponse.Ok == true {
return dbResponse, nil
}
return dbResponse, fmt.Errorf("Error syncing database")
}
//SaveDoc method provides a function to save a document, id and byte array
func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc) (string, error) {
logger.Debugf("Entering SaveDoc() id=[%s]", id)
if !utf8.ValidString(id) {
return "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
}
saveURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return "", err
}
saveURL.Path = dbclient.DBName
// id can contain a '/', so encode separately
saveURL = &url.URL{Opaque: saveURL.String() + "/" + encodePathElement(id)}
if rev == "" {
//See if the document already exists, we need the rev for save
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}
logger.Debugf(" rev=%s", rev)
//Set up a buffer for the data to be pushed to couchdb
data := []byte{}
//Set up a default boundary for use by multipart if sending attachments
defaultBoundary := ""
//check to see if attachments is nil, if so, then this is a JSON only
if couchDoc.Attachments == nil {
//Test to see if this is a valid JSON
if IsJSON(string(couchDoc.JSONValue)) != true {
return "", fmt.Errorf("JSON format is not valid")
}
// if there are no attachments, then use the bytes passed in as the JSON
data = couchDoc.JSONValue
} else { // there are attachments
//attachments are included, create the multipart definition
multipartData, multipartBoundary, err3 := createAttachmentPart(couchDoc, defaultBoundary)
if err3 != nil {
return "", err3
}
//Set the data buffer to the data from the create multi-part data
data = multipartData.Bytes()
//Set the default boundary to the value generated in the multipart creation
defaultBoundary = multipartBoundary
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
//handle the request for saving the JSON or attachments
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPut, saveURL.String(), data, rev, defaultBoundary, maxRetries)
if err != nil {
return "", err
}
defer closeResponseBody(resp)
//get the revision and return
revision, err := getRevisionHeader(resp)
if err != nil {
return "", err
}
logger.Debugf("Exiting SaveDoc()")
return revision, nil
}
func createAttachmentPart(couchDoc *CouchDoc, defaultBoundary string) (bytes.Buffer, string, error) {
//Create a buffer for writing the result
writeBuffer := new(bytes.Buffer)
// read the attachment and save as an attachment
writer := multipart.NewWriter(writeBuffer)
//retrieve the boundary for the multipart
defaultBoundary = writer.Boundary()
fileAttachments := map[string]FileDetails{}
for _, attachment := range couchDoc.Attachments {
fileAttachments[attachment.Name] = FileDetails{true, attachment.ContentType, len(attachment.AttachmentBytes)}
}
attachmentJSONMap := map[string]interface{}{
"_attachments": fileAttachments}
//Add any data uploaded with the files
if couchDoc.JSONValue != nil {
//create a generic map
genericMap := make(map[string]interface{})
//unmarshal the data into the generic map
decoder := json.NewDecoder(bytes.NewBuffer(couchDoc.JSONValue))
decoder.UseNumber()
decoder.Decode(&genericMap)
//add all key/values to the attachmentJSONMap
for jsonKey, jsonValue := range genericMap {
attachmentJSONMap[jsonKey] = jsonValue
}
}
filesForUpload, _ := json.Marshal(attachmentJSONMap)
logger.Debugf(string(filesForUpload))
//create the header for the JSON
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/json")
part, err := writer.CreatePart(header)
if err != nil {
return *writeBuffer, defaultBoundary, err
}
part.Write(filesForUpload)
for _, attachment := range couchDoc.Attachments {
header := make(textproto.MIMEHeader)
part, err2 := writer.CreatePart(header)
if err2 != nil {
return *writeBuffer, defaultBoundary, err2
}
part.Write(attachment.AttachmentBytes)
}
err = writer.Close()
if err != nil {
return *writeBuffer, defaultBoundary, err
}
return *writeBuffer, defaultBoundary, nil
}
func getRevisionHeader(resp *http.Response) (string, error) {
revision := resp.Header.Get("Etag")
if revision == "" {
return "", fmt.Errorf("No revision tag detected")
}
reg := regexp.MustCompile(`"([^"]*)"`)
revisionNoQuotes := reg.ReplaceAllString(revision, "${1}")
return revisionNoQuotes, nil
}
//ReadDoc method provides function to retrieve a document from the database by id
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
var couchDoc CouchDoc
attachments := []*Attachment{}
logger.Debugf("Entering ReadDoc() id=[%s]", id)
if !utf8.ValidString(id) {
return nil, "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
}
readURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, "", err
}
readURL.Path = dbclient.DBName
// id can contain a '/', so encode separately
readURL = &url.URL{Opaque: readURL.String() + "/" + encodePathElement(id)}
query := readURL.Query()
query.Add("attachments", "true")
readURL.RawQuery = query.Encode()
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, couchDBReturn, err := dbclient.CouchInstance.handleRequest(http.MethodGet, readURL.String(), nil, "", "", maxRetries)
if err != nil {
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debug("Document not found (404), returning nil value instead of 404 error")
// non-existent document should return nil value instead of a 404 error
// for details see https://github.com/hyperledger-archives/fabric/issues/936
return nil, "", nil
}
logger.Debugf("couchDBReturn=%v\n", couchDBReturn)
return nil, "", err
}
defer closeResponseBody(resp)
//Get the media type from the Content-Type header
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
log.Fatal(err)
}
//Get the revision from header
revision, err := getRevisionHeader(resp)
if err != nil {
return nil, "", err
}
//check to see if the is multipart, handle as attachment if multipart is detected
if strings.HasPrefix(mediaType, "multipart/") {
//Set up the multipart reader based on the boundary
multipartReader := multipart.NewReader(resp.Body, params["boundary"])
for {
p, err := multipartReader.NextPart()
if err == io.EOF {
break // processed all parts
}
if err != nil {
return nil, "", err
}
defer p.Close()
logger.Debugf("part header=%s", p.Header)
switch p.Header.Get("Content-Type") {
case "application/json":
partdata, err := ioutil.ReadAll(p)
if err != nil {
return nil, "", err
}
couchDoc.JSONValue = partdata
default:
//Create an attachment structure and load it
attachment := &Attachment{}
attachment.ContentType = p.Header.Get("Content-Type")
contentDispositionParts := strings.Split(p.Header.Get("Content-Disposition"), ";")
if strings.TrimSpace(contentDispositionParts[0]) == "attachment" {
switch p.Header.Get("Content-Encoding") {
case "gzip": //See if the part is gzip encoded
var respBody []byte
gr, err := gzip.NewReader(p)
if err != nil {
return nil, "", err
}
respBody, err = ioutil.ReadAll(gr)
if err != nil {
return nil, "", err
}
logger.Debugf("Retrieved attachment data")
attachment.AttachmentBytes = respBody
attachment.Name = p.FileName()
attachments = append(attachments, attachment)
default:
//retrieve the data, this is not gzip
partdata, err := ioutil.ReadAll(p)
if err != nil {
return nil, "", err
}
logger.Debugf("Retrieved attachment data")
attachment.AttachmentBytes = partdata
attachment.Name = p.FileName()
attachments = append(attachments, attachment)
} // end content-encoding switch
} // end if attachment
} // end content-type switch
} // for all multiparts
couchDoc.Attachments = attachments
return &couchDoc, revision, nil
}
//handle as JSON document
couchDoc.JSONValue, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", err
}
logger.Debugf("Exiting ReadDoc()")
return &couchDoc, revision, nil
}
//ReadDocRange method provides function to a range of documents based on the start and end keys
//startKey and endKey can also be empty strings. If startKey and endKey are empty, all documents are returned
//This function provides a limit option to specify the max number of entries and is supplied by config.
//Skip is reserved for possible future future use.
func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit, skip int) (*[]QueryResult, error) {
logger.Debugf("Entering ReadDocRange() startKey=%s, endKey=%s", startKey, endKey)
var results []QueryResult
rangeURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
rangeURL.Path = dbclient.DBName + "/_all_docs"
queryParms := rangeURL.Query()
queryParms.Set("limit", strconv.Itoa(limit))
queryParms.Add("skip", strconv.Itoa(skip))
queryParms.Add("include_docs", "true")
queryParms.Add("inclusive_end", "false") // endkey should be exclusive to be consistent with goleveldb
//Append the startKey if provided
if startKey != "" {
var err error
if startKey, err = encodeForJSON(startKey); err != nil {
return nil, err
}
queryParms.Add("startkey", "\""+startKey+"\"")
}
//Append the endKey if provided
if endKey != "" {
var err error
if endKey, err = encodeForJSON(endKey); err != nil {
return nil, err
}
queryParms.Add("endkey", "\""+endKey+"\"")
}
rangeURL.RawQuery = queryParms.Encode()
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodGet, rangeURL.String(), nil, "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
if err2 != nil {
log.Fatal(err2)
}
logger.Debugf("%s", dump)
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jsonResponse = &RangeQueryResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}
logger.Debugf("Total Rows: %d", jsonResponse.TotalRows)
for _, row := range jsonResponse.Rows {
var jsonDoc = &Doc{}
err3 := json.Unmarshal(row.Doc, &jsonDoc)
if err3 != nil {
return nil, err3
}
if jsonDoc.Attachments != nil {
logger.Debugf("Adding JSON document and attachments for id: %s", jsonDoc.ID)
couchDoc, _, err := dbclient.ReadDoc(jsonDoc.ID)
if err != nil {
return nil, err
}
var addDocument = &QueryResult{jsonDoc.ID, couchDoc.JSONValue, couchDoc.Attachments}
results = append(results, *addDocument)
} else {
logger.Debugf("Adding json docment for id: %s", jsonDoc.ID)
var addDocument = &QueryResult{jsonDoc.ID, row.Doc, nil}
results = append(results, *addDocument)
}
}
logger.Debugf("Exiting ReadDocRange()")
return &results, nil
}
//DeleteDoc method provides function to delete a document from the database by id
func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
logger.Debugf("Entering DeleteDoc() id=%s", id)
deleteURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return err
}
deleteURL.Path = dbclient.DBName
// id can contain a '/', so encode separately
deleteURL = &url.URL{Opaque: deleteURL.String() + "/" + encodePathElement(id)}
if rev == "" {
//See if the document already exists, we need the rev for delete
_, revdoc, err2 := dbclient.ReadDoc(id)
if err2 != nil {
//set the revision to indicate that the document was not found
rev = ""
} else {
//set the revision to the rev returned from the document read
rev = revdoc
}
}
logger.Debugf(" rev=%s", rev)
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, couchDBReturn, err := dbclient.CouchInstance.handleRequest(http.MethodDelete, deleteURL.String(), nil, rev, "", maxRetries)
if err != nil {
fmt.Printf("couchDBReturn=%v", couchDBReturn)
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debug("Document not found (404), returning nil value instead of 404 error")
// non-existent document should return nil value instead of a 404 error
// for details see https://github.com/hyperledger-archives/fabric/issues/936
return nil
}
return err
}
defer closeResponseBody(resp)
logger.Debugf("Exiting DeleteDoc()")
return nil
}
//QueryDocuments method provides function for processing a query
func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, error) {
logger.Debugf("Entering QueryDocuments() query=%s", query)
var results []QueryResult
queryURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
queryURL.Path = dbclient.DBName + "/_find"
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, queryURL.String(), []byte(query), "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(logging.DEBUG) {
dump, err2 := httputil.DumpResponse(resp, true)
if err2 != nil {
log.Fatal(err2)
}
logger.Debugf("%s", dump)
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jsonResponse = &QueryResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}
for _, row := range jsonResponse.Docs {
var jsonDoc = &Doc{}
err3 := json.Unmarshal(row, &jsonDoc)
if err3 != nil {
return nil, err3
}
if jsonDoc.Attachments != nil {
logger.Debugf("Adding JSON docment and attachments for id: %s", jsonDoc.ID)
couchDoc, _, err := dbclient.ReadDoc(jsonDoc.ID)
if err != nil {
return nil, err
}
var addDocument = &QueryResult{ID: jsonDoc.ID, Value: couchDoc.JSONValue, Attachments: couchDoc.Attachments}
results = append(results, *addDocument)
} else {
logger.Debugf("Adding json docment for id: %s", jsonDoc.ID)
var addDocument = &QueryResult{ID: jsonDoc.ID, Value: row, Attachments: nil}
results = append(results, *addDocument)
}
}
logger.Debugf("Exiting QueryDocuments()")
return &results, nil
}
//BatchRetrieveIDRevision - batch method to retrieve IDs and revisions
func (dbclient *CouchDatabase) BatchRetrieveIDRevision(keys []string) ([]*DocMetadata, error) {
batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
batchURL.Path = dbclient.DBName + "/_all_docs"
queryParms := batchURL.Query()
queryParms.Add("include_docs", "true")
batchURL.RawQuery = queryParms.Encode()
keymap := make(map[string]interface{})
keymap["keys"] = keys
jsonKeys, err := json.Marshal(keymap)
if err != nil {
return nil, err
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), jsonKeys, "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jsonResponse = &BatchRetrieveDocMedatadataResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}
revisionDocs := []*DocMetadata{}
for _, row := range jsonResponse.Rows {
revisionDoc := &DocMetadata{ID: row.ID, Rev: row.Doc.Rev, Version: row.Doc.Version}
revisionDocs = append(revisionDocs, revisionDoc)
}
return revisionDocs, nil
}
//BatchUpdateDocuments - batch method to batch update documents
func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) {
logger.Debugf("Entering BatchUpdateDocuments() documents=%v", documents)
batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
if err != nil {
logger.Errorf("URL parse error: %s", err.Error())
return nil, err
}
batchURL.Path = dbclient.DBName + "/_bulk_docs"
documentMap := make(map[string]interface{})
var jsonDocumentMap []interface{}
for _, jsonDocument := range documents {
//create a document map
var document = make(map[string]interface{})
//unmarshal the JSON component of the CouchDoc into the document
json.Unmarshal(jsonDocument.JSONValue, &document)
//iterate through any attachments
if len(jsonDocument.Attachments) > 0 {
//create a file attachment map
fileAttachment := make(map[string]interface{})
//for each attachment, create a Base64Attachment, name the attachment,
//add the content type and base64 encode the attachment
for _, attachment := range jsonDocument.Attachments {
fileAttachment[attachment.Name] = Base64Attachment{attachment.ContentType,
base64.StdEncoding.EncodeToString(attachment.AttachmentBytes)}
}
//add attachments to the document
document["_attachments"] = fileAttachment
}
//Append the document to the map of documents
jsonDocumentMap = append(jsonDocumentMap, document)
}
//Add the documents to the "docs" item
documentMap["docs"] = jsonDocumentMap
jsonKeys, err := json.Marshal(documentMap)
if err != nil {
return nil, err
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), jsonKeys, "", "", maxRetries)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var jsonResponse = []*BatchUpdateResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, err2
}
logger.Debugf("Exiting BatchUpdateDocuments()")
return jsonResponse, nil
}
//handleRequest method is a generic http request handler.
// if it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data []byte, rev string,
multipartBoundary string, maxRetries int) (*http.Response, *DBReturn, error) {
logger.Debugf("Entering handleRequest() method=%s url=%v", method, connectURL)
//create the return objects for couchDB
var resp *http.Response
var errResp error
couchDBReturn := &DBReturn{}
//set initial wait duration for retries
waitDuration := retryWaitTime * time.Millisecond
//attempt the http request for the max number of retries
for attempts := 0; attempts < maxRetries; attempts++ {
//Set up a buffer for the payload data
payloadData := new(bytes.Buffer)
payloadData.ReadFrom(bytes.NewReader(data))
//Create request based on URL for couchdb operation
req, err := http.NewRequest(method, connectURL, payloadData)
if err != nil {
return nil, nil, err
}
//add content header for PUT
if method == http.MethodPut || method == http.MethodPost || method == http.MethodDelete {
//If the multipartBoundary is not set, then this is a JSON and content-type should be set
//to application/json. Else, this is contains an attachment and needs to be multipart
if multipartBoundary == "" {
req.Header.Set("Content-Type", "application/json")
} else {
req.Header.Set("Content-Type", "multipart/related;boundary=\""+multipartBoundary+"\"")
}
//check to see if the revision is set, if so, pass as a header
if rev != "" {
req.Header.Set("If-Match", rev)
}
}
//add content header for PUT
if method == http.MethodPut || method == http.MethodPost {
req.Header.Set("Accept", "application/json")
}
//add content header for GET
if method == http.MethodGet {
req.Header.Set("Accept", "multipart/related")
}
//If username and password are set the use basic auth
if couchInstance.conf.Username != "" && couchInstance.conf.Password != "" {
req.SetBasicAuth(couchInstance.conf.Username, couchInstance.conf.Password)
}
if logger.IsEnabledFor(logging.DEBUG) {
dump, _ := httputil.DumpRequestOut(req, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("HTTP Request: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}
//Execute http request
resp, errResp = couchInstance.client.Do(req)
//if an error is not detected then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
break
}
//if this is an error, record the retry error, else this is a 500 error
if errResp != nil {
//Log the error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Error:%v",
waitDuration.String(), attempts+1, errResp.Error())
} else {
//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
closeResponseBody(resp)
if err != nil {
return nil, nil, err
}
errorBytes := []byte(jsonError)
//Unmarshal the response
json.Unmarshal(errorBytes, &couchDBReturn)
//Log the 500 error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Couch DB Error:%s, Status Code:%v Reason:%v",
waitDuration.String(), attempts+1, couchDBReturn.Error, resp.Status, couchDBReturn.Reason)
}
//sleep for specified sleep time, then retry
time.Sleep(waitDuration)
//backoff, doubling the retry time for next attempt
waitDuration *= 2
}
//if the error present, return the error
if errResp != nil {
return nil, nil, errResp
}
//set the return code for the couchDB request
couchDBReturn.StatusCode = resp.StatusCode
//check to see if the status code is 400 or higher
//response codes 4XX and 500 will be treated as errors
if resp.StatusCode >= 400 {
// close the response before returning error
defer closeResponseBody(resp)
//Read the response body
jsonError, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}
errorBytes := []byte(jsonError)
//marshal the response
json.Unmarshal(errorBytes, &couchDBReturn)
logger.Debugf("Couch DB Error:%s, Status Code:%v, Reason:%s",
couchDBReturn.Error, resp.StatusCode, couchDBReturn.Reason)
return nil, couchDBReturn, fmt.Errorf("Couch DB Error:%s, Status Code:%v, Reason:%s",
couchDBReturn.Error, resp.StatusCode, couchDBReturn.Reason)
}
logger.Debugf("Exiting handleRequest()")
//If no errors, then return the results
return resp, couchDBReturn, nil
}
//IsJSON tests a string to determine if a valid JSON
func IsJSON(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
// encodePathElement uses Golang for encoding and in addition, replaces a '/' by %2F.
// Otherwise, in the regular encoding, a '/' is treated as a path separator in the url
func encodePathElement(str string) string {
u := &url.URL{}
u.Path = str
encodedStr := u.String()
encodedStr = strings.Replace(encodedStr, "/", "%2F", -1)
return encodedStr
}
func encodeForJSON(str string) (string, error) {
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
if err := encoder.Encode(str); err != nil {
return "", err
}
// Encode adds double quotes to string and terminates with \n - stripping them as bytes as they are all ascii(0-127)
buffer := buf.Bytes()
return string(buffer[1 : len(buffer)-2]), nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liurenhao/fabric.git
git@gitee.com:liurenhao/fabric.git
liurenhao
fabric
fabric
v1.0.0-alpha2

搜索帮助