Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ limitations under the License.

<properties>
<elasticsearch.version>8.12.1</elasticsearch.version>
<flink.connector.elasticsearch.version>3.0.1-1.17</flink.connector.elasticsearch.version>
<flink.connector.elasticsearch.version>3.1.0-1.20</flink.connector.elasticsearch.version>
<httpclient.version>4.5.13</httpclient.version>
<jakarta.json.version>2.0.2</jakarta.json.version>
</properties>
Expand Down Expand Up @@ -195,4 +195,13 @@ limitations under the License.
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.connector.elasticsearch.version>4.0.0-2.0</flink.connector.elasticsearch.version>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.connectors.elasticsearch.serializer;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
Expand Down Expand Up @@ -251,9 +250,4 @@ private void checkIndex(int index, int size) {
throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
}
}

@Override
public void open(Sink.InitContext context) {
ElementConverter.super.open(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.api.connector.sink2.InitContextAdapter;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.AsyncSinkBaseAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

Expand All @@ -38,8 +40,7 @@
* @param <InputT> type of records that will be converted into {@link Operation}. See {@link
* Elasticsearch8AsyncSinkBuilder} on how to construct valid instances.
*/
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBaseAdapter<InputT, Operation> {

@VisibleForTesting protected final NetworkConfig networkConfig;

Expand Down Expand Up @@ -78,14 +79,13 @@ protected Elasticsearch8AsyncSink(
}

/**
* Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch.
* Creates a new {@link SinkWriter} for writing elements to Elasticsearch.
*
* @param context the initialization context.
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
InitContext context) {
public SinkWriter<InputT> createWriter(InitContext context) {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
context,
Expand All @@ -100,18 +100,19 @@ public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
}

/**
* Restores a {@link StatefulSinkWriter} from a previously saved state.
* Restores a {@link StatefulSinkWriterAdapter} from a previously saved state.
*
* @param context the initialization context.
* @param recoveredState the recovered state.
* @return a restored instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter(
InitContext context, Collection<BufferedRequestState<Operation>> recoveredState) {
public StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> restoreWriterAdapter(
WriterInitContext context, Collection<BufferedRequestState<Operation>> recoveredState)
throws IOException {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
context,
new InitContextAdapter(context),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand All @@ -41,9 +43,7 @@
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -53,7 +53,8 @@
*
* @param <InputT> type of Operations
*/
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, Operation> {
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriterAdapter<InputT, Operation>
implements StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class);

private final ElasticsearchAsyncClient esClient;
Expand Down Expand Up @@ -118,7 +119,7 @@ public Elasticsearch8AsyncWriter(

@Override
protected void submitRequestEntries(
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
List<Operation> requestEntries, ResultHandler<Operation> resultHandler) {
numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());

Expand All @@ -136,27 +137,27 @@ protected void submitRequestEntries(
LOG.debug(
"Skipping empty BulkRequest, all {} operation(s) have null BulkOperationVariant",
requestEntries.size());
requestResult.accept(Collections.emptyList());
resultHandler.complete();
return;
}

esClient.bulk(br.build())
.whenComplete(
(response, error) -> {
if (error != null) {
handleFailedRequest(requestEntries, requestResult, error);
handleFailedRequest(requestEntries, resultHandler, error);
} else if (response.errors()) {
handlePartiallyFailedRequest(
requestEntries, requestResult, response);
requestEntries, resultHandler, response);
} else {
handleSuccessfulRequest(requestResult, response);
handleSuccessfulRequest(resultHandler, response);
}
});
}

private void handleFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
ResultHandler<Operation> resultHandler,
Throwable error) {
LOG.warn(
"The BulkRequest of {} operation(s) has failed due to: {}",
Expand All @@ -165,14 +166,15 @@ private void handleFailedRequest(
LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());

if (isRetryable(error.getCause())) {
requestResult.accept(requestEntries);
Throwable retryableError = error.getCause() != null ? error.getCause() : error;
if (isRetryable(retryableError)) {
resultHandler.retryForEntries(requestEntries);
}
}

private void handlePartiallyFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
ResultHandler<Operation> resultHandler,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();
Expand All @@ -192,16 +194,16 @@ private void handlePartiallyFailedRequest(
requestEntries.size(),
failedItems.size(),
response.took());
requestResult.accept(failedItems);
resultHandler.retryForEntries(failedItems);
}

private void handleSuccessfulRequest(
Consumer<List<Operation>> requestResult, BulkResponse response) {
ResultHandler<Operation> resultHandler, BulkResponse response) {
LOG.debug(
"The BulkRequest of {} operation(s) completed successfully. It took {}ms",
response.items().size(),
response.took());
requestResult.accept(Collections.emptyList());
resultHandler.complete();
}

private boolean isRetryable(Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.api.connector.sink2;

/**
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public interface StatefulSinkWriterAdapter<InputT, WriterStateT>
extends StatefulSink.StatefulSinkWriter<InputT, WriterStateT> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.api.connector.sink2;

import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

import java.util.OptionalLong;

/**
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
* that allows Flink CDC to work across different Flink versions.
*/
public class WriterInitContextAdapter implements WriterInitContext {

private final Sink.InitContext context;

public WriterInitContextAdapter(Sink.InitContext context) {
this.context = context;
}

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return this.context.getUserCodeClassLoader();
}

@Override
public MailboxExecutor getMailboxExecutor() {
return this.context.getMailboxExecutor();
}

@Override
public ProcessingTimeService getProcessingTimeService() {
return this.context.getProcessingTimeService();
}

@Override
public SinkWriterMetricGroup metricGroup() {
return this.context.metricGroup();
}

@Override
public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
return this.context.asSerializationSchemaInitializationContext();
}

@Override
public boolean isObjectReuseEnabled() {
return this.context.isObjectReuseEnabled();
}

@Override
public <IN> TypeSerializer<IN> createInputSerializer() {
return this.context.createInputSerializer();
}

@Override
public OptionalLong getRestoredCheckpointId() {
return this.context.getRestoredCheckpointId();
}

@Override
public JobInfo getJobInfo() {
return this.context.getJobInfo();
}

@Override
public TaskInfo getTaskInfo() {
return this.context.getTaskInfo();
}
}
Loading
Loading