Core Operations
1. 🏝️ In optimize
function:
(To run via CLI: type python tagifai/main.py optimize
)
Load the argumrnt json file into a Python dictionary, followed by unpacking it as keyword arguments and passing it to the
Namespace
constructor (fromargparse
library), which allows you to access the elements of the dictionary using dot notation.Set
optuna
pruner toMedianPruner
, which stops a trial if the trial's best intermediate result is worse than median of intermediate results of previous trials at the same step.Initiate a
optuna
study using the default study_name of "optimization".Initiate an instance of
MLflowCallback
(fromoptuna.integration.mlflow
) to track relevant information of Optuna.Start optimization using
study.optimize()
by callingtrain.object
forn_trial
times withtrial
(=optuna.trial._trial.Trial
type) and the current args as input.(in
train.objective
) Set the parameters to tune using, e.g.,trial.suggest_loguniform( learning_rate, 1e-2, 1e0)
to suggest values for this continuous parameter.Pass
df
, new args, andtrial
totrain.train()
.(in
train.train()
) Use the new args to train, in each of the training epoch (from 1 toargs.num_epochs
= 100) usestrail.report()
to reportval_loss
andepoch
, which are used to determine whether thistrail
should be pruned.Return artifacts (
args
,label_encoder
, lvectorizer
,model
, andperformance
).(in
train.objective()
) Use, e.g.,trail.set_user_attr(precision, artifacts[performance] [overall][precision]
) to set user attributes to the trial.Retrun
f1
.(in
main.optimize()
), instudy.optimize()
creates (if not already exists) a new MLflow experiment with the nameoptimization
.Go to Step 6 and continue to the next trial.
After
num_trails
is completed, merge the dictionary ofstudy.best_trail.params
into the starting argsargs.__dict__
usingargs = {**args.__dict__, **study.best_trial.params}
. Overide the values if the starting args have keys of the same names.Save the optimized args into the
config/args.json
file location.
2. ⛺ In class LabelEncoder
fit
raw labels to get a class instance with attributesclass_to_index
andindex_to_class
(intrain.train
module).Then
encode
raw labels for further training (intrain.train
module).After traing is completed,
save
class_to_index
to MLflow folder stores/model/1/ 5457....0020/artifacts/label_encoder.json (inmain.train_model
module).
3. ⛵ In train_model
(To run via CLI: type python tagifai/main.py train-model
)
Load the same raw data
df
.Load the optimized args.json file as a Namespace object.
Set the MLflow experiment name to
baseline
usingmlflow.set_experiment()
. (Tracking uri has already been set bymlflow.set_tracking_uri()
inconfig/config.py
)Start a mlflow run with the run name =
run_name
(=sgd
).Get
run_id
usingmlflow.active_run().info.run_id
.(in
train.train
) Inputdf
andargs
totrain()
, where thetrain_loss
andval_loss
of each epoch are logged usingmlflow.log_metrics(Dict[str, float], Step: optional [int])
.(in
main.train-model
) Get the artifacts dictionary back fromtrain.train()
, log additional metrics of overall precision, recall, and f1 usingmlflow.log_metrics()
.Save the parameters using
mlflow.log_params(Dict[str, Any])
after converting the Namespace args by returning the__dict__
method of Namespace) to dictionary usingvars()
.Use
mlflow.log_artifacts(local_dir: str)
to log all the contents of a local directory as artifacts of this run. Here we usewith tempfile.TemporaryDirectory() as dp:
to create a temporary directory to save the artifacts.
Use
json.dump()
to save the dictionary of args to a json file using the serialization encoder class lNumpyEncoder
.Use the
save()
method of aLableEncoder
class instance to save theclass_to_index
(key) dictionary to a json file.Use
joblib.dump()
on thevectorizer
(aTfidfVectorizer
instance) andmodel
(aSGDClassifier
instance) to save them as.pkl
files.Use
json.dump()
to save the performance dictionary to a json file.Finally use
mlflow.log_artifacts(dp)
to log all the contents of the temporary directorydp
.If not a test,
Use
Path(config.CONFIG_DIR, "run_id.txt")
to create a platform-independent file path and open it in write mode. Then use thewrite()
method ofopen(Path(), "w")
to writerun_id
torun_in.txt
.Use
json.dump()
to save the performance dictionary toperformance.json
.
elt_data()
Extract, load and transform our data assets.
Source code in tagifai/main.py
@app.command()
def elt_data():
"""Extract, load and transform our data assets."""
# Extract + Load
projects = pd.read_csv(config.PROJECTS_URL)
tags = pd.read_csv(config.TAGS_URL)
projects.to_csv(Path(config.DATA_DIR, "projects.csv"), index=False)
tags.to_csv(Path(config.DATA_DIR, "tags.csv"), index=False)
# Transform
df = pd.merge(projects, tags, on="id")
df = df[df.tag.notnull()] # drop rows w/ no tag
df.to_csv(Path(config.DATA_DIR, "labeled_projects.csv"), index=False)
# logger.info("✅ Saved data!")
load_artifacts(run_id=None)
Load artifacts for a given run_id.
Parameters: |
|
---|
Returns: |
|
---|
Source code in tagifai/main.py
def load_artifacts(run_id: str = None) -> Dict:
"""Load artifacts for a given run_id.
Args:
run_id (str): id of run to load artifacts from.
Returns:
Dict: run's artifacts.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
# Locate specifics artifacts directory
experiment_id = mlflow.get_run(run_id=run_id).info.experiment_id
artifacts_dir = Path(config.MODEL_REGISTRY, experiment_id, run_id, "artifacts")
# Load objects from run
args = Namespace(**utils.load_dict(filepath=Path(artifacts_dir, "args.json")))
vectorizer = joblib.load(Path(artifacts_dir, "vectorizer.pkl"))
label_encoder = data.LabelEncoder.load(fp=Path(artifacts_dir, "label_encoder.json"))
model = joblib.load(Path(artifacts_dir, "model.pkl"))
performance = utils.load_dict(filepath=Path(artifacts_dir, "performance.json"))
return {
"args": args,
"label_encoder": label_encoder,
"vectorizer": vectorizer,
"model": model,
"performance": performance,
}
optimize(args_fp='config/args.json', study_name='optimization', num_trials=20)
Optimize hyperparameters.
Parameters: |
|
---|
Source code in tagifai/main.py
@app.command()
def optimize(
args_fp: str = "config/args.json", study_name: str = "optimization", num_trials: int = 20
) -> None:
"""Optimize hyperparameters.
Args:
args_fp (str): location of args.
study_name (str): name of optimization study.
num_trials (int): number of trials to run in study.
"""
# Load labeled data
df = pd.read_csv(Path(config.DATA_DIR, "labeled_projects.csv"))
# Optimize
args = Namespace(**utils.load_dict(filepath=args_fp))
pruner = optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5)
study = optuna.create_study(study_name=study_name, direction="maximize", pruner=pruner)
mlflow_callback = MLflowCallback(tracking_uri=mlflow.get_tracking_uri(), metric_name="f1")
logger.info(f"\nTracking_uri: {mlflow.get_tracking_uri()}")
study.optimize(
lambda trial: train.objective(args, df, trial),
n_trials=num_trials,
callbacks=[mlflow_callback],
)
# Best trial
trials_df = study.trials_dataframe()
trials_df = trials_df.sort_values(["user_attrs_f1"], ascending=False)
logger.info(f"\nargs.__dict__: {args.__dict__}")
logger.info(f"study.best_trial.params: {study.best_trial.params}")
args = {**args.__dict__, **study.best_trial.params}
logger.info(f"arg: {args}")
utils.save_dict(d=args, filepath=args_fp, cls=NumpyEncoder)
logger.info(f"\nBest value (f1): {study.best_trial.value}")
logger.info(f"Best hyperparameters: {json.dumps(study.best_trial.params, indent=2)}")
predict_tag(text='', run_id=None)
Predict tag for text.
Parameters: |
|
---|
Source code in tagifai/main.py
@app.command()
def predict_tag(text: str = "", run_id: str = None) -> None:
"""Predict tag for text.
Args:
text (str): input text to predict label for.
run_id (str, optional): run id to load artifacts for prediction. Defaults to None.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
artifacts = load_artifacts(run_id=run_id)
prediction = predict.predict(texts=[text], artifacts=artifacts)
logger.info(json.dumps(prediction, indent=2))
return prediction
train_model(args_fp='config/args.json', experiment_name='baselines', run_name='sgd', test_run=False)
Train a model given arguments.
Parameters: |
|
---|
Source code in tagifai/main.py
@app.command()
def train_model(
args_fp: str = "config/args.json",
experiment_name: str = "baselines",
run_name: str = "sgd",
test_run: bool = False,
) -> None:
"""Train a model given arguments.
Args:
args_fp (str): location of args.
experiment_name (str): name of experiment.
run_name (str): name of specific run in experiment.
test_run (bool, optional): If True, artifacts will not be saved. Defaults to False.
"""
logger.info(f"\nargs_fp: {args_fp}")
logger.info(f"experiment_name: {experiment_name}")
logger.info(f"run_name: {run_name}")
logger.info(f"test_run: {test_run}")
# Load labeled data
df = pd.read_csv(Path(config.DATA_DIR, "labeled_projects.csv"))
# Train
args = Namespace(**utils.load_dict(filepath=args_fp))
mlflow.set_experiment(experiment_name=experiment_name)
with mlflow.start_run(run_name=run_name):
run_id = mlflow.active_run().info.run_id
logger.info(f"artifact_uri: {mlflow.get_artifact_uri()}")
logger.info(f"tracking_uri: {mlflow.get_tracking_uri()}")
logger.info(f"Run ID: {run_id}")
artifacts = train.train(df=df, args=args)
performance = artifacts["performance"]
logger.info(json.dumps(performance, indent=2))
# Log metrics and parameters
# performance = artifacts["performance"]
mlflow.log_metrics({"precision": performance["overall"]["precision"]})
mlflow.log_metrics({"recall": performance["overall"]["recall"]})
mlflow.log_metrics({"f1": performance["overall"]["f1"]})
mlflow.log_params(vars(artifacts["args"]))
# Log artifacts
with tempfile.TemporaryDirectory() as dp:
utils.save_dict(vars(artifacts["args"]), Path(dp, "args.json"), cls=NumpyEncoder)
artifacts["label_encoder"].save(Path(dp, "label_encoder.json"))
joblib.dump(artifacts["vectorizer"], Path(dp, "vectorizer.pkl"))
joblib.dump(artifacts["model"], Path(dp, "model.pkl"))
utils.save_dict(performance, Path(dp, "performance.json"))
logger.info(f"artifact_uri: {mlflow.get_artifact_uri()}")
logger.info(f"tracking_uri: {mlflow.get_tracking_uri()}")
mlflow.log_artifacts(dp)
# Save to config
if not test_run: # pragma: no cover, actual run
open(Path(config.CONFIG_DIR, "run_id.txt"), "w").write(run_id)
utils.save_dict(performance, Path(config.CONFIG_DIR, "performance.json"))