import autocuda
import sklearn
import torch
from pyabsa.framework.checkpoint_class.checkpoint_template import CheckpointManager
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import numpy as np
from transformers import (
DataCollatorForSeq2Seq,
AutoTokenizer,
AutoModelForSeq2SeqLM,
T5ForConditionalGeneration,
Seq2SeqTrainingArguments,
Trainer,
Seq2SeqTrainer,
)
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from .instruction import (
ATEInstruction,
APCInstruction,
OpinionInstruction,
CategoryInstruction,
)
[docs]class T5Generator:
def __init__(self, checkpoint):
try:
checkpoint = CheckpointManager().parse_checkpoint(checkpoint, "ACOS")
except Exception as e:
print(e)
self.tokenizer = AutoTokenizer.from_pretrained(checkpoint)
self.model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)
self.model.config.max_length = 128
self.data_collator = DataCollatorForSeq2Seq(self.tokenizer)
self.device = autocuda.auto_cuda()
self.model.to(self.device)
[docs] def train(self, tokenized_datasets, **kwargs):
"""
Train the generative model.
"""
# Set training arguments
args = Seq2SeqTrainingArguments(**kwargs)
# Define trainer object
trainer = Seq2SeqTrainer(
self.model,
args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["test"]
if tokenized_datasets.get("test") is not None
else None,
tokenizer=self.tokenizer,
data_collator=self.data_collator,
)
print("Trainer device:", trainer.args.device)
# Finetune the model
torch.cuda.empty_cache()
print("\nModel training started ....")
trainer.train()
# Save best model
trainer.save_model()
return trainer
[docs] def predict(self, text, **kwargs):
"""
Predict the output from the model.
"""
ate_instructor = ATEInstruction()
apc_instructor = APCInstruction()
op_instructor = OpinionInstruction()
cat_instructor = CategoryInstruction()
result = {
"text": text,
}
# ATE inference
inputs = self.tokenizer(
ate_instructor.prepare_input(text), truncation=True, return_tensors="pt"
).to(self.device)
ate_outputs = self.model.generate(**inputs, **kwargs)
ate_outputs = self.tokenizer.batch_decode(
ate_outputs, skip_special_tokens=True
)[0]
result["aspect"] = [asp.strip() for asp in ate_outputs.split("|")]
# APC inference
inputs = self.tokenizer(
apc_instructor.prepare_input(text, ate_outputs),
truncation=True,
return_tensors="pt",
).to(self.device)
apc_outputs = self.model.generate(**inputs, **kwargs)
apc_outputs = self.tokenizer.batch_decode(
apc_outputs, skip_special_tokens=True
)[0]
result["sentiment"] = [sent.strip() for sent in apc_outputs.split("|")]
# Opinion inference
inputs = self.tokenizer(
op_instructor.prepare_input(text, ate_outputs),
truncation=True,
return_tensors="pt",
).to(self.device)
op_outputs = self.model.generate(**inputs, **kwargs)
op_outputs = self.tokenizer.batch_decode(op_outputs, skip_special_tokens=True)[
0
]
result["opinion"] = [op.strip() for op in op_outputs.split("|")]
# Category inference
inputs = self.tokenizer(
cat_instructor.prepare_input(text, ate_outputs),
truncation=True,
return_tensors="pt",
).to(self.device)
cat_outputs = self.model.generate(**inputs, **kwargs)
cat_outputs = self.tokenizer.batch_decode(
cat_outputs, skip_special_tokens=True
)[0]
result["category"] = [cat.strip() for cat in cat_outputs.split("|")]
ensemble_result = {
"text": text,
"Quadruples": [
{
"Aspect": asp,
"Sentiment": sent.partition(":")[2],
"Opinion": op.partition(":")[2],
"Category": cat.partition(":")[2],
}
for asp, sent, op, cat in zip(
result["aspect"],
result["sentiment"],
result["opinion"],
result["category"],
)
],
}
return ensemble_result
[docs] def get_labels(
self,
tokenized_dataset,
trained_model_path=None,
predictor=None,
batch_size=4,
sample_set="train",
):
"""
Get the predictions from the trained model.
"""
if not predictor:
print("Prediction from checkpoint")
def collate_fn(batch):
input_ids = [torch.tensor(example["input_ids"]) for example in batch]
input_ids = pad_sequence(
input_ids,
batch_first=True,
padding_value=self.tokenizer.pad_token_id,
)
return input_ids
dataloader = DataLoader(
tokenized_dataset[sample_set],
batch_size=batch_size,
collate_fn=collate_fn,
)
predicted_output = []
self.model.to(self.device)
print("Model loaded to: ", self.device)
for batch in tqdm(dataloader):
batch = batch.to(self.device)
output_ids = self.model.generate(batch)
output_texts = self.tokenizer.batch_decode(
output_ids, skip_special_tokens=True
)
for output_text in output_texts:
predicted_output.append(output_text)
else:
print("Prediction from trainer")
output_ids = predictor.predict(
test_dataset=tokenized_dataset[sample_set]
).predictions
predicted_output = self.tokenizer.batch_decode(
output_ids, skip_special_tokens=True
)
return predicted_output
[docs] def get_aspect_metrics(self, true_aspects, pred_aspects):
aspect_p = precision_score(true_aspects, pred_aspects, average="macro")
aspect_r = recall_score(true_aspects, pred_aspects, average="macro")
aspect_f1 = f1_score(true_aspects, pred_aspects, average="macro")
return aspect_p, aspect_r, aspect_f1
[docs] def get_classic_metrics(self, y_true, y_pred):
valid_gts = []
valid_preds = []
for gt, pred in zip(y_true, y_pred):
gt_list = gt.split("|")
pred_list = pred.split("|")
while gt_list:
gt_val = gt_list[-1].strip().lower()
for pred_val in pred_list:
pred_val = pred_val.strip().lower()
gt_key, _, gt_label = gt_val.partition(":")
pred_key, _, pred_label = pred_val.partition(":")
if gt_key.startswith(pred_key):
if gt_label:
valid_gts.append(gt_label)
else:
break
if pred_label:
valid_preds.append(pred_label)
else:
valid_preds.append("")
break
gt_list.pop()
report = sklearn.metrics.classification_report(valid_gts, valid_preds)
print(report)
accuracy = sklearn.metrics.accuracy_score(valid_gts, valid_preds)
precision = precision_score(valid_gts, valid_preds, average="macro")
recall = recall_score(valid_gts, valid_preds, average="macro")
f1 = f1_score(valid_gts, valid_preds, average="macro")
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1": f1,
}
# def get_classic_metrics(self, y_true, y_pred):
#
# tp = 1e-6
# for gt, pred in zip(y_true, y_pred):
# print(gt)
# print(pred)
# if pred.strip().lower() == gt.strip().lower():
# tp += 1
# p = tp / len(y_true)
# r = tp / len(y_true)
# return {"precision": p, "recall": r, "f1": 2 * p * r / (p + r)}
[docs] def get_metrics(self, y_true, y_pred):
total_pred = 1e-6
total_gt = 1e-6
true_aspects = []
pred_aspects = []
true_opinions = []
pred_opinions = []
true_sentiments = []
pred_sentiments = []
true_categories = []
pred_categories = []
for gt, pred in zip(y_true, y_pred):
gt_list, _, _ = gt.partition("<EndOfAnswer>")
pred_list, _, _ = pred.partition("<EndOfAnswer>")
gt_list = gt_list.split(",")
pred_list = pred_list.split(",")
total_pred += len(pred_list)
total_gt += len(gt_list)
for gt_val, pred_val in zip(gt_list, pred_list):
try:
true_aspects.append(gt_val[0])
pred_aspects.append(pred_val[0])
except Exception as e:
pred_aspects.append("NULL")
try:
true_opinions.append(gt_val[1])
pred_opinions.append(pred_val[1])
except Exception as e:
pred_opinions.append("NULL")
try:
true_sentiments.append(gt_val[2])
pred_sentiments.append(pred_val[2])
except Exception as e:
pred_sentiments.append("NULL")
try:
true_categories.append(gt_val[3])
pred_categories.append(pred_val[3])
except Exception as e:
pred_categories.append("NULL")
aspect_p, aspect_r, aspect_f1 = self.get_aspect_metrics(
true_aspects, pred_aspects
)
opinion_p, opinion_r, opinion_f1 = self.get_aspect_metrics(
true_opinions, pred_opinions
)
sentiment_p, sentiment_r, sentiment_f1 = self.get_aspect_metrics(
true_sentiments, pred_sentiments
)
category_p, category_r, category_f1 = self.get_aspect_metrics(
true_categories, pred_categories
)
return {
"avg_precision": (aspect_p + opinion_p + sentiment_p + category_p) / 4,
"avg_recall": (aspect_r + opinion_r + sentiment_r + category_r) / 4,
"avg_f1": (aspect_f1 + opinion_f1 + sentiment_f1 + category_f1) / 4,
"aspect_p": aspect_p,
"aspect_r": aspect_r,
"aspect_f1": aspect_f1,
"opinion_p": opinion_p,
"opinion_r": opinion_r,
"opinion_f1": opinion_f1,
"sentiment_p": sentiment_p,
"sentiment_r": sentiment_r,
"sentiment_f1": sentiment_f1,
"category_p": category_p,
"category_r": category_r,
"category_f1": category_f1,
}
[docs]class T5Classifier:
def __init__(self, model_checkpoint):
self.tokenizer = AutoTokenizer.from_pretrained(
model_checkpoint, force_download=True
)
self.model = T5ForConditionalGeneration.from_pretrained(
model_checkpoint, force_download=True
)
self.data_collator = DataCollatorForSeq2Seq(self.tokenizer)
[docs] def train(self, tokenized_datasets, **kwargs):
"""
Train the generative model.
"""
# Set training arguments
args = Seq2SeqTrainingArguments(**kwargs)
# Define trainer object
trainer = Trainer(
self.model,
args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["test"]
if tokenized_datasets.get("test") is not None
else None,
tokenizer=self.tokenizer,
data_collator=self.data_collator,
)
print("Trainer device:", trainer.args.device)
# Finetune the model
torch.cuda.empty_cache()
print("\nModel training started ....")
trainer.train()
# Save best model
trainer.save_model()
return trainer
[docs] def get_labels(
self, tokenized_dataset, predictor=None, batch_size=4, sample_set="train"
):
"""
Get the predictions from the trained model.
"""
if not predictor:
print("Prediction from checkpoint")
def collate_fn(batch):
input_ids = [torch.tensor(example["input_ids"]) for example in batch]
input_ids = pad_sequence(
input_ids,
batch_first=True,
padding_value=self.tokenizer.pad_token_id,
)
return input_ids
dataloader = DataLoader(
tokenized_dataset[sample_set],
batch_size=batch_size,
collate_fn=collate_fn,
)
predicted_output = []
self.model.to(self.device)
print("Model loaded to: ", self.device)
for batch in tqdm(dataloader):
batch = batch.to(self.device)
output_ids = self.model.to.generate(batch)
output_texts = self.tokenizer.batch_decode(
output_ids, skip_special_tokens=True
)
for output_text in output_texts:
predicted_output.append(output_text)
else:
print("Prediction from trainer")
pred_proba = predictor.predict(
test_dataset=tokenized_dataset[sample_set]
).predictions[0]
output_ids = np.argmax(pred_proba, axis=2)
predicted_output = self.tokenizer.batch_decode(
output_ids, skip_special_tokens=True
)
return predicted_output
[docs] def get_metrics(self, y_true, y_pred):
cnt = 0
for gt, pred in y_true, y_pred:
if gt == pred:
cnt += 1
return cnt / len(y_true)
[docs]class ABSAGenerator(T5Generator):
pass