代码拉取完成,页面将自动刷新
import pickle
import time
import threading
def deferFunc(func):
def wrapper(*args):
self = args[0]
self.lock.acquire()
try:
result = func(*args)
self.cacheKeysNum = self.cacheKeysNum + 1
self.writeToDisk()
return result
finally:
self.lock.release()
return wrapper
class dbCache():
def __init__(self,loadFiles):
self.cacheFile = loadFiles
self.lastTimes = time.time()
self.cacheKeysNum = 0
self.lock = threading.Lock()
self.cacheSetting()
try:
with open("db.cache","rb") as f:
self.caches = pickle.load(f)
except Exception as e:
print("open file errors" , e)
self.caches = {}
def cacheSetting(self,queueMaxKeys = 3,ageSec = 10):
self.queueMaxKeys = queueMaxKeys
self.ageSec = ageSec
def writeToDisk(self):
if self.cacheKeysNum >= self.queueMaxKeys or (time.time() - self.lastTimes) >= self.ageSec:
with open(self.cacheFile, "wb") as f:
pickle.dump(self.caches, f)
self.lastTimes = time.time()
self.cacheKeysNum = 0
@deferFunc
def get(self,key):
if key in self.caches:
return self.caches[key]
else:
return None
@deferFunc
def set(self,key,value):
if key != "":
self.caches[key] = value
else:
raise "key cannot be empty"
def isExists(self,key):
if key in self.caches:
return True
else:
return False
@deferFunc
def update(self,key,value):
if self.isExists(key):
self.caches[key] = value
return True
else:
return False
@deferFunc
def delete(self,key):
if self.isExists(key):
del self.caches[key]
return True
else:
return False
def main() -> None:
c = dbCache("db.cache")
c.cacheSetting(queueMaxKeys=3,ageSec=3)
c.set("name","pdudo")
time.sleep(5)
c.set("site","juejin")
print(c.get("name"))
c.delete("name")
c.update("site","pdudo")
if __name__ == '__main__':
main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。