diff --git a/dbrepo-data-service/services/src/main/java/at/tuwien/mapper/MariaDbMapper.java b/dbrepo-data-service/services/src/main/java/at/tuwien/mapper/MariaDbMapper.java
index 8be9ef68e3dff7246a061664d10273eee95d24f0..9713baf0e9552deeac0b936327cf14d9dd946baa 100644
--- a/dbrepo-data-service/services/src/main/java/at/tuwien/mapper/MariaDbMapper.java
+++ b/dbrepo-data-service/services/src/main/java/at/tuwien/mapper/MariaDbMapper.java
@@ -7,6 +7,7 @@ import at.tuwien.api.database.table.TupleUpdateDto;
 import at.tuwien.api.database.table.columns.ColumnDto;
 import at.tuwien.api.database.table.columns.ColumnTypeDto;
 import at.tuwien.api.database.table.columns.CreateTableColumnDto;
+import at.tuwien.api.database.table.constraints.primary.PrimaryKeyDto;
 import at.tuwien.exception.QueryMalformedException;
 import at.tuwien.exception.TableMalformedException;
 import at.tuwien.utils.MariaDbUtil;
@@ -15,10 +16,7 @@ import org.mapstruct.Named;
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
+import java.sql.*;
 import java.text.Normalizer;
 import java.time.Instant;
 import java.time.ZoneId;
@@ -400,6 +398,38 @@ public interface MariaDbMapper {
         return stringBuilder.toString();
     }
 
