Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
Expand All @@ -34,6 +36,9 @@ public class JmhSqlJoinBenchmark extends JmhSqlAbstractBenchmark {
/** Count of entries in EMP table. */
protected static final int EMP_CNT = 10_000;

/** Counter for forcing query replanning in cold benchmark. */
private final AtomicInteger planRnd = new AtomicInteger();

/**
* Initiate new tables.
*/
Expand All @@ -59,14 +64,67 @@ public class JmhSqlJoinBenchmark extends JmhSqlAbstractBenchmark {
* Colocated distributed join.
*/
@Benchmark
public void colocatedDistributedJoin() {
public void colocatedDistributedJoin(Blackhole bh) {
int key = ThreadLocalRandom.current().nextInt(EMP_CNT / BATCH_SIZE);

List<List<?>> res = executeSql("SELECT emp.name, dept.name FROM emp JOIN dept ON emp.deptid = dept.deptid " +
"WHERE emp.salary = ?", key);

if (res.size() != BATCH_SIZE)
throw new AssertionError("Unexpected result size: " + res.size());
bh.consume(res);
}

/**
* LEFT JOIN with DISTINCT subquery - regression test for query splitter optimization.
* DISTINCT is applied to the smaller table (dept), then LEFT JOINed with the larger table (emp),
* and filtered by a condition on the right table.
*/
@Benchmark
public void leftJoinDistinctRegression(Blackhole bh) {
List<List<?>> res = executeSql(
"SELECT d.deptid, d.name, e.empid " +
"FROM (SELECT DISTINCT * FROM dept) d " +
"LEFT JOIN emp e ON d.deptid = e.deptid " +
"WHERE e.name = ?",
"Employee 5"
);

bh.consume(res);
}

/**
* LEFT JOIN without DISTINCT subquery - baseline for comparison.
*/
@Benchmark
public void leftJoinNoDistinctBaseline(Blackhole bh) {
List<List<?>> res = executeSql(
"SELECT e.empid, e.name, d.deptid " +
"FROM emp e " +
"LEFT JOIN dept d ON e.deptid = d.deptid " +
"WHERE d.name = ?",
"Department 5"
);

bh.consume(res);
}

/**
* LEFT JOIN with DISTINCT where query text is changed every run to bypass plan cache.
* This tests the cold path where query optimization and planning happens every time.
*/
@Benchmark
public void leftJoinDistinctRegressionCold(Blackhole bh) {
int rnd = planRnd.incrementAndGet();

List<List<?>> res = executeSql(
"/*rnd=" + rnd + "*/ " +
"SELECT d.deptid, d.name, e.empid " +
"FROM (SELECT DISTINCT * FROM dept) d " +
"LEFT JOIN emp e ON d.deptid = e.deptid " +
"WHERE e.name = ?",
"Employee 5"
);

bh.consume(res);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ private void doPushDownQueryModelRange(SplitterQueryModel model, int begin, int
pushDownSelectColumns(tblAliases, cols, wrapAlias, select);

// Move all the related WHERE conditions to wrap query.
pushDownWhereConditions(tblAliases, cols, wrapAlias, select);
pushDownWhereConditions(tblAliases, cols, wrapAlias, select, model, begin, end);

// Push down to a subquery all the JOIN elements and process ON conditions.
pushDownJoins(tblAliases, cols, model, begin, end, wrapAlias);
Expand Down Expand Up @@ -1048,6 +1048,46 @@ private static String uniquePushDownColumnAlias(String uniqueTblAlias, String co
return uniqueTblAlias + "__" + colName;
}

/**
* Return table aliases whose WHERE conditions are safe to push down.
* For LEFT OUTER JOIN, only aliases from the left branch are safe.
*/
private static Set<GridSqlAlias> tblAliasesToPushdownConditions(
SplitterQueryModel model,
int begin,
int end,
Set<GridSqlAlias> tblAliases
) {
int leftBranchEnd = -1;

for (int i = 1; i <= end; i++) {
if (model.findJoin(i).isLeftOuter()) {
leftBranchEnd = i - 1;
break;
}
}

if (leftBranchEnd == -1)
return tblAliases;

Set<GridSqlAlias> aliases = U.newIdentityHashSet();

int safeEnd = Math.min(end, leftBranchEnd);

if (begin > safeEnd)
return aliases;

for (int i = begin; i <= safeEnd; i++) {
GridSqlAlias uniqueTblAlias = model.childModel(i).uniqueAlias();

assert uniqueTblAlias != null : model.ast().getSQL();

aliases.add(uniqueTblAlias);
}

return aliases;
}

/**
* @param tblAliases Table aliases for push down.
* @param cols Columns with generated aliases.
Expand All @@ -1058,11 +1098,17 @@ private void pushDownWhereConditions(
Set<GridSqlAlias> tblAliases,
Map<String, GridSqlAlias> cols,
GridSqlAlias wrapAlias,
GridSqlSelect select
GridSqlSelect select,
SplitterQueryModel model,
int begin,
int end
) {
if (select.where() == null)
return;

Set<GridSqlAlias> tblAliasesToPushdownConditions =
tblAliasesToPushdownConditions(model, begin, end, tblAliases);

GridSqlSelect wrapSelect = GridSqlAlias.<GridSqlSubquery>unwrap(wrapAlias).subquery();

List<SplitterAndCondition> andConditions = new ArrayList<>();
Expand All @@ -1073,7 +1119,7 @@ private void pushDownWhereConditions(
SplitterAndCondition c = andConditions.get(i);
GridSqlAst condition = c.ast();

if (isAllRelatedToTables(tblAliases, U.newIdentityHashSet(), condition)) {
if (isAllRelatedToTables(tblAliasesToPushdownConditions, U.newIdentityHashSet(), condition)) {
if (!SplitterUtils.isTrue(condition)) {
// Replace the original condition with `true` and move it to the wrap query.
c.parent().child(c.childIndex(), TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,66 @@ public void testAvgVariousDataTypes() throws Exception {
}
}

/**
* Verifies LEFT JOIN behavior when the left side is a subquery with DISTINCT and
* the result is filtered by a condition on the right table.
* The test checks that no rows are returned when there is no matching department name.
*
* @throws Exception If failed.
*/
@Test
public void testLeftJoinWithSubquery() throws Exception {
IgniteCache<?, ?> cache = ignite(0).getOrCreateCache(
cacheConfig("psLeftJoinSubquery", true, Integer.class, Person.class)
);

String personTbl = "PERSON_LEFT_JOIN_SUBQUERY";
String depTbl = "DEPARTMENT_LEFT_JOIN_SUBQUERY";

try {
cache.query(new SqlFieldsQuery(
"CREATE TABLE " + personTbl + " (" +
"id INT PRIMARY KEY, " +
"depId INT, " +
"name VARCHAR" +
')'
)).getAll();

cache.query(new SqlFieldsQuery(
"CREATE TABLE " + depTbl + " (" +
"id INT PRIMARY KEY, " +
"name VARCHAR" +
')'
)).getAll();

cache.query(new SqlFieldsQuery(
"INSERT INTO " + personTbl + "(id, depId, name) VALUES (1, 1, 'Emma')"
)).getAll();

cache.query(new SqlFieldsQuery(
"INSERT INTO " + depTbl + "(id, name) VALUES (2, 'TX')"
)).getAll();

List<List<?>> res = cache.query(new SqlFieldsQuery(
"SELECT p.id AS person_id, p.name AS person_name, o.id AS department_id " +
"FROM (SELECT DISTINCT * FROM " + personTbl + ") p " +
"LEFT JOIN " + depTbl + " o ON p.depId = o.id " +
"WHERE o.name = 'SQL'"
)).getAll();

assertTrue(res.isEmpty());
}
finally {
try {
cache.query(new SqlFieldsQuery("DROP TABLE IF EXISTS " + personTbl)).getAll();
cache.query(new SqlFieldsQuery("DROP TABLE IF EXISTS " + depTbl)).getAll();
}
finally {
cache.destroy();
}
}
}

/**
* Check avg() with various data types.
*
Expand Down
Loading