# Author: Bohua Zhan
import io
import os
import json
from kernel import term
from kernel.term import Var
from kernel import theory
from kernel.theory import Theory, TheoryException
from kernel.thm import Thm
from kernel import extension
from server import items
import sys
# sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
Cache of parsed theories.
The dictionary is indexed by user, and then by theory name.
Each theory stores a 'timestamp' field, for the last modification
time of the corresponding file.
In the contents, instead of each item is the parsed item as
well as the corresponding extension.
theory_cache = dict()
Cache of item mapping.
The dictionary is indexed by user. For each user, it is a
mapping from (ty, name) to (theory_name, timestamp, index).
item_index = dict()
dirname = os.path.dirname(__file__)
def user_dir(username="master"):
"""Returns directory for the user."""
assert username, "user_dir: empty username."
if username == 'master':
return os.path.join(dirname, '../library/')
return os.path.join(dirname, '../users/' + username)
def user_file(filename, username="master"):
"""Return json file for the user and given filename."""
assert username, "user_file: empty username."
if username == 'master':
return os.path.join(dirname, '../library/' + filename + '.json')
return os.path.join(dirname, '../users/' + username + '/' + filename + '.json')
def load_json_data(filename, username="master"):
"""Load json data for the given theory name and user."""
with open(user_file(filename, username), encoding='utf-8') as f:
return json.load(f)
def load_metadata(username="master"):
"""For the given user, load metadata for all theory files."""
theory_cache[username] = dict()
item_index[username] = dict()
for f in os.listdir(user_dir(username)):
if f.endswith('.json'):
filename = f[:-5]
data = load_json_data(filename, username)
timestamp = os.path.getmtime(user_file(filename, username))
theory_cache[username][filename] = {
'imports': data['imports'],
'description': data['description']
# Immediately check for topological order.
def check_topological_sort(username="master"):
"""For the given user, check the import relations have no cycles."""
for name in theory_cache[username].keys():
theory_cache[username][name]['visited'] = False
count = 0
def dfs(name, path):
"""Perform depth-first search.
name - current theory name.
path - list of theory names on the current search path,
not including the current theory.
nonlocal count
if theory_cache[username][name]['visited']:
if name in path:
id = path.index(name)
cycle = path[id:] + (name,)
raise TheoryException("Cycle in imports: %s" % (', '.join(cycle)))
for import_name in theory_cache[username][name]['imports']:
dfs(import_name, path + (name,))
theory_cache[username][name]['order'] = count
theory_cache[username][name]['visited'] = True
count += 1
for name in sorted(theory_cache[username].keys()):
if not theory_cache[username][name]['visited']:
dfs(name, tuple())
def get_import_order(filenames, username="master"):
"""Obtain the order of loading theories for fulfilling
the imports in the theory given by the list of filenames.
if username not in theory_cache:
depend_list = []
def dfs(name):
if name in depend_list:
for import_name in theory_cache[username][name]['imports']:
for name in filenames:
return depend_list
def load_theory_cache(filename, username="master"):
"""Load the content of the given theory into cache.
Return the theory cache as a dictionary.
if username not in theory_cache:
cache = theory_cache[username][filename]
timestamp = os.path.getmtime(user_file(filename, username))
if 'timestamp' in cache and timestamp == cache['timestamp']:
# No need to update cache
return cache
# Load all required macros and methods for this file.
# Make table for this later.
if filename == 'logic':
from prover import z3wrapper
if filename == 'expr':
from data import expr
if filename == 'real':
from data import real
if filename == 'hoare':
from imperative import imp
# Load all imported theories
depend_list = get_import_order(cache['imports'], username)
with theory.fresh_theory():
for prev_name in depend_list:
prev_cache = load_theory_cache(prev_name, username)
for item in prev_cache['content']:
if item.error is None:
# Use this theory to parse the content of current theory
cache['timestamp'] = timestamp
data = load_json_data(filename, username)
cache['content'] = []
for index, item in enumerate(data['content']):
item = items.parse_item(item)
if item.error is None:
exts = item.get_extension()
for ext in exts:
if ext.is_constant():
name = ext.ref_name
name = ext.name
item_index[username][(ext.ty, name)] = (filename, timestamp, index)
return cache
def query_item_index(username, filename, ext_ty, name):
"""Query the item index."""
# Make sure the theory (and all its dependencies) are indexed
load_theory_cache(filename, username)
if (ext_ty, name) in item_index[username]:
filename, timestamp, index = item_index[username][(ext_ty, name)]
if timestamp == os.path.getmtime(user_file(filename, username)):
return filename, index
return None
return None
def load_theory(filename: str, *, limit=None, username="master"):
"""Load the theory with the given theory name.
Optional limit is a pair (ty, name) specifying the first item
that should not be loaded.
load_theory_cache(filename, username)
cache = theory_cache[username][filename]
# Load imported theories
depend_list = get_import_order(cache['imports'], username)
theory.thy = theory.EmptyTheory()
for prev_name in depend_list:
prev_cache = load_theory_cache(prev_name, username)
for item in prev_cache['content']:
if item.error is None:
if limit == 'start':
return None
# Take the portion of content up to (and not including) limit
content = cache['content']
found_limit = False
for item in content:
if limit and item.ty == limit[0] and item.name == limit[1]:
found_limit = True
if item.error is None:
if limit and not found_limit:
raise TheoryException("load_theory: limit %s not found" % str(limit))
return None
