Source code for SciAssist.utils.data_utils

from typing import List, Dict

import nltk
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer
from transformers import DataCollatorForSeq2Seq

from SciAssist import BASE_CACHE_DIR
from SciAssist.datamodules.components.cora_label import label2id as cora_label2id
from SciAssist.models.components.bart_summarization import BartForSummarization
from SciAssist.models.components.flant5_summarization import FlanT5ForSummarization


class MyDatasetExtraction(Dataset):
    def __init__(self, tokenizer, dataset, token_pad_idx = 0, tag_pad_idx = -1):
        self.batch_size = 32
        self.max_len = 128
        self.token_pad_idx = token_pad_idx
        self.tag_pad_idx = tag_pad_idx
        self.tag2idx = {'B-DATA': 0, 'I-DATA': 1, 'O': 2}
        self.idx2tag = {0: 'B-DATA', 1: 'I-DATA', 2: 'O'}
        self.tokenizer = tokenizer
        self.dataset = self.preprocess(dataset)

     
    def __len__(self):
        """get dataset size"""
        return self.dataset['size']
 

    def __getitem__(self, idx):
        """sample data to get batch"""
        sentences = self.dataset['data'][idx]

        if 'labels' in self.dataset.keys():
            labels = self.dataset['labels'][idx]
            tags = self.dataset['tags'][idx]
            return [sentences, labels, tags]
        else:
            return [sentences]


    def preprocess(self, dataset):
        """Loads sentences and tags from their corresponding files. 
            Maps tokens and tags to their indices and stores them in the provided dict d.
        """
        processed_dataset = {}
        sentences = []
        if 'labels' in dataset.keys():
            labels = []
            tags = []

        for line in dataset['data']['text']:
            # replace each token by its index
            tokens = line.strip().split(' ')
            # print(tokens)
            subwords = list(map(self.tokenizer.tokenize, tokens)) # 每个词切分成子词
            # print(subwords)
            subword_lengths = list(map(len, subwords)) # 记录子词的长度,用于对齐tag
            # print(subword_lengths)
            subwords = ['[CLS]'] + [item for indices in subwords for item in indices]
            # subwords = ['<s>'] + [item for indices in subwords for item in indices] # 组成输入 token
            # print(subwords)
            token_start_idxs = 1 + np.cumsum([0] + subword_lengths[:-1]) # 记录每个token开始的位置
            # print(token_start_idxs)
            # print(self.tokenizer.convert_tokens_to_ids(subwords), token_start_idxs)
            sentences.append((self.tokenizer.convert_tokens_to_ids(subwords), token_start_idxs))

        if 'labels' in dataset.keys():
            for line in dataset['labels']['text']:
                labels.append(int(line.strip()))
            assert len(sentences) == len(labels)
            processed_dataset['labels'] = labels

            for line in dataset['tags']['text']:
                # replace each tag by its index
                tag_seq = [self.tag2idx.get(tag) for tag in line.strip().split(' ')]
                tags.append(tag_seq)
            # checks to ensure there is a tag for each token
            assert len(sentences) == len(tags)
            for i in range(len(sentences)):
                assert len(tags[i]) == len(sentences[i][-1])
            processed_dataset['tags'] = tags

        processed_dataset['data'] = sentences
        processed_dataset['size'] = len(sentences)

        return processed_dataset


    def collate_fn(self, batch):
        sentences = [x[0] for x in batch]
        processed_batch = {}
        if len(batch[0]) == 3:
            labels = [x[1] for x in batch]
            tags = [x[2] for x in batch]

        # batch length
        batch_len = len(sentences)  # batch size
        batch_max_subwords_len = max([len(s[0]) for s in sentences])
        max_subword_len = min(batch_max_subwords_len, self.max_len)
        max_token_len = 0
 
        # padding data 初始化
        batch_data = self.token_pad_idx * np.ones((batch_len, max_subword_len))
        batch_token_starts = []
 
        # padding and aligning
        for j in range(batch_len):
            cur_subwords_len = len(sentences[j][0])  # word_id list
            if cur_subwords_len <= max_subword_len:
                batch_data[j][:cur_subwords_len] = sentences[j][0]
            else:
                batch_data[j] = sentences[j][0][:max_subword_len]
            token_start_ids = sentences[j][-1]
            token_starts = np.zeros(max_subword_len)
            token_starts[[idx for idx in token_start_ids if idx < max_subword_len]] = 1
            batch_token_starts.append(token_starts)
            max_token_len = max(int(sum(token_starts)), max_token_len)

        processed_batch['input_subwords'] = torch.tensor(batch_data, dtype = torch.long)
        processed_batch['input_token_start_indexs'] = torch.tensor(np.array(batch_token_starts), dtype = torch.long)
        # processed_batch['attention_mask'] = (processed_batch['input_subwords'] != 1)
        processed_batch['attention_mask'] = processed_batch['input_subwords'].gt(0)

        if len(batch[0]) == 3:
            batch_tags = self.tag_pad_idx * np.ones((batch_len, max_token_len))
            batch_labels = np.ones((batch_len, ))
            for j in range(batch_len):
                batch_labels[j] = labels[j]
                cur_tags_len = len(tags[j])
                if cur_tags_len <= max_token_len:
                    batch_tags[j][:cur_tags_len] = tags[j]
                else:
                    batch_tags[j] = tags[j][:max_token_len]
            processed_batch['ner_tags'] = torch.tensor(np.array(batch_tags), dtype = torch.long)
            processed_batch['cls_labels'] = torch.tensor(np.array(batch_labels), dtype = torch.long)

        return processed_batch


