Ai
1 Star 0 Fork 1

Owen/Python-causalml

forked from 连享会/Python-causalml 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
test_meta_learners.py 31.01 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestRegressor
from causalml.dataset import synthetic_data
from causalml.inference.meta import BaseSLearner, BaseSRegressor, BaseSClassifier, LRSRegressor
from causalml.inference.meta import BaseTLearner, BaseTRegressor, BaseTClassifier, XGBTRegressor, MLPTRegressor
from causalml.inference.meta import BaseXLearner, BaseXClassifier, BaseXRegressor
from causalml.inference.meta import BaseRLearner, BaseRClassifier, BaseRRegressor, XGBRRegressor
from causalml.inference.meta import TMLELearner
from causalml.inference.meta import BaseDRLearner
from causalml.metrics import ape, get_cumgain
from .const import RANDOM_SEED, N_SAMPLE, ERROR_THRESHOLD, CONTROL_NAME, CONVERSION
def test_synthetic_data():
y, X, treatment, tau, b, e = synthetic_data(mode=1, n=N_SAMPLE, p=8, sigma=.1)
assert (y.shape[0] == X.shape[0] and y.shape[0] == treatment.shape[0] and
y.shape[0] == tau.shape[0] and y.shape[0] == b.shape[0] and
y.shape[0] == e.shape[0])
y, X, treatment, tau, b, e = synthetic_data(mode=2, n=N_SAMPLE, p=8, sigma=.1)
assert (y.shape[0] == X.shape[0] and y.shape[0] == treatment.shape[0] and
y.shape[0] == tau.shape[0] and y.shape[0] == b.shape[0] and
y.shape[0] == e.shape[0])
y, X, treatment, tau, b, e = synthetic_data(mode=3, n=N_SAMPLE, p=8, sigma=.1)
assert (y.shape[0] == X.shape[0] and y.shape[0] == treatment.shape[0] and
y.shape[0] == tau.shape[0] and y.shape[0] == b.shape[0] and
y.shape[0] == e.shape[0])
y, X, treatment, tau, b, e = synthetic_data(mode=4, n=N_SAMPLE, p=8, sigma=.1)
assert (y.shape[0] == X.shape[0] and y.shape[0] == treatment.shape[0] and
y.shape[0] == tau.shape[0] and y.shape[0] == b.shape[0] and
y.shape[0] == e.shape[0])
def test_BaseSLearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseSLearner(learner=LinearRegression())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, return_ci=True)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
def test_BaseSRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseSRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_LRSRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = LRSRegressor()
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
def test_BaseTLearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseTLearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
# test of using control_learner and treatment_learner
learner = BaseTLearner(learner=XGBRegressor(),
control_learner=RandomForestRegressor(),
treatment_learner=RandomForestRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
def test_BaseTRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseTRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_MLPTRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = MLPTRegressor()
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_XGBTRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = XGBTRegressor()
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseXLearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseXLearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, p=e, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
# basic test of using outcome_learner and effect_learner
learner = BaseXLearner(learner=XGBRegressor(),
control_outcome_learner=RandomForestRegressor(),
treatment_outcome_learner=RandomForestRegressor(),
control_effect_learner=RandomForestRegressor(),
treatment_effect_learner=RandomForestRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
def test_BaseXRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseXRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, p=e, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseXLearner_without_p(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseXLearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseXRegressor_without_p(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseXRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseRLearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseRLearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, p=e, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
# basic test of using outcome_learner and effect_learner
learner = BaseRLearner(learner=XGBRegressor(),
outcome_learner=RandomForestRegressor(),
effect_learner=RandomForestRegressor())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD * 5 # might need to look into higher ape
def test_BaseRRegressor(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseRRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, p=e, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseRLearner_without_p(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseRLearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_BaseRRegressor_without_p(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseRRegressor(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
def test_TMLELearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = TMLELearner(learner=XGBRegressor())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, p=e, treatment=treatment, y=y)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
def test_BaseSClassifier(generate_classification_data):
np.random.seed(RANDOM_SEED)
df, x_names = generate_classification_data()
df['treatment_group_key'] = np.where(df['treatment_group_key'] == CONTROL_NAME, 0, 1)
df_train, df_test = train_test_split(df,
test_size=0.2,
random_state=RANDOM_SEED)
uplift_model = BaseSClassifier(learner=XGBClassifier())
uplift_model.fit(X=df_train[x_names].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)
tau_pred = uplift_model.predict(X=df_test[x_names].values,
treatment=df_test['treatment_group_key'].values)
auuc_metrics = pd.DataFrame({'tau_pred': tau_pred.flatten(),
'W': df_test['treatment_group_key'].values,
CONVERSION: df_test[CONVERSION].values,
'treatment_effect_col': df_test['treatment_effect'].values})
cumgain = get_cumgain(auuc_metrics,
outcome_col=CONVERSION,
treatment_col='W',
treatment_effect_col='treatment_effect_col')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['tau_pred'].sum() > cumgain['Random'].sum()
def test_BaseTClassifier(generate_classification_data):
np.random.seed(RANDOM_SEED)
df, x_names = generate_classification_data()
df['treatment_group_key'] = np.where(df['treatment_group_key'] == CONTROL_NAME, 0, 1)
df_train, df_test = train_test_split(df,
test_size=0.2,
random_state=RANDOM_SEED)
uplift_model = BaseTClassifier(learner=LogisticRegression())
uplift_model.fit(X=df_train[x_names].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)
tau_pred = uplift_model.predict(X=df_test[x_names].values,
treatment=df_test['treatment_group_key'].values)
auuc_metrics = pd.DataFrame({'tau_pred': tau_pred.flatten(),
'W': df_test['treatment_group_key'].values,
CONVERSION: df_test[CONVERSION].values,
'treatment_effect_col': df_test['treatment_effect'].values})
cumgain = get_cumgain(auuc_metrics,
outcome_col=CONVERSION,
treatment_col='W',
treatment_effect_col='treatment_effect_col')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['tau_pred'].sum() > cumgain['Random'].sum()
def test_BaseXClassifier(generate_classification_data):
np.random.seed(RANDOM_SEED)
df, x_names = generate_classification_data()
df['treatment_group_key'] = np.where(df['treatment_group_key'] == CONTROL_NAME, 0, 1)
propensity_model = LogisticRegression()
propensity_model.fit(X=df[x_names].values, y=df['treatment_group_key'].values)
df['propensity_score'] = propensity_model.predict_proba(df[x_names].values)[:, 1]
df_train, df_test = train_test_split(df,
test_size=0.2,
random_state=RANDOM_SEED)
# specify all 4 learners
uplift_model = BaseXClassifier(control_outcome_learner=XGBClassifier(),
control_effect_learner=XGBRegressor(),
treatment_outcome_learner=XGBClassifier(),
treatment_effect_learner=XGBRegressor())
uplift_model.fit(X=df_train[x_names].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)
tau_pred = uplift_model.predict(X=df_test[x_names].values,
p=df_test['propensity_score'].values)
# specify 2 learners
uplift_model = BaseXClassifier(outcome_learner=XGBClassifier(),
effect_learner=XGBRegressor())
uplift_model.fit(X=df_train[x_names].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)
tau_pred = uplift_model.predict(X=df_test[x_names].values,
p=df_test['propensity_score'].values)
# calculate metrics
auuc_metrics = pd.DataFrame({'tau_pred': tau_pred.flatten(),
'W': df_test['treatment_group_key'].values,
CONVERSION: df_test[CONVERSION].values,
'treatment_effect_col': df_test['treatment_effect'].values})
cumgain = get_cumgain(auuc_metrics,
outcome_col=CONVERSION,
treatment_col='W',
treatment_effect_col='treatment_effect_col')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['tau_pred'].sum() > cumgain['Random'].sum()
def test_BaseRClassifier(generate_classification_data):
np.random.seed(RANDOM_SEED)
df, x_names = generate_classification_data()
df['treatment_group_key'] = np.where(df['treatment_group_key'] == CONTROL_NAME, 0, 1)
propensity_model = LogisticRegression()
propensity_model.fit(X=df[x_names].values, y=df['treatment_group_key'].values)
df['propensity_score'] = propensity_model.predict_proba(df[x_names].values)[:, 1]
df_train, df_test = train_test_split(df,
test_size=0.2,
random_state=RANDOM_SEED)
uplift_model = BaseRClassifier(outcome_learner=XGBClassifier(),
effect_learner=XGBRegressor())
uplift_model.fit(X=df_train[x_names].values,
p=df_train['propensity_score'].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values)
tau_pred = uplift_model.predict(X=df_test[x_names].values)
auuc_metrics = pd.DataFrame({'tau_pred': tau_pred.flatten(),
'W': df_test['treatment_group_key'].values,
CONVERSION: df_test[CONVERSION].values,
'treatment_effect_col': df_test['treatment_effect'].values})
cumgain = get_cumgain(auuc_metrics,
outcome_col=CONVERSION,
treatment_col='W',
treatment_effect_col='treatment_effect_col')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['tau_pred'].sum() > cumgain['Random'].sum()
def test_BaseRClassifier_with_sample_weights(generate_classification_data):
np.random.seed(RANDOM_SEED)
df, x_names = generate_classification_data()
df['treatment_group_key'] = np.where(df['treatment_group_key'] == CONTROL_NAME, 0, 1)
df['sample_weights'] = np.random.randint(low=1, high=3, size=df.shape[0])
propensity_model = LogisticRegression()
propensity_model.fit(X=df[x_names].values, y=df['treatment_group_key'].values)
df['propensity_score'] = propensity_model.predict_proba(df[x_names].values)[:, 1]
df_train, df_test = train_test_split(df,
test_size=0.2,
random_state=RANDOM_SEED)
uplift_model = BaseRClassifier(outcome_learner=XGBClassifier(),
effect_learner=XGBRegressor())
uplift_model.fit(X=df_train[x_names].values,
p=df_train['propensity_score'].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values,
sample_weight=df_train['sample_weights'])
tau_pred = uplift_model.predict(X=df_test[x_names].values)
auuc_metrics = pd.DataFrame({'tau_pred': tau_pred.flatten(),
'W': df_test['treatment_group_key'].values,
CONVERSION: df_test[CONVERSION].values,
'treatment_effect_col': df_test['treatment_effect'].values})
cumgain = get_cumgain(auuc_metrics,
outcome_col=CONVERSION,
treatment_col='W',
treatment_effect_col='treatment_effect_col')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['tau_pred'].sum() > cumgain['Random'].sum()
# Check if XGBRRegressor successfully produces treatment effect estimation
# when sample_weight is passed
uplift_model = XGBRRegressor()
uplift_model.fit(X=df_train[x_names].values,
p=df_train['propensity_score'].values,
treatment=df_train['treatment_group_key'].values,
y=df_train[CONVERSION].values,
sample_weight=df_train['sample_weights'])
tau_pred = uplift_model.predict(X=df_test[x_names].values)
assert len(tau_pred) == len(df_test['sample_weights'].values)
def test_pandas_input(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
# convert to pandas types
y = pd.Series(y)
X = pd.DataFrame(X)
treatment = pd.Series(treatment)
try:
learner = BaseSLearner(learner=LinearRegression())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, return_ci=True)
except AttributeError:
assert False
try:
learner = BaseTLearner(learner=LinearRegression())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y)
except AttributeError:
assert False
try:
learner = BaseXLearner(learner=LinearRegression())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
except AttributeError:
assert False
try:
learner = BaseRLearner(learner=LinearRegression())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
except AttributeError:
assert False
try:
learner = TMLELearner(learner=LinearRegression())
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
except AttributeError:
assert False
def test_BaseDRLearner(generate_regression_data):
y, X, treatment, tau, b, e = generate_regression_data()
learner = BaseDRLearner(learner=XGBRegressor(), treatment_effect_learner=LinearRegression())
# check the accuracy of the ATE estimation
ate_p, lb, ub = learner.estimate_ate(X=X, treatment=treatment, y=y, p=e)
assert (ate_p >= lb) and (ate_p <= ub)
assert ape(tau.mean(), ate_p) < ERROR_THRESHOLD
# check the accuracy of the CATE estimation with the bootstrap CI
cate_p, _, _ = learner.fit_predict(X=X, treatment=treatment, y=y, p=e, return_ci=True, n_bootstraps=10)
auuc_metrics = pd.DataFrame({'cate_p': cate_p.flatten(),
'W': treatment,
'y': y,
'treatment_effect_col': tau})
cumgain = get_cumgain(auuc_metrics,
outcome_col='y',
treatment_col='W',
treatment_effect_col='tau')
# Check if the cumulative gain when using the model's prediction is
# higher than it would be under random targeting
assert cumgain['cate_p'].sum() > cumgain['Random'].sum()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/owen560/Python-causalml.git
git@gitee.com:owen560/Python-causalml.git
owen560
Python-causalml
Python-causalml
v0.12.0

搜索帮助