diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index a86a07c8f866..d1c6be7b7bac 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -19,62 +19,60 @@ package org.apache.druid.client; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy; -import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; -import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.data.input.ResourceInputSource; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.HttpResponseHandler; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; -import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Druids; -import org.apache.druid.query.Query; +import org.apache.druid.query.NestedDataTestUtils; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryInterruptedException; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.Result; -import org.apache.druid.query.timeboundary.TimeBoundaryQuery; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; -import org.apache.druid.server.coordination.TestCoordinatorClient; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NoneShardSpec; -import org.easymock.Capture; -import org.easymock.EasyMock; +import org.apache.druid.timeline.SegmentId; import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.timeout.ReadTimeoutException; -import org.joda.time.Duration; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.net.MalformedURLException; import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; +import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; public class DirectDruidClientTest @@ -82,66 +80,32 @@ public class DirectDruidClientTest @ClassRule public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new QueryStackTests.Junit4ConglomerateRule(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final String hostName = "localhost:8080"; + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + private final ResponseContext responseContext = ResponseContext.createEmpty(); - private final DataSegment dataSegment = new DataSegment( - "test", - Intervals.of("2013-01-01/2013-01-02"), - DateTimes.of("2013-01-01").toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - 0, - 0L - ); - private ServerSelector serverSelector; - - private HttpClient httpClient; - private DirectDruidClient client; - private QueryableDruidServer queryableDruidServer; - private ScheduledExecutorService queryCancellationExecutor; + private WrappingScheduledExecutorService queryCancellationExecutor; + private BlockingExecutorService blockingExecutorService; @Before public void setup() { - final BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); - filter.start(); - httpClient = EasyMock.createMock(HttpClient.class); - serverSelector = new ServerSelector( - dataSegment, - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), - filter + responseContext.initialize(); + blockingExecutorService = new BlockingExecutorService("test-druid-client-cancel-executor"); + queryCancellationExecutor = new WrappingScheduledExecutorService( + "DirectDruidClientTest-%s", + blockingExecutorService, + false ); - queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); - client = new DirectDruidClient( - conglomerateRule.getConglomerate(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - hostName, - new NoopServiceEmitter(), - queryCancellationExecutor - ); - queryableDruidServer = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer, serverSelector.getSegment()); } @After public void teardown() throws InterruptedException { + blockingExecutorService.shutdownNow(); queryCancellationExecutor.shutdown(); queryCancellationExecutor.awaitTermination(1, TimeUnit.SECONDS); } @@ -151,86 +115,39 @@ public void testRun() throws Exception { final URL url = new URL(StringUtils.format("http://%s/druid/v2/", hostName)); - SettableFuture futureResult = SettableFuture.create(); - Capture capturedRequest = EasyMock.newCapture(); - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(futureResult) - .times(1); - - SettableFuture futureException = SettableFuture.create(); - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(futureException) - .times(1); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(SettableFuture.create()) - .atLeastOnce(); + QueuedTestHttpClient queuedHttpClient = new QueuedTestHttpClient(); + DirectDruidClient client1 = makeDirectDruidClient(queuedHttpClient); - EasyMock.replay(httpClient); + DirectDruidClient client2 = makeDirectDruidClient(queuedHttpClient); - DirectDruidClient client2 = new DirectDruidClient( - conglomerateRule.getConglomerate(), - QueryRunnerTestHelper.NOOP_QUERYWATCHER, - new DefaultObjectMapper(), - httpClient, - "http", - "foo2", - new NoopServiceEmitter(), - queryCancellationExecutor - ); - - QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client2 - ); - serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - Sequence s1 = client.run(QueryPlus.wrap(query)); - Assert.assertTrue(capturedRequest.hasCaptured()); - Assert.assertEquals(url, capturedRequest.getValue().getUrl()); - Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); - Assert.assertEquals(1, client.getNumOpenConnections()); - - // simulate read timeout - client.run(QueryPlus.wrap(query)); - Assert.assertEquals(2, client.getNumOpenConnections()); + // Queue first call: pending until we provide a result + SettableFuture futureResult = SettableFuture.create(); + queuedHttpClient.enqueue(futureResult); + // Queue second call: will fail with ReadTimeoutException + SettableFuture futureException = SettableFuture.create(); + queuedHttpClient.enqueue(futureException); + // Subsequent calls: no enqueue → default pending futures created in client + + QueryPlus queryPlus = getQueryPlus(); + + Sequence s1 = client1.run(queryPlus, responseContext); + List requests = queuedHttpClient.getRequests(); + Assert.assertFalse(requests.isEmpty()); + Assert.assertEquals(url, requests.get(0).getUrl()); + Assert.assertEquals(HttpMethod.POST, requests.get(0).getMethod()); + Assert.assertEquals(1, client1.getNumOpenConnections()); + + // simulate read timeout on second request + client1.run(queryPlus, responseContext); + Assert.assertEquals(2, client1.getNumOpenConnections()); futureException.setException(new ReadTimeoutException()); - Assert.assertEquals(1, client.getNumOpenConnections()); + Assert.assertEquals(1, client1.getNumOpenConnections()); - // subsequent connections should work - client.run(QueryPlus.wrap(query)); - client.run(QueryPlus.wrap(query)); - client.run(QueryPlus.wrap(query)); - - Assert.assertTrue(client.getNumOpenConnections() == 4); + // subsequent connections should work (and remain open) + client1.run(queryPlus, responseContext); + client1.run(queryPlus, responseContext); + client1.run(queryPlus, responseContext); + Assert.assertEquals(4, client1.getNumOpenConnections()); // produce result for first connection futureResult.set( @@ -241,244 +158,267 @@ public void testRun() throws Exception List results = s1.toList(); Assert.assertEquals(1, results.size()); Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), results.get(0).getTimestamp()); - Assert.assertEquals(3, client.getNumOpenConnections()); - - client2.run(QueryPlus.wrap(query)); - client2.run(QueryPlus.wrap(query)); + Assert.assertEquals(3, client1.getNumOpenConnections()); + client2.run(queryPlus, responseContext); + client2.run(queryPlus, responseContext); Assert.assertEquals(2, client2.getNumOpenConnections()); - - Assert.assertEquals(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES), queryableDruidServer2); - - EasyMock.verify(httpClient); } @Test - public void testCancel() + public void testCancel() throws MalformedURLException { - Capture capturedRequest = EasyMock.newCapture(); - ListenableFuture cancelledFuture = Futures.immediateCancelledFuture(); - SettableFuture cancellationFuture = SettableFuture.create(); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(cancelledFuture) - .once(); - - EasyMock.expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(cancellationFuture) - .anyTimes(); - - EasyMock.replay(httpClient); + QueryPlus queryPlus = getQueryPlus(); + TestHttpClient testHttpClient = new TestHttpClient(objectMapper, Futures.immediateCancelledFuture()); + // add a generic server and a cancel query URL + QueryableIndex index = makeQueryableIndex(); + TestHttpClient.SimpleServerManager simpleServerManager = new TestHttpClient.SimpleServerManager( + conglomerateRule.getConglomerate(), DataSegment.builder(SegmentId.dummy("test")).build(), index, false + ); + testHttpClient.addServerAndRunner( + new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), + simpleServerManager + ); + testHttpClient.addUrlAndRunner( + new URL(StringUtils.format("http://%s/druid/v2/%s", hostName, queryPlus.getQuery().getId())), + simpleServerManager + ); + DirectDruidClient client = makeDirectDruidClient(testHttpClient); + Sequence results = client.run(queryPlus, responseContext); - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); - cancellationFuture.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled"))); - Sequence results = client.run(QueryPlus.wrap(query)); - Assert.assertEquals(HttpMethod.POST, capturedRequest.getValue().getMethod()); Assert.assertEquals(0, client.getNumOpenConnections()); + QueryInterruptedException actualException = + Assert.assertThrows(QueryInterruptedException.class, () -> results.toList()); + Assert.assertEquals(hostName, actualException.getHost()); + Assert.assertEquals("Query cancelled", actualException.getErrorCode()); + Assert.assertEquals("Task was cancelled.", actualException.getCause().getMessage()); + Assert.assertTrue(blockingExecutorService.hasPendingTasks()); + blockingExecutorService.finishNextPendingTask(); + Assert.assertTrue(blockingExecutorService.hasPendingTasks()); + ISE observedException = Assert.assertThrows(ISE.class, () -> blockingExecutorService.finishNextPendingTask()); + Assert.assertTrue(observedException.getCause() instanceof CancellationException); - QueryInterruptedException exception = null; - try { - results.toList(); - } - catch (QueryInterruptedException e) { - exception = e; - } - Assert.assertNotNull(exception); - - EasyMock.verify(httpClient); } @Test public void testQueryInterruptionExceptionLogMessage() { SettableFuture interruptionFuture = SettableFuture.create(); - Capture capturedRequest = EasyMock.newCapture(); - final String hostName = "localhost:8080"; - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(interruptionFuture) - .anyTimes(); - - EasyMock.replay(httpClient); - - // test error - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); interruptionFuture.set( new ByteArrayInputStream( StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}") ) ); - Sequence results = client.run(QueryPlus.wrap(query)); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(interruptionFuture)); - QueryInterruptedException actualException = null; - try { - results.toList(); - } - catch (QueryInterruptedException e) { - actualException = e; - } - Assert.assertNotNull(actualException); + interruptionFuture.set( + new ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")) + ); + Sequence results = client.run(getQueryPlus(), responseContext); + + QueryInterruptedException actualException = + Assert.assertThrows(QueryInterruptedException.class, () -> results.toList()); Assert.assertEquals("testing1", actualException.getErrorCode()); Assert.assertEquals("testing2", actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); } @Test - public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException + public void testQueryTimeoutBeforeFuture() throws IOException { SettableFuture timeoutFuture = SettableFuture.create(); - Capture capturedRequest = EasyMock.newCapture(); - final String queryId = "timeout-before-future"; - - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(timeoutFuture) - .anyTimes(); - - EasyMock.replay(httpClient); - - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext( - ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250, "queryId", queryId) - ); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture)); - Sequence results = client.run(QueryPlus.wrap(query)); + QueryPlus queryPlus = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 250)); + Sequence results = client.run(queryPlus, responseContext); - // incomplete result set + // Incomplete result set delivered via a pipe to simulate slow stream PipedInputStream in = new PipedInputStream(); final PipedOutputStream out = new PipedOutputStream(in); - timeoutFuture.set( - in + timeoutFuture.set(in); + + QueryTimeoutException actualException = Assert.assertThrows( + QueryTimeoutException.class, + () -> { + out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); + Thread.sleep(250); + out.write(StringUtils.toUtf8("]")); + out.close(); + results.toList(); + } ); - - QueryTimeoutException actualException = null; - try { - out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}")); - Thread.sleep(250); - out.write(StringUtils.toUtf8("]")); - out.close(); - results.toList(); - } - catch (QueryTimeoutException e) { - actualException = e; - } - Assert.assertNotNull(actualException); Assert.assertEquals("Query timeout", actualException.getErrorCode()); - Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", actualException.getMessage()); + Assert.assertEquals(StringUtils.format("url[http://%s/druid/v2/] timed out", hostName), actualException.getMessage()); Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); } @Test public void testQueryTimeoutFromFuture() { - SettableFuture noFuture = SettableFuture.create(); - Capture capturedRequest = EasyMock.newCapture(); - final String queryId = "never-ending-future"; - - EasyMock - .expect( - httpClient.go( - EasyMock.capture(capturedRequest), - EasyMock.anyObject(), - EasyMock.anyObject(Duration.class) - ) - ) - .andReturn(noFuture) - .anyTimes(); + final SettableFuture timeoutFuture = SettableFuture.create(); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture)); - EasyMock.replay(httpClient); + QueryPlus query = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500)); + Sequence results = client.run(query, responseContext); + QueryTimeoutException actualException = Assert.assertThrows(QueryTimeoutException.class, results::toList); + Assert.assertEquals("Query timeout", actualException.getErrorCode()); + Assert.assertEquals(StringUtils.format("Query [%s] timed out!", query.getQuery().getId()), actualException.getMessage()); + Assert.assertEquals(hostName, actualException.getHost()); + } + + @Test + public void testQueryTimeoutDuringRunThrowsExceptionImmediately() + { + SettableFuture timeoutFuture = SettableFuture.create(); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(timeoutFuture)); - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext( - ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 500, "queryId", queryId) + QueryPlus queryPlus = getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis())); + QueryTimeoutException actualException = Assert.assertThrows( + QueryTimeoutException.class, + () -> client.run(queryPlus, responseContext) + ); + Assert.assertEquals("Query timeout", actualException.getErrorCode()); + Assert.assertEquals( + StringUtils.format( + "Query[%s] url[http://%s/druid/v2/] timed out.", + queryPlus.getQuery().getId(), + hostName + ), actualException.getMessage() ); + } + + @Test + public void testQueryTimeoutDuringResponseHandling() + { + final TestHttpClient testHttpClient = new TestHttpClient(objectMapper, 110); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient(testHttpClient, false)); - Sequence results = client.run(QueryPlus.wrap(query)); + final QueryPlus queryPlus = getQueryPlus(Map.of( + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100, + DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 100 + )); - QueryTimeoutException actualException = null; - try { - results.toList(); - } - catch (QueryTimeoutException e) { - actualException = e; - } - Assert.assertNotNull(actualException); + QueryTimeoutException actualException = Assert.assertThrows( + QueryTimeoutException.class, + () -> client.run(queryPlus, responseContext) + ); Assert.assertEquals("Query timeout", actualException.getErrorCode()); - Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), actualException.getMessage()); - Assert.assertEquals(hostName, actualException.getHost()); - EasyMock.verify(httpClient); + Assert.assertEquals( + StringUtils.format("Query[%s] url[http://%s/druid/v2/] timed out.", + queryPlus.getQuery().getId(), + hostName + ), actualException.getMessage() + ); } @Test - public void testConnectionCountAfterException() throws JsonProcessingException + public void testConnectionCountAfterException() { - ObjectMapper mockObjectMapper = EasyMock.createMock(ObjectMapper.class); - EasyMock.expect(mockObjectMapper.writeValueAsBytes(Query.class)) - .andThrow(new JsonProcessingException("Error") - { - }); + final DirectDruidClient client = makeDirectDruidClient(initHttpClientFromExistingClient()); + + Assert.assertThrows(RuntimeException.class, () -> client.run(getQueryPlus(), responseContext)); + Assert.assertEquals(0, client.getNumOpenConnections()); + } + + @Test + public void testResourceLimitExceededException() + { + final DirectDruidClient client = makeDirectDruidClient(initHttpClientWithSuccessfulQuery()); + + final QueryPlus queryPlus = getQueryPlus(Map.of( + QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 100, + DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE + )); - DirectDruidClient client2 = new DirectDruidClient( + ResourceLimitExceededException actualException = Assert.assertThrows( + ResourceLimitExceededException.class, + () -> client.run(queryPlus, responseContext) + ); + + Assert.assertEquals( + StringUtils.format( + "Query[%s] url[http://localhost:8080/druid/v2/] total bytes gathered[127] exceeds maxScatterGatherBytes[100]", + queryPlus.getQuery().getId() + ), + actualException.getMessage()); + } + + private DirectDruidClient makeDirectDruidClient(HttpClient httpClient) + { + return new DirectDruidClient( conglomerateRule.getConglomerate(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, - mockObjectMapper, + objectMapper, httpClient, "http", hostName, new NoopServiceEmitter(), queryCancellationExecutor ); + } - QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer( - new DruidServer( - "test1", - "localhost", - null, - 0, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - 0 - ), - client2 + private HttpClient initHttpClientFromExistingClient() + { + return initHttpClientFromExistingClient(new TestHttpClient(objectMapper), true); + } + + private HttpClient initHttpClientWithSuccessfulQuery() + { + return initHttpClientFromExistingClient(new TestHttpClient(objectMapper), false); + } + + private HttpClient initHttpClientFromExistingClient(ListenableFuture future) + { + return initHttpClientFromExistingClient(new TestHttpClient(objectMapper, future), false); + } + + private HttpClient initHttpClientFromExistingClient(TestHttpClient httpClient, boolean throwQueryError) + { + final QueryableIndex index = makeQueryableIndex(); + httpClient.addServerAndRunner( + new DruidServer("test1", hostName, null, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), + new TestHttpClient.SimpleServerManager( + conglomerateRule.getConglomerate(), DataSegment.builder(SegmentId.dummy("test")).build(), index, throwQueryError + ) ); + return httpClient; + } - serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment()); + private QueryableIndex makeQueryableIndex() + { + try { + return IndexBuilder.create() + .tmpDir(temporaryFolder.newFolder()) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) + .build() + ) + .inputSource( + ResourceInputSource.of( + NestedDataTestUtils.class.getClassLoader(), + NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE + ) + ) + .inputFormat(TestIndex.DEFAULT_JSON_INPUT_FORMAT) + .inputTmpDir(temporaryFolder.newFolder()) + .buildMMappedIndex(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } - TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); - query = query.withOverriddenContext(ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + private static QueryPlus getQueryPlus() + { + return getQueryPlus(Map.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE)); + } - TimeBoundaryQuery finalQuery = query; - Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap(finalQuery))); - Assert.assertEquals(0, client2.getNumOpenConnections()); + private static QueryPlus getQueryPlus(Map context) + { + return QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(context).randomQueryId().build()); } } diff --git a/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java new file mode 100644 index 000000000000..67f392bd4d93 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/QueuedTestHttpClient.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.joda.time.Duration; + +import java.io.InputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; + +/** + * A test {@link HttpClient} that captures {@link Request}s and serves preloaded responses sequentially. If no future + * has been {@link #enqueue(ListenableFuture) enqueued} when {@link #go(Request, HttpResponseHandler, Duration)} is + * invoked, the client returns a pending future that never completes. + * + *

