1 Star 1 Fork 2

pdudo / golearn

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
main.py 2.21 KB
一键复制 编辑 原始数据 按行查看 历史
import pickle
import time
import threading
def deferFunc(func):
def wrapper(*args):
self = args[0]
self.lock.acquire()
try:
result = func(*args)
self.cacheKeysNum = self.cacheKeysNum + 1
self.writeToDisk()
return result
finally:
self.lock.release()
return wrapper
class dbCache():
def __init__(self,loadFiles):
self.cacheFile = loadFiles
self.lastTimes = time.time()
self.cacheKeysNum = 0
self.lock = threading.Lock()
self.cacheSetting()
try:
with open("db.cache","rb") as f:
self.caches = pickle.load(f)
except Exception as e:
print("open file errors" , e)
self.caches = {}
def cacheSetting(self,queueMaxKeys = 3,ageSec = 10):
self.queueMaxKeys = queueMaxKeys
self.ageSec = ageSec
def writeToDisk(self):
if self.cacheKeysNum >= self.queueMaxKeys or (time.time() - self.lastTimes) >= self.ageSec:
with open(self.cacheFile, "wb") as f:
pickle.dump(self.caches, f)
self.lastTimes = time.time()
self.cacheKeysNum = 0
@deferFunc
def get(self,key):
if key in self.caches:
return self.caches[key]
else:
return None
@deferFunc
def set(self,key,value):
if key != "":
self.caches[key] = value
else:
raise "key cannot be empty"
def isExists(self,key):
if key in self.caches:
return True
else:
return False
@deferFunc
def update(self,key,value):
if self.isExists(key):
self.caches[key] = value
return True
else:
return False
@deferFunc
def delete(self,key):
if self.isExists(key):
del self.caches[key]
return True
else:
return False
def main() -> None:
c = dbCache("db.cache")
c.cacheSetting(queueMaxKeys=3,ageSec=3)
c.set("name","pdudo")
time.sleep(5)
c.set("site","juejin")
print(c.get("name"))
c.delete("name")
c.update("site","pdudo")
if __name__ == '__main__':
main()
Go
1
https://gitee.com/pdudo/golearn.git
git@gitee.com:pdudo/golearn.git
pdudo
golearn
golearn
master

搜索帮助