+    default String tableCreateDtoToCreateTimestampTableRawQuery(at.tuwien.api.database.table.internal.TableCreateDto data) {
+        final StringBuilder stringBuilder = new StringBuilder("CREATE TABLE `")
+                .append(nameToInternalName(data.getName())).append("_timestamps")
+                .append("` (");
+
+        stringBuilder.append("`")
+                .append(nameToInternalName("database_id"))
+                .append("` VARCHAR(255) NOT NULL, ");
+        stringBuilder.append("`")
+                .append(nameToInternalName("record_id"))
+                .append("` VARCHAR(255) NOT NULL, ");
+        stringBuilder.append("`")
+                .append(nameToInternalName("timestamp_add"))
+                .append("` TIMESTAMP NOT NULL, ");
+        stringBuilder.append("`")
+                .append(nameToInternalName("timestamp_delete"))
+                .append("` TIMESTAMP NOT NULL");
+
+        stringBuilder.append(", PRIMARY KEY (")
+                .append("`").append(nameToInternalName("database_id")).append("`, ")
+                .append("`").append(nameToInternalName("record_id")).append("`, ")
+                .append("`").append(nameToInternalName("timestamp_add")).append("`, ")
+                .append("`").append(nameToInternalName("timestamp_delete")).append("`")
+                .append(")");
+
+        stringBuilder.append(") WITH SYSTEM VERSIONING")
+                .append(";");
+        log.trace("mapped create timestamp table statement: {}", stringBuilder);
+        return stringBuilder.toString();
+    }
+
+
     /**
      * Selects the row count from a table/view.
      *
@@ -617,6 +647,65 @@ public interface MariaDbMapper {
         return statement.toString();
     }
 
+    default String tupleToRawSelectSystemTimestampsQuery(TableDto table, TupleDto data) throws TableMalformedException {
+        if (table.getConstraints() == null ||
+                table.getConstraints().getPrimaryKey() == null ||
+                table.getConstraints().getPrimaryKey().isEmpty()) {
+            throw new TableMalformedException("Primary key is not defined for table: " + table.getInternalName());
+        }
+
+        final StringBuilder statement = new StringBuilder("SELECT ");
+
+        boolean first = true;
+        // Append primary key columns to the SELECT clause.
+        for (PrimaryKeyDto pk : table.getConstraints().getPrimaryKey()) {
+            if (!first) {
+                statement.append(", ");
+            }
+            first = false;
+            String pkColumn = pk.getColumn().getInternalName();
+            statement.append("`").append(pkColumn).append("`");
+        }
+
+        // Append the system versioning timestamp columns.
+        statement.append(", `ROW_START`, `ROW_END` ");
+
+        // FROM clause with the table's internal name.
+        statement.append("FROM `").append(table.getInternalName()).append("` ");
+
+        // Include system versioning clause.
+        statement.append("FOR SYSTEM_TIME ALL ");
+
+        // Build the WHERE clause by embedding actual primary key values.
+        first = true;
+        statement.append("WHERE ");
+        for (PrimaryKeyDto pk : table.getConstraints().getPrimaryKey()) {
+            if (!first) {
+                statement.append(" AND ");
+            }
+            first = false;
+            String pkColumn = pk.getColumn().getInternalName();
+            Object pkValue = data.getData().get(pkColumn);
+            if (pkValue == null) {
+                throw new TableMalformedException("Primary key value for column " + pkColumn + " is missing");
+            }
+            // If the primary key value is a String or Timestamp, wrap it in single quotes.
+            String valueStr;
+            if (pkValue instanceof String || pkValue instanceof Timestamp) {
+                valueStr = "'" + pkValue.toString().replace("'", "''") + "'";
+            } else {
+                valueStr = pkValue.toString();
+            }
+            statement.append("`").append(pkColumn).append("` = ").append(valueStr);
+        }
+        statement.append(";");
+
+        log.trace("mapped select system timestamps query: {}", statement);
+        return statement.toString();
+    }
+
+
+
     default void prepareStatementWithColumnTypeObject(PreparedStatement statement, ColumnTypeDto columnType, int idx,
                                                       String columnName, Object value) throws SQLException {
         switch (columnType) {
diff --git a/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceMariaDbImpl.java b/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceMariaDbImpl.java
index 4d899c99782f99813c3618dd4c051a2a2a0f2da8..9615f995e690af1c82af6796917ebad0552c70cd 100644
--- a/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceMariaDbImpl.java
+++ b/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/DatabaseServiceMariaDbImpl.java
@@ -103,6 +103,12 @@ public class DatabaseServiceMariaDbImpl extends DataConnector implements Databas
             connection.prepareStatement(mariaDbMapper.tableCreateDtoToCreateTableRawQuery(data))
                     .execute();
             log.trace("executed statement in {} ms", System.currentTimeMillis() - start);
+
+            /* create timestamp table */
+            final long start2 = System.currentTimeMillis();
+            connection.prepareStatement(mariaDbMapper.tableCreateDtoToCreateTimestampTableRawQuery(data))
+                    .execute();
+            log.trace("executed statement in {} ms", System.currentTimeMillis() - start2);
             connection.commit();
         } catch (SQLException e) {
             connection.rollback();
diff --git a/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/TableServiceMariaDbImpl.java b/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/TableServiceMariaDbImpl.java
index 466f7539fd250666962d08a75ffffd5aa61480fa..ecc8ac78609153aedb3cadfe99979d763019126b 100644
--- a/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/TableServiceMariaDbImpl.java
+++ b/dbrepo-data-service/services/src/main/java/at/tuwien/service/impl/TableServiceMariaDbImpl.java
@@ -5,6 +5,7 @@ import at.tuwien.api.database.table.*;
 import at.tuwien.api.database.table.columns.ColumnDto;
 import at.tuwien.api.database.table.columns.ColumnStatisticDto;
 import at.tuwien.api.database.table.columns.ColumnTypeDto;
+import at.tuwien.api.database.table.constraints.primary.PrimaryKeyDto;
 import at.tuwien.exception.*;
 import at.tuwien.mapper.DataMapper;
 import at.tuwien.mapper.MariaDbMapper;
@@ -21,10 +22,7 @@ import org.apache.spark.sql.SaveMode;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.sql.*;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
@@ -309,6 +307,32 @@ public class TableServiceMariaDbImpl extends DataConnector implements TableServi
             statement.executeUpdate();
             log.trace("executed statement in {} ms", System.currentTimeMillis() - start);
             connection.commit();
+            log.info(table.toString());
+            log.info(data.toString());
+
+            // Now, fetch the system timestamps using the primary key values.
+            String selectQuery = mariaDbMapper.tupleToRawSelectSystemTimestampsQuery(table, data);
+            PreparedStatement selectStmt = connection.prepareStatement(selectQuery);
+
+            ResultSet rs = selectStmt.executeQuery();
+            if (rs.next()) {
+                // Build a log message with primary key columns and the two timestamps.
+                StringBuilder rowInfo = new StringBuilder();
+                for (PrimaryKeyDto pk : table.getConstraints().getPrimaryKey()) {
+                    String pkColumn = pk.getColumn().getInternalName();
+                    Object value = rs.getObject(pkColumn);
+                    rowInfo.append(pkColumn).append("=").append(value).append(" ");
+                }
+                Timestamp tsAdd = rs.getTimestamp("ROW_START");
+                Timestamp tsDelete = rs.getTimestamp("ROW_END");
+                rowInfo.append("timestampAdd=").append(tsAdd).append(" ");
+                rowInfo.append("timestampDelete=").append(tsDelete);
+                log.info("Fetched system timestamps for tuple: {}", rowInfo.toString().trim());
+            } else {
+                log.warn("No tuple found for the given primary key values");
+            }
+            rs.close();
+            selectStmt.close();
         } catch (SQLException e) {
             connection.rollback();
             log.error("Failed to create tuple: {}", e.getMessage());