how to include image input to a transformer model?

Issue

i am using this transformer architecture: https://github.com/JanSchm/CapMarket/blob/master/bot_experiments/IBM_Transformer%2BTimeEmbedding.ipynb

to make some binary clasification, i am adding some pictures as input, but i was wondering, how is the right way to do this?

my modified architecture is:

'''Initialize time and transformer layers'''
    time_embedding = Time2Vector(seq_len)
    attn_layer1 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
    attn_layer2 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
    attn_layer3 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
    '''Construct model'''
    liq_seq = Input(shape=(seq_len, XN_train.shape[2],))

    pic_seq = Input(name="input_images",shape=(500,700,3))
  
x_t = time_embedding(liq_seq)
x_liq= Concatenate(axis=-1)([liq_seq, x_t])
x_liq  = LSTM(
    units = 64, 
    return_sequences=False
    )(liq_seq)
x_liq=LayerNormalization()(x_liq)
x_liq = Dense(64)(x_liq)
x_liq=LayerNormalization()(x_liq)

x_pic = Conv2D(64, (10, 10), name="first_conv", activation='relu', input_shape=(500,700, 3))(pic_seq)
x_pic =MaxPooling2D((2, 2),name="first_pooling")(x_pic)
x_pic = Flatten(name="flatten")(x_pic)
x_pic =Dense(64, activation='tanh')(x_pic)
x_pic=LayerNormalization()(x_pic)


x_liq_pic = Concatenate(axis=1)([x_liq, x_pic])
x_liq_pic =Dense(seq_len*2, activation='tanh')(x_liq_pic)

x_liq_pic= Reshape((seq_len,2))(x_liq_pic)

#x_liq_pic = Concatenate(axis=-1)([x_liq_pic, x_t])  

x_liq_pic = attn_layer1((x_liq_pic, x_liq_pic, x_liq_pic))
x_liq_pic = attn_layer2((x_liq_pic, x_liq_pic, x_liq_pic))
x_liq_pic = attn_layer3((x_liq_pic, x_liq_pic, x_liq_pic))
x_liq_pic = GlobalAveragePooling1D(data_format='channels_first')(x_liq_pic)
x_liq_pic = Dropout(0.2)(x_liq_pic)
x_liq_pic = Dense(64, activation='tanh')(x_liq_pic)
x_liq_pic = Dropout(0.2)(x_liq_pic)
out = Dense(1, activation='softmax')(x_liq_pic)

model = Model(inputs=[pic_seq,liq_seq], outputs=out) 

here i am doing the concatenation of the time embedding beforethe first lstm(not sure is i should add this lstm layer and concatenate here) then i use the dense layer to make it have a common shape, then i put a convolutional 2d to start working whit the images, then it goes to the dense in order to make it have the desired shape

having this two outputs whit the same shape, i concatenate them and then pass it over a dense, then i reshape it, in order to do the time embedding concatenation again before sending all this mess up to the transformer’s layers

here it is the the model’s plot enter image description here

i really feel like im doing this wrong but i can’t find too much documentation over this topic, also i am using a tensorflow dataset to feed the network

here i put the time2vec, attention, multihead and transformer classes (almost identical to the github code)

