1 Star 0 Fork 1

boeib / memcached-memcached-master

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
storage.c 48.92 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "memcached.h"
#ifdef EXTSTORE
#include "storage.h"
#include "extstore.h"
#include <stdlib.h>
#include <stdio.h>
#include <stddef.h>
#include <string.h>
#include <limits.h>
#include <ctype.h>
#define PAGE_BUCKET_DEFAULT 0
#define PAGE_BUCKET_COMPACT 1
#define PAGE_BUCKET_CHUNKED 2
#define PAGE_BUCKET_LOWTTL 3
/*
* API functions
*/
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
typedef struct _io_pending_storage_t {
io_queue_t *q;
conn *c;
mc_resp *resp; /* original struct ends here */
item *hdr_it; /* original header item. */
obj_io io_ctx; /* embedded extstore IO header */
unsigned int iovec_data; /* specific index of data iovec */
bool noreply; /* whether the response had noreply set */
bool miss; /* signal a miss to unlink hdr_it */
bool badcrc; /* signal a crc failure */
bool active; /* tells if IO was dispatched or not */
} io_pending_storage_t;
// Only call this if item has ITEM_HDR
bool storage_validate_item(void *e, item *it) {
item_hdr *hdr = (item_hdr *)ITEM_data(it);
if (extstore_check(e, hdr->page_id, hdr->page_version) != 0) {
return false;
} else {
return true;
}
}
void storage_delete(void *e, item *it) {
if (it->it_flags & ITEM_HDR) {
item_hdr *hdr = (item_hdr *)ITEM_data(it);
extstore_delete(e, hdr->page_id, hdr->page_version,
1, ITEM_ntotal(it));
}
}
// Function for the extra stats called from a protocol.
// NOTE: This either needs a name change or a wrapper, perhaps?
// it's defined here to reduce exposure of extstore.h to the rest of memcached
// but feels a little off being defined here.
// At very least maybe "process_storage_stats" in line with making this more
// of a generic wrapper module.
void process_extstore_stats(ADD_STAT add_stats, conn *c) {
int i;
char key_str[STAT_KEY_LEN];
char val_str[STAT_VAL_LEN];
int klen = 0, vlen = 0;
struct extstore_stats st;
assert(add_stats);
void *storage = c->thread->storage;
extstore_get_stats(storage, &st);
st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
extstore_get_page_data(storage, &st);
for (i = 0; i < st.page_count; i++) {
APPEND_NUM_STAT(i, "version", "%llu",
(unsigned long long) st.page_data[i].version);
APPEND_NUM_STAT(i, "bytes", "%llu",
(unsigned long long) st.page_data[i].bytes_used);
APPEND_NUM_STAT(i, "bucket", "%u",
st.page_data[i].bucket);
APPEND_NUM_STAT(i, "free_bucket", "%u",
st.page_data[i].free_bucket);
}
}
// Additional storage stats for the main stats output.
void storage_stats(ADD_STAT add_stats, conn *c) {
struct extstore_stats st;
if (c->thread->storage) {
STATS_LOCK();
APPEND_STAT("extstore_compact_lost", "%llu", (unsigned long long)stats.extstore_compact_lost);
APPEND_STAT("extstore_compact_rescues", "%llu", (unsigned long long)stats.extstore_compact_rescues);
APPEND_STAT("extstore_compact_skipped", "%llu", (unsigned long long)stats.extstore_compact_skipped);
STATS_UNLOCK();
extstore_get_stats(c->thread->storage, &st);
APPEND_STAT("extstore_page_allocs", "%llu", (unsigned long long)st.page_allocs);
APPEND_STAT("extstore_page_evictions", "%llu", (unsigned long long)st.page_evictions);
APPEND_STAT("extstore_page_reclaims", "%llu", (unsigned long long)st.page_reclaims);
APPEND_STAT("extstore_pages_free", "%llu", (unsigned long long)st.pages_free);
APPEND_STAT("extstore_pages_used", "%llu", (unsigned long long)st.pages_used);
APPEND_STAT("extstore_objects_evicted", "%llu", (unsigned long long)st.objects_evicted);
APPEND_STAT("extstore_objects_read", "%llu", (unsigned long long)st.objects_read);
APPEND_STAT("extstore_objects_written", "%llu", (unsigned long long)st.objects_written);
APPEND_STAT("extstore_objects_used", "%llu", (unsigned long long)st.objects_used);
APPEND_STAT("extstore_bytes_evicted", "%llu", (unsigned long long)st.bytes_evicted);
APPEND_STAT("extstore_bytes_written", "%llu", (unsigned long long)st.bytes_written);
APPEND_STAT("extstore_bytes_read", "%llu", (unsigned long long)st.bytes_read);
APPEND_STAT("extstore_bytes_used", "%llu", (unsigned long long)st.bytes_used);
APPEND_STAT("extstore_bytes_fragmented", "%llu", (unsigned long long)st.bytes_fragmented);
APPEND_STAT("extstore_limit_maxbytes", "%llu", (unsigned long long)(st.page_count * st.page_size));
APPEND_STAT("extstore_io_queue", "%llu", (unsigned long long)(st.io_queue));
}
}
// FIXME: This runs in the IO thread. to get better IO performance this should
// simply mark the io wrapper with the return value and decrement wrapleft, if
// zero redispatching. Still a bit of work being done in the side thread but
// minimized at least.
// TODO: wrap -> p?
static void _storage_get_item_cb(void *e, obj_io *io, int ret) {
// FIXME: assumes success
io_pending_storage_t *p = (io_pending_storage_t *)io->data;
mc_resp *resp = p->resp;
conn *c = p->c;
assert(p->active == true);
item *read_it = (item *)io->buf;
bool miss = false;
// TODO: How to do counters for hit/misses?
if (ret < 1) {
miss = true;
} else {
uint32_t crc2;
uint32_t crc = (uint32_t) read_it->exptime;
int x;
// item is chunked, crc the iov's
if (io->iov != NULL) {
// first iov is the header, which we don't use beyond crc
crc2 = crc32c(0, (char *)io->iov[0].iov_base+STORE_OFFSET, io->iov[0].iov_len-STORE_OFFSET);
// make sure it's not sent. hack :(
io->iov[0].iov_len = 0;
for (x = 1; x < io->iovcnt; x++) {
crc2 = crc32c(crc2, (char *)io->iov[x].iov_base, io->iov[x].iov_len);
}
} else {
crc2 = crc32c(0, (char *)read_it+STORE_OFFSET, io->len-STORE_OFFSET);
}
if (crc != crc2) {
miss = true;
p->badcrc = true;
}
}
if (miss) {
if (p->noreply) {
// In all GET cases, noreply means we send nothing back.
resp->skip = true;
} else {
// TODO: This should be movable to the worker thread.
// Convert the binprot response into a miss response.
// The header requires knowing a bunch of stateful crap, so rather
// than simply writing out a "new" miss response we mangle what's
// already there.
if (c->protocol == binary_prot) {
protocol_binary_response_header *header =
(protocol_binary_response_header *)resp->wbuf;
// cut the extra nbytes off of the body_len
uint32_t body_len = ntohl(header->response.bodylen);
uint8_t hdr_len = header->response.extlen;
body_len -= resp->iov[p->iovec_data].iov_len + hdr_len;
resp->tosend -= resp->iov[p->iovec_data].iov_len + hdr_len;
header->response.extlen = 0;
header->response.status = (uint16_t)htons(PROTOCOL_BINARY_RESPONSE_KEY_ENOENT);
header->response.bodylen = htonl(body_len);
// truncate the data response.
resp->iov[p->iovec_data].iov_len = 0;
// wipe the extlen iov... wish it was just a flat buffer.
resp->iov[p->iovec_data-1].iov_len = 0;
resp->chunked_data_iov = 0;
} else {
int i;
// Meta commands have EN status lines for miss, rather than
// END as a trailer as per normal ascii.
if (resp->iov[0].iov_len >= 3
&& memcmp(resp->iov[0].iov_base, "VA ", 3) == 0) {
// TODO: These miss translators should use specific callback
// functions attached to the io wrap. This is weird :(
resp->iovcnt = 1;
resp->iov[0].iov_len = 4;
resp->iov[0].iov_base = "EN\r\n";
resp->tosend = 4;
} else {
// Wipe the iovecs up through our data injection.
// Allows trailers to be returned (END)
for (i = 0; i <= p->iovec_data; i++) {
resp->tosend -= resp->iov[i].iov_len;
resp->iov[i].iov_len = 0;
resp->iov[i].iov_base = NULL;
}
}
resp->chunked_total = 0;
resp->chunked_data_iov = 0;
}
}
p->miss = true;
} else {
assert(read_it->slabs_clsid != 0);
// TODO: should always use it instead of ITEM_data to kill more
// chunked special casing.
if ((read_it->it_flags & ITEM_CHUNKED) == 0) {
resp->iov[p->iovec_data].iov_base = ITEM_data(read_it);
}
p->miss = false;
}
p->q->count--;
p->active = false;
//assert(c->io_wrapleft >= 0);
// All IO's have returned, lets re-attach this connection to our original
// thread.
if (p->q->count == 0) {
redispatch_conn(c);
}
}
int storage_get_item(conn *c, item *it, mc_resp *resp) {
#ifdef NEED_ALIGN
item_hdr hdr;
memcpy(&hdr, ITEM_data(it), sizeof(hdr));
#else
item_hdr *hdr = (item_hdr *)ITEM_data(it);
#endif
io_queue_t *q = conn_io_queue_get(c, IO_QUEUE_EXTSTORE);
size_t ntotal = ITEM_ntotal(it);
unsigned int clsid = slabs_clsid(ntotal);
item *new_it;
bool chunked = false;
if (ntotal > settings.slab_chunk_size_max) {
// Pull a chunked item header.
uint32_t flags;
FLAGS_CONV(it, flags);
new_it = item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, it->nbytes);
assert(new_it == NULL || (new_it->it_flags & ITEM_CHUNKED));
chunked = true;
} else {
new_it = do_item_alloc_pull(ntotal, clsid);
}
if (new_it == NULL)
return -1;
// so we can free the chunk on a miss
new_it->slabs_clsid = clsid;
io_pending_storage_t *p = do_cache_alloc(c->thread->io_cache);
// this is a re-cast structure, so assert that we never outsize it.
assert(sizeof(io_pending_t) >= sizeof(io_pending_storage_t));
memset(p, 0, sizeof(io_pending_storage_t));
p->active = true;
p->miss = false;
p->badcrc = false;
p->noreply = c->noreply;
// io_pending owns the reference for this object now.
p->hdr_it = it;
p->resp = resp;
p->q = q; // quicker access to the queue structure.
obj_io *eio = &p->io_ctx;
// FIXME: error handling.
if (chunked) {
unsigned int ciovcnt = 0;
size_t remain = new_it->nbytes;
item_chunk *chunk = (item_chunk *) ITEM_schunk(new_it);
// TODO: This might make sense as a _global_ cache vs a per-thread.
// but we still can't load objects requiring > IOV_MAX iovs.
// In the meantime, these objects are rare/slow enough that
// malloc/freeing a statically sized object won't cause us much pain.
eio->iov = malloc(sizeof(struct iovec) * IOV_MAX);
if (eio->iov == NULL) {
item_remove(new_it);
do_cache_free(c->thread->io_cache, p);
return -1;
}
// fill the header so we can get the full data + crc back.
eio->iov[0].iov_base = new_it;
eio->iov[0].iov_len = ITEM_ntotal(new_it) - new_it->nbytes;
ciovcnt++;
while (remain > 0) {
chunk = do_item_alloc_chunk(chunk, remain);
// FIXME: _pure evil_, silently erroring if item is too large.
if (chunk == NULL || ciovcnt > IOV_MAX-1) {
item_remove(new_it);
free(eio->iov);
// TODO: wrapper function for freeing up an io wrap?
eio->iov = NULL;
do_cache_free(c->thread->io_cache, p);
return -1;
}
eio->iov[ciovcnt].iov_base = chunk->data;
eio->iov[ciovcnt].iov_len = (remain < chunk->size) ? remain : chunk->size;
chunk->used = (remain < chunk->size) ? remain : chunk->size;
remain -= chunk->size;
ciovcnt++;
}
eio->iovcnt = ciovcnt;
}
// Chunked or non chunked we reserve a response iov here.
p->iovec_data = resp->iovcnt;
int iovtotal = (c->protocol == binary_prot) ? it->nbytes - 2 : it->nbytes;
if (chunked) {
resp_add_chunked_iov(resp, new_it, iovtotal);
} else {
resp_add_iov(resp, "", iovtotal);
}
// We can't bail out anymore, so mc_resp owns the IO from here.
resp->io_pending = (io_pending_t *)p;
eio->buf = (void *)new_it;
p->c = c;
// We need to stack the sub-struct IO's together for submission.
eio->next = q->stack_ctx;
q->stack_ctx = eio;
// No need to stack the io_pending's together as they live on mc_resp's.
assert(q->count >= 0);
q->count++;
// reference ourselves for the callback.
eio->data = (void *)p;
// Now, fill in io->io based on what was in our header.
#ifdef NEED_ALIGN
eio->page_version = hdr.page_version;
eio->page_id = hdr.page_id;
eio->offset = hdr.offset;
#else
eio->page_version = hdr->page_version;
eio->page_id = hdr->page_id;
eio->offset = hdr->offset;
#endif
eio->len = ntotal;
eio->mode = OBJ_IO_READ;
eio->cb = _storage_get_item_cb;
// FIXME: This stat needs to move to reflect # of flash hits vs misses
// for now it's a good gauge on how often we request out to flash at
// least.
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
return 0;
}
void storage_submit_cb(void *ctx, void *stack) {
// Don't need to do anything special for extstore.
extstore_submit(ctx, stack);
}
static void recache_or_free(io_pending_t *pending) {
// re-cast to our specific struct.
io_pending_storage_t *p = (io_pending_storage_t *)pending;
conn *c = p->c;
obj_io *io = &p->io_ctx;
item *it = (item *)io->buf;
assert(c != NULL);
assert(io != NULL);
bool do_free = true;
if (p->active) {
// If request never dispatched, free the read buffer but leave the
// item header alone.
do_free = false;
size_t ntotal = ITEM_ntotal(p->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
p->q->count--;
assert(p->q->count >= 0);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_aborted_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (p->miss) {
// If request was ultimately a miss, unlink the header.
do_free = false;
size_t ntotal = ITEM_ntotal(p->hdr_it);
item_unlink(p->hdr_it);
slabs_free(it, ntotal, slabs_clsid(ntotal));
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.miss_from_extstore++;
if (p->badcrc)
c->thread->stats.badcrc_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (settings.ext_recache_rate) {
// hashvalue is cuddled during store
uint32_t hv = (uint32_t)it->time;
// opt to throw away rather than wait on a lock.
void *hold_lock = item_trylock(hv);
if (hold_lock != NULL) {
item *h_it = p->hdr_it;
uint8_t flags = ITEM_LINKED|ITEM_FETCHED|ITEM_ACTIVE;
// Item must be recently hit at least twice to recache.
if (((h_it->it_flags & flags) == flags) &&
h_it->time > current_time - ITEM_UPDATE_INTERVAL &&
c->recache_counter++ % settings.ext_recache_rate == 0) {
do_free = false;
// In case it's been updated.
it->exptime = h_it->exptime;
it->it_flags &= ~ITEM_LINKED;
it->refcount = 0;
it->h_next = NULL; // might not be necessary.
STORAGE_delete(c->thread->storage, h_it);
item_replace(h_it, it, hv);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.recache_from_extstore++;
pthread_mutex_unlock(&c->thread->stats.mutex);
}
}
if (hold_lock)
item_trylock_unlock(hold_lock);
}
if (do_free)
slabs_free(it, ITEM_ntotal(it), ITEM_clsid(it));
p->io_ctx.buf = NULL;
p->io_ctx.next = NULL;
p->active = false;
// TODO: reuse lock and/or hv.
item_remove(p->hdr_it);
}
// Called after the IO is processed but before the response is transmitted.
// TODO: stubbed with a reminder: should be able to move most of the extstore
// callback code into this code instead, executing on worker thread instead of
// IO thread.
void storage_complete_cb(void *ctx, void *stack_ctx) {
return;
}
// Called after responses have been transmitted. Need to free up related data.
void storage_finalize_cb(io_pending_t *pending) {
recache_or_free(pending);
io_pending_storage_t *p = (io_pending_storage_t *)pending;
obj_io *io = &p->io_ctx;
// malloc'ed iovec list used for chunked extstore fetches.
if (io->iov) {
free(io->iov);
io->iov = NULL;
}
// don't need to free the main context, since it's embedded.
}
/*
* WRITE FLUSH THREAD
*/
static int storage_write(void *storage, const int clsid, const int item_age) {
int did_moves = 0;
struct lru_pull_tail_return it_info;
it_info.it = NULL;
lru_pull_tail(clsid, COLD_LRU, 0, LRU_PULL_RETURN_ITEM, 0, &it_info);
/* Item is locked, and we have a reference to it. */
if (it_info.it == NULL) {
return did_moves;
}
obj_io io;
item *it = it_info.it;
/* First, storage for the header object */
size_t orig_ntotal = ITEM_ntotal(it);
uint32_t flags;
if ((it->it_flags & ITEM_HDR) == 0 &&
(item_age == 0 || current_time - it->time > item_age)) {
FLAGS_CONV(it, flags);
item *hdr_it = do_item_alloc(ITEM_key(it), it->nkey, flags, it->exptime, sizeof(item_hdr));
/* Run the storage write understanding the start of the item is dirty.
* We will fill it (time/exptime/etc) from the header item on read.
*/
if (hdr_it != NULL) {
int bucket = (it->it_flags & ITEM_CHUNKED) ?
PAGE_BUCKET_CHUNKED : PAGE_BUCKET_DEFAULT;
// Compress soon to expire items into similar pages.
if (it->exptime - current_time < settings.ext_low_ttl) {
bucket = PAGE_BUCKET_LOWTTL;
}
hdr_it->it_flags |= ITEM_HDR;
io.len = orig_ntotal;
io.mode = OBJ_IO_WRITE;
// NOTE: when the item is read back in, the slab mover
// may see it. Important to have refcount>=2 or ~ITEM_LINKED
assert(it->refcount >= 2);
// NOTE: write bucket vs free page bucket will disambiguate once
// lowttl feature is better understood.
if (extstore_write_request(storage, bucket, bucket, &io) == 0) {
// cuddle the hash value into the time field so we don't have
// to recalculate it.
item *buf_it = (item *) io.buf;
buf_it->time = it_info.hv;
// copy from past the headers + time headers.
// TODO: should be in items.c
if (it->it_flags & ITEM_CHUNKED) {
// Need to loop through the item and copy
item_chunk *sch = (item_chunk *) ITEM_schunk(it);
int remain = orig_ntotal;
int copied = 0;
// copy original header
int hdrtotal = ITEM_ntotal(it) - it->nbytes;
memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, hdrtotal - STORE_OFFSET);
copied = hdrtotal;
// copy data in like it were one large object.
while (sch && remain) {
assert(remain >= sch->used);
memcpy((char *)io.buf+copied, sch->data, sch->used);
// FIXME: use one variable?
remain -= sch->used;
copied += sch->used;
sch = sch->next;
}
} else {
memcpy((char *)io.buf+STORE_OFFSET, (char *)it+STORE_OFFSET, io.len-STORE_OFFSET);
}
// crc what we copied so we can do it sequentially.
buf_it->it_flags &= ~ITEM_LINKED;
buf_it->exptime = crc32c(0, (char*)io.buf+STORE_OFFSET, orig_ntotal-STORE_OFFSET);
extstore_write(storage, &io);
item_hdr *hdr = (item_hdr *) ITEM_data(hdr_it);
hdr->page_version = io.page_version;
hdr->page_id = io.page_id;
hdr->offset = io.offset;
// overload nbytes for the header it
hdr_it->nbytes = it->nbytes;
/* success! Now we need to fill relevant data into the new
* header and replace. Most of this requires the item lock
*/
/* CAS gets set while linking. Copy post-replace */
item_replace(it, hdr_it, it_info.hv);
ITEM_set_cas(hdr_it, ITEM_get_cas(it));
do_item_remove(hdr_it);
did_moves = 1;
LOGGER_LOG(NULL, LOG_EVICTIONS, LOGGER_EXTSTORE_WRITE, it, bucket);
} else {
/* Failed to write for some reason, can't continue. */
slabs_free(hdr_it, ITEM_ntotal(hdr_it), ITEM_clsid(hdr_it));
}
}
}
do_item_remove(it);
item_unlock(it_info.hv);
return did_moves;
}
static pthread_t storage_write_tid;
static pthread_mutex_t storage_write_plock;
#define WRITE_SLEEP_MAX 1000000
#define WRITE_SLEEP_MIN 500
static void *storage_write_thread(void *arg) {
void *storage = arg;
// NOTE: ignoring overflow since that would take years of uptime in a
// specific load pattern of never going to sleep.
unsigned int backoff[MAX_NUMBER_OF_SLAB_CLASSES] = {0};
unsigned int counter = 0;
useconds_t to_sleep = WRITE_SLEEP_MIN;
logger *l = logger_create();
if (l == NULL) {
fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
abort();
}
pthread_mutex_lock(&storage_write_plock);
while (1) {
// cache per-loop to avoid calls to the slabs_clsid() search loop
int min_class = slabs_clsid(settings.ext_item_size);
bool do_sleep = true;
counter++;
if (to_sleep > WRITE_SLEEP_MAX)
to_sleep = WRITE_SLEEP_MAX;
for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
bool did_move = false;
bool mem_limit_reached = false;
unsigned int chunks_free;
int item_age;
int target = settings.ext_free_memchunks[x];
if (min_class > x || (backoff[x] && (counter % backoff[x] != 0))) {
// Long sleeps means we should retry classes sooner.
if (to_sleep > WRITE_SLEEP_MIN * 10)
backoff[x] /= 2;
continue;
}
// Avoid extra slab lock calls during heavy writing.
chunks_free = slabs_available_chunks(x, &mem_limit_reached,
NULL);
// storage_write() will fail and cut loop after filling write buffer.
while (1) {
// if we are low on chunks and no spare, push out early.
if (chunks_free < target && mem_limit_reached) {
item_age = 0;
} else {
item_age = settings.ext_item_age;
}
if (storage_write(storage, x, item_age)) {
chunks_free++; // Allow stopping if we've done enough this loop
did_move = true;
do_sleep = false;
if (to_sleep > WRITE_SLEEP_MIN)
to_sleep /= 2;
} else {
break;
}
}
if (!did_move) {
backoff[x]++;
} else if (backoff[x]) {
backoff[x] /= 2;
}
}
// flip lock so we can be paused or stopped
pthread_mutex_unlock(&storage_write_plock);
if (do_sleep) {
usleep(to_sleep);
to_sleep *= 2;
}
pthread_mutex_lock(&storage_write_plock);
}
return NULL;
}
// TODO
// logger needs logger_destroy() to exist/work before this is safe.
/*int stop_storage_write_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
do_run_lru_maintainer_thread = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
// WAKEUP SIGNAL
if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
return -1;
}
settings.lru_maintainer_thread = false;
return 0;
}*/
void storage_write_pause(void) {
pthread_mutex_lock(&storage_write_plock);
}
void storage_write_resume(void) {
pthread_mutex_unlock(&storage_write_plock);
}
int start_storage_write_thread(void *arg) {
int ret;
pthread_mutex_init(&storage_write_plock, NULL);
if ((ret = pthread_create(&storage_write_tid, NULL,
storage_write_thread, arg)) != 0) {
fprintf(stderr, "Can't create storage_write thread: %s\n",
strerror(ret));
return -1;
}
return 0;
}
/*** COMPACTOR ***/
/* Fetch stats from the external storage system and decide to compact.
* If we're more than half full, start skewing how aggressively to run
* compaction, up to a desired target when all pages are full.
*/
static int storage_compact_check(void *storage, logger *l,
uint32_t *page_id, uint64_t *page_version,
uint64_t *page_size, bool *drop_unread) {
struct extstore_stats st;
int x;
double rate;
uint64_t frag_limit;
uint64_t low_version = ULLONG_MAX;
uint64_t lowest_version = ULLONG_MAX;
unsigned int low_page = 0;
unsigned int lowest_page = 0;
extstore_get_stats(storage, &st);
if (st.pages_used == 0)
return 0;
// lets pick a target "wasted" value and slew.
if (st.pages_free > settings.ext_compact_under)
return 0;
*drop_unread = false;
// the number of free pages reduces the configured frag limit
// this allows us to defrag early if pages are very empty.
rate = 1.0 - ((double)st.pages_free / st.page_count);
rate *= settings.ext_max_frag;
frag_limit = st.page_size * rate;
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_FRAGINFO,
NULL, rate, frag_limit);
st.page_data = calloc(st.page_count, sizeof(struct extstore_page_data));
extstore_get_page_data(storage, &st);
// find oldest page by version that violates the constraint
for (x = 0; x < st.page_count; x++) {
if (st.page_data[x].version == 0 ||
st.page_data[x].bucket == PAGE_BUCKET_LOWTTL)
continue;
if (st.page_data[x].version < lowest_version) {
lowest_page = x;
lowest_version = st.page_data[x].version;
}
if (st.page_data[x].bytes_used < frag_limit) {
if (st.page_data[x].version < low_version) {
low_page = x;
low_version = st.page_data[x].version;
}
}
}
*page_size = st.page_size;
free(st.page_data);
// we have a page + version to attempt to reclaim.
if (low_version != ULLONG_MAX) {
*page_id = low_page;
*page_version = low_version;
return 1;
} else if (lowest_version != ULLONG_MAX && settings.ext_drop_unread
&& st.pages_free <= settings.ext_drop_under) {
// nothing matched the frag rate barrier, so pick the absolute oldest
// version if we're configured to drop items.
*page_id = lowest_page;
*page_version = lowest_version;
*drop_unread = true;
return 1;
}
return 0;
}
static pthread_t storage_compact_tid;
static pthread_mutex_t storage_compact_plock;
#define MIN_STORAGE_COMPACT_SLEEP 10000
#define MAX_STORAGE_COMPACT_SLEEP 2000000
struct storage_compact_wrap {
obj_io io;
pthread_mutex_t lock; // gates the bools.
bool done;
bool submitted;
bool miss; // version flipped out from under us
};
static void storage_compact_readback(void *storage, logger *l,
bool drop_unread, char *readback_buf,
uint32_t page_id, uint64_t page_version, uint64_t read_size) {
uint64_t offset = 0;
unsigned int rescues = 0;
unsigned int lost = 0;
unsigned int skipped = 0;
while (offset < read_size) {
item *hdr_it = NULL;
item_hdr *hdr = NULL;
item *it = (item *)(readback_buf+offset);
unsigned int ntotal;
// probably zeroed out junk at the end of the wbuf
if (it->nkey == 0) {
break;
}
ntotal = ITEM_ntotal(it);
uint32_t hv = (uint32_t)it->time;
item_lock(hv);
// We don't have a conn and don't need to do most of do_item_get
hdr_it = assoc_find(ITEM_key(it), it->nkey, hv);
if (hdr_it != NULL) {
bool do_write = false;
refcount_incr(hdr_it);
// Check validity but don't bother removing it.
if ((hdr_it->it_flags & ITEM_HDR) && !item_is_flushed(hdr_it) &&
(hdr_it->exptime == 0 || hdr_it->exptime > current_time)) {
hdr = (item_hdr *)ITEM_data(hdr_it);
if (hdr->page_id == page_id && hdr->page_version == page_version) {
// Item header is still completely valid.
extstore_delete(storage, page_id, page_version, 1, ntotal);
// drop inactive items.
if (drop_unread && GET_LRU(hdr_it->slabs_clsid) == COLD_LRU) {
do_write = false;
skipped++;
} else {
do_write = true;
}
}
}
if (do_write) {
bool do_update = false;
int tries;
obj_io io;
io.len = ntotal;
io.mode = OBJ_IO_WRITE;
for (tries = 10; tries > 0; tries--) {
if (extstore_write_request(storage, PAGE_BUCKET_COMPACT, PAGE_BUCKET_COMPACT, &io) == 0) {
memcpy(io.buf, it, io.len);
extstore_write(storage, &io);
do_update = true;
break;
} else {
usleep(1000);
}
}
if (do_update) {
if (it->refcount == 2) {
hdr->page_version = io.page_version;
hdr->page_id = io.page_id;
hdr->offset = io.offset;
rescues++;
} else {
lost++;
// TODO: re-alloc and replace header.
}
} else {
lost++;
}
}
do_item_remove(hdr_it);
}
item_unlock(hv);
offset += ntotal;
if (read_size - offset < sizeof(struct _stritem))
break;
}
STATS_LOCK();
stats.extstore_compact_lost += lost;
stats.extstore_compact_rescues += rescues;
stats.extstore_compact_skipped += skipped;
STATS_UNLOCK();
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_END,
NULL, page_id, offset, rescues, lost, skipped);
}
static void _storage_compact_cb(void *e, obj_io *io, int ret) {
struct storage_compact_wrap *wrap = (struct storage_compact_wrap *)io->data;
assert(wrap->submitted == true);
pthread_mutex_lock(&wrap->lock);
if (ret < 1) {
wrap->miss = true;
}
wrap->done = true;
pthread_mutex_unlock(&wrap->lock);
}
// TODO: hoist the storage bits from lru_maintainer_thread in here.
// would be nice if they could avoid hammering the same locks though?
// I guess it's only COLD. that's probably fine.
static void *storage_compact_thread(void *arg) {
void *storage = arg;
useconds_t to_sleep = MAX_STORAGE_COMPACT_SLEEP;
bool compacting = false;
uint64_t page_version = 0;
uint64_t page_size = 0;
uint64_t page_offset = 0;
uint32_t page_id = 0;
bool drop_unread = false;
char *readback_buf = NULL;
struct storage_compact_wrap wrap;
logger *l = logger_create();
if (l == NULL) {
fprintf(stderr, "Failed to allocate logger for storage compaction thread\n");
abort();
}
readback_buf = malloc(settings.ext_wbuf_size);
if (readback_buf == NULL) {
fprintf(stderr, "Failed to allocate readback buffer for storage compaction thread\n");
abort();
}
pthread_mutex_init(&wrap.lock, NULL);
wrap.done = false;
wrap.submitted = false;
wrap.io.data = &wrap;
wrap.io.iov = NULL;
wrap.io.buf = (void *)readback_buf;
wrap.io.len = settings.ext_wbuf_size;
wrap.io.mode = OBJ_IO_READ;
wrap.io.cb = _storage_compact_cb;
pthread_mutex_lock(&storage_compact_plock);
while (1) {
pthread_mutex_unlock(&storage_compact_plock);
if (to_sleep) {
extstore_run_maint(storage);
usleep(to_sleep);
}
pthread_mutex_lock(&storage_compact_plock);
if (!compacting && storage_compact_check(storage, l,
&page_id, &page_version, &page_size, &drop_unread)) {
page_offset = 0;
compacting = true;
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_START,
NULL, page_id, page_version);
}
if (compacting) {
pthread_mutex_lock(&wrap.lock);
if (page_offset < page_size && !wrap.done && !wrap.submitted) {
wrap.io.page_version = page_version;
wrap.io.page_id = page_id;
wrap.io.offset = page_offset;
// FIXME: should be smarter about io->next (unlink at use?)
wrap.io.next = NULL;
wrap.submitted = true;
wrap.miss = false;
extstore_submit(storage, &wrap.io);
} else if (wrap.miss) {
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_ABORT,
NULL, page_id);
wrap.done = false;
wrap.submitted = false;
compacting = false;
} else if (wrap.submitted && wrap.done) {
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_READ_START,
NULL, page_id, page_offset);
storage_compact_readback(storage, l, drop_unread,
readback_buf, page_id, page_version, settings.ext_wbuf_size);
page_offset += settings.ext_wbuf_size;
wrap.done = false;
wrap.submitted = false;
} else if (page_offset >= page_size) {
compacting = false;
wrap.done = false;
wrap.submitted = false;
extstore_close_page(storage, page_id, page_version);
LOGGER_LOG(l, LOG_SYSEVENTS, LOGGER_COMPACT_END,
NULL, page_id);
}
pthread_mutex_unlock(&wrap.lock);
if (to_sleep > MIN_STORAGE_COMPACT_SLEEP)
to_sleep /= 2;
} else {
if (to_sleep < MAX_STORAGE_COMPACT_SLEEP)
to_sleep += MIN_STORAGE_COMPACT_SLEEP;
}
}
free(readback_buf);
return NULL;
}
// TODO
// logger needs logger_destroy() to exist/work before this is safe.
/*int stop_storage_compact_thread(void) {
int ret;
pthread_mutex_lock(&lru_maintainer_lock);
do_run_lru_maintainer_thread = 0;
pthread_mutex_unlock(&lru_maintainer_lock);
if ((ret = pthread_join(lru_maintainer_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU maintainer thread: %s\n", strerror(ret));
return -1;
}
settings.lru_maintainer_thread = false;
return 0;
}*/
void storage_compact_pause(void) {
pthread_mutex_lock(&storage_compact_plock);
}
void storage_compact_resume(void) {
pthread_mutex_unlock(&storage_compact_plock);
}
int start_storage_compact_thread(void *arg) {
int ret;
pthread_mutex_init(&storage_compact_plock, NULL);
if ((ret = pthread_create(&storage_compact_tid, NULL,
storage_compact_thread, arg)) != 0) {
fprintf(stderr, "Can't create storage_compact thread: %s\n",
strerror(ret));
return -1;
}
return 0;
}
/*** UTILITY ***/
// /path/to/file:100G:bucket1
// FIXME: Modifies argument. copy instead?
struct extstore_conf_file *storage_conf_parse(char *arg, unsigned int page_size) {
struct extstore_conf_file *cf = NULL;
char *b = NULL;
char *p = strtok_r(arg, ":", &b);
char unit = 0;
uint64_t multiplier = 0;
int base_size = 0;
if (p == NULL)
goto error;
// First arg is the filepath.
cf = calloc(1, sizeof(struct extstore_conf_file));
cf->file = strdup(p);
p = strtok_r(NULL, ":", &b);
if (p == NULL) {
fprintf(stderr, "must supply size to ext_path, ie: ext_path=/f/e:64m (M|G|T|P supported)\n");
goto error;
}
unit = tolower(p[strlen(p)-1]);
p[strlen(p)-1] = '\0';
// sigh.
switch (unit) {
case 'm':
multiplier = 1024 * 1024;
break;
case 'g':
multiplier = 1024 * 1024 * 1024;
break;
case 't':
multiplier = 1024 * 1024;
multiplier *= 1024 * 1024;
break;
case 'p':
multiplier = 1024 * 1024;
multiplier *= 1024 * 1024 * 1024;
break;
}
base_size = atoi(p);
multiplier *= base_size;
// page_count is nearest-but-not-larger-than pages * psize
cf->page_count = multiplier / page_size;
assert(page_size * cf->page_count <= multiplier);
// final token would be a default free bucket
p = strtok_r(NULL, ",", &b);
// TODO: We reuse the original DEFINES for now,
// but if lowttl gets split up this needs to be its own set.
if (p != NULL) {
if (strcmp(p, "compact") == 0) {
cf->free_bucket = PAGE_BUCKET_COMPACT;
} else if (strcmp(p, "lowttl") == 0) {
cf->free_bucket = PAGE_BUCKET_LOWTTL;
} else if (strcmp(p, "chunked") == 0) {
cf->free_bucket = PAGE_BUCKET_CHUNKED;
} else if (strcmp(p, "default") == 0) {
cf->free_bucket = PAGE_BUCKET_DEFAULT;
} else {
fprintf(stderr, "Unknown extstore bucket: %s\n", p);
goto error;
}
} else {
// TODO: is this necessary?
cf->free_bucket = PAGE_BUCKET_DEFAULT;
}
// TODO: disabling until compact algorithm is improved.
if (cf->free_bucket != PAGE_BUCKET_DEFAULT) {
fprintf(stderr, "ext_path only presently supports the default bucket\n");
goto error;
}
return cf;
error:
if (cf) {
if (cf->file)
free(cf->file);
free(cf);
}
return NULL;
}
struct storage_settings {
struct extstore_conf_file *storage_file;
struct extstore_conf ext_cf;
};
void *storage_init_config(struct settings *s) {
struct storage_settings *cf = calloc(1, sizeof(struct storage_settings));
s->ext_item_size = 512;
s->ext_item_age = UINT_MAX;
s->ext_low_ttl = 0;
s->ext_recache_rate = 2000;
s->ext_max_frag = 0.8;
s->ext_drop_unread = false;
s->ext_wbuf_size = 1024 * 1024 * 4;
s->ext_compact_under = 0;
s->ext_drop_under = 0;
s->slab_automove_freeratio = 0.01;
s->ext_page_size = 1024 * 1024 * 64;
s->ext_io_threadcount = 1;
cf->ext_cf.page_size = settings.ext_page_size;
cf->ext_cf.wbuf_size = settings.ext_wbuf_size;
cf->ext_cf.io_threadcount = settings.ext_io_threadcount;
cf->ext_cf.io_depth = 1;
cf->ext_cf.page_buckets = 4;
cf->ext_cf.wbuf_count = cf->ext_cf.page_buckets;
return cf;
}
// TODO: pass settings struct?
int storage_read_config(void *conf, char **subopt) {
struct storage_settings *cf = conf;
struct extstore_conf *ext_cf = &cf->ext_cf;
char *subopts_value;
enum {
EXT_PAGE_SIZE,
EXT_WBUF_SIZE,
EXT_THREADS,
EXT_IO_DEPTH,
EXT_PATH,
EXT_ITEM_SIZE,
EXT_ITEM_AGE,
EXT_LOW_TTL,
EXT_RECACHE_RATE,
EXT_COMPACT_UNDER,
EXT_DROP_UNDER,
EXT_MAX_FRAG,
EXT_DROP_UNREAD,
SLAB_AUTOMOVE_FREERATIO, // FIXME: move this back?
};
char *const subopts_tokens[] = {
[EXT_PAGE_SIZE] = "ext_page_size",
[EXT_WBUF_SIZE] = "ext_wbuf_size",
[EXT_THREADS] = "ext_threads",
[EXT_IO_DEPTH] = "ext_io_depth",
[EXT_PATH] = "ext_path",
[EXT_ITEM_SIZE] = "ext_item_size",
[EXT_ITEM_AGE] = "ext_item_age",
[EXT_LOW_TTL] = "ext_low_ttl",
[EXT_RECACHE_RATE] = "ext_recache_rate",
[EXT_COMPACT_UNDER] = "ext_compact_under",
[EXT_DROP_UNDER] = "ext_drop_under",
[EXT_MAX_FRAG] = "ext_max_frag",
[EXT_DROP_UNREAD] = "ext_drop_unread",
[SLAB_AUTOMOVE_FREERATIO] = "slab_automove_freeratio",
NULL
};
switch (getsubopt(subopt, subopts_tokens, &subopts_value)) {
case EXT_PAGE_SIZE:
if (cf->storage_file) {
fprintf(stderr, "Must specify ext_page_size before any ext_path arguments\n");
return 1;
}
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_page_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf->page_size)) {
fprintf(stderr, "could not parse argument to ext_page_size\n");
return 1;
}
ext_cf->page_size *= 1024 * 1024; /* megabytes */
break;
case EXT_WBUF_SIZE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_wbuf_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf->wbuf_size)) {
fprintf(stderr, "could not parse argument to ext_wbuf_size\n");
return 1;
}
ext_cf->wbuf_size *= 1024 * 1024; /* megabytes */
settings.ext_wbuf_size = ext_cf->wbuf_size;
break;
case EXT_THREADS:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_threads argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf->io_threadcount)) {
fprintf(stderr, "could not parse argument to ext_threads\n");
return 1;
}
break;
case EXT_IO_DEPTH:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_io_depth argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &ext_cf->io_depth)) {
fprintf(stderr, "could not parse argument to ext_io_depth\n");
return 1;
}
break;
case EXT_ITEM_SIZE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_item_size argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_item_size)) {
fprintf(stderr, "could not parse argument to ext_item_size\n");
return 1;
}
break;
case EXT_ITEM_AGE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_item_age argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_item_age)) {
fprintf(stderr, "could not parse argument to ext_item_age\n");
return 1;
}
break;
case EXT_LOW_TTL:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_low_ttl argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_low_ttl)) {
fprintf(stderr, "could not parse argument to ext_low_ttl\n");
return 1;
}
break;
case EXT_RECACHE_RATE:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_recache_rate argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_recache_rate)) {
fprintf(stderr, "could not parse argument to ext_recache_rate\n");
return 1;
}
break;
case EXT_COMPACT_UNDER:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_compact_under argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_compact_under)) {
fprintf(stderr, "could not parse argument to ext_compact_under\n");
return 1;
}
break;
case EXT_DROP_UNDER:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_drop_under argument\n");
return 1;
}
if (!safe_strtoul(subopts_value, &settings.ext_drop_under)) {
fprintf(stderr, "could not parse argument to ext_drop_under\n");
return 1;
}
break;
case EXT_MAX_FRAG:
if (subopts_value == NULL) {
fprintf(stderr, "Missing ext_max_frag argument\n");
return 1;
}
if (!safe_strtod(subopts_value, &settings.ext_max_frag)) {
fprintf(stderr, "could not parse argument to ext_max_frag\n");
return 1;
}
break;
case SLAB_AUTOMOVE_FREERATIO:
if (subopts_value == NULL) {
fprintf(stderr, "Missing slab_automove_freeratio argument\n");
return 1;
}
if (!safe_strtod(subopts_value, &settings.slab_automove_freeratio)) {
fprintf(stderr, "could not parse argument to slab_automove_freeratio\n");
return 1;
}
break;
case EXT_DROP_UNREAD:
settings.ext_drop_unread = true;
break;
case EXT_PATH:
if (subopts_value) {
struct extstore_conf_file *tmp = storage_conf_parse(subopts_value, ext_cf->page_size);
if (tmp == NULL) {
fprintf(stderr, "failed to parse ext_path argument\n");
return 1;
}
if (cf->storage_file != NULL) {
tmp->next = cf->storage_file;
}
cf->storage_file = tmp;
} else {
fprintf(stderr, "missing argument to ext_path, ie: ext_path=/d/file:5G\n");
return 1;
}
break;
default:
fprintf(stderr, "Illegal suboption \"%s\"\n", subopts_value);
return 1;
}
return 0;
}
int storage_check_config(void *conf) {
struct storage_settings *cf = conf;
struct extstore_conf *ext_cf = &cf->ext_cf;
if (cf->storage_file) {
if (settings.item_size_max > ext_cf->wbuf_size) {
fprintf(stderr, "-I (item_size_max: %d) cannot be larger than ext_wbuf_size: %d\n",
settings.item_size_max, ext_cf->wbuf_size);
return 1;
}
if (settings.udpport) {
fprintf(stderr, "Cannot use UDP with extstore enabled (-U 0 to disable)\n");
return 1;
}
return 0;
}
return 2;
}
void *storage_init(void *conf) {
struct storage_settings *cf = conf;
struct extstore_conf *ext_cf = &cf->ext_cf;
enum extstore_res eres;
void *storage = NULL;
if (settings.ext_compact_under == 0) {
// If changing the default fraction (4), change the help text as well.
settings.ext_compact_under = cf->storage_file->page_count / 4;
/* Only rescues non-COLD items if below this threshold */
settings.ext_drop_under = cf->storage_file->page_count / 4;
}
crc32c_init();
/* Init free chunks to zero. */
for (int x = 0; x < MAX_NUMBER_OF_SLAB_CLASSES; x++) {
settings.ext_free_memchunks[x] = 0;
}
storage = extstore_init(cf->storage_file, ext_cf, &eres);
if (storage == NULL) {
fprintf(stderr, "Failed to initialize external storage: %s\n",
extstore_err(eres));
if (eres == EXTSTORE_INIT_OPEN_FAIL) {
perror("extstore open");
}
return NULL;
}
return storage;
}
#endif
1
https://gitee.com/dearbub/memcached-memcached-master.git
git@gitee.com:dearbub/memcached-memcached-master.git
dearbub
memcached-memcached-master
memcached-memcached-master
master

搜索帮助