1 Star 0 Fork 0

妥協/fabric

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
couchdb.go 63.29 KB
Copy Edit Raw Blame History
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package couchdb
import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
"mime/multipart"
"net/http"
"net/http/httputil"
"net/textproto"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/hyperledger/fabric/common/flogging"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
)
var logger = flogging.MustGetLogger("couchdb")
//time between retry attempts in milliseconds
const retryWaitTime = 125
// DBOperationResponse is body for successful database calls.
type DBOperationResponse struct {
Ok bool
}
// DBInfo is body for database information.
type DBInfo struct {
DbName string `json:"db_name"`
Sizes struct {
File int `json:"file"`
External int `json:"external"`
Active int `json:"active"`
} `json:"sizes"`
Other struct {
DataSize int `json:"data_size"`
} `json:"other"`
DocDelCount int `json:"doc_del_count"`
DocCount int `json:"doc_count"`
DiskSize int `json:"disk_size"`
DiskFormatVersion int `json:"disk_format_version"`
DataSize int `json:"data_size"`
CompactRunning bool `json:"compact_running"`
InstanceStartTime string `json:"instance_start_time"`
}
//ConnectionInfo is a structure for capturing the database info and version
type ConnectionInfo struct {
Couchdb string `json:"couchdb"`
Version string `json:"version"`
Vendor struct {
Name string `json:"name"`
} `json:"vendor"`
}
//RangeQueryResponse is used for processing REST range query responses from CouchDB
type RangeQueryResponse struct {
TotalRows int32 `json:"total_rows"`
Offset int32 `json:"offset"`
Rows []struct {
ID string `json:"id"`
Key string `json:"key"`
Value struct {
Rev string `json:"rev"`
} `json:"value"`
Doc json.RawMessage `json:"doc"`
} `json:"rows"`
}
//QueryResponse is used for processing REST query responses from CouchDB
type QueryResponse struct {
Warning string `json:"warning"`
Docs []json.RawMessage `json:"docs"`
Bookmark string `json:"bookmark"`
}
// DocMetadata is used for capturing CouchDB document header info,
// used to capture id, version, rev and attachments returned in the query from CouchDB
type DocMetadata struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
Version string `json:"~version"`
AttachmentsInfo map[string]*AttachmentInfo `json:"_attachments"`
}
//DocID is a minimal structure for capturing the ID from a query result
type DocID struct {
ID string `json:"_id"`
}
//QueryResult is used for returning query results from CouchDB
type QueryResult struct {
ID string
Value []byte
Attachments []*AttachmentInfo
}
// Config is a structure used to configure a CouchInstance.
type Config struct {
// Address is the hostname:port of the CouchDB database instance.
Address string
// Username is the username used to authenticate with CouchDB. This username
// must have read and write access permissions.
Username string
// Password is the password for Username.
Password string
// MaxRetries is the maximum number of times to retry CouchDB operations on
// failure.
MaxRetries int
// MaxRetriesOnStartup is the maximum number of times to retry CouchDB operations on
// failure when initializing the ledger.
MaxRetriesOnStartup int
// RequestTimeout is the timeout used for CouchDB operations.
RequestTimeout time.Duration
// InternalQueryLimit is the maximum number of records to return internally
// when querying CouchDB.
InternalQueryLimit int
// MaxBatchUpdateSize is the maximum number of records to included in CouchDB
// bulk update operations.
MaxBatchUpdateSize int
// WarmIndexesAfterNBlocks is the number of blocks after which to warm any
// CouchDB indexes.
WarmIndexesAfterNBlocks int
// CreateGlobalChangesDB determines whether or not to create the "_global_changes"
// system database.
CreateGlobalChangesDB bool
// RedoLogPath is the directory where the CouchDB redo log files are stored.
RedoLogPath string
// UserCacheSizeMBs denotes the user specified maximum mega bytes (MB) to be allocated
// for the user state cache (i.e., all chaincodes deployed by the user). Note that
// UserCacheSizeMBs needs to be a multiple of 32 MB. If it is not a multiple of 32 MB,
// the peer would round the size to the next multiple of 32 MB.
UserCacheSizeMBs int
}
//CouchInstance represents a CouchDB instance
type CouchInstance struct {
conf *Config
client *http.Client // a client to connect to this instance
stats *stats
}
//CouchDatabase represents a database within a CouchDB instance
type CouchDatabase struct {
CouchInstance *CouchInstance //connection configuration
DBName string
IndexWarmCounter int
}
//DBReturn contains an error reported by CouchDB
type DBReturn struct {
StatusCode int `json:"status_code"`
Error string `json:"error"`
Reason string `json:"reason"`
}
//CreateIndexResponse contains an the index creation response from CouchDB
type CreateIndexResponse struct {
Result string `json:"result"`
ID string `json:"id"`
Name string `json:"name"`
}
//AttachmentInfo contains the definition for an attached file for couchdb
type AttachmentInfo struct {
Name string
ContentType string `json:"content_type"`
Length uint64
AttachmentBytes []byte `json:"data"`
}
//FileDetails defines the structure needed to send an attachment to couchdb
type FileDetails struct {
Follows bool `json:"follows"`
ContentType string `json:"content_type"`
Length int `json:"length"`
}
//CouchDoc defines the structure for a JSON document value
type CouchDoc struct {
JSONValue []byte
Attachments []*AttachmentInfo
}
//BatchRetrieveDocMetadataResponse is used for processing REST batch responses from CouchDB
type BatchRetrieveDocMetadataResponse struct {
Rows []struct {
ID string `json:"id"`
DocMetadata struct {
ID string `json:"_id"`
Rev string `json:"_rev"`
Version string `json:"~version"`
} `json:"doc"`
} `json:"rows"`
}
//BatchUpdateResponse defines a structure for batch update response
type BatchUpdateResponse struct {
ID string `json:"id"`
Error string `json:"error"`
Reason string `json:"reason"`
Ok bool `json:"ok"`
Rev string `json:"rev"`
}
//Base64Attachment contains the definition for an attached file for couchdb
type Base64Attachment struct {
ContentType string `json:"content_type"`
AttachmentData string `json:"data"`
}
//IndexResult contains the definition for a couchdb index
type IndexResult struct {
DesignDocument string `json:"designdoc"`
Name string `json:"name"`
Definition string `json:"definition"`
}
//DatabaseSecurity contains the definition for CouchDB database security
type DatabaseSecurity struct {
Admins struct {
Names []string `json:"names"`
Roles []string `json:"roles"`
} `json:"admins"`
Members struct {
Names []string `json:"names"`
Roles []string `json:"roles"`
} `json:"members"`
}
// closeResponseBody discards the body and then closes it to enable returning it to
// connection pool
func closeResponseBody(resp *http.Response) {
if resp != nil {
io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body
resp.Body.Close()
}
}
//CreateDatabaseIfNotExist method provides function to create database
func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() error {
logger.Debugf("[%s] Entering CreateDatabaseIfNotExist()", dbclient.DBName)
dbInfo, couchDBReturn, err := dbclient.GetDatabaseInfo()
if err != nil {
if couchDBReturn == nil || couchDBReturn.StatusCode != 404 {
return err
}
}
//If the dbInfo returns populated and status code is 200, then the database exists
if dbInfo != nil && couchDBReturn.StatusCode == 200 {
//Apply database security if needed
errSecurity := dbclient.applyDatabasePermissions()
if errSecurity != nil {
return errSecurity
}
logger.Debugf("[%s] Database already exists", dbclient.DBName)
logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
return nil
}
logger.Debugf("[%s] Database does not exist.", dbclient.DBName)
connectURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
//process the URL with a PUT, creates the database
resp, _, err := dbclient.handleRequest(http.MethodPut, "CreateDatabaseIfNotExist", connectURL, nil, "", "", maxRetries, true, nil)
if err != nil {
// Check to see if the database exists
// Even though handleRequest() returned an error, the
// database may have been created and a false error
// returned due to a timeout or race condition.
// Do a final check to see if the database really got created.
dbInfo, couchDBReturn, errDbInfo := dbclient.GetDatabaseInfo()
//If there is no error, then the database exists, return without an error
if errDbInfo == nil && dbInfo != nil && couchDBReturn.StatusCode == 200 {
errSecurity := dbclient.applyDatabasePermissions()
if errSecurity != nil {
return errSecurity
}
logger.Infof("[%s] Created state database", dbclient.DBName)
logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
return nil
}
return err
}
defer closeResponseBody(resp)
errSecurity := dbclient.applyDatabasePermissions()
if errSecurity != nil {
return errSecurity
}
logger.Infof("Created state database %s", dbclient.DBName)
logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
return nil
}
//applyDatabaseSecurity
func (dbclient *CouchDatabase) applyDatabasePermissions() error {
//If the username and password are not set, then skip applying permissions
if dbclient.CouchInstance.conf.Username == "" && dbclient.CouchInstance.conf.Password == "" {
return nil
}
securityPermissions := &DatabaseSecurity{}
securityPermissions.Admins.Names = append(securityPermissions.Admins.Names, dbclient.CouchInstance.conf.Username)
securityPermissions.Members.Names = append(securityPermissions.Members.Names, dbclient.CouchInstance.conf.Username)
err := dbclient.ApplyDatabaseSecurity(securityPermissions)
if err != nil {
return err
}
return nil
}
//GetDatabaseInfo method provides function to retrieve database information
func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) {
connectURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, "GetDatabaseInfo", connectURL, nil, "", "", maxRetries, true, nil)
if err != nil {
return nil, couchDBReturn, err
}
defer closeResponseBody(resp)
dbResponse := &DBInfo{}
decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
if decodeErr != nil {
return nil, nil, errors.Wrap(decodeErr, "error decoding response body")
}
// trace the database info response
logger.Debugw("GetDatabaseInfo()", "dbResponseJSON", dbResponse)
return dbResponse, couchDBReturn, nil
}
//VerifyCouchConfig method provides function to verify the connection information
func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBReturn, error) {
logger.Debugf("Entering VerifyCouchConfig()")
defer logger.Debugf("Exiting VerifyCouchConfig()")
connectURL, err := url.Parse(couchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, nil, errors.Wrapf(err, "error parsing couch instance URL: %s", couchInstance.URL())
}
connectURL.Path = "/"
//get the number of retries for startup
maxRetriesOnStartup := couchInstance.conf.MaxRetriesOnStartup
resp, couchDBReturn, err := couchInstance.handleRequest(context.Background(), http.MethodGet, "", "VerifyCouchConfig", connectURL, nil,
"", "", maxRetriesOnStartup, true, nil)
if err != nil {
return nil, couchDBReturn, errors.WithMessage(err, "unable to connect to CouchDB, check the hostname and port")
}
defer closeResponseBody(resp)
dbResponse := &ConnectionInfo{}
decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
if decodeErr != nil {
return nil, nil, errors.Wrap(decodeErr, "error decoding response body")
}
// trace the database info response
logger.Debugw("VerifyConnection()", "dbResponseJSON", dbResponse)
//check to see if the system databases exist
//Verifying the existence of the system database accomplishes two steps
//1. Ensures the system databases are created
//2. Verifies the username password provided in the CouchDB config are valid for system admin
err = CreateSystemDatabasesIfNotExist(couchInstance)
if err != nil {
logger.Errorf("Unable to connect to CouchDB, error: %s. Check the admin username and password.", err)
return nil, nil, errors.WithMessage(err, "unable to connect to CouchDB. Check the admin username and password")
}
return dbResponse, couchDBReturn, nil
}
// IsEmpty returns false if couchInstance contains any databases
// (except couchdb system databases and any database name supplied in the parameter 'databasesToIgnore')
func (couchInstance *CouchInstance) IsEmpty(databasesToIgnore []string) (bool, error) {
toIgnore := map[string]bool{}
for _, s := range databasesToIgnore {
toIgnore[s] = true
}
applicationDBNames, err := couchInstance.RetrieveApplicationDBNames()
if err != nil {
return false, err
}
for _, dbName := range applicationDBNames {
if !toIgnore[dbName] {
return false, nil
}
}
return true, nil
}
// RetrieveApplicationDBNames returns all the applicaiton database names in the couch instance
func (couchInstance *CouchInstance) RetrieveApplicationDBNames() ([]string, error) {
connectURL, err := url.Parse(couchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing couch instance URL: %s", couchInstance.URL())
}
connectURL.Path = "/_all_dbs"
maxRetries := couchInstance.conf.MaxRetries
resp, _, err := couchInstance.handleRequest(
context.Background(),
http.MethodGet,
"",
"IsEmpty",
connectURL,
nil,
"",
"",
maxRetries,
true,
nil,
)
if err != nil {
return nil, errors.WithMessage(err, "unable to connect to CouchDB, check the hostname and port")
}
var dbNames []string
defer closeResponseBody(resp)
if err := json.NewDecoder(resp.Body).Decode(&dbNames); err != nil {
return nil, errors.Wrap(err, "error decoding response body")
}
logger.Debugf("dbNames = %s", dbNames)
applicationsDBNames := []string{}
for _, d := range dbNames {
if !isCouchSystemDBName(d) {
applicationsDBNames = append(applicationsDBNames, d)
}
}
return applicationsDBNames, nil
}
func isCouchSystemDBName(name string) bool {
return strings.HasPrefix(name, "_")
}
// HealthCheck checks if the peer is able to communicate with CouchDB
func (couchInstance *CouchInstance) HealthCheck(ctx context.Context) error {
connectURL, err := url.Parse(couchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", couchInstance.URL())
}
_, _, err = couchInstance.handleRequest(ctx, http.MethodHead, "", "HealthCheck", connectURL, nil, "", "", 0, true, nil)
if err != nil {
return fmt.Errorf("failed to connect to couch db [%s]", err)
}
return nil
}
// InternalQueryLimit returns the maximum number of records to return internally
// when querying CouchDB.
func (couchInstance *CouchInstance) InternalQueryLimit() int32 {
return int32(couchInstance.conf.InternalQueryLimit)
}
// MaxBatchUpdateSize returns the maximum number of records to include in a
// bulk update operation.
func (couchInstance *CouchInstance) MaxBatchUpdateSize() int {
return couchInstance.conf.MaxBatchUpdateSize
}
// URL returns the URL for the CouchDB instance.
func (couchInstance *CouchInstance) URL() string {
URL := &url.URL{
Host: couchInstance.conf.Address,
Scheme: "http",
}
return URL.String()
}
//DropDatabase provides method to drop an existing database
func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering DropDatabase()", dbName)
connectURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodDelete, "DropDatabase", connectURL, nil, "", "", maxRetries, true, nil)
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
dbResponse := &DBOperationResponse{}
decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
if decodeErr != nil {
return nil, errors.Wrap(decodeErr, "error decoding response body")
}
if dbResponse.Ok == true {
logger.Debugf("[%s] Dropped database", dbclient.DBName)
}
logger.Debugf("[%s] Exiting DropDatabase()", dbclient.DBName)
if dbResponse.Ok == true {
return dbResponse, nil
}
return dbResponse, errors.New("error dropping database")
}
// EnsureFullCommit calls _ensure_full_commit for explicit fsync
func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering EnsureFullCommit()", dbName)
connectURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodPost, "EnsureFullCommit", connectURL, nil, "", "", maxRetries, true, nil, "_ensure_full_commit")
if err != nil {
logger.Errorf("Failed to invoke couchdb _ensure_full_commit. Error: %+v", err)
return nil, err
}
defer closeResponseBody(resp)
dbResponse := &DBOperationResponse{}
decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
if decodeErr != nil {
return nil, errors.Wrap(decodeErr, "error decoding response body")
}
// check if we should warm indexes
if dbclient.CouchInstance.conf.WarmIndexesAfterNBlocks > 0 {
// check to see if the number of blocks committed exceeds the threshold for index warming
if dbclient.IndexWarmCounter >= dbclient.CouchInstance.conf.WarmIndexesAfterNBlocks {
// use a go routine to launch WarmIndexAllIndexes()
go dbclient.runWarmIndexAllIndexes()
dbclient.IndexWarmCounter = 0
}
dbclient.IndexWarmCounter++
}
logger.Debugf("[%s] Exiting EnsureFullCommit()", dbclient.DBName)
if dbResponse.Ok == true {
return dbResponse, nil
}
return dbResponse, errors.New("error syncing database")
}
//SaveDoc method provides a function to save a document, id and byte array
func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc) (string, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering SaveDoc() id=[%s]", dbName, id)
if !utf8.ValidString(id) {
return "", errors.Errorf("doc id [%x] not a valid utf8 string", id)
}
saveURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//Set up a buffer for the data to be pushed to couchdb
data := []byte{}
//Set up a default boundary for use by multipart if sending attachments
defaultBoundary := ""
//Create a flag for shared connections. This is set to false for zero length attachments
keepConnectionOpen := true
//check to see if attachments is nil, if so, then this is a JSON only
if couchDoc.Attachments == nil {
//Test to see if this is a valid JSON
if IsJSON(string(couchDoc.JSONValue)) != true {
return "", errors.New("JSON format is not valid")
}
// if there are no attachments, then use the bytes passed in as the JSON
data = couchDoc.JSONValue
} else { // there are attachments
//attachments are included, create the multipart definition
multipartData, multipartBoundary, err3 := createAttachmentPart(couchDoc, defaultBoundary)
if err3 != nil {
return "", err3
}
//If there is a zero length attachment, do not keep the connection open
for _, attach := range couchDoc.Attachments {
if attach.Length < 1 {
keepConnectionOpen = false
}
}
//Set the data buffer to the data from the create multi-part data
data = multipartData.Bytes()
//Set the default boundary to the value generated in the multipart creation
defaultBoundary = multipartBoundary
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
//handle the request for saving document with a retry if there is a revision conflict
resp, _, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodPut, dbName, "SaveDoc", saveURL, data, rev, defaultBoundary, maxRetries, keepConnectionOpen, nil)
if err != nil {
return "", err
}
defer closeResponseBody(resp)
//get the revision and return
revision, err := getRevisionHeader(resp)
if err != nil {
return "", err
}
logger.Debugf("[%s] Exiting SaveDoc()", dbclient.DBName)
return revision, nil
}
//getDocumentRevision will return the revision if the document exists, otherwise it will return ""
func (dbclient *CouchDatabase) getDocumentRevision(id string) string {
var rev = ""
//See if the document already exists, we need the rev for saves and deletes
_, revdoc, err := dbclient.ReadDoc(id)
if err == nil {
//set the revision to the rev returned from the document read
rev = revdoc
}
return rev
}
func createAttachmentPart(couchDoc *CouchDoc, defaultBoundary string) (bytes.Buffer, string, error) {
//Create a buffer for writing the result
writeBuffer := new(bytes.Buffer)
// read the attachment and save as an attachment
writer := multipart.NewWriter(writeBuffer)
//retrieve the boundary for the multipart
defaultBoundary = writer.Boundary()
fileAttachments := map[string]FileDetails{}
for _, attachment := range couchDoc.Attachments {
fileAttachments[attachment.Name] = FileDetails{true, attachment.ContentType, len(attachment.AttachmentBytes)}
}
attachmentJSONMap := map[string]interface{}{
"_attachments": fileAttachments}
//Add any data uploaded with the files
if couchDoc.JSONValue != nil {
//create a generic map
genericMap := make(map[string]interface{})
//unmarshal the data into the generic map
decoder := json.NewDecoder(bytes.NewBuffer(couchDoc.JSONValue))
decoder.UseNumber()
decodeErr := decoder.Decode(&genericMap)
if decodeErr != nil {
return *writeBuffer, "", errors.Wrap(decodeErr, "error decoding json data")
}
//add all key/values to the attachmentJSONMap
for jsonKey, jsonValue := range genericMap {
attachmentJSONMap[jsonKey] = jsonValue
}
}
filesForUpload, err := json.Marshal(attachmentJSONMap)
if err != nil {
return *writeBuffer, "", errors.Wrap(err, "error marshalling json data")
}
logger.Debugf(string(filesForUpload))
//create the header for the JSON
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/json")
part, err := writer.CreatePart(header)
if err != nil {
return *writeBuffer, defaultBoundary, errors.Wrap(err, "error creating multipart")
}
part.Write(filesForUpload)
for _, attachment := range couchDoc.Attachments {
header := make(textproto.MIMEHeader)
part, err2 := writer.CreatePart(header)
if err2 != nil {
return *writeBuffer, defaultBoundary, errors.Wrap(err2, "error creating multipart")
}
part.Write(attachment.AttachmentBytes)
}
err = writer.Close()
if err != nil {
return *writeBuffer, defaultBoundary, errors.Wrap(err, "error closing multipart writer")
}
return *writeBuffer, defaultBoundary, nil
}
func getRevisionHeader(resp *http.Response) (string, error) {
if resp == nil {
return "", errors.New("no response received from CouchDB")
}
revision := resp.Header.Get("Etag")
if revision == "" {
return "", errors.New("no revision tag detected")
}
reg := regexp.MustCompile(`"([^"]*)"`)
revisionNoQuotes := reg.ReplaceAllString(revision, "${1}")
return revisionNoQuotes, nil
}
//ReadDoc method provides function to retrieve a document and its revision
//from the database by id
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
var couchDoc CouchDoc
attachments := []*AttachmentInfo{}
dbName := dbclient.DBName
logger.Debugf("[%s] Entering ReadDoc() id=[%s]", dbName, id)
if !utf8.ValidString(id) {
return nil, "", errors.Errorf("doc id [%x] not a valid utf8 string", id)
}
readURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
query := readURL.Query()
query.Add("attachments", "true")
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, "ReadDoc", readURL, nil, "", "", maxRetries, true, &query, id)
if err != nil {
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debugf("[%s] Document not found (404), returning nil value instead of 404 error", dbclient.DBName)
// non-existent document should return nil value instead of a 404 error
// for details see https://github.com/hyperledger-archives/fabric/issues/936
return nil, "", nil
}
logger.Debugf("[%s] couchDBReturn=%v\n", dbclient.DBName, couchDBReturn)
return nil, "", err
}
defer closeResponseBody(resp)
//Get the media type from the Content-Type header
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
log.Fatal(err)
}
//Get the revision from header
revision, err := getRevisionHeader(resp)
if err != nil {
return nil, "", err
}
//check to see if the is multipart, handle as attachment if multipart is detected
if strings.HasPrefix(mediaType, "multipart/") {
//Set up the multipart reader based on the boundary
multipartReader := multipart.NewReader(resp.Body, params["boundary"])
for {
p, err := multipartReader.NextPart()
if err == io.EOF {
break // processed all parts
}
if err != nil {
return nil, "", errors.Wrap(err, "error reading next multipart")
}
defer p.Close()
logger.Debugf("[%s] part header=%s", dbclient.DBName, p.Header)
switch p.Header.Get("Content-Type") {
case "application/json":
partdata, err := ioutil.ReadAll(p)
if err != nil {
return nil, "", errors.Wrap(err, "error reading multipart data")
}
couchDoc.JSONValue = partdata
default:
//Create an attachment structure and load it
attachment := &AttachmentInfo{}
attachment.ContentType = p.Header.Get("Content-Type")
contentDispositionParts := strings.Split(p.Header.Get("Content-Disposition"), ";")
if strings.TrimSpace(contentDispositionParts[0]) == "attachment" {
switch p.Header.Get("Content-Encoding") {
case "gzip": //See if the part is gzip encoded
var respBody []byte
gr, err := gzip.NewReader(p)
if err != nil {
return nil, "", errors.Wrap(err, "error creating gzip reader")
}
respBody, err = ioutil.ReadAll(gr)
if err != nil {
return nil, "", errors.Wrap(err, "error reading gzip data")
}
logger.Debugf("[%s] Retrieved attachment data", dbclient.DBName)
attachment.AttachmentBytes = respBody
attachment.Length = uint64(len(attachment.AttachmentBytes))
attachment.Name = p.FileName()
attachments = append(attachments, attachment)
default:
//retrieve the data, this is not gzip
partdata, err := ioutil.ReadAll(p)
if err != nil {
return nil, "", errors.Wrap(err, "error reading multipart data")
}
logger.Debugf("[%s] Retrieved attachment data", dbclient.DBName)
attachment.AttachmentBytes = partdata
attachment.Length = uint64(len(attachment.AttachmentBytes))
attachment.Name = p.FileName()
attachments = append(attachments, attachment)
} // end content-encoding switch
} // end if attachment
} // end content-type switch
} // for all multiparts
couchDoc.Attachments = attachments
return &couchDoc, revision, nil
}
//handle as JSON document
couchDoc.JSONValue, err = ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", errors.Wrap(err, "error reading response body")
}
logger.Debugf("[%s] Exiting ReadDoc()", dbclient.DBName)
return &couchDoc, revision, nil
}
//ReadDocRange method provides function to a range of documents based on the start and end keys
//startKey and endKey can also be empty strings. If startKey and endKey are empty, all documents are returned
//This function provides a limit option to specify the max number of entries and is supplied by config.
//Skip is reserved for possible future future use.
func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit int32) ([]*QueryResult, string, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering ReadDocRange() startKey=%s, endKey=%s", dbName, startKey, endKey)
var results []*QueryResult
rangeURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
queryParms := rangeURL.Query()
//Increment the limit by 1 to see if there are more qualifying records
queryParms.Set("limit", strconv.FormatInt(int64(limit+1), 10))
queryParms.Add("include_docs", "true")
queryParms.Add("inclusive_end", "false") // endkey should be exclusive to be consistent with goleveldb
queryParms.Add("attachments", "true") // get the attachments as well
//Append the startKey if provided
if startKey != "" {
if startKey, err = encodeForJSON(startKey); err != nil {
return nil, "", err
}
queryParms.Add("startkey", "\""+startKey+"\"")
}
//Append the endKey if provided
if endKey != "" {
var err error
if endKey, err = encodeForJSON(endKey); err != nil {
return nil, "", err
}
queryParms.Add("endkey", "\""+endKey+"\"")
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodGet, "RangeDocRange", rangeURL, nil, "", "", maxRetries, true, &queryParms, "_all_docs")
if err != nil {
return nil, "", err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(zapcore.DebugLevel) {
dump, err2 := httputil.DumpResponse(resp, true)
if err2 != nil {
log.Fatal(err2)
}
logger.Debugf("[%s] %s", dbclient.DBName, dump)
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", errors.Wrap(err, "error reading response body")
}
var jsonResponse = &RangeQueryResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, "", errors.Wrap(err2, "error unmarshalling json data")
}
//if an additional record is found, then reduce the count by 1
//and populate the nextStartKey
if jsonResponse.TotalRows > limit {
jsonResponse.TotalRows = limit
}
logger.Debugf("[%s] Total Rows: %d", dbclient.DBName, jsonResponse.TotalRows)
//Use the next endKey as the starting default for the nextStartKey
nextStartKey := endKey
for index, row := range jsonResponse.Rows {
var docMetadata = &DocMetadata{}
err3 := json.Unmarshal(row.Doc, &docMetadata)
if err3 != nil {
return nil, "", errors.Wrap(err3, "error unmarshalling json data")
}
//if there is an extra row for the nextStartKey, then do not add the row to the result set
//and populate the nextStartKey variable
if int32(index) >= jsonResponse.TotalRows {
nextStartKey = docMetadata.ID
continue
}
if docMetadata.AttachmentsInfo != nil {
logger.Debugf("[%s] Adding JSON document and attachments for id: %s", dbclient.DBName, docMetadata.ID)
attachments := []*AttachmentInfo{}
for attachmentName, attachment := range docMetadata.AttachmentsInfo {
attachment.Name = attachmentName
attachments = append(attachments, attachment)
}
var addDocument = &QueryResult{docMetadata.ID, row.Doc, attachments}
results = append(results, addDocument)
} else {
logger.Debugf("[%s] Adding json docment for id: %s", dbclient.DBName, docMetadata.ID)
var addDocument = &QueryResult{docMetadata.ID, row.Doc, nil}
results = append(results, addDocument)
}
}
logger.Debugf("[%s] Exiting ReadDocRange()", dbclient.DBName)
return results, nextStartKey, nil
}
//DeleteDoc method provides function to delete a document from the database by id
func (dbclient *CouchDatabase) DeleteDoc(id, rev string) error {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering DeleteDoc() id=%s", dbName, id)
deleteURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
//handle the request for saving document with a retry if there is a revision conflict
resp, couchDBReturn, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodDelete, dbName, "DeleteDoc",
deleteURL, nil, "", "", maxRetries, true, nil)
if err != nil {
if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
logger.Debugf("[%s] Document not found (404), returning nil value instead of 404 error", dbclient.DBName)
// non-existent document should return nil value instead of a 404 error
// for details see https://github.com/hyperledger-archives/fabric/issues/936
return nil
}
return err
}
defer closeResponseBody(resp)
logger.Debugf("[%s] Exiting DeleteDoc()", dbclient.DBName)
return nil
}
//QueryDocuments method provides function for processing a query
func (dbclient *CouchDatabase) QueryDocuments(query string) ([]*QueryResult, string, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering QueryDocuments() query=%s", dbName, query)
var results []*QueryResult
queryURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodPost, "QueryDocuments", queryURL, []byte(query), "", "", maxRetries, true, nil, "_find")
if err != nil {
return nil, "", err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(zapcore.DebugLevel) {
dump, err2 := httputil.DumpResponse(resp, true)
if err2 != nil {
log.Fatal(err2)
}
logger.Debugf("[%s] %s", dbclient.DBName, dump)
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", errors.Wrap(err, "error reading response body")
}
var jsonResponse = &QueryResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, "", errors.Wrap(err2, "error unmarshalling json data")
}
if jsonResponse.Warning != "" {
logger.Warnf("The query [%s] caused the following warning: [%s]", query, jsonResponse.Warning)
}
for _, row := range jsonResponse.Docs {
var docMetadata = &DocMetadata{}
err3 := json.Unmarshal(row, &docMetadata)
if err3 != nil {
return nil, "", errors.Wrap(err3, "error unmarshalling json data")
}
// JSON Query results never have attachments
// The If block below will never be executed
if docMetadata.AttachmentsInfo != nil {
logger.Debugf("[%s] Adding JSON docment and attachments for id: %s", dbclient.DBName, docMetadata.ID)
couchDoc, _, err := dbclient.ReadDoc(docMetadata.ID)
if err != nil {
return nil, "", err
}
var addDocument = &QueryResult{ID: docMetadata.ID, Value: couchDoc.JSONValue, Attachments: couchDoc.Attachments}
results = append(results, addDocument)
} else {
logger.Debugf("[%s] Adding json docment for id: %s", dbclient.DBName, docMetadata.ID)
var addDocument = &QueryResult{ID: docMetadata.ID, Value: row, Attachments: nil}
results = append(results, addDocument)
}
}
logger.Debugf("[%s] Exiting QueryDocuments()", dbclient.DBName)
return results, jsonResponse.Bookmark, nil
}
// ListIndex method lists the defined indexes for a database
func (dbclient *CouchDatabase) ListIndex() ([]*IndexResult, error) {
//IndexDefinition contains the definition for a couchdb index
type indexDefinition struct {
DesignDocument string `json:"ddoc"`
Name string `json:"name"`
Type string `json:"type"`
Definition json.RawMessage `json:"def"`
}
//ListIndexResponse contains the definition for listing couchdb indexes
type listIndexResponse struct {
TotalRows int `json:"total_rows"`
Indexes []indexDefinition `json:"indexes"`
}
dbName := dbclient.DBName
logger.Debug("[%s] Entering ListIndex()", dbName)
indexURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodGet, "ListIndex", indexURL, nil, "", "", maxRetries, true, nil, "_index")
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "error reading response body")
}
var jsonResponse = &listIndexResponse{}
err2 := json.Unmarshal(jsonResponseRaw, jsonResponse)
if err2 != nil {
return nil, errors.Wrap(err2, "error unmarshalling json data")
}
var results []*IndexResult
for _, row := range jsonResponse.Indexes {
//if the DesignDocument does not begin with "_design/", then this is a system
//level index and is not meaningful and cannot be edited or deleted
designDoc := row.DesignDocument
s := strings.SplitAfterN(designDoc, "_design/", 2)
if len(s) > 1 {
designDoc = s[1]
//Add the index definition to the results
var addIndexResult = &IndexResult{DesignDocument: designDoc, Name: row.Name, Definition: fmt.Sprintf("%s", row.Definition)}
results = append(results, addIndexResult)
}
}
logger.Debugf("[%s] Exiting ListIndex()", dbclient.DBName)
return results, nil
}
// CreateIndex method provides a function creating an index
func (dbclient *CouchDatabase) CreateIndex(indexdefinition string) (*CreateIndexResponse, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering CreateIndex() indexdefinition=%s", dbName, indexdefinition)
//Test to see if this is a valid JSON
if IsJSON(indexdefinition) != true {
return nil, errors.New("JSON format is not valid")
}
indexURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodPost, "CreateIndex", indexURL, []byte(indexdefinition), "", "", maxRetries, true, nil, "_index")
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if resp == nil {
return nil, errors.New("invalid response received from CouchDB")
}
//Read the response body
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "error reading response body")
}
couchDBReturn := &CreateIndexResponse{}
jsonBytes := []byte(respBody)
//unmarshal the response
err = json.Unmarshal(jsonBytes, &couchDBReturn)
if err != nil {
return nil, errors.Wrap(err, "error unmarshalling json data")
}
if couchDBReturn.Result == "created" {
logger.Infof("Created CouchDB index [%s] in state database [%s] using design document [%s]", couchDBReturn.Name, dbclient.DBName, couchDBReturn.ID)
return couchDBReturn, nil
}
logger.Infof("Updated CouchDB index [%s] in state database [%s] using design document [%s]", couchDBReturn.Name, dbclient.DBName, couchDBReturn.ID)
return couchDBReturn, nil
}
// DeleteIndex method provides a function deleting an index
func (dbclient *CouchDatabase) DeleteIndex(designdoc, indexname string) error {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering DeleteIndex() designdoc=%s indexname=%s", dbName, designdoc, indexname)
indexURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodDelete, "DeleteIndex", indexURL, nil, "", "", maxRetries, true, nil, "_index", designdoc, "json", indexname)
if err != nil {
return err
}
defer closeResponseBody(resp)
return nil
}
//WarmIndex method provides a function for warming a single index
func (dbclient *CouchDatabase) WarmIndex(designdoc, indexname string) error {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering WarmIndex() designdoc=%s indexname=%s", dbName, designdoc, indexname)
indexURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
queryParms := indexURL.Query()
//Query parameter that allows the execution of the URL to return immediately
//The update_after will cause the index update to run after the URL returns
queryParms.Add("stale", "update_after")
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodGet, "WarmIndex", indexURL, nil, "", "", maxRetries, true, &queryParms, "_design", designdoc, "_view", indexname)
if err != nil {
return err
}
defer closeResponseBody(resp)
return nil
}
//runWarmIndexAllIndexes is a wrapper for WarmIndexAllIndexes to catch and report any errors
func (dbclient *CouchDatabase) runWarmIndexAllIndexes() {
err := dbclient.WarmIndexAllIndexes()
if err != nil {
logger.Errorf("Error detected during WarmIndexAllIndexes(): %+v", err)
}
}
//WarmIndexAllIndexes method provides a function for warming all indexes for a database
func (dbclient *CouchDatabase) WarmIndexAllIndexes() error {
logger.Debugf("[%s] Entering WarmIndexAllIndexes()", dbclient.DBName)
//Retrieve all indexes
listResult, err := dbclient.ListIndex()
if err != nil {
return err
}
//For each index definition, execute an index refresh
for _, elem := range listResult {
err := dbclient.WarmIndex(elem.DesignDocument, elem.Name)
if err != nil {
return err
}
}
logger.Debugf("[%s] Exiting WarmIndexAllIndexes()", dbclient.DBName)
return nil
}
//GetDatabaseSecurity method provides function to retrieve the security config for a database
func (dbclient *CouchDatabase) GetDatabaseSecurity() (*DatabaseSecurity, error) {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering GetDatabaseSecurity()", dbName)
securityURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodGet, "GetDatabaseSecurity", securityURL, nil, "", "", maxRetries, true, nil, "_security")
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "error reading response body")
}
var jsonResponse = &DatabaseSecurity{}
err2 := json.Unmarshal(jsonResponseRaw, jsonResponse)
if err2 != nil {
return nil, errors.Wrap(err2, "error unmarshalling json data")
}
logger.Debugf("[%s] Exiting GetDatabaseSecurity()", dbclient.DBName)
return jsonResponse, nil
}
//ApplyDatabaseSecurity method provides function to update the security config for a database
func (dbclient *CouchDatabase) ApplyDatabaseSecurity(databaseSecurity *DatabaseSecurity) error {
dbName := dbclient.DBName
logger.Debugf("[%s] Entering ApplyDatabaseSecurity()", dbName)
securityURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
//Ensure all of the arrays are initialized to empty arrays instead of nil
if databaseSecurity.Admins.Names == nil {
databaseSecurity.Admins.Names = make([]string, 0)
}
if databaseSecurity.Admins.Roles == nil {
databaseSecurity.Admins.Roles = make([]string, 0)
}
if databaseSecurity.Members.Names == nil {
databaseSecurity.Members.Names = make([]string, 0)
}
if databaseSecurity.Members.Roles == nil {
databaseSecurity.Members.Roles = make([]string, 0)
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
databaseSecurityJSON, err := json.Marshal(databaseSecurity)
if err != nil {
return errors.Wrap(err, "error unmarshalling json data")
}
logger.Debugf("[%s] Applying security to database: %s", dbclient.DBName, string(databaseSecurityJSON))
resp, _, err := dbclient.handleRequest(http.MethodPut, "ApplyDatabaseSecurity", securityURL, databaseSecurityJSON, "", "", maxRetries, true, nil, "_security")
if err != nil {
return err
}
defer closeResponseBody(resp)
logger.Debugf("[%s] Exiting ApplyDatabaseSecurity()", dbclient.DBName)
return nil
}
//BatchRetrieveDocumentMetadata - batch method to retrieve document metadata for a set of keys,
// including ID, couchdb revision number, and ledger version
func (dbclient *CouchDatabase) BatchRetrieveDocumentMetadata(keys []string) ([]*DocMetadata, error) {
logger.Debugf("[%s] Entering BatchRetrieveDocumentMetadata() keys=%s", dbclient.DBName, keys)
batchRetrieveURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
queryParms := batchRetrieveURL.Query()
// While BatchRetrieveDocumentMetadata() does not return the entire document,
// for reads/writes, we do need to get document so that we can get the ledger version of the key.
// TODO For blind writes we do not need to get the version, therefore when we bulk get
// the revision numbers for the write keys that were not represented in read set
// (the second time BatchRetrieveDocumentMetadata is called during block processing),
// we could set include_docs to false to optimize the response.
queryParms.Add("include_docs", "true")
keymap := make(map[string]interface{})
keymap["keys"] = keys
jsonKeys, err := json.Marshal(keymap)
if err != nil {
return nil, errors.Wrap(err, "error marshalling json data")
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodPost, "BatchRetrieveDocumentMetadata", batchRetrieveURL, jsonKeys, "", "", maxRetries, true, &queryParms, "_all_docs")
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(zapcore.DebugLevel) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("[%s] HTTP Response: %s", dbclient.DBName, bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "error reading response body")
}
var jsonResponse = &BatchRetrieveDocMetadataResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, errors.Wrap(err2, "error unmarshalling json data")
}
docMetadataArray := []*DocMetadata{}
for _, row := range jsonResponse.Rows {
docMetadata := &DocMetadata{ID: row.ID, Rev: row.DocMetadata.Rev, Version: row.DocMetadata.Version}
docMetadataArray = append(docMetadataArray, docMetadata)
}
logger.Debugf("[%s] Exiting BatchRetrieveDocumentMetadata()", dbclient.DBName)
return docMetadataArray, nil
}
//BatchUpdateDocuments - batch method to batch update documents
func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) {
dbName := dbclient.DBName
if logger.IsEnabledFor(zapcore.DebugLevel) {
documentIdsString, err := printDocumentIds(documents)
if err == nil {
logger.Debugf("[%s] Entering BatchUpdateDocuments() document ids=[%s]", dbName, documentIdsString)
} else {
logger.Debugf("[%s] Entering BatchUpdateDocuments() Could not print document ids due to error: %+v", dbName, err)
}
}
batchUpdateURL, err := url.Parse(dbclient.CouchInstance.URL())
if err != nil {
logger.Errorf("URL parse error: %s", err)
return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.URL())
}
documentMap := make(map[string]interface{})
var jsonDocumentMap []interface{}
for _, jsonDocument := range documents {
//create a document map
var document = make(map[string]interface{})
//unmarshal the JSON component of the CouchDoc into the document
err = json.Unmarshal(jsonDocument.JSONValue, &document)
if err != nil {
return nil, errors.Wrap(err, "error unmarshalling json data")
}
//iterate through any attachments
if len(jsonDocument.Attachments) > 0 {
//create a file attachment map
fileAttachment := make(map[string]interface{})
//for each attachment, create a Base64Attachment, name the attachment,
//add the content type and base64 encode the attachment
for _, attachment := range jsonDocument.Attachments {
fileAttachment[attachment.Name] = Base64Attachment{attachment.ContentType,
base64.StdEncoding.EncodeToString(attachment.AttachmentBytes)}
}
//add attachments to the document
document["_attachments"] = fileAttachment
}
//Append the document to the map of documents
jsonDocumentMap = append(jsonDocumentMap, document)
}
//Add the documents to the "docs" item
documentMap["docs"] = jsonDocumentMap
bulkDocsJSON, err := json.Marshal(documentMap)
if err != nil {
return nil, errors.Wrap(err, "error marshalling json data")
}
//get the number of retries
maxRetries := dbclient.CouchInstance.conf.MaxRetries
resp, _, err := dbclient.handleRequest(http.MethodPost, "BatchUpdateDocuments", batchUpdateURL, bulkDocsJSON, "", "", maxRetries, true, nil, "_bulk_docs")
if err != nil {
return nil, err
}
defer closeResponseBody(resp)
if logger.IsEnabledFor(zapcore.DebugLevel) {
dump, _ := httputil.DumpResponse(resp, false)
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
logger.Debugf("[%s] HTTP Response: %s", dbclient.DBName, bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
}
//handle as JSON document
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "error reading response body")
}
var jsonResponse = []*BatchUpdateResponse{}
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
if err2 != nil {
return nil, errors.Wrap(err2, "error unmarshalling json data")
}
logger.Debugf("[%s] Exiting BatchUpdateDocuments() _bulk_docs response=[%s]", dbclient.DBName, string(jsonResponseRaw))
return jsonResponse, nil
}
//handleRequestWithRevisionRetry method is a generic http request handler with
//a retry for document revision conflict errors,
//which may be detected during saves or deletes that timed out from client http perspective,
//but which eventually succeeded in couchdb
func (dbclient *CouchDatabase) handleRequestWithRevisionRetry(id, method, dbName, functionName string, connectURL *url.URL, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool, queryParms *url.Values) (*http.Response, *DBReturn, error) {
//Initialize a flag for the revision conflict
revisionConflictDetected := false
var resp *http.Response
var couchDBReturn *DBReturn
var errResp error
//attempt the http request for the max number of retries
//In this case, the retry is to catch problems where a client timeout may miss a
//successful CouchDB update and cause a document revision conflict on a retry in handleRequest
for attempts := 0; attempts <= maxRetries; attempts++ {
//if the revision was not passed in, or if a revision conflict is detected on prior attempt,
//query CouchDB for the document revision
if rev == "" || revisionConflictDetected {
rev = dbclient.getDocumentRevision(id)
}
//handle the request for saving/deleting the couchdb data
resp, couchDBReturn, errResp = dbclient.CouchInstance.handleRequest(context.Background(), method, dbName, functionName, connectURL,
data, rev, multipartBoundary, maxRetries, keepConnectionOpen, queryParms, id)
//If there was a 409 conflict error during the save/delete, log it and retry it.
//Otherwise, break out of the retry loop
if couchDBReturn != nil && couchDBReturn.StatusCode == 409 {
logger.Warningf("CouchDB document revision conflict detected, retrying. Attempt:%v", attempts+1)
revisionConflictDetected = true
} else {
break
}
}
// return the handleRequest results
return resp, couchDBReturn, errResp
}
func (dbclient *CouchDatabase) handleRequest(method, functionName string, connectURL *url.URL, data []byte, rev, multipartBoundary string,
maxRetries int, keepConnectionOpen bool, queryParms *url.Values, pathElements ...string) (*http.Response, *DBReturn, error) {
return dbclient.CouchInstance.handleRequest(context.Background(),
method, dbclient.DBName, functionName, connectURL, data, rev, multipartBoundary,
maxRetries, keepConnectionOpen, queryParms, pathElements...,
)
}
//handleRequest method is a generic http request handler.
// If it returns an error, it ensures that the response body is closed, else it is the
// callee's responsibility to close response correctly.
// Any http error or CouchDB error (4XX or 500) will result in a golang error getting returned
func (couchInstance *CouchInstance) handleRequest(ctx context.Context, method, dbName, functionName string, connectURL *url.URL, data []byte, rev string,
multipartBoundary string, maxRetries int, keepConnectionOpen bool, queryParms *url.Values, pathElements ...string) (*http.Response, *DBReturn, error) {
logger.Debugf("Entering handleRequest() method=%s url=%v dbName=%s", method, connectURL, dbName)
//create the return objects for couchDB
var resp *http.Response
var errResp error
couchDBReturn := &DBReturn{}
defer couchInstance.recordMetric(time.Now(), dbName, functionName, couchDBReturn)
//set initial wait duration for retries
waitDuration := retryWaitTime * time.Millisecond
if maxRetries < 0 {
return nil, nil, errors.New("number of retries must be zero or greater")
}
requestURL := constructCouchDBUrl(connectURL, dbName, pathElements...)
if queryParms != nil {
requestURL.RawQuery = queryParms.Encode()
}
logger.Debugf("Request URL: %s", requestURL)
//attempt the http request for the max number of retries
// if maxRetries is 0, the database creation will be attempted once and will
// return an error if unsuccessful
// if maxRetries is 3 (default), a maximum of 4 attempts (one attempt with 3 retries)
// will be made with warning entries for unsuccessful attempts
for attempts := 0; attempts <= maxRetries; attempts++ {
//Set up a buffer for the payload data
payloadData := new(bytes.Buffer)
payloadData.ReadFrom(bytes.NewReader(data))
//Create request based on URL for couchdb operation
req, err := http.NewRequest(method, requestURL.String(), payloadData)
if err != nil {
return nil, nil, errors.Wrap(err, "error creating http request")
}
req.WithContext(ctx)
//set the request to close on completion if shared connections are not allowSharedConnection
//Current CouchDB has a problem with zero length attachments, do not allow the connection to be reused.
//Apache JIRA item for CouchDB https://issues.apache.org/jira/browse/COUCHDB-3394
if !keepConnectionOpen {
req.Close = true
}
//add content header for PUT
if method == http.MethodPut || method == http.MethodPost || method == http.MethodDelete {
//If the multipartBoundary is not set, then this is a JSON and content-type should be set
//to application/json. Else, this is contains an attachment and needs to be multipart
if multipartBoundary == "" {
req.Header.Set("Content-Type", "application/json")
} else {
req.Header.Set("Content-Type", "multipart/related;boundary=\""+multipartBoundary+"\"")
}
//check to see if the revision is set, if so, pass as a header
if rev != "" {
req.Header.Set("If-Match", rev)
}
}
//add content header for PUT
if method == http.MethodPut || method == http.MethodPost {
req.Header.Set("Accept", "application/json")
}
//add content header for GET
if method == http.MethodGet {
req.Header.Set("Accept", "multipart/related")
}
//If username and password are set the use basic auth
if couchInstance.conf.Username != "" && couchInstance.conf.Password != "" {
//req.Header.Set("Authorization", "Basic YWRtaW46YWRtaW5w")
req.SetBasicAuth(couchInstance.conf.Username, couchInstance.conf.Password)
}
//Execute http request
resp, errResp = couchInstance.client.Do(req)
//check to see if the return from CouchDB is valid
if invalidCouchDBReturn(resp, errResp) {
continue
}
//if there is no golang http error and no CouchDB 500 error, then drop out of the retry
if errResp == nil && resp != nil && resp.StatusCode < 500 {
// if this is an error, then populate the couchDBReturn
if resp.StatusCode >= 400 {
//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, errors.Wrap(err, "error reading response body")
}
defer closeResponseBody(resp)
errorBytes := []byte(jsonError)
//Unmarshal the response
err = json.Unmarshal(errorBytes, &couchDBReturn)
if err != nil {
return nil, nil, errors.Wrap(err, "error unmarshalling json data")
}
}
break
}
// If the maxRetries is greater than 0, then log the retry info
if maxRetries > 0 {
//if this is an unexpected golang http error, log the error and retry
if errResp != nil {
//Log the error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Error:%v",
waitDuration.String(), attempts+1, errResp.Error())
//otherwise this is an unexpected 500 error from CouchDB. Log the error and retry.
} else {
//Read the response body and close it for next attempt
jsonError, err := ioutil.ReadAll(resp.Body)
defer closeResponseBody(resp)
if err != nil {
return nil, nil, errors.Wrap(err, "error reading response body")
}
errorBytes := []byte(jsonError)
//Unmarshal the response
err = json.Unmarshal(errorBytes, &couchDBReturn)
if err != nil {
return nil, nil, errors.Wrap(err, "error unmarshalling json data")
}
//Log the 500 error with the retry count and continue
logger.Warningf("Retrying couchdb request in %s. Attempt:%v Couch DB Error:%s, Status Code:%v Reason:%v",
waitDuration.String(), attempts+1, couchDBReturn.Error, resp.Status, couchDBReturn.Reason)
}
//sleep for specified sleep time, then retry
time.Sleep(waitDuration)
//backoff, doubling the retry time for next attempt
waitDuration *= 2
}
} // end retry loop
//if a golang http error is still present after retries are exhausted, return the error
if errResp != nil {
return nil, couchDBReturn, errors.Wrap(errResp, "http error calling couchdb")
}
//This situation should not occur according to the golang spec.
//if this error returned (errResp) from an http call, then the resp should be not nil,
//this is a structure and StatusCode is an int
//This is meant to provide a more graceful error if this should occur
if invalidCouchDBReturn(resp, errResp) {
return nil, nil, errors.New("unable to connect to CouchDB, check the hostname and port.")
}
//set the return code for the couchDB request
couchDBReturn.StatusCode = resp.StatusCode
// check to see if the status code from couchdb is 400 or higher
// response codes 4XX and 500 will be treated as errors -
// golang error will be created from the couchDBReturn contents and both will be returned
if resp.StatusCode >= 400 {
// if the status code is 400 or greater, log and return an error
logger.Debugf("Error handling CouchDB request. Error:%s, Status Code:%v, Reason:%s",
couchDBReturn.Error, resp.StatusCode, couchDBReturn.Reason)
return nil, couchDBReturn, errors.Errorf("error handling CouchDB request. Error:%s, Status Code:%v, Reason:%s",
couchDBReturn.Error, resp.StatusCode, couchDBReturn.Reason)
}
logger.Debugf("Exiting handleRequest()")
//If no errors, then return the http response and the couchdb return object
return resp, couchDBReturn, nil
}
func (ci *CouchInstance) recordMetric(startTime time.Time, dbName, api string, couchDBReturn *DBReturn) {
ci.stats.observeProcessingTime(startTime, dbName, api, strconv.Itoa(couchDBReturn.StatusCode))
}
//invalidCouchDBResponse checks to make sure either a valid response or error is returned
func invalidCouchDBReturn(resp *http.Response, errResp error) bool {
if resp == nil && errResp == nil {
return true
}
return false
}
//IsJSON tests a string to determine if a valid JSON
func IsJSON(s string) bool {
var js map[string]interface{}
return json.Unmarshal([]byte(s), &js) == nil
}
// encodePathElement uses Golang for url path encoding, additionally:
// '/' is replaced by %2F, otherwise path encoding will treat as path separator and ignore it
// '+' is replaced by %2B, otherwise path encoding will ignore it, while CouchDB will unencode the plus as a space
// Note that all other URL special characters have been tested successfully without need for special handling
func encodePathElement(str string) string {
u := &url.URL{}
u.Path = str
encodedStr := u.EscapedPath() // url encode using golang url path encoding rules
encodedStr = strings.Replace(encodedStr, "/", "%2F", -1)
encodedStr = strings.Replace(encodedStr, "+", "%2B", -1)
return encodedStr
}
func encodeForJSON(str string) (string, error) {
buf := &bytes.Buffer{}
encoder := json.NewEncoder(buf)
if err := encoder.Encode(str); err != nil {
return "", errors.Wrap(err, "error encoding json data")
}
// Encode adds double quotes to string and terminates with \n - stripping them as bytes as they are all ascii(0-127)
buffer := buf.Bytes()
return string(buffer[1 : len(buffer)-2]), nil
}
// printDocumentIds is a convenience method to print readable log entries for arrays of pointers
// to couch document IDs
func printDocumentIds(documentPointers []*CouchDoc) (string, error) {
documentIds := []string{}
for _, documentPointer := range documentPointers {
docMetadata := &DocMetadata{}
err := json.Unmarshal(documentPointer.JSONValue, &docMetadata)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling json data")
}
documentIds = append(documentIds, docMetadata.ID)
}
return strings.Join(documentIds, ","), nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liurenhao/fabric.git
git@gitee.com:liurenhao/fabric.git
liurenhao
fabric
fabric
v2.1.1

Search