Pluggable flow aggregation function functionality added#22
Pluggable flow aggregation function functionality added#22arashkav wants to merge 5 commits intovisgl:mainfrom
Conversation
✅ Deploy Preview for flowmapgl canceled.
|
| result.push(aggregateFlow); | ||
| aggFlowsByKey.set(key, aggregateFlow); | ||
| } else { | ||
| aggregateFlow.values.push(flowCountsMapReduce.map(flow)); |
There was a problem hiding this comment.
Why do you add values to flow here? It will likely significantly increase the memory use for the resulting flows data structure.
There was a problem hiding this comment.
I try to explain why I picked this approach. Previously, you applied a map-reduce approach which means summing up every new counts added to a cluster in each iteration. This approach works fine when our aggregation function is 'sum'.
The same can not be achieved if we wanna 'average'. To explain, say we have three edges with values of 10,5 and 8. If we apply the same approach for sum and just average them every time 'reduce' function is called, we get a different number than a real average.
(((10+5)/2) + 8)/2 != (10+5+8)/3
This approach, despite your concern, gives the developer full flexibility on what aggregation function to be used (e.x. weighted sum, logarithmic, exponential, etc.).
To avoid any performance loss with normal 'sum' aggregation, I change the code to use the previous 'reduce' function (and stop pushing values into the array) if getFlowAggFunc is not defined.
I also consider replacing arrays with a better performing function.
There was a problem hiding this comment.
Yes, I get that. I was more concerned with the memory footprint of the resulting data structure. In line 209 we add the new values property to the flow object, so the resulting data structure will keep more data than necessary. It appears to me that values is only used as a temporary accumulator for the flow counts, so we don't need to keep it forever.
Another issue with your proposed approach is that the reduce function is called every time a new value is added. This might slow the calculations down unnecessarily esp. if the aggregation function is costly.
Maybe we can instead accumulate the values in a separate temporary map similar to aggFlowsByKey, e.g.:
const aggFlowCountsByKey = new Map<string, number[]>();
After iterating over all flows we can call flowCountsMapReduce.reduce once for each of them and save the results to the flow counts. Then, we can leave aggFlowCountsByKey behind to be garbage collected.
Or alternatively, we can use your accumulation approach, but give the resulting array another pass at the end in which we calculate the averages from the values (calling the agg function only once per flow), add them as counts to the flows and delete the values property from the results. Maybe that's simpler.
There was a problem hiding this comment.
I made the changes following the proposed approach. Added node aggregation feature as well. Please let me know if they work better now.
To be able to allow weighted aggregation I added another argument that allows to determine the weight attribute. Example would be we have a performance metric and we wanna weigthed sum it considering the volume(flow).
getFlowMagnitude: (flow) => flow.metric_1,
getFlowAggWeight: (flow) => flow.count,
getFlowAggFunc: (values) => values.reduce((accumulator, curr:any) => accumulator + curr.aggvalue*curr.aggweight, 0)
/values.reduce((acc,cur:any)=>acc + cur.aggweight,0),
The example app seem to be working quite responsive as the memory footprint dropped dramatically.
|
I think we also need to apply the aggregation function to the location totals, at least here: |
| result.push(aggregateFlow); | ||
| aggFlowsByKey.set(key, aggregateFlow); | ||
| } else { | ||
| aggregateFlow.values.push(flowCountsMapReduce.map(flow)); |
There was a problem hiding this comment.
In the latest commit, I made the required changes to apply the plugged aggregation function on the node clustering and volume function.
| getFlowTime?: FlowAccessor<F, Date>; // TODO: use number instead of Date | ||
| // getFlowColor?: FlowAccessor<string | undefined>; | ||
| getFlowAggFunc: FlowAggregatorFunc<number[], number>; | ||
| getFlowAggWeight: FlowAccessor<F, number>; |
There was a problem hiding this comment.
Here is the new weight function that similar to the magnitude function is fed into flowmap. getFlowAggWeight provides the weight for each edge volume/count provided via getFlowMagnitude.

As discussed this will allow the users to plug their own function to the flowmap layer via getFlowAggFunc.
Example would be below that averages rather than summing:
new FlowmapLayer<LocationDatum, FlowDatum>({
id: 'my-flowmap-layer',
data,
opacity: config.opacity,
pickable: true,
darkMode: config.darkMode,
colorScheme: config.colorScheme,
fadeAmount: config.fadeAmount,
fadeEnabled: config.fadeEnabled,
fadeOpacityEnabled: config.fadeOpacityEnabled,
locationTotalsEnabled: config.locationTotalsEnabled,
locationLabelsEnabled: config.locationLabelsEnabled,
animationEnabled: config.animationEnabled,
clusteringEnabled: config.clusteringEnabled,
clusteringAuto: config.clusteringAuto,
clusteringLevel: config.clusteringLevel,
adaptiveScalesEnabled: config.adaptiveScalesEnabled,
highlightColor: config.highlightColor,
maxTopFlowsDisplayNum: config.maxTopFlowsDisplayNum,
getLocationId: (loc) => loc.id,
getLocationLat: (loc) => loc.lat,
getLocationLon: (loc) => loc.lon,
getFlowOriginId: (flow) => flow.origin,
getLocationName: (loc) => loc.name,
getFlowDestId: (flow) => flow.dest,
getFlowMagnitude: (flow) => flow.count,
getFlowAggFunc: (values) => values.reduce((a, b) => a + b, 0)/values.length,
onHover: (info) => setTooltip(getTooltipState(info)),
onClick: (info) =>
console.log('clicked', info.object?.type, info.object, info),
});
Also, it defaults to sum when the function is not provided!