Skip to content

Commit e1afa05

Browse files
authored
[feat] PIP-454: Metadata Store Migration Framework (implementation) (#25219)
1 parent d992dc5 commit e1afa05

38 files changed

Lines changed: 2657 additions & 45 deletions

File tree

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,8 +498,8 @@ The Apache Software License, Version 2.0
498498
* Prometheus
499499
- io.prometheus-simpleclient_httpserver-0.16.0.jar
500500
* Oxia
501-
- io.github.oxia-db-oxia-client-api-0.7.2.jar
502-
- io.github.oxia-db-oxia-client-0.7.2.jar
501+
- io.github.oxia-db-oxia-client-api-0.7.4.jar
502+
- io.github.oxia-db-oxia-client-0.7.4.jar
503503
* OpenHFT
504504
- net.openhft-zero-allocation-hashing-0.16.jar
505505
* Java JSON WebTokens

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ flexible messaging model and an intuitive client API.</description>
298298
<apache-http-client.version>4.5.13</apache-http-client.version>
299299
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
300300
<jetcd.version>0.7.7</jetcd.version>
301-
<oxia.version>0.7.2</oxia.version>
301+
<oxia.version>0.7.4</oxia.version>
302302
<snakeyaml.version>2.0</snakeyaml.version>
303303
<ant.version>1.10.12</ant.version>
304304
<seancfoley.ipaddress.version>5.5.0</seancfoley.ipaddress.version>

pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
5454
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver;
5555
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
56+
import org.apache.pulsar.metadata.impl.DualMetadataStore;
5657
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
5758
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
5859
import org.slf4j.Logger;
@@ -315,7 +316,7 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe
315316
}
316317
}
317318