class DataUtilsForDatasetExtraction():
    """

    Args:
        tokenizer (`PretrainedTokenizer`, default to None):
            The tokenizer for tokenization.
        checkpoint (`str`):
            The checkpoint from which the tokenizer is loaded.
        model_max_length (`int`, *optional*): The max sequence length the model accepts.
        max_source_length (`int`, *optional*): The max length of the input text.
        max_target_length (`int`, *optional*): The max length of the generated summary.
    """
    def __init__(self, tokenizer = None,
                 checkpoint = "allenai/scibert_scivocab_uncased",
                 model_max_length = 128
                 ):

        self.checkpoint = checkpoint
        self.model_max_length = model_max_length

        if tokenizer is None:
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.checkpoint,
                model_max_length = self.model_max_length,
                cache_dir=BASE_CACHE_DIR,
                use_fast=True
            )
        else:
            self.tokenizer = tokenizer

        self.tag2idx = {'B-DATA': 0, 'I-DATA': 1, 'O': 2}
        self.idx2tag = {0: 'B-DATA', 1: 'I-DATA', 2: 'O'}


    def tokenize_and_align_labels(self, dataset):

        """

        Process the dataset for model input, for example, do tokenization and prepare label_ids.

        Args:
            dataset (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]}
            inputs (`str`): The name of input column
            labels (`str`): The name of target column

        Returns:
            `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids }

        """
        
        """Loads sentences and tags from their corresponding files.
            Maps tokens and tags to their indices and stores them in the provided dict d.
        """
        processed_dataset = MyDatasetExtraction(self.tokenizer, dataset)

        return processed_dataset


    def postprocess(self, ner_output, cls_output, batch_tags, batch_labels):

        """
        Process model's outputs and get the final results rather than simple ids.

        Args:
            preds (Tensor): Prediction labels, the output of the model.
            labels (Tensor): True labels

        Returns:
            `(LongTensor, LongTensor)`: decoded_preds, decoded_labels

        """
        pred_tags = []
        true_tags = []
        pred_labels = []
        true_labels = []

        ner_output = ner_output.detach().cpu().numpy()
        cls_output = cls_output.detach().cpu().numpy()
        batch_tags = batch_tags.to('cpu').numpy()
        batch_labels = batch_labels.to('cpu').numpy()

        pred_tags.extend([[self.idx2tag.get(idx) for idx in indices] for indices in np.argmax(ner_output, axis=2)])
        true_tags.extend([[self.idx2tag.get(idx) if idx != -1 else 'O' for idx in indices] for indices in batch_tags])
        true_labels.extend(batch_labels)

        pred_labels.extend(cls_output)
        pred_labels = np.argmax(pred_labels, axis=1)

        assert len(pred_tags) == len(true_tags)

        return pred_tags, true_tags, pred_labels, true_labels


    def get_dataloader(self, dataset):

        """
        Generate DataLoader for a dataset.

        Args:
            dataset (`Dataset`): The raw dataset.
            inputs_column (`str`): Column name of the inputs.
            labels_column (`str`): Column name of the labels.

        Returns:
            `DataLoader`: A dataloader for the dataset. Will be used for inference.
        """
        tokenized_dataset = self.tokenize_and_align_labels(dataset)

        dataloader = DataLoader(
            dataset=tokenized_dataset,
            batch_size=32,
            collate_fn=tokenized_dataset.collate_fn,
        )

        return dataloader


