1. 引言

什么是Self-attention, Muti-attention和Transformer

2. 数据预处理

mdb影评的数据集介绍与下载
下载后执行下面预处理代码,把每个词都转化为索引。

import os as os
import numpy as np
save_dir = './data'
import tensorflow.keras as keras

def get_data(datapath):
    pos_files = os.listdir(datapath + '/pos')
    neg_files = os.listdir(datapath + '/neg')
    print(len(pos_files))
    print(len(neg_files))

    pos_all = []
    neg_all = []
    for pf, nf in zip(pos_files, neg_files):
        with open(datapath + '/pos' + '/' + pf, encoding='utf-8') as f:
            s = f.read()
            pos_all.append(s)
        with open(datapath + '/neg' + '/' + nf, encoding='utf-8') as f:
            s = f.read()
            neg_all.append(s)

    X_orig= np.array(pos_all + neg_all)
    Y_orig = np.array([1 for _ in range(len(pos_all))] + [0 for _ in range(len(neg_all))])
    print("X_orig:", X_orig.shape)
    print("Y_orig:", Y_orig.shape)

    return X_orig, Y_orig

vocab_size = 30000

def generate_train_vector():
    X_orig, Y_orig = get_data(r'.\aclImdb\train')
    X_orig_test, Y_orig_test = get_data(r'.\aclImdb\test')
    X_orig = np.concatenate([X_orig, X_orig_test])
    Y_orig = np.concatenate([Y_orig, Y_orig_test])

    maxlen = 200
    print("Start fitting the corpus......")
    t = keras.preprocessing.text.Tokenizer(vocab_size)  # 要使得文本向量化时省略掉低频词,就要设置这个参数
    t.fit_on_texts(X_orig)  # 在所有的评论数据集上训练,得到统计信息
    word_index = t.word_index  # 不受vocab_size的影响
    print(X_orig)
    print('all_vocab_size', len(word_index), type(word_index))
    print(word_index)
    print("Start vectorizing the sentences.......")
    v_X = t.texts_to_sequences(X_orig)  # 受vocab_size的影响
    print("Start padding......")
    print(v_X)
    pad_X = keras.preprocessing.sequence.pad_sequences(v_X, maxlen=maxlen, padding='post')
    print(pad_X.shape)
    print('padx',pad_X[0:2])
    print("Finished!")

    np.savez(save_dir+'/train_vector_Data', x=pad_X, y=Y_orig)
    import copy
    x = list(t.word_counts.items())
    s = sorted(x, key=lambda p: p[1], reverse=True)
    small_word_index = copy.deepcopy(word_index)  # 防止原来的字典也被改变了
    print("Removing less freq words from word-index dict...")
    for item in s[vocab_size:]:
        small_word_index.pop(item[0])
    print("Finished!")
    print(len(small_word_index))
    print(len(word_index))
    np.save(save_dir+'/small_word_index', small_word_index)


if __name__ == '__main__':
    generate_train_vector()

Transformer训练IMDB代码

利用预处理产生的train_vector_Data.npz数据训练

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
from sklearn.model_selection import train_test_split

"""
## Implement multi head self attention as a Keras layer
"""

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim # 32
        self.num_heads = num_heads  # 2
        if embed_dim % num_heads != 0:
            raise ValueError(f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}")
        self.projection_dim = embed_dim // num_heads #16
        self.query_dense = layers.Dense(embed_dim) # 32
        self.key_dense = layers.Dense(embed_dim) # 32
        self.value_dense = layers.Dense(embed_dim)  # 32
        self.combine_heads = layers.Dense(embed_dim) # 32

    def attention(self, query, key, value): # 32 2 200 16
        score = tf.matmul(query, key, transpose_b=True) # 32 2 200 200
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1) # 32 2 200 200
        output = tf.matmul(weights, value)  # 32 2 200 16
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs): # 32 200 32
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0] # 32
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim) # 32 200 32
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim) # 32 200 32
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim) # 32 200 32
        query = self.separate_heads(query, batch_size)  # (batch_size, num_heads, seq_len, projection_dim) # 32 2 200 16
        key = self.separate_heads(key, batch_size)  # (batch_size, num_heads, seq_len, projection_dim)  # 32 2 200 16
        value = self.separate_heads(value, batch_size)  # (batch_size, num_heads, seq_len, projection_dim)  # 32 2 200 16
        attention, weights = self.attention(query, key, value) # 32 2 200 16 , 32 2 200 200
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len, num_heads, projection_dim) # 32 2 200 16
        concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))  # (batch_size, seq_len, embed_dim) # 32 200 32
        output = self.combine_heads(concat_attention)  # (batch_size, seq_len, embed_dim) # 32 200 32
        return output # 32 200 32


"""
## Implement a Transformer block as a layer
"""


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.5):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),] # 32 32
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs) # 32 200 32
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output) # 32 200 32
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
        
"""
## Implement embedding layer
Two seperate embedding layers, one for tokens, one for token index (positions).
"""


class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions


"""
## Download and prepare dataset
"""

vocab_size = 30000  # Only consider the top 20k words
maxlen = 200  # Only consider the first 200 words of each movie review
trainDataNew = np.load('./data/train_vector_Data.npz')
x_train = trainDataNew['x']
y_train = trainDataNew['y']

"""
## Create classifier model using transformer layer
Transformer layer outputs one vector for each time step of our input sequence.
Here, we take the mean across all time steps and
use a feed forward network on top of it to classify text.
"""
embed_dim = 32  # Embedding size for each token
num_heads = 2  # Number of attention heads
ff_dim = 32  # Hidden layer size in feed forward network inside transformer

inputs = layers.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs)


"""
## Train and Evaluate
"""

model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])
history = model.fit(x_train, y_train, batch_size=128, epochs=30, validation_split=0.1)

print(history)

import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['accuracy', 'val_accuracy'], loc='upper left')
plt.show()

第一个Epoch, 训练集上准确率是82.88%, 验证集上90.96%

350/352 [============================>.] - ETA: 0s - loss: 0.3647 - accuracy: 0.8286
351/352 [============================>.] - ETA: 0s - loss: 0.3645 - accuracy: 0.8287
352/352 [==============================] - ETA: 0s - loss: 0.3644 - accuracy: 0.8288
352/352 [==============================] - 113s 320ms/step - loss: 0.3644 - accuracy: 0.8288 - val_loss: 0.2234 - val_accuracy: 0.9096
Epoch 2/30

参考资料
[1] https://keras.io/examples/nlp/text_classification_with_transformer/

    query = np.arange(0, 4, 1).astype(np.float32).repeat(8).reshape(2,4,4)
    #query = np.arange(0, 24, 1).reshape(2, 12)

    s = layers.Dense(4)
    print(query)
    print(s.get_weights())
    attention = s(query)
    print(s.get_weights())
Logo

一站式 AI 云服务平台

更多推荐