This is useful for tests that need deterministic control over request ordering and response timing.

+ */ +public class QueuedTestHttpClient implements HttpClient +{ + private final List captured = new ArrayList<>(); + private final Deque> queue = new ArrayDeque<>(); + + public void enqueue(ListenableFuture f) + { + queue.addLast(f); + } + + public List getRequests() + { + return captured; + } + + @Override + @SuppressWarnings("unchecked") + public ListenableFuture go( + Request request, + HttpResponseHandler handler, + Duration readTimeout + ) + { + captured.add(request); + ListenableFuture f = queue.pollFirst(); + if (f == null) { + f = SettableFuture.create(); // pending forever + } + return (ListenableFuture) f; + } + + @Override + public ListenableFuture go(Request request, HttpResponseHandler handler) + { + throw new UnsupportedOperationException("Use go(Request, HttpResponseHandler, Duration) instead)"); + } +} + diff --git a/server/src/test/java/org/apache/druid/client/TestHttpClient.java b/server/src/test/java/org/apache/druid/client/TestHttpClient.java index b979f39799a4..ef4da264aadb 100644 --- a/server/src/test/java/org/apache/druid/client/TestHttpClient.java +++ b/server/src/test/java/org/apache/druid/client/TestHttpClient.java @@ -67,10 +67,29 @@ public class TestHttpClient implements HttpClient private final Map servers = new HashMap<>(); private final ObjectMapper objectMapper; + @Nullable + private final ListenableFuture future; + private final long responseDelayMillis; public TestHttpClient(ObjectMapper objectMapper) { this.objectMapper = objectMapper; + this.future = null; + this.responseDelayMillis = -1; + } + + public TestHttpClient(ObjectMapper objectMapper, ListenableFuture future) + { + this.objectMapper = objectMapper; + this.future = future; + this.responseDelayMillis = -1; + } + + public TestHttpClient(ObjectMapper objectMapper, long responseDelayMillis) + { + this.objectMapper = objectMapper; + this.future = null; + this.responseDelayMillis = responseDelayMillis; } public void addServerAndRunner(DruidServer server, SimpleServerManager serverManager) @@ -78,6 +97,11 @@ public void addServerAndRunner(DruidServer server, SimpleServerManager serverMan servers.put(computeUrl(server), serverManager); } + public void addUrlAndRunner(URL queryId, SimpleServerManager serverManager) + { + servers.put(queryId, serverManager); + } + @Nullable public SimpleServerManager getServerManager(DruidServer server) { @@ -137,13 +161,23 @@ public ListenableFuture go( response.setContent( HeapChannelBufferFactory.getInstance().getBuffer(serializedContent, 0, serializedContent.length) ); + if (responseDelayMillis > 0) { + Thread.sleep(responseDelayMillis); + } final ClientResponse intermClientResponse = handler.handleResponse(response, NOOP_TRAFFIC_COP); final ClientResponse finalClientResponse = handler.done(intermClientResponse); - return Futures.immediateFuture(finalClientResponse.getObj()); + if (future != null) { + return future; + } else { + return Futures.immediateFuture(finalClientResponse.getObj()); + } } catch (IOException e) { throw new RuntimeException(e); } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java index 89fa07c41550..65fd37be2cc7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java @@ -248,7 +248,7 @@ private void executeNow() future.complete(result); } catch (Exception e) { - throw new ISE("Error while executing task", e); + throw new ISE(e, "Error[%s] while executing task", e.getMessage()); } } }