[docs]class DataUtilsForSeq2Seq(): """ Args: tokenizer (`PretrainedTokenizer`, default to None): The tokenizer for tokenization. checkpoint (`str`): The checkpoint from which the tokenizer is loaded. model_max_length (`int`, *optional*): The max sequence length the model accepts. max_source_length (`int`, *optional*): The max length of the input text. max_target_length (`int`, *optional*): The max length of the generated summary. """ def __init__(self, tokenizer = None, model_class = BartForSummarization, checkpoint = "facebook/bart-large-cnn", model_max_length = 1024, max_source_length = 1024, max_target_length = 128, ): self.checkpoint = checkpoint self.model_max_length = model_max_length self.max_source_length = max_source_length self.max_target_length = max_target_length self.model_class = model_class if tokenizer is None: self.tokenizer = AutoTokenizer.from_pretrained( self.checkpoint, model_max_length = self.model_max_length, cache_dir=BASE_CACHE_DIR, use_fast=True ) else: self.tokenizer = tokenizer
[docs] def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary"): """ Process the dataset for model input, for example, do tokenization and prepare label_ids. Args: examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} inputs (`str`): The name of input column labels (`str`): The name of target column Returns: `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } """ # Select input column inputs = examples[inputs_column] # Setup the tokenizer for inputs model_inputs = self.tokenizer(inputs, max_length=self.max_source_length, padding="max_length", truncation=True) # Select target column if labels_column in examples.keys(): labels = examples[labels_column] # Setup the tokenizer for targets with self.tokenizer.as_target_tokenizer(): labels = self.tokenizer(labels, max_length=self.max_target_length, padding="max_length", truncation=True) # Ignore padding in the loss labels["input_ids"] = [ [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] ] model_inputs["labels"] = labels["input_ids"] return model_inputs
[docs] def collator(self): """ The collating function. Returns: `function`: A collating function. For example, **DataCollatorForSeq2Seq(...)**. You can also custom a collating function, but remember that `collator()` needs to return a **function**. """ return DataCollatorForSeq2Seq(self.tokenizer, model=self.model_class, pad_to_multiple_of=8)
[docs] def postprocess(self, preds, labels): """ Process model's outputs and get the final results rather than simple ids. Args: preds (Tensor): Prediction labels, the output of the model. labels (Tensor): True labels Returns: `(LongTensor, LongTensor)`: decoded_preds, decoded_labels """ decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) labels = np.array(labels.to("cpu")) # Replace -100 in the labels as we can't decode them. labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True) decoded_preds = [pred.strip() for pred in decoded_preds] decoded_labels = [label.strip() for label in decoded_labels] # rougeLSum expects newline after each sentence decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] return decoded_preds, decoded_labels
[docs] def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): """ Generate DataLoader for a dataset. Args: dataset (`Dataset`): The raw dataset. inputs_column (`str`): Column name of the inputs. labels_column (`str`): Column name of the labels. Returns: `DataLoader`: A dataloader for the dataset. Will be used for inference. """ tokenized_example = dataset.map( lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), batched=True, remove_columns=dataset.column_names ) dataloader = DataLoader( dataset=tokenized_example, batch_size=8, collate_fn=self.collator(), ) return dataloader
class DataUtilsForTokenClassification(): def __init__(self, tokenizer = None, checkpoint ="allenai/scibert_scivocab_uncased", model_max_length = 512, ): self.checkpoint = checkpoint self.model_max_length = model_max_length if tokenizer == None: self.tokenizer = AutoTokenizer.from_pretrained( self.checkpoint, model_max_length=self.model_max_length, cache_dir=BASE_CACHE_DIR ) else: self.tokenizer = tokenizer def tokenize_and_align_labels(self, examples, label2id=None): ''' Prepare the dataset for input. For token-level task, construct token_mapping to obtain token based BERT representation from subtoken based one. Args: examples: Dataset, {"tokens":[[s1],[s2]..],"labels":[[l1],[l2]..]} label2id: Map label to label_id Returns: Dict{ "input_ids":, "token_type_ids":, "attention_mask":, "token_mapping":, "labels":, } ''' # Get input_ids, token_type_ids, attention_mask tokenized_inputs = self.tokenizer( examples["tokens"], truncation=True, is_split_into_words=True ) if "labels" in examples.keys(): raw_labels = examples["labels"] # label2id labels = [[int(label2id[l]) for l in label] for label in raw_labels] tokenized_inputs["labels"] = labels # Map sub-token to token tokenized_inputs["word_ids"] = [] for i in range(len(examples["tokens"])): tokenized_inputs["word_ids"].append(tokenized_inputs.word_ids(i)) # Prepare token_mapping for obtaining token based BERT representeation # Construct a subtoken to token mapping matrix token_mapping mapping [bsize, max_tok_len, max_subtok_len]. # For example, in sent i, token j include subtokens[s:t), then mapping[i, j, s:t] = 1 / (t - s) # after obtaining subtoken based BERT representation `subtoken_context`[bsize, max_subtok_len, 768], use torch.matmul() # to obtain token based BERT representation # token_context = torch.matmul(token_mapping, subtoken_context) token_mappings = [] for tokens, word_ids in zip(examples["tokens"], tokenized_inputs["word_ids"]): current_tok = 0 # len(subtok_count) == the length of tokens for input, maybe smaller than origin ones # calculate the number of subtokens of a token subtok_count = [0] for tok_id in word_ids: if tok_id == None: continue if tok_id == current_tok: subtok_count[current_tok] += 1 else: current_tok += 1 subtok_count.append(1) # construct token_mapping token_mapping = [] for i in range(len(subtok_count)): token_mapping.append([]) for j in range(len(word_ids)): token_mapping[i].append(0) for subtok_id, tok_id in enumerate(word_ids): if tok_id == None: continue token_mapping[tok_id][subtok_id] = 1 / subtok_count[tok_id] token_mappings.append(token_mapping) tokenized_inputs["token_mapping"] = token_mappings return tokenized_inputs def convert_to_list(self, batch): res = [] for i in batch: input_ids = i["input_ids"] token_type_ids = i["token_type_ids"] attn_mask = i["attention_mask"] token_mapping = i["token_mapping"] if "labels" in i.keys(): labels = i["labels"] res.append([input_ids, token_type_ids, attn_mask, token_mapping, labels]) else: res.append([input_ids, token_type_ids, attn_mask, token_mapping]) return res def pad(self, batch: List[Dict]): # Pads to the longest sample batch = self.convert_to_list(batch) get_element = lambda x: [sample[x] for sample in batch] # subtoken length subtok_len = [len(tokens) for tokens in get_element(0)] max_subtok_len = np.array(subtok_len).max() # origin token length tok_len = [len(tokens) for tokens in get_element(3)] max_tok_len = np.array(tok_len).max() do_pad = lambda x, seqlen: [sample[x] + [0] * (seqlen - len(sample[x])) for sample in batch] # 0: <pad> do_labels_pad = lambda x, seqlen: [sample[x] + [-100] * (seqlen - len(sample[x])) for sample in batch] # pad for origin tokens do_map_pad1 = lambda x, seqlen: [sample[x] + [[0]] * (seqlen - len(sample[x])) for sample in batch] # pad for subtokens do_map_pad2 = lambda batch, seqlen: [[subtoks + [0] * (seqlen - len(subtoks)) for subtoks in sample] for sample in batch] input_ids = do_pad(0, max_subtok_len) token_type_ids = do_pad(1, max_subtok_len) attn_mask = do_pad(2, max_subtok_len) token_mapping = do_map_pad1(3, max_tok_len) token_mapping = do_map_pad2(token_mapping, max_subtok_len) # [batch_size, max_tok_len, max_subtok_len] LT = torch.LongTensor input_ids = LT(input_ids) attn_mask = LT(attn_mask) token_type_ids = LT(token_type_ids) token_mapping = torch.Tensor(token_mapping) if len(batch[0]) == 5: labels = do_labels_pad(4, max_tok_len) labels = LT(labels) else: return { "input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": attn_mask, "token_mapping": token_mapping } return { "input_ids": input_ids, "token_type_ids": token_type_ids, "attention_mask": attn_mask, "labels": labels, "token_mapping": token_mapping } def collator(self): return self.pad def postprocess(self, preds, labels, label_names): ''' Remove `-100` label and mask the padded labels with len(label_names). Args: preds (Tensor): Prediction labels labels (Tensor): True labels label_names (List): Label types Returns: (LongTensor, LongTensor): ''' preds = preds.tolist() labels = labels.tolist() do_pad = lambda x, seqlen: [x + [len(label_names)] * (seqlen - len(x))] true_preds, true_labels = [], [] for pred, label in zip(preds, labels): true_len = 0 for l in label: if l == -100: break else: true_len += 1 true_preds.append(do_pad(pred[:true_len], len(label))) true_labels.append(do_pad(label[:true_len], len(label))) true_labels = torch.LongTensor(true_labels) true_preds = torch.LongTensor(true_preds) return true_preds, true_labels def get_dataloader(self, dataset, label2id = cora_label2id): tokenized_example = dataset.map( lambda x: self.tokenize_and_align_labels(x, label2id), batched=True, remove_columns=dataset.column_names ) dataloader = DataLoader( dataset=tokenized_example, batch_size=8, collate_fn=self.collator(), ) return dataloader class DataUtilsForT5(): """ Args: tokenizer (`PretrainedTokenizer`, default to None): The tokenizer for tokenization. checkpoint (`str`): The checkpoint from which the tokenizer is loaded. model_max_length (`int`, *optional*): The max sequence length the model accepts. max_source_length (`int`, *optional*): The max length of the input text. max_target_length (`int`, *optional*): The max length of the generated summary. """ def __init__(self, tokenizer = None, model_class = FlanT5ForSummarization, checkpoint = "facebook/bart-large-cnn", prompt = "Please give a summary of the following text: ", model_max_length = 1024, max_source_length = 1024, max_target_length = 128, ): self.checkpoint = checkpoint self.model_max_length = model_max_length self.max_source_length = max_source_length self.max_target_length = max_target_length self.model_class = model_class self.prompt = prompt if tokenizer is None: self.tokenizer = AutoTokenizer.from_pretrained( self.checkpoint, model_max_length = self.model_max_length, cache_dir=BASE_CACHE_DIR, use_fast=True ) else: self.tokenizer = tokenizer def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary"): """ Process the dataset for model input, for example, do tokenization and prepare label_ids. Args: examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} inputs (`str`): The name of input column labels (`str`): The name of target column Returns: `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } """ # Select input column inputs = examples[inputs_column] inputs = [self.prompt + raw_text for raw_text in inputs] # Setup the tokenizer for inputs model_inputs = self.tokenizer(inputs, max_length=self.max_source_length, padding="max_length", truncation=True) # Select target column if labels_column in examples.keys(): labels = examples[labels_column] # Setup the tokenizer for targets with self.tokenizer.as_target_tokenizer(): labels = self.tokenizer(labels, max_length=self.max_target_length, padding="max_length", truncation=True) # Ignore padding in the loss labels["input_ids"] = [ [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] ] model_inputs["labels"] = labels["input_ids"] return model_inputs def collator(self): """ The collating function. Returns: `function`: A collating function. For example, **DataCollatorForSeq2Seq(...)**. You can also custom a collating function, but remember that `collator()` needs to return a **function**. """ return DataCollatorForSeq2Seq(self.tokenizer, model=self.model_class, pad_to_multiple_of=8) def postprocess(self, preds, labels): """ Process model's outputs and get the final results rather than simple ids. Args: preds (Tensor): Prediction labels, the output of the model. labels (Tensor): True labels Returns: `(LongTensor, LongTensor)`: decoded_preds, decoded_labels """ decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) labels = np.array(labels.to("cpu")) # Replace -100 in the labels as we can't decode them. labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True) decoded_preds = [pred.strip() for pred in decoded_preds] decoded_labels = [label.strip() for label in decoded_labels] # rougeLSum expects newline after each sentence decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] return decoded_preds, decoded_labels def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): """ Generate DataLoader for a dataset. Args: dataset (`Dataset`): The raw dataset. inputs_column (`str`): Column name of the inputs. labels_column (`str`): Column name of the labels. Returns: `DataLoader`: A dataloader for the dataset. Will be used for inference. """ tokenized_example = dataset.map( lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), batched=True, remove_columns=dataset.column_names ) dataloader = DataLoader( dataset=tokenized_example, batch_size=8, collate_fn=self.collator(), ) return dataloader class DataUtilsForFlanT5(): """ Args: tokenizer (`PretrainedTokenizer`, default to None): The tokenizer for tokenization. checkpoint (`str`): The checkpoint from which the tokenizer is loaded. model_max_length (`int`, *optional*): The max sequence length the model accepts. max_source_length (`int`, *optional*): The max length of the input text. max_target_length (`int`, *optional*): The max length of the generated summary. """ def __init__(self, tokenizer = None, model_class = FlanT5ForSummarization, checkpoint = "google/flan-t5-base", prompt = "Please give a summary of the following text ", model_max_length = 1024, max_source_length = 1024, max_target_length = 500, ): self.checkpoint = checkpoint self.model_max_length = model_max_length self.max_source_length = max_source_length self.max_target_length = max_target_length self.model_class = model_class self.prompt = prompt if tokenizer is None: self.tokenizer = AutoTokenizer.from_pretrained( self.checkpoint, model_max_length = self.model_max_length, cache_dir=BASE_CACHE_DIR, use_fast=True ) else: self.tokenizer = tokenizer def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary"): """ Process the dataset for model input, for example, do tokenization and prepare label_ids. Args: examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} inputs (`str`): The name of input column labels (`str`): The name of target column Returns: `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } """ inputs = examples[inputs_column] prompts = [ self.prompt for i in inputs ] # kw_instructions = ["{}"] def kw(prompt, keyword): # i = randint(1,10) if keyword is not None and keyword != [""]: return "Keywords: [ " + str(", ".join(keyword)) + " ]. " + prompt + "based on these keywords " if keyword is not None else prompt return prompt def leng(prompt, length): if length is not None: return prompt + ", which has less than " + str(length) + " words " if length is not None else prompt return prompt if "keywords" in examples.keys(): keywords = examples["keywords"] if keywords is not None: prompts = [ kw(prompt,keyword) for (prompt,keyword) in zip(prompts,keywords) ] if "length" in examples.keys(): if examples["length"] is not None: lengths = examples["length"] prompts = [ leng(prompt,length) for (prompt,length) in zip(prompts,lengths)] inputs = [ prompt + ": " + raw_text for (prompt,raw_text) in zip(prompts, inputs) ] # Setup the tokenizer for inputs model_inputs = self.tokenizer(inputs, max_length=self.max_source_length, padding="max_length", truncation=True) # Select target column if labels_column in examples.keys(): labels = examples[labels_column] # Setup the tokenizer for targets with self.tokenizer.as_target_tokenizer(): labels = self.tokenizer(labels, max_length=self.max_target_length, padding="max_length", truncation=True) # Ignore padding in the loss labels["input_ids"] = [ [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] ] model_inputs["labels"] = labels["input_ids"] #when test # if "id" in examples.keys(): # model_inputs["id"] = examples["id"] # if "length" in examples.keys() and examples["length"] is not None: # if examples["length"][0] is not None: # model_inputs["length"] = examples["length"] return model_inputs def collator(self): """ The collating function. Returns: `function`: A collating function. For example, **DataCollatorForSeq2Seq(...)**. You can also custom a collating function, but remember that `collator()` needs to return a **function**. """ return DataCollatorForSeq2Seq(self.tokenizer, model=self.model_class, pad_to_multiple_of=8) def postprocess(self, preds, labels): """ Process model's outputs and get the final results rather than simple ids. Args: preds (Tensor): Prediction labels, the output of the model. labels (Tensor): True labels Returns: `(LongTensor, LongTensor)`: decoded_preds, decoded_labels """ decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) labels = np.array(labels.to("cpu")) # Replace -100 in the labels as we can't decode them. labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True) decoded_preds = [pred.strip() for pred in decoded_preds] decoded_labels = [label.strip() for label in decoded_labels] # rougeLSum expects newline after each sentence decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] return decoded_preds, decoded_labels def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): """ Generate DataLoader for a dataset. Args: dataset (`Dataset`): The raw dataset. inputs_column (`str`): Column name of the inputs. labels_column (`str`): Column name of the labels. Returns: `DataLoader`: A dataloader for the dataset. Will be used for inference. """ tokenized_example = dataset.map( lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), batched=True, remove_columns=dataset.column_names ) dataloader = DataLoader( dataset=tokenized_example, batch_size=8, collate_fn=self.collator(), ) return dataloader # class DataUtilsForFiD(): # """ # # Args: # tokenizer (`PretrainedTokenizer`, default to None): # The tokenizer for tokenization. # checkpoint (`str`): # The checkpoint from which the tokenizer is loaded. # model_max_length (`int`, *optional*): The max sequence length the model accepts. # max_source_length (`int`, *optional*): The max length of the input text. # max_target_length (`int`, *optional*): The max length of the generated summary. # """ # # def __init__(self, tokenizer = None, model_class = FiDT5, # checkpoint = "google/flan-t5-large", # model_max_length = 64, # max_source_length = 64, # max_target_length = 128, # ): # # self.checkpoint = checkpoint # self.model_max_length = model_max_length # self.max_source_length = max_source_length # self.max_target_length = max_target_length # self.model_class = model_class # # if tokenizer is None: # self.tokenizer = AutoTokenizer.from_pretrained( # self.checkpoint, # model_max_length = self.model_max_length, # cache_dir=BASE_CACHE_DIR, # ) # else: # self.tokenizer = tokenizer # # # def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary", token_per_paragraph=50): # # """ # # Process the dataset for model input, for example, do tokenization and prepare label_ids. # # Args: # examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} # inputs (`str`): The name of input column # labels (`str`): The name of target column # # Returns: # `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } # # """ # # # Select input column # inputs = examples[inputs_column] # dataset = {"paragraphs": []} # # for input in inputs: # texts = ["Please give a summary of the following text: "] # tokens = input.split(" ") # index = 0 # while index+token_per_paragraph < min(len(tokens),120*token_per_paragraph): # p = " ".join(tokens[index:index+token_per_paragraph]) # texts.append(p) # index += token_per_paragraph # texts.append(" ".join(tokens[index:index+token_per_paragraph])) # dataset["paragraphs"].append(texts) # # # # Select target column # if labels_column in examples.keys(): # labels = examples[labels_column] # dataset["labels"] = labels # # return dataset # # def collator(self): # # """ # # The collating function. # # Returns: # `function`: A collating function. # # For example, **DataCollatorForSeq2Seq(...)**. # # You can also custom a collating function, but remember that `collator()` needs to return a **function**. # """ # # from SciAssist.utils.collators.CollatorForFid import DataCollatorForFid # # return DataCollatorForFid(self.max_source_length, self.tokenizer, self.max_target_length) # # def postprocess(self, preds, labels): # # """ # Process model's outputs and get the final results rather than simple ids. # # Args: # preds (Tensor): Prediction labels, the output of the model. # labels (Tensor): True labels # # Returns: # `(LongTensor, LongTensor)`: decoded_preds, decoded_labels # # """ # # decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) # # labels = np.array(labels.to("cpu")) # # Replace -100 in the labels as we can't decode them. # labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) # # decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True) # # decoded_preds = [pred.strip() for pred in decoded_preds] # decoded_labels = [label.strip() for label in decoded_labels] # # # rougeLSum expects newline after each sentence # decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] # decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] # # return decoded_preds, decoded_labels # # def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): # # """ # Generate DataLoader for a dataset. # # Args: # dataset (`Dataset`): The raw dataset. # inputs_column (`str`): Column name of the inputs. # labels_column (`str`): Column name of the labels. # # Returns: # `DataLoader`: A dataloader for the dataset. Will be used for inference. # """ # # tokenized_example = dataset.map( # lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), # batched=True, # remove_columns=dataset.column_names # ) # dataloader = DataLoader( # dataset=tokenized_example, # batch_size=8, # collate_fn=self.collator(), # ) # # return dataloader # # # # class DataUtilsForFrost(): # """ # # Args: # tokenizer (`PretrainedTokenizer`, default to None): # The tokenizer for tokenization. # checkpoint (`str`): # The checkpoint from which the tokenizer is loaded. # model_max_length (`int`, *optional*): The max sequence length the model accepts. # max_source_length (`int`, *optional*): The max length of the input text. # max_target_length (`int`, *optional*): The max length of the generated summary. # """ # # # def __init__(self, tokenizer = None, model_class = FrostForSummarization, # checkpoint = "pegasus/frost", # model_max_length = 1024, # max_source_length = 1024, # max_target_length = 128, # ): # # self.checkpoint = checkpoint # self.model_max_length = model_max_length # self.max_source_length = max_source_length # self.max_target_length = max_target_length # self.model_class = model_class # # if tokenizer is None: # self.tokenizer = PegasusTokenizer.from_pretrained( # self.checkpoint, # cache_dir=BASE_CACHE_DIR, # model_max_length=self.model_max_length, # ) # else: # self.tokenizer = tokenizer # # # FROST Constants # self.ENTITYCHAIN_START_TOKEN = "[CONTENT]" # self.SUMMARY_START_TOKEN = "[SUMMARY]" # self.ENTITY_SEPARATOR = " | " # self.ENTITY_SENTENCE_SEPARATOR = " ||| " # # # Prepare Spacy processor # self.SPACY_MODEL_OR_PATH = "en_core_web_sm" # self.SPACY_PROCESSOR = spacy.load(self.SPACY_MODEL_OR_PATH) # # def get_frost_labels(self, text): # """Gets Spacy Frost processor.""" # entity_plans = [] # for text_sent in self.SPACY_PROCESSOR(text.replace("\n", " ")).sents: # entity_plans.append( # self.ENTITY_SEPARATOR.join( # [entity.text for entity in self.SPACY_PROCESSOR(text_sent.text).ents])) # text_with_entityplans = ( # self.ENTITYCHAIN_START_TOKEN + " " + # self.ENTITY_SENTENCE_SEPARATOR.join(entity_plans) + " " + # self.SUMMARY_START_TOKEN + " " + text) # return text_with_entityplans # # # def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary"): # # """ # # Process the dataset for model input, for example, do tokenization and prepare label_ids. # # Args: # examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} # inputs (`str`): The name of input column # labels (`str`): The name of target column # # Returns: # `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } # # """ # # # Select input column # inputs = examples[inputs_column] # # # Setup the tokenizer for inputs # model_inputs = self.tokenizer(inputs, max_length=self.max_target_length, padding="max_length", truncation=True) # # # Select target column # if labels_column in examples.keys(): # labels = examples[labels_column] # labels = [self.get_frost_labels(label) for label in labels] # # # Setup the tokenizer for targets # with self.tokenizer.as_target_tokenizer(): # labels = self.tokenizer(labels, max_length=self.max_target_length, padding="max_length", truncation=True) # # Ignore padding in the loss # labels["input_ids"] = [ # [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] # ] # # model_inputs["labels"] = labels["input_ids"] # # return model_inputs # # def collator(self): # # """ # # The collating function. # # Returns: # `function`: A collating function. # # For example, **DataCollatorForSeq2Seq(...)**. # # You can also custom a collating function, but remember that `collator()` needs to return a **function**. # """ # # # return DataCollatorForSeq2Seq(self.tokenizer, model=self.model_class, pad_to_multiple_of=8) # # def postprocess(self, preds, labels): # # """ # Process model's outputs and get the final results rather than simple ids. # # Args: # preds (Tensor): Prediction labels, the output of the model. # labels (Tensor): True labels # # Returns: # `(LongTensor, LongTensor)`: decoded_preds, decoded_labels # # """ # # decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) # # labels = np.array(labels.to("cpu")) # # Replace -100 in the labels as we can't decode them. # labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) # # decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=False) # # decoded_preds = [pred.strip() for pred in decoded_preds] # decoded_labels = [label.strip() for label in decoded_labels] # # # rougeLSum expects newline after each sentence # decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] # decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] # # return decoded_preds, decoded_labels # # def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): # # """ # Generate DataLoader for a dataset. # # Args: # dataset (`Dataset`): The raw dataset. # inputs_column (`str`): Column name of the inputs. # labels_column (`str`): Column name of the labels. # # Returns: # `DataLoader`: A dataloader for the dataset. Will be used for inference. # """ # # tokenized_example = dataset.map( # lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), # batched=True, # remove_columns=dataset.column_names # ) # dataloader = DataLoader( # dataset=tokenized_example, # batch_size=8, # collate_fn=self.collator(), # ) # # return dataloader class DataUtilsForT5(): """ Args: tokenizer (`PretrainedTokenizer`, default to None): The tokenizer for tokenization. checkpoint (`str`): The checkpoint from which the tokenizer is loaded. model_max_length (`int`, *optional*): The max sequence length the model accepts. max_source_length (`int`, *optional*): The max length of the input text. max_target_length (`int`, *optional*): The max length of the generated summary. """ def __init__(self, tokenizer = None, model_class = BartForSummarization, checkpoint = "facebook/bart-large-cnn", prompt = "Please give a summary of the following text: ", model_max_length = 1024, max_source_length = 1024, max_target_length = 128, ): self.checkpoint = checkpoint self.model_max_length = model_max_length self.max_source_length = max_source_length self.max_target_length = max_target_length self.model_class = model_class self.prompt = prompt if tokenizer is None: self.tokenizer = AutoTokenizer.from_pretrained( self.checkpoint, model_max_length = self.model_max_length, cache_dir=BASE_CACHE_DIR, use_fast=True ) else: self.tokenizer = tokenizer def tokenize_and_align_labels(self, examples, inputs_column="text", labels_column="summary"): """ Process the dataset for model input, for example, do tokenization and prepare label_ids. Args: examples (`Dataset`): { "text": [s1, s2, ...], "summary": [l1, l2, ...]} inputs (`str`): The name of input column labels (`str`): The name of target column Returns: `Dict`: {"input_ids": input_ids, "attention_mask": attention_mask, "labels": label_ids } """ # Select input column inputs = examples[inputs_column] inputs = [self.prompt + raw_text for raw_text in inputs] # Setup the tokenizer for inputs model_inputs = self.tokenizer(inputs, max_length=self.max_source_length, padding="max_length", truncation=True) # Select target column if labels_column in examples.keys(): labels = examples[labels_column] # Setup the tokenizer for targets with self.tokenizer.as_target_tokenizer(): labels = self.tokenizer(labels, max_length=self.max_target_length, padding="max_length", truncation=True) # Ignore padding in the loss labels["input_ids"] = [ [(l if l != self.tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"] ] model_inputs["labels"] = labels["input_ids"] return model_inputs def collator(self): """ The collating function. Returns: `function`: A collating function. For example, **DataCollatorForSeq2Seq(...)**. You can also custom a collating function, but remember that `collator()` needs to return a **function**. """ return DataCollatorForSeq2Seq(self.tokenizer, model=self.model_class, pad_to_multiple_of=8) def postprocess(self, preds, labels): """ Process model's outputs and get the final results rather than simple ids. Args: preds (Tensor): Prediction labels, the output of the model. labels (Tensor): True labels Returns: `(LongTensor, LongTensor)`: decoded_preds, decoded_labels """ decoded_preds = self.tokenizer.batch_decode(preds, skip_special_tokens=True) labels = np.array(labels.to("cpu")) # Replace -100 in the labels as we can't decode them. labels = np.where(labels != -100, labels, self.tokenizer.pad_token_id) decoded_labels = self.tokenizer.batch_decode(labels, skip_special_tokens=True) decoded_preds = [pred.strip() for pred in decoded_preds] decoded_labels = [label.strip() for label in decoded_labels] # rougeLSum expects newline after each sentence decoded_preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in decoded_preds] decoded_labels = ["\n".join(nltk.sent_tokenize(label)) for label in decoded_labels] return decoded_preds, decoded_labels def get_dataloader(self, dataset, inputs_column="text", labels_column="summary"): """ Generate DataLoader for a dataset. Args: dataset (`Dataset`): The raw dataset. inputs_column (`str`): Column name of the inputs. labels_column (`str`): Column name of the labels. Returns: `DataLoader`: A dataloader for the dataset. Will be used for inference. """ tokenized_example = dataset.map( lambda x: self.tokenize_and_align_labels(x, inputs_column=inputs_column, labels_column=labels_column), batched=True, remove_columns=dataset.column_names ) dataloader = DataLoader( dataset=tokenized_example, batch_size=8, collate_fn=self.collator(), ) return dataloader