1 Star 0 Fork 0

Kenvins/librdkafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rdbuf.h 12.35 KB
一键复制 编辑 原始数据 按行查看 历史
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2017-2022, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _RDBUF_H
#define _RDBUF_H
#ifndef _WIN32
/* for struct iovec */
#include <sys/socket.h>
#include <sys/types.h>
#endif
#include "rdsysqueue.h"
/**
* @name Generic byte buffers
*
* @{
*
* A buffer is a list of segments, each segment having a memory pointer,
* write offset, and capacity.
*
* The main buffer and segment structure is tailored for append-writing
* or append-pushing foreign memory.
*
* Updates of previously written memory regions are possible through the
* use of write_update() that takes an absolute offset.
*
* The write position is part of the buffer and segment structures, while
* read is a separate object (rd_slice_t) that does not affect the buffer.
*/
/**
* @brief Buffer segment
*/
typedef struct rd_segment_s {
TAILQ_ENTRY(rd_segment_s) seg_link; /*<< rbuf_segments Link */
char *seg_p; /**< Backing-store memory */
size_t seg_of; /**< Current relative write-position
* (length of payload in this segment) */
size_t seg_size; /**< Allocated size of seg_p */
size_t seg_absof; /**< Absolute offset of this segment's
* beginning in the grand rd_buf_t */
void (*seg_free)(void *p); /**< Optional free function for seg_p */
int seg_flags; /**< Segment flags */
#define RD_SEGMENT_F_RDONLY 0x1 /**< Read-only segment */
#define RD_SEGMENT_F_FREE \
0x2 /**< Free segment on destroy, \
* e.g, not a fixed segment. */
} rd_segment_t;
TAILQ_HEAD(rd_segment_head, rd_segment_s);
/**
* @brief Buffer, containing a list of segments.
*/
typedef struct rd_buf_s {
struct rd_segment_head rbuf_segments; /**< TAILQ list of segments */
size_t rbuf_segment_cnt; /**< Number of segments */
rd_segment_t *rbuf_wpos; /**< Current write position seg */
size_t rbuf_len; /**< Current (written) length */
size_t rbuf_erased; /**< Total number of bytes
* erased from segments.
* This amount is taken into
* account when checking for
* writable space which is
* always at the end of the
* buffer and thus can't make
* use of the erased parts. */
size_t rbuf_size; /**< Total allocated size of
* all segments. */
char *rbuf_extra; /* Extra memory allocated for
* use by segment structs,
* buffer memory, etc. */
size_t rbuf_extra_len; /* Current extra memory used */
size_t rbuf_extra_size; /* Total size of extra memory */
} rd_buf_t;
/**
* @brief A read-only slice of a buffer.
*/
typedef struct rd_slice_s {
const rd_buf_t *buf; /**< Pointer to buffer */
const rd_segment_t *seg; /**< Current read position segment.
* Will point to NULL when end of
* slice is reached. */
size_t rof; /**< Relative read offset in segment */
size_t start; /**< Slice start offset in buffer */
size_t end; /**< Slice end offset in buffer+1 */
} rd_slice_t;
/**
* @returns the current write position (absolute offset)
*/
static RD_INLINE RD_UNUSED size_t rd_buf_write_pos(const rd_buf_t *rbuf) {
const rd_segment_t *seg = rbuf->rbuf_wpos;
if (unlikely(!seg)) {
#if ENABLE_DEVEL
rd_assert(rbuf->rbuf_len == 0);
#endif
return 0;
}
#if ENABLE_DEVEL
rd_assert(seg->seg_absof + seg->seg_of == rbuf->rbuf_len);
#endif
return seg->seg_absof + seg->seg_of;
}
/**
* @returns the number of bytes available for writing (before growing).
*/
static RD_INLINE RD_UNUSED size_t rd_buf_write_remains(const rd_buf_t *rbuf) {
return rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased);
}
/**
* @returns the number of bytes remaining to write to the given segment,
* and sets the \p *p pointer (unless NULL) to the start of
* the contiguous memory.
*/
static RD_INLINE RD_UNUSED size_t
rd_segment_write_remains(const rd_segment_t *seg, void **p) {
if (unlikely((seg->seg_flags & RD_SEGMENT_F_RDONLY)))
return 0;
if (p)
*p = (void *)(seg->seg_p + seg->seg_of);
return seg->seg_size - seg->seg_of;
}
/**
* @returns the last segment for the buffer.
*/
static RD_INLINE RD_UNUSED rd_segment_t *rd_buf_last(const rd_buf_t *rbuf) {
return TAILQ_LAST(&rbuf->rbuf_segments, rd_segment_head);
}
/**
* @returns the total written buffer length
*/
static RD_INLINE RD_UNUSED size_t rd_buf_len(const rd_buf_t *rbuf) {
return rbuf->rbuf_len;
}
int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof);
size_t rd_buf_write(rd_buf_t *rbuf, const void *payload, size_t size);
size_t rd_buf_write_slice(rd_buf_t *rbuf, rd_slice_t *slice);
size_t rd_buf_write_update(rd_buf_t *rbuf,
size_t absof,
const void *payload,
size_t size);
void rd_buf_push0(rd_buf_t *rbuf,
const void *payload,
size_t size,
void (*free_cb)(void *),
rd_bool_t writable);
#define rd_buf_push(rbuf, payload, size, free_cb) \
rd_buf_push0(rbuf, payload, size, free_cb, rd_false /*not-writable*/)
#define rd_buf_push_writable(rbuf, payload, size, free_cb) \
rd_buf_push0(rbuf, payload, size, free_cb, rd_true /*writable*/)
size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size);
size_t rd_buf_get_writable(rd_buf_t *rbuf, void **p);
void rd_buf_write_ensure_contig(rd_buf_t *rbuf, size_t size);
void rd_buf_write_ensure(rd_buf_t *rbuf, size_t min_size, size_t max_size);
size_t rd_buf_get_write_iov(const rd_buf_t *rbuf,
struct iovec *iovs,
size_t *iovcntp,
size_t iov_max,
size_t size_max);
void rd_buf_init(rd_buf_t *rbuf, size_t fixed_seg_cnt, size_t buf_size);
rd_buf_t *rd_buf_new(size_t fixed_seg_cnt, size_t buf_size);
void rd_buf_destroy(rd_buf_t *rbuf);
void rd_buf_destroy_free(rd_buf_t *rbuf);
void rd_buf_dump(const rd_buf_t *rbuf, int do_hexdump);
int unittest_rdbuf(void);
/**@}*/
/**
* @name Buffer reads operate on slices of an rd_buf_t and does not
* modify the underlying rd_buf_t itself.
*
* @warning A slice will not be valid/safe after the buffer or
* segments have been modified by a buf write operation
* (write, update, write_seek, etc).
* @{
*/
/**
* @returns the remaining length in the slice
*/
#define rd_slice_remains(slice) ((slice)->end - rd_slice_abs_offset(slice))
/**
* @returns the total size of the slice, regardless of current position.
*/
#define rd_slice_size(slice) ((slice)->end - (slice)->start)
/**
* @returns the read position in the slice as a new slice.
*/
static RD_INLINE RD_UNUSED rd_slice_t rd_slice_pos(const rd_slice_t *slice) {
rd_slice_t newslice = *slice;
if (!slice->seg)
return newslice;
newslice.start = slice->seg->seg_absof + slice->rof;
return newslice;
}
/**
* @returns the read position as an absolute buffer byte offset.
* @remark this is the buffer offset, not the slice's local offset.
*/
static RD_INLINE RD_UNUSED size_t rd_slice_abs_offset(const rd_slice_t *slice) {
if (unlikely(!slice->seg)) /* reader has reached the end */
return slice->end;
return slice->seg->seg_absof + slice->rof;
}
/**
* @returns the read position as a byte offset.
* @remark this is the slice-local offset, not the backing buffer's offset.
*/
static RD_INLINE RD_UNUSED size_t rd_slice_offset(const rd_slice_t *slice) {
if (unlikely(!slice->seg)) /* reader has reached the end */
return rd_slice_size(slice);
return (slice->seg->seg_absof + slice->rof) - slice->start;
}
int rd_slice_init_seg(rd_slice_t *slice,
const rd_buf_t *rbuf,
const rd_segment_t *seg,
size_t rof,
size_t size);
int rd_slice_init(rd_slice_t *slice,
const rd_buf_t *rbuf,
size_t absof,
size_t size);
void rd_slice_init_full(rd_slice_t *slice, const rd_buf_t *rbuf);
size_t rd_slice_reader(rd_slice_t *slice, const void **p);
size_t rd_slice_peeker(const rd_slice_t *slice, const void **p);
size_t rd_slice_read(rd_slice_t *slice, void *dst, size_t size);
size_t
rd_slice_peek(const rd_slice_t *slice, size_t offset, void *dst, size_t size);
size_t rd_slice_read_uvarint(rd_slice_t *slice, uint64_t *nump);
/**
* @brief Read a zig-zag varint-encoded signed integer from \p slice,
* storing the decoded number in \p nump on success (return value > 0).
*
* @returns the number of bytes read on success or 0 in case of
* buffer underflow.
*/
static RD_UNUSED RD_INLINE size_t rd_slice_read_varint(rd_slice_t *slice,
int64_t *nump) {
size_t r;
uint64_t unum;
r = rd_slice_read_uvarint(slice, &unum);
if (likely(r > 0)) {
/* Zig-zag decoding */
*nump = (int64_t)((unum >> 1) ^ -(int64_t)(unum & 1));
}
return r;
}
const void *rd_slice_ensure_contig(rd_slice_t *slice, size_t size);
int rd_slice_seek(rd_slice_t *slice, size_t offset);
size_t rd_slice_get_iov(const rd_slice_t *slice,
struct iovec *iovs,
size_t *iovcntp,
size_t iov_max,
size_t size_max);
uint32_t rd_slice_crc32(rd_slice_t *slice);
uint32_t rd_slice_crc32c(rd_slice_t *slice);
int rd_slice_narrow(rd_slice_t *slice,
rd_slice_t *save_slice,
size_t size) RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_relative(rd_slice_t *slice,
rd_slice_t *save_slice,
size_t relsize) RD_WARN_UNUSED_RESULT;
void rd_slice_widen(rd_slice_t *slice, const rd_slice_t *save_slice);
int rd_slice_narrow_copy(const rd_slice_t *orig,
rd_slice_t *new_slice,
size_t size) RD_WARN_UNUSED_RESULT;
int rd_slice_narrow_copy_relative(const rd_slice_t *orig,
rd_slice_t *new_slice,
size_t relsize) RD_WARN_UNUSED_RESULT;
void rd_slice_dump(const rd_slice_t *slice, int do_hexdump);
/**@}*/
#endif /* _RDBUF_H */
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/kenvins/librdkafka.git
git@gitee.com:kenvins/librdkafka.git
kenvins
librdkafka
librdkafka
master

搜索帮助