318-
if (localStore instanceof ZKMetadataStore && configStore instanceof ZKMetadataStore) {
319+
if (localStore instanceof DualMetadataStore && configStore instanceof DualMetadataStore) {
319320
String uriStr;
320321
if (arguments.existingBkMetadataServiceUri != null) {
321322
uriStr = arguments.existingBkMetadataServiceUri;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.admin.impl;
20+
21+
import io.swagger.annotations.ApiOperation;
22+
import io.swagger.annotations.ApiParam;
23+
import io.swagger.annotations.ApiResponse;
24+
import io.swagger.annotations.ApiResponses;
25+
import javax.ws.rs.GET;
26+
import javax.ws.rs.POST;
27+
import javax.ws.rs.Path;
28+
import javax.ws.rs.QueryParam;
29+
import javax.ws.rs.core.Response;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.pulsar.broker.admin.AdminResource;
32+
import org.apache.pulsar.broker.web.RestException;
33+
import org.apache.pulsar.common.migration.MigrationState;
34+
import org.apache.pulsar.common.util.ObjectMapperFactory;
35+
import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
36+
import org.apache.pulsar.metadata.impl.DualMetadataStore;
37+
38+
/**
39+
* Admin resource for metadata store migration operations.
40+
*/
41+
@Slf4j
42+
public class MetadataMigrationBase extends AdminResource {
43+
44+
@GET
45+
@Path("/status")
46+
@ApiOperation(value = "Get current migration status", response = MigrationState.class)
47+
@ApiResponses(value = {
48+
@ApiResponse(code = 200, message = "Migration status retrieved successfully"),
49+
@ApiResponse(code = 500, message = "Internal server error")
50+
})
51+
public MigrationState getStatus() {
52+
validateSuperUserAccess();
53+
54+
try {
55+
var ogr = pulsar().getLocalMetadataStore().get(MigrationState.MIGRATION_FLAG_PATH).get();
56+
if (ogr.isPresent()) {
57+
return ObjectMapperFactory.getMapper().reader().readValue(ogr.get().getValue(), MigrationState.class);
58+
} else {
59+
return MigrationState.NOT_STARTED;
60+
}
61+
} catch (Exception e) {
62+
log.error("Failed to get migration status", e);
63+
throw new RestException(e);
64+
}
65+
}
66+
67+
@POST
68+
@Path("/start")
69+
@ApiOperation(value = "Start metadata store migration")
70+
@ApiResponses(value = {
71+
@ApiResponse(code = 204, message = "Migration started successfully"),
72+
@ApiResponse(code = 400, message = "Invalid target URL"),
73+
@ApiResponse(code = 409, message = "Migration already in progress"),
74+
@ApiResponse(code = 500, message = "Internal server error")
75+
})
76+
public void startMigration(
77+
@ApiParam(value = "Target metadata store URL", required = true)
78+
@QueryParam("target")
79+
String targetUrl) {
80+
validateSuperUserAccess();
81+
82+
if (targetUrl == null || targetUrl.trim().isEmpty()) {
83+
throw new RestException(Response.Status.BAD_REQUEST, "Target URL is required");
84+
}
85+
86+
try {
87+
// Check if metadata store is wrapped with DualMetadataStore
88+
if (!(pulsar().getLocalMetadataStore() instanceof DualMetadataStore)) {
89+
throw new RestException(Response.Status.BAD_REQUEST, "Metadata store is not configured for migration. "
90+
+ "Please ensure you're using a supported source metadata store (e.g., ZooKeeper).");
91+
}
92+
93+
// Create coordinator
94+
MigrationCoordinator coordinator = new MigrationCoordinator(pulsar().getLocalMetadataStore(), targetUrl);
95+
96+
// Start migration in background thread
97+
pulsar().getExecutor().submit(() -> {
98+
try {
99+
log.info("Starting metadata migration to: {}", targetUrl);
100+
coordinator.startMigration();
101+
log.info("Metadata migration completed successfully");
102+
} catch (Exception e) {
103+
log.error("Metadata migration failed", e);
104+
}
105+
});
106+
107+
log.info("Migration initiated to target: {}", targetUrl);
108+
109+
} catch (RestException e) {
110+
throw e;
111+
} catch (Exception e) {
112+
log.error("Failed to start migration", e);
113+
throw new RestException(e);
114+
}
115+
}
116+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.admin.v2;
20+
21+
import io.swagger.annotations.Api;
22+
import javax.ws.rs.Path;
23+
import javax.ws.rs.Produces;
24+
import javax.ws.rs.core.MediaType;
25+
import org.apache.pulsar.broker.admin.impl.MetadataMigrationBase;
26+
27+
/**
28+
* REST API for metadata store migration operations.
29+
*/
30+
@Path("/metadata/migration")
31+
@Api(value = "/metadata/migration", description = "Metadata store migration admin APIs", tags = "metadata-migration")
32+
@Produces(MediaType.APPLICATION_JSON)
33+
public class MetadataMigration extends MetadataMigrationBase {
34+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateMetadataStoreTableViewImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.pulsar.common.util.FutureUtil;
3737
import org.apache.pulsar.metadata.api.MetadataStoreException;
3838
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
39-
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
39+
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
4040
import org.apache.pulsar.metadata.tableview.impl.MetadataStoreTableViewImpl;
4141

4242
@Slf4j
@@ -65,7 +65,7 @@ public void start(PulsarService pulsar,
6565
init(pulsar);
6666
conflictResolver = new ServiceUnitStateDataConflictResolver();
6767
conflictResolver.setStorageType(MetadataStore);
68-
if (!(pulsar.getLocalMetadataStore() instanceof AbstractMetadataStore)
68+
if (!(pulsar.getLocalMetadataStore() instanceof MetadataStoreExtended)
6969
&& !MetadataSessionExpiredPolicy.shutdown.equals(pulsar.getConfig().getZookeeperSessionExpiredPolicy())) {
7070
String errorMsg = String.format("Your current metadata store [%s] does not support the registration of "
7171
+ "session event listeners. Please set \"zookeeperSessionExpiredPolicy\" to \"shutdown\";"

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceChaosTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
2929
import org.apache.pulsar.common.policies.data.TopicType;
3030
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
31+
import org.apache.pulsar.metadata.impl.DualMetadataStore;
3132
import org.apache.zookeeper.CreateMode;
3233
import org.apache.zookeeper.ZooDefs;
3334
import org.apache.zookeeper.ZooKeeper;
@@ -55,7 +56,9 @@ public void cleanup() throws Exception {
5556
@Test
5657
public void testFetchPartitionedTopicMetadataWithCacheRefresh() throws Exception {
5758
final String configMetadataStoreConnectString =
58-
WhiteboxImpl.getInternalState(pulsar.getConfigurationMetadataStore(), "zkConnectString");
59+
WhiteboxImpl.getInternalState(
60+
((DualMetadataStore) pulsar.getConfigurationMetadataStore()).getSourceStore(),
61+
"zkConnectString");
5962
@Cleanup
6063
final ZooKeeper anotherZKCli = new ZooKeeper(configMetadataStoreConnectString, 5000, null);
6164
// Set policy of auto create topic to PARTITIONED.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import org.apache.pulsar.common.policies.data.ClusterData;
3636
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
3737
import org.apache.pulsar.common.policies.data.TopicType;
38+
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
3839
import org.apache.pulsar.metadata.api.extended.SessionEvent;
40+
import org.apache.pulsar.metadata.impl.DualMetadataStore;
3941
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
4042
import org.apache.pulsar.tests.TestRetrySupport;
4143
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -88,9 +90,13 @@ protected void startBrokers() throws Exception {
8890
pulsar = new PulsarService(config);
8991
pulsar.start();
9092
broker = pulsar.getBrokerService();
91-
ZKMetadataStore zkMetadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore();
92-
localZkOfBroker = zkMetadataStore.getZkClient();
93-
zkMetadataStore.registerSessionListener(n -> {
93+
MetadataStoreExtended store = pulsar.getLocalMetadataStore();
94+
if (store instanceof DualMetadataStore dms) {
95+
localZkOfBroker = ((ZKMetadataStore) dms.getSourceStore()).getZkClient();
96+
} else if (store instanceof ZKMetadataStore zkStore) {
97+
localZkOfBroker = zkStore.getZkClient();
98+
}
99+
store.registerSessionListener(n -> {
94100
log.info("Received session event: {}", n);
95101
sessionEvent = n;
96102
});

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
import org.apache.pulsar.common.schema.SchemaInfo;
117117
import org.apache.pulsar.common.schema.SchemaType;
118118
import org.apache.pulsar.common.util.FutureUtil;
119+
import org.apache.pulsar.metadata.impl.DualMetadataStore;
119120
import org.awaitility.Awaitility;
120121
import org.awaitility.reflect.WhiteboxImpl;
121122
import org.glassfish.jersey.client.JerseyClient;
@@ -1547,8 +1548,9 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception {
15471548
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
15481549

15491550
// We inject an error to make "start replicator" to fail.
1551+
DualMetadataStore dms = (DualMetadataStore) pulsar1.getConfigurationMetadataStore();
15501552
AsyncLoadingCache<String, Boolean> existsCache =
1551-
WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(), "existsCache");
1553+
WhiteboxImpl.getInternalState(dms.getSourceStore(), "existsCache");
15521554
String path = "/admin/partitioned-topics/" + TopicName.get(topicName).getPersistenceNamingEncoding();
15531555
existsCache.put(path, CompletableFuture.completedFuture(true));
15541556

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.admin;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import org.apache.pulsar.common.migration.MigrationState;
23+
24+
/**
25+
* Handle cluster metadata migrations.
26+
*/
27+
public interface MetadataMigration {
28+
29+
/**
30+
* Start metadata store migration.
31+
*
32+
* @return
33+
*/
34+
CompletableFuture<Void> start(String targetUrl);
35+
36+
/**
37+
* Get current migration status.
38+
*
39+
* @return
40+
*/
41+
CompletableFuture<MigrationState> status();
42+
}

0 commit comments

Comments
 (0)