Tensorflow: How to zip together multiple concatenated datasets?

Issue

I am attempting to zip together multiple tf.data.Dataset objects for a multi-input keras model. Each tf.data.Dataset object is a concatenation of multiple dataframes, each with the same number of columns, but not necessarily the same number of lines.
I am able to create the full dataset, but then when I try to pass the dataset in a keras model I get an error :

TypeError: Inputs to a layer should be tensors. Got: <tensorflow.python.data.ops.dataset_ops._NestedVariant object

The problem is that I would really like to take advantage of the lazy structure of the tf.data.Dataset since I am using the window function, but I am having difficulty aggregating all the datasets together.

How do I zip together multiple datasets in a way that the can be passed into the model.fit() function?

Any help would be appreciated.

Here is a simple functional code that recreates my problem :


import pandas as pd
import numpy as np


import tensorflow as tf

# Create dataframes with 4 features and  target

dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])


# Convert dataframes to datasets

def get_dataset(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())

    return dataset


def get_dataset_windowed(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
                                                                                        drop_remainder=True)
    return dataset


windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]

windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)


static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]

static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)


targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]

targets = tf.data.Dataset.from_tensor_slices(targets)
targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)

# Zip datasets together

full_dataset = tf.data.Dataset.zip(
    (
        {
            "short_term_ts_input": windowed_dataset,
            "static_input": static_dataset,
        },
        {
             "output": targets,
        }
    )
)
full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)


# Creating, compiling and fitting model

short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
static_input = tf.keras.Input(shape=(2), name="static_input")
targets = tf.keras.Input(shape=(1,), name="output")

short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
static_features = tf.keras.layers.Dense(16)(static_input)
x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
x_concat = tf.keras.layers.Dense(32)(x_concat)

output = tf.keras.layers.Dense(1)(x_concat)

model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))

tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)

model.fit(full_dataset)`

Solution

Maybe something like this:

import pandas as pd
import numpy as np


import tensorflow as tf

# Create dataframes with 4 features and  target

dataframe1 = pd.DataFrame(np.random.randn(1000, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])
dataframe2 = pd.DataFrame(np.random.randn(800, 5), columns=["feature1", "feature2", "feature3", "feature4", "target"])


# Convert dataframes to datasets

def get_dataset(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].iloc[4:].to_numpy())

    return dataset


def get_dataset_windowed(df: pd.DataFrame, features):
    dataset = tf.data.Dataset.from_tensor_slices(df.loc[:, features].to_numpy()).window(5, shift=1, stride=1,
                                                                                        drop_remainder=True)
    return dataset


windowed_dataset = [get_dataset_windowed(x, ["feature3", "feature4"]) for x in [dataframe1, dataframe2]]

windowed_dataset = tf.data.Dataset.from_tensor_slices(windowed_dataset)
windowed_dataset = windowed_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE).flat_map(lambda z: z.batch(5))


static_dataset = [get_dataset(x, ["feature1", "feature2"]) for x in [dataframe1, dataframe2]]

static_dataset = tf.data.Dataset.from_tensor_slices(static_dataset)
static_dataset = static_dataset.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)


targets = [get_dataset(x, ["target"]) for x in [dataframe1, dataframe2]]

targets = tf.data.Dataset.from_tensor_slices(targets)
targets = targets.interleave(lambda x: x, cycle_length=1, num_parallel_calls=tf.data.AUTOTUNE)

# Zip datasets together

full_dataset = tf.data.Dataset.zip(
    (
       {
            "short_term_ts_input": windowed_dataset,
            "static_input": static_dataset,
        },
      
    )
)
full_dataset = tf.data.Dataset.zip((full_dataset, targets))
full_dataset = full_dataset.shuffle(buffer_size=1024).batch(128)


# Creating, compiling and fitting model

short_term_ts_input = tf.keras.Input(shape=(5, 2), name="short_term_ts_input")
static_input = tf.keras.Input(shape=(2), name="static_input")

short_term_ts_features = tf.keras.layers.LSTM(32, return_sequences=False)(short_term_ts_input)
short_term_ts_features = tf.keras.layers.Dense(8)(short_term_ts_features)
static_features = tf.keras.layers.Dense(16)(static_input)
x_concat = tf.keras.layers.concatenate([short_term_ts_features, static_features])
x_concat = tf.keras.layers.Dense(32)(x_concat)

output = tf.keras.layers.Dense(1)(x_concat)

model = tf.keras.Model(inputs=[short_term_ts_input, static_input], outputs=[output])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')

tf.keras.utils.plot_model(model, "model_test.png", show_shapes=True)

model.fit(full_dataset)

Answered By – AloneTogether

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published