Ai
2 Star 0 Fork 0

mirrors_watson/apm-nodejs-http-client

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
index.js 21.78 KB
一键复制 编辑 原始数据 按行查看 历史
Andrew Wilkins 提交于 2019-12-06 17:33 +08:00 . feat: add serverCaCert option (#102)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
'use strict'
const util = require('util')
const os = require('os')
const { URL } = require('url')
const zlib = require('zlib')
const querystring = require('querystring')
const Writable = require('readable-stream').Writable
const getContainerInfo = require('container-info')
const pump = require('pump')
const eos = require('end-of-stream')
const streamToBuffer = require('fast-stream-to-buffer')
const StreamChopper = require('stream-chopper')
const ndjson = require('./lib/ndjson')
const truncate = require('./lib/truncate')
const pkg = require('./package')
module.exports = Client
const flush = Symbol('flush')
const hostname = os.hostname()
const requiredOpts = [
'agentName',
'agentVersion',
'serviceName',
'userAgent'
]
const containerInfo = getContainerInfo.sync()
const node8 = process.version.indexOf('v8.') === 0
// All sockets on the agent are unreffed when they are created. This means that
// when those are the only handles left, the `beforeExit` event will be
// emitted. By listening for this we can make sure to end the requests properly
// before exiting. This way we don't keep the process running until the `time`
// timeout happens.
const clients = []
process.once('beforeExit', function () {
clients.forEach(function (client) {
if (!client) return // clients remove them selfs from the array when they end
client.end()
})
})
util.inherits(Client, Writable)
Client.encoding = Object.freeze({
METADATA: Symbol('metadata'),
TRANSACTION: Symbol('transaction'),
SPAN: Symbol('span'),
ERROR: Symbol('error'),
METRICSET: Symbol('metricset')
})
function Client (opts) {
if (!(this instanceof Client)) return new Client(opts)
this.config(opts)
Writable.call(this, this._conf)
const errorproxy = (err) => {
if (this.destroyed === false) this.emit('request-error', err)
}
const fail = () => {
if (this._writableState.ending === false) this.destroy()
}
this._corkTimer = null
this._received = 0 // number of events given to the client for reporting
this.sent = 0 // number of events written to the socket
this._active = false
this._onflushed = null
this._transport = null
this._configTimer = null
this._encodedMetadata = null
switch (this._conf.serverUrl.protocol.slice(0, -1)) { // 'http:' => 'http'
case 'http': {
this._transport = require('http')
break
}
case 'https': {
this._transport = require('https')
break
}
default: {
throw new Error('Unknown protocol ' + this._conf.serverUrl.protocol.slice(0, -1))
}
}
this._agent = new this._transport.Agent(this._conf)
this._chopper = new StreamChopper({
size: this._conf.size,
time: this._conf.time,
type: StreamChopper.overflow,
transform () {
return zlib.createGzip()
}
}).on('stream', onStream(this, errorproxy))
eos(this._chopper, fail)
this._index = clients.length
clients.push(this)
if (this._conf.centralConfig) this._pollConfig()
}
Client.prototype.config = function (opts) {
this._conf = Object.assign(this._conf || {}, opts, { objectMode: true })
this._conf.globalLabels = normalizeGlobalLabels(this._conf.globalLabels)
const missing = requiredOpts.filter(name => !this._conf[name])
if (missing.length > 0) throw new Error('Missing required option(s): ' + missing.join(', '))
// default values
if (!this._conf.size && this._conf.size !== 0) this._conf.size = 750 * 1024
if (!this._conf.time && this._conf.time !== 0) this._conf.time = 10000
if (!this._conf.serverTimeout && this._conf.serverTimeout !== 0) this._conf.serverTimeout = 15000
if (!this._conf.serverUrl) this._conf.serverUrl = 'http://localhost:8200'
if (!this._conf.hostname) this._conf.hostname = hostname
if (!this._conf.environment) this._conf.environment = process.env.NODE_ENV || 'development'
if (!this._conf.truncateKeywordsAt) this._conf.truncateKeywordsAt = 1024
if (!this._conf.truncateErrorMessagesAt) this._conf.truncateErrorMessagesAt = 2048
if (!this._conf.truncateStringsAt) this._conf.truncateStringsAt = 1024
if (!this._conf.truncateCustomKeysAt) this._conf.truncateCustomKeysAt = 1024
if (!this._conf.truncateQueriesAt) this._conf.truncateQueriesAt = 10000
if (!this._conf.bufferWindowTime) this._conf.bufferWindowTime = 20
if (!this._conf.bufferWindowSize) this._conf.bufferWindowSize = 50
this._conf.keepAlive = this._conf.keepAlive !== false
this._conf.centralConfig = this._conf.centralConfig || false
// process
this._conf.serverUrl = new URL(this._conf.serverUrl)
if (containerInfo) {
if (!this._conf.containerId && containerInfo.containerId) {
this._conf.containerId = containerInfo.containerId
}
if (!this._conf.kubernetesPodUID && containerInfo.podId) {
this._conf.kubernetesPodUID = containerInfo.podId
}
if (!this._conf.kubernetesPodName && containerInfo.podId) {
this._conf.kubernetesPodName = hostname
}
}
// http request options
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent)
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
this._conf.metadata = getMetadata(this._conf)
}
Client.prototype._pollConfig = function () {
const opts = this._conf.requestConfig
if (this._conf.lastConfigEtag) {
opts.headers['If-None-Match'] = this._conf.lastConfigEtag
}
const req = this._transport.get(opts, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case
res.destroy(err)
})
this._scheduleNextConfigPoll(getMaxAge(res))
if (
res.statusCode === 304 || // No new config since last time
res.statusCode === 403 || // Central config not enabled in APM Server
res.statusCode === 404 // Old APM Server that doesn't support central config
) {
res.resume()
return
}
streamToBuffer(res, (err, buf) => {
if (err) return res.destroy(err)
if (res.statusCode === 200) {
// 200: New config available (or no config for the given service.name / service.environment)
const etag = res.headers.etag
if (etag) this._conf.lastConfigEtag = etag
try {
this.emit('config', JSON.parse(buf))
} catch (e) {
res.destroy(e)
}
} else {
res.destroy(processConfigErrorResponse(res, buf))
}
})
})
req.on('error', err => {
this._scheduleNextConfigPoll()
this.emit('request-error', err)
})
}
Client.prototype._scheduleNextConfigPoll = function (seconds) {
if (this._configTimer !== null) return
seconds = seconds || 300
this._configTimer = setTimeout(() => {
this._configTimer = null
this._pollConfig()
}, seconds * 1000)
this._configTimer.unref()
}
// re-ref the open socket handles
Client.prototype._ref = function () {
Object.keys(this._agent.sockets).forEach(remote => {
this._agent.sockets[remote].forEach(function (socket) {
socket.ref()
})
})
}
Client.prototype._write = function (obj, enc, cb) {
if (obj === flush) {
this._writeFlush(cb)
} else {
this._received++
this._chopper.write(this._encode(obj, enc), cb)
}
}
Client.prototype._writev = function (objs, cb) {
let offset = 0
const processBatch = () => {
let index = -1
for (let i = offset; i < objs.length; i++) {
if (objs[i].chunk === flush) {
index = i
break
}
}
if (offset === 0 && index === -1) {
// normally there's no flush object queued, so here's a shortcut that just
// skips all the complicated splitting logic
this._writevCleaned(objs, cb)
} else if (index === -1) {
// no more flush elements in the queue, just write the rest
this._writevCleaned(objs.slice(offset), cb)
} else if (index > offset) {
// there's a few items in the queue before we need to flush, let's first write those
this._writevCleaned(objs.slice(offset, index), processBatch)
offset = index
} else if (index === objs.length - 1) {
// the last item in the queue is a flush
this._writeFlush(cb)
} else {
// the next item in the queue is a flush
this._writeFlush(processBatch)
offset++
}
}
processBatch()
}
function encodeObject (obj) {
return this._encode(obj.chunk, obj.encoding)
}
Client.prototype._writevCleaned = function (objs, cb) {
const chunk = objs.map(encodeObject.bind(this)).join('')
this._received += objs.length
this._chopper.write(chunk, cb)
}
Client.prototype._writeFlush = function (cb) {
if (this._active) {
this._onflushed = cb
this._chopper.chop()
} else {
this._chopper.chop(cb)
}
}
Client.prototype._maybeCork = function () {
if (!this._writableState.corked && this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
}
} else if (this._writableState.length >= this._conf.bufferWindowSize) {
this._maybeUncork()
}
}
Client.prototype._maybeUncork = function () {
if (this._writableState.corked) {
// Wait till next tick, so that the current write that triggered the call
// to `_maybeUncork` have time to be added to the queue. If we didn't do
// this, that last write would trigger a single call to `_write`.
process.nextTick(() => {
if (this.destroyed === false) this.uncork()
})
if (this._corkTimer) {
clearTimeout(this._corkTimer)
this._corkTimer = null
}
}
}
Client.prototype._encode = function (obj, enc) {
const out = {}
switch (enc) {
case Client.encoding.SPAN:
out.span = truncate.span(obj.span, this._conf)
break
case Client.encoding.TRANSACTION:
out.transaction = truncate.transaction(obj.transaction, this._conf)
break
case Client.encoding.METADATA:
out.metadata = truncate.metadata(obj.metadata, this._conf)
break
case Client.encoding.ERROR:
out.error = truncate.error(obj.error, this._conf)
break
case Client.encoding.METRICSET:
out.metricset = truncate.metricset(obj.metricset, this._conf)
break
}
return ndjson.serialize(out)
}
Client.prototype.sendSpan = function (span, cb) {
this._maybeCork()
return this.write({ span }, Client.encoding.SPAN, cb)
}
Client.prototype.sendTransaction = function (transaction, cb) {
this._maybeCork()
return this.write({ transaction }, Client.encoding.TRANSACTION, cb)
}
Client.prototype.sendError = function (error, cb) {
this._maybeCork()
return this.write({ error }, Client.encoding.ERROR, cb)
}
Client.prototype.sendMetricSet = function (metricset, cb) {
this._maybeCork()
return this.write({ metricset }, Client.encoding.METRICSET, cb)
}
Client.prototype.flush = function (cb) {
this._maybeUncork()
// Write the special "flush" signal. We do this so that the order of writes
// and flushes are kept. If we where to just flush the client right here, the
// internal Writable buffer might still contain data that hasn't yet been
// given to the _write function.
return this.write(flush, cb)
}
Client.prototype._final = function (cb) {
if (this._configTimer) {
clearTimeout(this._configTimer)
this._configTimer = null
}
clients[this._index] = null // remove global reference to ease garbage collection
this._ref()
this._chopper.end()
cb()
}
Client.prototype._destroy = function (err, cb) {
if (this._configTimer) {
clearTimeout(this._configTimer)
this._configTimer = null
}
if (this._corkTimer) {
clearTimeout(this._corkTimer)
this._corkTimer = null
}
clients[this._index] = null // remove global reference to ease garbage collection
this._chopper.destroy()
this._agent.destroy()
cb(err)
}
function onStream (client, onerror) {
return function (stream, next) {
const onerrorproxy = (err) => {
stream.removeListener('error', onerrorproxy)
req.removeListener('error', onerrorproxy)
destroyStream(stream)
onerror(err)
}
client._active = true
const req = client._transport.request(client._conf.requestIntake, onResult(onerror))
// Abort the current request if the server responds prior to the request
// being finished
req.on('response', function (res) {
if (!req.finished) {
// In Node.js 8, the zlib stream will emit a 'zlib binding closed'
// error when destroyed. Furthermore, the HTTP response will not emit
// any data events after the request have been destroyed, so it becomes
// impossible to see the error returned by the server if we abort the
// request. So for Node.js 8, we'll work around this by closing the
// stream gracefully.
//
// This results in the gzip buffer being flushed and a little more data
// being sent to the APM Server, but it's better than not getting the
// error body.
if (node8) {
stream.end()
} else {
destroyStream(stream)
}
}
})
// Mointor streams for errors so that we can make sure to destory the
// output stream as soon as that occurs
stream.on('error', onerrorproxy)
req.on('error', onerrorproxy)
req.on('socket', function (socket) {
// Sockets will automatically be unreffed by the HTTP agent when they are
// not in use by an HTTP request, but as we're keeping the HTTP request
// open, we need to unref the socket manually
socket.unref()
})
if (Number.isFinite(client._conf.serverTimeout)) {
req.setTimeout(client._conf.serverTimeout, function () {
req.abort()
})
}
pump(stream, req, function () {
// This function is technically called with an error, but because we
// manually attach error listeners on all the streams in the pipeline
// above, we can safely ignore it.
//
// We do this for two reasons:
//
// 1) This callback might be called a few ticks too late, in which case a
// race condition could occur where the user would write to the output
// stream before the rest of the system discovered that it was
// unwritable
//
// 2) The error might occur post the end of the stream. In that case we
// would not get it here as the internal error listener would have
// been removed and the stream would throw the error instead
client.sent = client._received
client._active = false
if (client._onflushed) {
client._onflushed()
client._onflushed = null
}
next()
})
// Only intended for local debugging
if (client._conf.payloadLogFile) {
if (!client._payloadLogFile) {
client._payloadLogFile = require('fs').createWriteStream(client._conf.payloadLogFile, { flags: 'a' })
}
// Manually write to the file instead of using pipe/pump so that the file
// handle isn't closed when the stream ends
stream.pipe(zlib.createGunzip()).on('data', function (chunk) {
client._payloadLogFile.write(chunk)
})
}
// All requests to the APM Server must start with a metadata object
if (!client._encodedMetadata) {
client._encodedMetadata = client._encode({ metadata: client._conf.metadata }, Client.encoding.METADATA)
}
stream.write(client._encodedMetadata)
}
}
function onResult (onerror) {
return streamToBuffer.onStream(function (err, buf, res) {
if (err) return onerror(err)
if (res.statusCode < 200 || res.statusCode > 299) {
onerror(processIntakeErrorResponse(res, buf))
}
})
}
function getIntakeRequestOptions (opts, agent) {
const headers = getHeaders(opts)
headers['Content-Type'] = 'application/x-ndjson'
headers['Content-Encoding'] = 'gzip'
return getBasicRequestOptions('POST', '/intake/v2/events', headers, opts, agent)
}
function getConfigRequestOptions (opts, agent) {
const path = '/config/v1/agents?' + querystring.stringify({
'service.name': opts.serviceName,
'service.environment': opts.environment
})
const headers = getHeaders(opts)
return getBasicRequestOptions('GET', path, headers, opts, agent)
}
function getBasicRequestOptions (method, defaultPath, headers, opts, agent) {
return {
agent: agent,
rejectUnauthorized: opts.rejectUnauthorized !== false,
ca: opts.serverCaCert,
hostname: opts.serverUrl.hostname,
port: opts.serverUrl.port,
method,
path: opts.serverUrl.pathname === '/' ? defaultPath : opts.serverUrl.pathname + defaultPath,
headers
}
}
function getHeaders (opts) {
const headers = {}
if (opts.secretToken) headers.Authorization = 'Bearer ' + opts.secretToken
headers.Accept = 'application/json'
headers['User-Agent'] = `${opts.userAgent} ${pkg.name}/${pkg.version} ${process.release.name}/${process.versions.node}`
return Object.assign(headers, opts.headers)
}
function getMetadata (opts) {
var payload = {
service: {
name: opts.serviceName,
environment: opts.environment,
runtime: {
name: process.release.name,
version: process.versions.node
},
language: {
name: 'javascript'
},
agent: {
name: opts.agentName,
version: opts.agentVersion
},
framework: undefined,
version: undefined,
node: undefined
},
process: {
pid: process.pid,
ppid: process.ppid,
title: process.title,
argv: process.argv
},
system: {
hostname: opts.hostname,
architecture: process.arch,
platform: process.platform,
container: undefined,
kubernetes: undefined
},
labels: opts.globalLabels
}
if (opts.serviceNodeName) {
payload.service.node = {
configured_name: opts.serviceNodeName
}
}
if (opts.serviceVersion) payload.service.version = opts.serviceVersion
if (opts.frameworkName || opts.frameworkVersion) {
payload.service.framework = {
name: opts.frameworkName,
version: opts.frameworkVersion
}
}
if (opts.containerId) {
payload.system.container = {
id: opts.containerId
}
}
if (opts.kubernetesNodeName || opts.kubernetesNamespace || opts.kubernetesPodName || opts.kubernetesPodUID) {
payload.system.kubernetes = {
namespace: opts.kubernetesNamespace,
node: opts.kubernetesNodeName
? { name: opts.kubernetesNodeName }
: undefined,
pod: (opts.kubernetesPodName || opts.kubernetesPodUID)
? { name: opts.kubernetesPodName, uid: opts.kubernetesPodUID }
: undefined
}
}
return payload
}
function destroyStream (stream) {
if (stream instanceof zlib.Gzip ||
stream instanceof zlib.Gunzip ||
stream instanceof zlib.Deflate ||
stream instanceof zlib.DeflateRaw ||
stream instanceof zlib.Inflate ||
stream instanceof zlib.InflateRaw ||
stream instanceof zlib.Unzip) {
// Zlib streams doesn't have a destroy function in Node.js 6. On top of
// that simply calling destroy on a zlib stream in Node.js 8+ will result
// in a memory leak as the handle isn't closed (an operation normally done
// by calling close). So until that is fixed, we need to manually close the
// handle after destroying the stream.
//
// PR: https://github.com/nodejs/node/pull/23734
if (typeof stream.destroy === 'function') {
// Manually close the stream instead of calling `close()` as that would
// have emitted 'close' again when calling `destroy()`
if (stream._handle && typeof stream._handle.close === 'function') {
stream._handle.close()
stream._handle = null
}
stream.destroy()
} else if (typeof stream.close === 'function') {
stream.close()
}
} else {
// For other streams we assume calling destroy is enough
if (typeof stream.destroy === 'function') stream.destroy()
// Or if there's no destroy (which Node.js 6 will not have on regular
// streams), emit `close` as that should trigger almost the same effect
else if (typeof stream.emit === 'function') stream.emit('close')
}
}
function oneOf (value, list) {
return list.indexOf(value) >= 0
}
function normalizeGlobalLabels (labels) {
if (!labels) return
const result = {}
for (const key of Object.keys(labels)) {
const value = labels[key]
result[key] = oneOf(typeof value, ['string', 'number', 'boolean'])
? value
: value.toString()
}
return result
}
function getMaxAge (res) {
const header = res.headers['cache-control']
const match = header && header.match(/max-age=(\d+)/)
return parseInt(match && match[1], 10)
}
function processIntakeErrorResponse (res, buf) {
const err = new Error('Unexpected APM Server response')
err.code = res.statusCode
if (buf.length > 0) {
const body = buf.toString('utf8')
const contentType = res.headers['content-type']
if (contentType && contentType.startsWith('application/json')) {
try {
const data = JSON.parse(body)
err.accepted = data.accepted
err.errors = data.errors
if (!err.errors) err.response = body
} catch (e) {
err.response = body
}
} else {
err.response = body
}
}
return err
}
function processConfigErrorResponse (res, buf) {
const err = new Error('Unexpected APM Server response when polling config')
err.code = res.statusCode
if (buf.length > 0) {
const body = buf.toString('utf8')
const contentType = res.headers['content-type']
if (contentType && contentType.startsWith('application/json')) {
try {
const response = JSON.parse(body)
if (typeof response === 'string') {
err.response = response
} else if (typeof response === 'object' && response !== null && typeof response.error === 'string') {
err.response = response.error
} else {
err.response = body
}
} catch (e) {
err.response = body
}
} else {
err.response = body
}
}
return err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors_watson/apm-nodejs-http-client.git
git@gitee.com:mirrors_watson/apm-nodejs-http-client.git
mirrors_watson
apm-nodejs-http-client
apm-nodejs-http-client
master

搜索帮助