class Time2Vector(Layer):
  def __init__(self, seq_len, **kwargs):
    super(Time2Vector, self).__init__()
    self.seq_len = seq_len

  def build(self, input_shape):
    '''Initialize weights and biases with shape (batch, seq_len)'''
    self.weights_linear = self.add_weight(name='weight_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
    self.bias_linear = self.add_weight(name='bias_linear',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)
    
    self.weights_periodic = self.add_weight(name='weight_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

    self.bias_periodic = self.add_weight(name='bias_periodic',
                                shape=(int(self.seq_len),),
                                initializer='uniform',
                                trainable=True)

  def call(self, x):
    '''Calculate linear and periodic time features'''
    x = tf.math.reduce_mean(x[:,:,:1], axis=-1) 
    time_linear = self.weights_linear * x + self.bias_linear # Linear time feature
    time_linear = tf.expand_dims(time_linear, axis=-1) # Add dimension (batch, seq_len, 1)
    
    time_periodic = tf.math.sin(tf.multiply(x, self.weights_periodic) + self.bias_periodic)
    time_periodic = tf.expand_dims(time_periodic, axis=-1) # Add dimension (batch, seq_len, 1)
    return tf.concat([time_linear, time_periodic], axis=-1) # shape = (batch, seq_len, 2)
   
  def get_config(self): # Needed for saving and loading model with custom layer
    config = super().get_config().copy()
    config.update({'seq_len': self.seq_len})
    return config


class SingleAttention(Layer):
  def __init__(self, d_k, d_v):
    super(SingleAttention, self).__init__()
    self.d_k = d_k
    self.d_v = d_v

  def build(self, input_shape):
    self.query = Dense(self.d_k, 
                       input_shape=input_shape, 
                       kernel_initializer='glorot_uniform', 
                       bias_initializer='glorot_uniform')
    
    self.key = Dense(self.d_k, 
                     input_shape=input_shape, 
                     kernel_initializer='glorot_uniform', 
                     bias_initializer='glorot_uniform')
    
    self.value = Dense(self.d_v, 
                       input_shape=input_shape, 
                       kernel_initializer='glorot_uniform', 
                       bias_initializer='glorot_uniform')

  def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
    q = self.query(inputs[0])
    k = self.key(inputs[1])

    attn_weights = tf.matmul(q, k, transpose_b=True)
    attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights)
    attn_weights = tf.nn.softmax(attn_weights, axis=-1)
    
    v = self.value(inputs[2])
    attn_out = tf.matmul(attn_weights, v)
    return attn_out    

#############################################################################

class MultiAttention(Layer):
  def __init__(self, d_k, d_v, n_heads):
    super(MultiAttention, self).__init__()
    self.d_k = d_k
    self.d_v = d_v
    self.n_heads = n_heads
    self.attn_heads = list()

  def build(self, input_shape):
    for n in range(self.n_heads):
      self.attn_heads.append(SingleAttention(self.d_k, self.d_v))  
    
    # input_shape[0]=(batch, seq_len, 7), input_shape[0][-1]=7 
    self.linear = Dense(input_shape[0][-1], 
                        input_shape=input_shape, 
                        kernel_initializer='glorot_uniform', 
                        bias_initializer='glorot_uniform')

  def call(self, inputs):
    attn = [self.attn_heads[i](inputs) for i in range(self.n_heads)]
    concat_attn = tf.concat(attn, axis=-1)
    multi_linear = self.linear(concat_attn)
    return multi_linear   

#############################################################################

class TransformerEncoder(Layer):
  def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs):
    super(TransformerEncoder, self).__init__()
    self.d_k = d_k
    self.d_v = d_v
    self.n_heads = n_heads
    self.ff_dim = ff_dim
    self.attn_heads = list()
    self.dropout_rate = dropout

  def build(self, input_shape):
    self.attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads)
    self.attn_dropout = Dropout(self.dropout_rate)
    self.attn_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)
    self.ff_LSTM= LSTM(units=self.ff_dim,input_shape=input_shape,return_sequences=True)
    self.ff_conv1D_1 = Conv1D(filters=self.ff_dim, kernel_size=1, activation='sigmoid')
    # input_shape[0]=(batch, seq_len, 7), input_shape[0][-1] = 7 
    self.ff_conv1D_2 = Conv1D(filters=input_shape[0][-1], kernel_size=1) 
    self.ff_dropout = Dropout(self.dropout_rate)
    self.ff_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)    
  
  def call(self, inputs): # inputs = (in_seq, in_seq, in_seq)
    attn_layer = self.attn_multi(inputs)
    attn_layer = self.attn_dropout(attn_layer)
    attn_layer = self.attn_normalize(inputs[0] + attn_layer)
    ff_layer = self.ff_LSTM(attn_layer)
    ff_layer = self.ff_conv1D_1(ff_layer)
    ff_layer = self.ff_conv1D_2(ff_layer)
    ff_layer = self.ff_dropout(ff_layer)
    ff_layer = self.ff_normalize(inputs[0] + ff_layer)
    return ff_layer 

  def get_config(self): 
    config = super().get_config().copy()
    config.update({'d_k': self.d_k,
                   'd_v': self.d_v,
                   'n_heads': self.n_heads,
                   'ff_dim': self.ff_dim,
                   'attn_heads': self.attn_heads,
                   'dropout_rate': self.dropout_rate})
    return config      

Solution

To include the positional encoding, we need to add it to the input sequence. Each timestep in the input sequence will get a two-element vector as the additional input. We will use a custom layer to create and add the positional encoding to the input sequence. Example:

class TransformerBlock(Layer):
  def __init__(self, embed_dim, n_heads, ff_dim, rate=0.1):
    super(TransformerBlock, self).__init__()
    self.att = MultiAttention(embed_dim, embed_dim, n_heads)
    self.ffn = Sequential(
        [Dense(ff_dim, activation="relu"), Dense(embed_dim),]
    )
    self.layernorm1 = LayerNormalization(epsilon=1e-6)
    self.layernorm2 = LayerNormalization(epsilon=1e-6)
    self.dropout1 = Dropout(rate)
    self.dropout2 = Dropout(rate)

  def call(self, inputs, training):
    attn_output = self.att(inputs)
    attn_output = self.dropout1(attn_output, training=training)
    out1 = self.layernorm1(inputs + attn_output)
    ffn_output = self.ffn(out1)
    ffn_output = self.dropout2(ffn_output, training=training)
    return self.layernorm2(out1 + ffn_output)

#############################################################################

class TokenAndPositionEmbedding(Layer):
  def __init__(self, maxlen, vocab_size, emded_dim):
    super(TokenAndPositionEmbedding, self).__init__()
    self.token_emb = Embedding(input_dim=vocab_size, output_dim=emded_dim)
    self.pos_emb = Embedding(input_dim=maxlen, output_dim=emded_dim)

  def call(self, x):
    maxlen = tf.shape(x)[-1]
    positions = tf.range(start=0, limit=maxlen, delta=1)
    positions = self.pos_emb(positions)
    x = self.token_emb(x)
    return x + positions

#############################################################################

class TransformerClassifier(Model):
  def __init__(self, maxlen, vocab_size, embed_dim, n_heads, ff_dim, n_classes, rate=0.1):
    super(TransformerClassifier, self).__init__()
    self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    self.transformer_block = TransformerBlock(embed_dim, n_heads, ff_dim)
    self.dropout = Dropout(rate)
    self.classifier = Dense(n_classes, activation='softmax')

  def call(self, inputs):
    x = self.embedding_layer(inputs)
    x = self.transformer_block(x)
    x = self.dropout(x)
    return self.classifier(x)

#############################################################################

# Create a Transformer block
transformer_block = TransformerBlock(32, 1, 64)

# Create a dummy input
x = tf.random.uniform((1, 200, 32))

# Run the input through the transformer block
y = transformer_block(x)

# Check the output shape
y.shape

#############################################################################

Answered By – Tibic4

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published