| | import os |
| | import logging |
| |
|
| | import torch |
| | from einops import rearrange |
| | from torch import nn |
| | import math |
| |
|
| | |
| | from .simple_tokenizer import SimpleTokenizer as _Tokenizer |
| | from .viclip_vision import clip_joint_l14, clip_joint_b16 |
| | from .viclip_text import clip_text_l14, clip_text_b16 |
| |
|
| | |
| | from transformers import PreTrainedModel |
| | from transformers import PretrainedConfig |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | from .configuration_viclip import Config |
| | |
| | class ViCLIP(PreTrainedModel): |
| | _auto_class="AutoModel" |
| | config_class=Config |
| |
|
| | def __init__(self, |
| | |
| | |
| | |
| | |
| | config=PretrainedConfig()): |
| | super(ViCLIP, self).__init__(config) |
| | self.config=config |
| | if 'size' in config.to_dict(): |
| | size=config.size |
| | pretrain=None |
| | tokenizer_path=config.tokenizer_path |
| | tokenizer=None |
| | freeze_text=True |
| |
|
| | if tokenizer: |
| | self.tokenizer = tokenizer |
| | elif tokenizer_path: |
| | self.tokenizer = _Tokenizer(tokenizer_path) |
| | else: |
| | self.tokenizer = _Tokenizer() |
| | self.max_txt_l = 32 |
| |
|
| | if size.lower() == 'l': |
| | self.vision_encoder_name = 'vit_l14' |
| | elif size.lower() == 'b': |
| | self.vision_encoder_name = 'vit_b16' |
| | else: |
| | raise NotImplementedError(f"Size {size} not implemented") |
| | |
| | self.vision_encoder_pretrained = False |
| | self.inputs_image_res = 224 |
| | self.vision_encoder_kernel_size = 1 |
| | self.vision_encoder_center = True |
| | self.video_input_num_frames = 8 |
| | self.vision_encoder_drop_path_rate = 0.1 |
| | self.vision_encoder_checkpoint_num = 24 |
| | self.is_pretrain = pretrain |
| | self.vision_width = 1024 |
| | self.text_width = 768 |
| | self.embed_dim = 768 |
| | self.masking_prob = 0.9 |
| | |
| | if size.lower() == 'l': |
| | self.text_encoder_name = 'vit_l14' |
| | elif size.lower() == 'b': |
| | self.text_encoder_name = 'vit_b16' |
| | else: |
| | raise NotImplementedError(f"Size {size} not implemented") |
| | |
| | self.text_encoder_pretrained = False |
| | self.text_encoder_d_model = 768 |
| |
|
| | self.text_encoder_vocab_size = 49408 |
| | |
| | |
| | self.vision_encoder = self.build_vision_encoder() |
| | self.text_encoder = self.build_text_encoder() |
| |
|
| | self.temp = nn.parameter.Parameter(torch.ones([]) * 1 / 100.0) |
| | self.temp_min = 1 / 100.0 |
| |
|
| | if pretrain: |
| | logger.info(f"Load pretrained weights from {pretrain}") |
| | state_dict = torch.load(pretrain, map_location='cpu')['model'] |
| | self.load_state_dict(state_dict) |
| | |
| | |
| | if freeze_text: |
| | self.freeze_text() |
| |
|
| |
|
| | def freeze_text(self): |
| | """freeze text encoder""" |
| | for p in self.text_encoder.parameters(): |
| | p.requires_grad = False |
| |
|
| | def no_weight_decay(self): |
| | ret = {"temp"} |
| | ret.update( |
| | {"vision_encoder." + k for k in self.vision_encoder.no_weight_decay()} |
| | ) |
| | ret.update( |
| | {"text_encoder." + k for k in self.text_encoder.no_weight_decay()} |
| | ) |
| |
|
| | return ret |
| |
|
| | def forward(self, image, text, raw_text, idx, log_generation=None, return_sims=False): |
| | """forward and calculate loss. |
| | |
| | Args: |
| | image (torch.Tensor): The input images. Shape: [B,T,C,H,W]. |
| | text (dict): TODO |
| | idx (torch.Tensor): TODO |
| | |
| | Returns: TODO |
| | |
| | """ |
| | self.clip_contrastive_temperature() |
| |
|
| | vision_embeds = self.encode_vision(image) |
| | text_embeds = self.encode_text(raw_text) |
| | if return_sims: |
| | sims = torch.nn.functional.normalize(vision_embeds, dim=-1) @ \ |
| | torch.nn.functional.normalize(text_embeds, dim=-1).transpose(0, 1) |
| | return sims |
| |
|
| | |
| |
|
| | |
| | loss_vtc = self.clip_loss.vtc_loss( |
| | vision_embeds, text_embeds, idx, self.temp, all_gather=True |
| | ) |
| |
|
| | return dict( |
| | loss_vtc=loss_vtc, |
| | ) |
| |
|
| | def encode_vision(self, image, test=False): |
| | """encode image / videos as features. |
| | |
| | Args: |
| | image (torch.Tensor): The input images. |
| | test (bool): Whether testing. |
| | |
| | Returns: tuple. |
| | - vision_embeds (torch.Tensor): The features of all patches. Shape: [B,T,L,C]. |
| | - pooled_vision_embeds (torch.Tensor): The pooled features. Shape: [B,T,C]. |
| | |
| | """ |
| | if image.ndim == 5: |
| | image = image.permute(0, 2, 1, 3, 4).contiguous() |
| | else: |
| | image = image.unsqueeze(2) |
| |
|
| | if not test and self.masking_prob > 0.0: |
| | return self.vision_encoder( |
| | image, masking_prob=self.masking_prob |
| | ) |
| |
|
| | return self.vision_encoder(image) |
| |
|
| | def encode_text(self, text): |
| | """encode text. |
| | Args: |
| | text (dict): The output of huggingface's `PreTrainedTokenizer`. contains keys: |
| | - input_ids (torch.Tensor): Token ids to be fed to a model. Shape: [B,L]. |
| | - attention_mask (torch.Tensor): The mask indicate padded tokens. Shape: [B,L]. 0 is padded token. |
| | - other keys refer to "https://huggingface.co/docs/transformers/v4.21.2/en/main_classes/tokenizer#transformers.PreTrainedTokenizer.__call__". |
| | Returns: tuple. |
| | - text_embeds (torch.Tensor): The features of all tokens. Shape: [B,L,C]. |
| | - pooled_text_embeds (torch.Tensor): The pooled features. Shape: [B,C]. |
| | |
| | """ |
| | device = next(self.text_encoder.parameters()).device |
| | text = self.text_encoder.tokenize( |
| | text, context_length=self.max_txt_l |
| | ).to(device) |
| | text_embeds = self.text_encoder(text) |
| | return text_embeds |
| |
|
| | @torch.no_grad() |
| | def clip_contrastive_temperature(self, min_val=0.001, max_val=0.5): |
| | """Seems only used during pre-training""" |
| | self.temp.clamp_(min=self.temp_min) |
| |
|
| | def build_vision_encoder(self): |
| | """build vision encoder |
| | Returns: (vision_encoder, vision_layernorm). Each is a `nn.Module`. |
| | |
| | """ |
| | encoder_name = self.vision_encoder_name |
| | if encoder_name == "vit_l14": |
| | vision_encoder = clip_joint_l14( |
| | pretrained=self.vision_encoder_pretrained, |
| | input_resolution=self.inputs_image_res, |
| | kernel_size=self.vision_encoder_kernel_size, |
| | center=self.vision_encoder_center, |
| | num_frames=self.video_input_num_frames, |
| | drop_path=self.vision_encoder_drop_path_rate, |
| | checkpoint_num=self.vision_encoder_checkpoint_num, |
| | ) |
| | elif encoder_name == "vit_b16": |
| | vision_encoder = clip_joint_b16( |
| | pretrained=self.vision_encoder_pretrained, |
| | input_resolution=self.inputs_image_res, |
| | kernel_size=self.vision_encoder_kernel_size, |
| | center=self.vision_encoder_center, |
| | num_frames=self.video_input_num_frames, |
| | drop_path=self.vision_encoder_drop_path_rate, |
| | checkpoint_num=self.vision_encoder_checkpoint_num, |
| | ) |
| | else: |
| | raise NotImplementedError(f"Not implemented: {encoder_name}") |
| | |
| | return vision_encoder |
| |
|
| | def build_text_encoder(self): |
| | """build text_encoder and possiblly video-to-text multimodal fusion encoder. |
| | Returns: nn.Module. The text encoder |
| | |
| | """ |
| | encoder_name = self.text_encoder_name |
| | |
| | if encoder_name == "vit_l14": |
| | text_encoder = clip_text_l14( |
| | pretrained=self.text_encoder_pretrained, |
| | context_length=self.max_txt_l, |
| | vocab_size=self.text_encoder_vocab_size, |
| | checkpoint_num=0, |
| | tokenizer_path=None if not 'tokenizer_path' in self.config.to_dict() else self.config.tokenizer_path |
| | ) |
| | elif encoder_name == "vit_b16": |
| | text_encoder = clip_text_b16( |
| | pretrained=self.text_encoder_pretrained, |
| | context_length=self.max_txt_l, |
| | vocab_size=self.text_encoder_vocab_size, |
| | checkpoint_num=0, |
| | tokenizer_path=None if not 'tokenizer_path' in self.config.to_dict() else self.config.tokenizer_path |
| | ) |
| | else: |
| | raise NotImplementedError(f"Not implemented: {encoder_name}") |
| |
|
| | return text_encoder |
| |
|
| | def get_text_encoder(self): |
| | """get text encoder, used for text and cross-modal encoding""" |
| | encoder = self.text_encoder |
| | return encoder.bert if hasattr(encoder, "bert") else encoder |
| | |
| | def get_text_features(self, input_text, tokenizer, text_feature_dict={}): |
| | if input_text in text_feature_dict: |
| | return text_feature_dict[input_text] |
| | text_template= f"{input_text}" |
| | with torch.no_grad(): |
| | |
| | text_features = self.encode_text(text_template).float() |
| | text_features /= text_features.norm(dim=-1, keepdim=True) |
| | text_feature_dict[input_text] = text_features |
| | return text_features |
| |
|
| | def get_vid_features(self, input_frames): |
| | with torch.no_grad(): |
| | clip_feat = self.encode_vision(input_frames,test=True).float() |
| | clip_feat /= clip_feat.norm(dim=-1, keepdim=True) |
| | return clip_feat |
| |
|
| | def get_predict_label(self, clip_feature, text_feats_tensor, top=5): |
| | label_probs = (100.0 * clip_feature @ text_feats_tensor.T).softmax(dim=-1) |
| | top_probs, top_labels = label_probs.cpu().topk(top, dim=-1) |
| | return top_probs, top_labels |
| |
|
| | |
| | if __name__ =="__main__": |
| | tokenizer = _Tokenizer() |
| |
|