|
28 | 28 | import java.util.Set; |
29 | 29 | import java.util.Timer; |
30 | 30 | import java.util.concurrent.ConcurrentHashMap; |
| 31 | +import java.util.concurrent.CountDownLatch; |
31 | 32 | import java.util.concurrent.ExecutionException; |
32 | 33 | import java.util.concurrent.ExecutorService; |
33 | 34 | import java.util.concurrent.Executors; |
|
37 | 38 | import javax.mail.MessagingException; |
38 | 39 | import javax.naming.ConfigurationException; |
39 | 40 |
|
40 | | -import com.cloud.dc.DataCenter; |
41 | | -import com.cloud.dc.Pod; |
42 | | -import com.cloud.org.Cluster; |
43 | 41 | import org.apache.cloudstack.framework.config.ConfigDepot; |
44 | 42 | import org.apache.cloudstack.framework.config.ConfigKey; |
45 | 43 | import org.apache.cloudstack.framework.config.Configurable; |
46 | 44 | import org.apache.cloudstack.framework.config.dao.ConfigurationDao; |
| 45 | +import org.apache.cloudstack.managed.context.ManagedContextRunnable; |
47 | 46 | import org.apache.cloudstack.managed.context.ManagedContextTimerTask; |
48 | 47 | import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; |
49 | 48 | import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; |
|
52 | 51 | import org.apache.cloudstack.utils.mailing.SMTPMailSender; |
53 | 52 | import org.apache.commons.lang3.ArrayUtils; |
54 | 53 | import org.apache.commons.lang3.math.NumberUtils; |
55 | | -import org.apache.logging.log4j.Logger; |
56 | 54 | import org.apache.logging.log4j.LogManager; |
| 55 | +import org.apache.logging.log4j.Logger; |
| 56 | +import org.jetbrains.annotations.Nullable; |
57 | 57 |
|
58 | 58 | import com.cloud.alert.dao.AlertDao; |
59 | 59 | import com.cloud.api.ApiDBUtils; |
|
66 | 66 | import com.cloud.configuration.Config; |
67 | 67 | import com.cloud.configuration.ConfigurationManager; |
68 | 68 | import com.cloud.dc.ClusterVO; |
| 69 | +import com.cloud.dc.DataCenter; |
69 | 70 | import com.cloud.dc.DataCenter.NetworkType; |
70 | 71 | import com.cloud.dc.DataCenterVO; |
71 | 72 | import com.cloud.dc.HostPodVO; |
| 73 | +import com.cloud.dc.Pod; |
72 | 74 | import com.cloud.dc.Vlan.VlanType; |
73 | 75 | import com.cloud.dc.dao.ClusterDao; |
74 | 76 | import com.cloud.dc.dao.DataCenterDao; |
|
82 | 84 | import com.cloud.host.dao.HostDao; |
83 | 85 | import com.cloud.network.Ipv6Service; |
84 | 86 | import com.cloud.network.dao.IPAddressDao; |
| 87 | +import com.cloud.org.Cluster; |
85 | 88 | import com.cloud.org.Grouping.AllocationState; |
86 | 89 | import com.cloud.resource.ResourceManager; |
87 | 90 | import com.cloud.storage.StorageManager; |
|
93 | 96 | import com.cloud.utils.db.TransactionCallbackNoReturn; |
94 | 97 | import com.cloud.utils.db.TransactionStatus; |
95 | 98 |
|
96 | | -import org.jetbrains.annotations.Nullable; |
97 | | - |
98 | 99 | public class AlertManagerImpl extends ManagerBase implements AlertManager, Configurable { |
99 | 100 | protected Logger logger = LogManager.getLogger(AlertManagerImpl.class.getName()); |
100 | 101 |
|
@@ -289,30 +290,47 @@ protected void recalculateHostCapacities() { |
289 | 290 | if (hostIds.isEmpty()) { |
290 | 291 | return; |
291 | 292 | } |
292 | | - ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>(); |
293 | 293 | ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, |
294 | 294 | Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size()))); |
295 | | - for (Long hostId : hostIds) { |
296 | | - futures.put(hostId, executorService.submit(() -> { |
297 | | - Transaction.execute(new TransactionCallbackNoReturn() { |
| 295 | + final CountDownLatch latch = new CountDownLatch(hostIds.size()); |
| 296 | + final ConcurrentHashMap<Long, Throwable> failures = new ConcurrentHashMap<>(); |
| 297 | + |
| 298 | + try { |
| 299 | + for (final Long hostId : hostIds) { |
| 300 | + executorService.execute(new ManagedContextRunnable() { |
298 | 301 | @Override |
299 | | - public void doInTransactionWithoutResult(TransactionStatus status) { |
300 | | - final HostVO host = hostDao.findById(hostId); |
301 | | - _capacityMgr.updateCapacityForHost(host); |
| 302 | + protected void runInContext() { |
| 303 | + try { |
| 304 | + final HostVO host = hostDao.findById(hostId); |
| 305 | + if (host == null) { |
| 306 | + logger.error("Host with ID: {} no longer exists, skipping capacity calculation", hostId); |
| 307 | + return; |
| 308 | + } |
| 309 | + _capacityMgr.updateCapacityForHost(host); |
| 310 | + } catch (Throwable t) { |
| 311 | + failures.put(hostId, t); |
| 312 | + logger.error("Error during host capacity calculation for ID: {}", hostId, t); |
| 313 | + } finally { |
| 314 | + latch.countDown(); |
| 315 | + } |
302 | 316 | } |
303 | 317 | }); |
304 | | - return null; |
305 | | - })); |
306 | | - } |
307 | | - for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) { |
| 318 | + } |
| 319 | + |
308 | 320 | try { |
309 | | - entry.getValue().get(); |
310 | | - } catch (InterruptedException | ExecutionException e) { |
311 | | - logger.error(String.format("Error during capacity calculation for host: %d due to : %s", |
312 | | - entry.getKey(), e.getMessage()), e); |
| 321 | + latch.await(); |
| 322 | + } catch (InterruptedException ie) { |
| 323 | + Thread.currentThread().interrupt(); |
| 324 | + logger.warn("Interrupted while waiting for host capacity calculation tasks"); |
313 | 325 | } |
| 326 | + |
| 327 | + if (!failures.isEmpty()) { |
| 328 | + logger.warn("Host capacity calculation finished with {} failures out of {} hosts", |
| 329 | + failures.size(), hostIds.size()); |
| 330 | + } |
| 331 | + } finally { |
| 332 | + executorService.shutdown(); |
314 | 333 | } |
315 | | - executorService.shutdown(); |
316 | 334 | } |
317 | 335 |
|
318 | 336 | protected void recalculateStorageCapacities() { |
|
0 commit comments