Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eab6692
Initial commit
arnautov-anton Dec 13, 2024
2b6b412
LiveLocationManager updates
arnautov-anton Dec 13, 2024
1fa5f14
Adjust LLM methods and subscriptions
arnautov-anton Dec 17, 2024
bb8afdc
Updates to LLM and channel
arnautov-anton Dec 19, 2024
c2a4185
LLM adjustments
arnautov-anton Jan 7, 2025
36d56e1
Validate messages instead (check deleted)
arnautov-anton Jan 9, 2025
dc135c4
Merge branch 'master' into feat/live-location-manager
MartinCupela Jul 1, 2025
6722dd6
feat: add LocationComposer
MartinCupela Jul 8, 2025
e6ea6c7
Merge branch 'master' into feat/live-location-manager
MartinCupela Jul 8, 2025
9a751c3
refactor: remove channel.sendStaticLocation and channel.startLiveLoca…
MartinCupela Jul 8, 2025
0e9064b
refactor: rename StaticLocationComposerLocation to StaticLocationPrev…
MartinCupela Jul 8, 2025
3e7aa2b
feat: add isSharedLocationResponse identity function
MartinCupela Jul 8, 2025
6287851
feat: add types SharedStaticLocationResponse & SharedLiveLocationResp…
MartinCupela Jul 10, 2025
c3e305f
feat: adapt LiveLocationManager to the new back-end
MartinCupela Jul 10, 2025
941d94a
feat: add location related validation to message composer
MartinCupela Jul 10, 2025
2ca820f
fix: add missing UpdateLocationLocationPayload properties
MartinCupela Jul 14, 2025
20c6cfa
Merge branch 'master' into feat/live-location-manager
MartinCupela Jul 14, 2025
617e200
refactor: remove duplicate concurrency.ts module
MartinCupela Jul 14, 2025
435bc1c
test: fix composition validation test
MartinCupela Jul 14, 2025
6ca98aa
test: add channel location sharing tests
MartinCupela Jul 14, 2025
2308990
test: add LiveLocationManager tests
MartinCupela Jul 16, 2025
c2ff1f7
test: add LocationComposer tests
MartinCupela Jul 16, 2025
81d96c7
test: add sharedLocation middleware
MartinCupela Jul 17, 2025
573ea49
test: add MessageComposer tests middleware
MartinCupela Jul 17, 2025
7462109
fix: send location updates with the original creator device id
MartinCupela Jul 18, 2025
942b030
fix: make channel property optional in ReminderResponse
MartinCupela Jul 18, 2025
4fa7a14
fix: stop sharing live location once expired
MartinCupela Jul 18, 2025
06eb498
test: stop sharing live location once expired
MartinCupela Jul 18, 2025
da8aaa9
feat: export WatchLocation type
MartinCupela Jul 18, 2025
5d4ee7c
refactor: make created_by_device_id optional
MartinCupela Jul 22, 2025
1e01078
fix: make sure the expired live locations are unregistered
MartinCupela Jul 22, 2025
31c4ef8
fix: fix StaticLocationPreview type declaration
MartinCupela Jul 22, 2025
9b126e7
fix: avoid storing shared_location in drafts
MartinCupela Jul 22, 2025
b24aca8
docs: clarify the current location sharing rules
MartinCupela Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 297 additions & 0 deletions src/LiveLocationManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
/**
* RULES:
*
* 1. one loc-sharing message per channel per user
* 2. live location is intended to be per device
* but created_by_device_id has currently no checks,
* and user can update the location from another device
* thus making location sharing based on user and channel
*/

import { withCancellation } from './utils/concurrency';
import { StateStore } from './store';
import { WithSubscriptions } from './utils/WithSubscriptions';
import type { StreamChat } from './client';
import type { Unsubscribe } from './store';
import type {
EventTypes,
MessageResponse,
SharedLiveLocationResponse,
SharedLocationResponse,
} from './types';
import type { Coords } from './messageComposer';

export type WatchLocationHandler = (value: Coords) => void;
export type WatchLocation = (handler: WatchLocationHandler) => Unsubscribe;
type DeviceIdGenerator = () => string;
type MessageId = string;

export type ScheduledLiveLocationSharing = SharedLiveLocationResponse & {
stopSharingTimeout: ReturnType<typeof setTimeout> | null;
};

export type LiveLocationManagerState = {
ready: boolean;
messages: Map<MessageId, ScheduledLiveLocationSharing>;
};

const isExpiredLocation = (location: SharedLiveLocationResponse) => {
const endTimeTimestamp = new Date(location.end_at).getTime();

return endTimeTimestamp < Date.now();
};

function isValidLiveLocationMessage(
message?: MessageResponse,
): message is MessageResponse & { shared_location: SharedLiveLocationResponse } {
if (!message || message.type === 'deleted' || !message.shared_location?.end_at)
return false;

return !isExpiredLocation(message.shared_location as SharedLiveLocationResponse);
}

export type LiveLocationManagerConstructorParameters = {
client: StreamChat;
getDeviceId: DeviceIdGenerator;
watchLocation: WatchLocation;
};

// Hard-coded minimal throttle timeout
export const UPDATE_LIVE_LOCATION_REQUEST_MIN_THROTTLE_TIMEOUT = 3000;

export class LiveLocationManager extends WithSubscriptions {
public state: StateStore<LiveLocationManagerState>;
private client: StreamChat;
private getDeviceId: DeviceIdGenerator;
private _deviceId: string;
private watchLocation: WatchLocation;

static symbol = Symbol(LiveLocationManager.name);

constructor({
client,
getDeviceId,
watchLocation,
}: LiveLocationManagerConstructorParameters) {
if (!client.userID) {
throw new Error('Live-location sharing is reserved for client-side use only');
}

super();

this.client = client;
this.state = new StateStore<LiveLocationManagerState>({
messages: new Map(),
ready: false,
});
this._deviceId = getDeviceId();
this.getDeviceId = getDeviceId;
this.watchLocation = watchLocation;
}

public async init() {
await this.assureStateInit();
this.registerSubscriptions();
}

public registerSubscriptions = () => {
this.incrementRefCount();
if (this.hasSubscriptions) return;

this.addUnsubscribeFunction(this.subscribeLiveLocationSharingUpdates());
this.addUnsubscribeFunction(this.subscribeTargetMessagesChange());
};

public unregisterSubscriptions = () => super.unregisterSubscriptions();

get messages() {
return this.state.getLatestValue().messages;
}

get stateIsReady() {
return this.state.getLatestValue().ready;
}

get deviceId() {
if (!this._deviceId) {
this._deviceId = this.getDeviceId();
}
return this._deviceId;
}

private async assureStateInit() {
if (this.stateIsReady) return;
const { active_live_locations } = await this.client.getSharedLocations();
this.state.next({
messages: new Map(
active_live_locations
.filter((location) => !isExpiredLocation(location))
.map((location) => [
location.message_id,
{
...location,
stopSharingTimeout: setTimeout(
() => {
this.unregisterMessages([location.message_id]);
},
new Date(location.end_at).getTime() - Date.now(),
),
},
]),
),
ready: true,
});
}

private subscribeTargetMessagesChange() {
let unsubscribeWatchLocation: null | (() => void) = null;

// Subscribe to location updates only if there are relevant messages to
// update, no need for the location watcher to be active/instantiated otherwise
const unsubscribe = this.state.subscribeWithSelector(
({ messages }) => ({ messages }),
({ messages }) => {
if (!messages.size) {
unsubscribeWatchLocation?.();
unsubscribeWatchLocation = null;
} else if (messages.size && !unsubscribeWatchLocation) {
unsubscribeWatchLocation = this.subscribeWatchLocation();
}
},
);

return () => {
unsubscribe();
unsubscribeWatchLocation?.();
};
}

private subscribeWatchLocation() {
let nextAllowedUpdateCallTimestamp = Date.now();

const unsubscribe = this.watchLocation(({ latitude, longitude }) => {
// Integrators can adjust the update interval by supplying custom watchLocation subscription,
// but the minimal timeout still has to be set as a failsafe (to prevent rate-limitting)
if (Date.now() < nextAllowedUpdateCallTimestamp) return;

nextAllowedUpdateCallTimestamp =
Date.now() + UPDATE_LIVE_LOCATION_REQUEST_MIN_THROTTLE_TIMEOUT;

withCancellation(LiveLocationManager.symbol, async () => {
const promises: Promise<SharedLocationResponse>[] = [];
await this.assureStateInit();
const expiredLocations: string[] = [];

for (const [messageId, location] of this.messages) {
if (isExpiredLocation(location)) {
expiredLocations.push(location.message_id);
continue;
}
if (location.latitude === latitude && location.longitude === longitude)
continue;
const promise = this.client.updateLocation({
created_by_device_id: location.created_by_device_id,
message_id: messageId,
latitude,
longitude,
});

promises.push(promise);
}
this.unregisterMessages(expiredLocations);
if (promises.length > 0) {
await Promise.allSettled(promises);
}
// TODO: handle values (remove failed - based on specific error code), keep re-trying others
});
});

return unsubscribe;
}

private subscribeLiveLocationSharingUpdates() {
/**
* Both message.updated & live_location_sharing.stopped get emitted when message gets an
* update, live_location_sharing.stopped gets emitted only locally and only if the update goes
* through, it's a failsafe for when channel is no longer being watched for whatever reason
*/
const subscriptions = [
...(
[
'live_location_sharing.started',
'message.updated',
'message.deleted',
] as EventTypes[]
).map((eventType) =>
this.client.on(eventType, (event) => {
if (!event.message) return;

if (event.type === 'live_location_sharing.started') {
this.registerMessage(event.message);
} else if (event.type === 'message.updated') {
const isRegistered = this.messages.has(event.message.id);
if (isRegistered && !isValidLiveLocationMessage(event.message)) {
this.unregisterMessages([event.message.id]);
}
this.registerMessage(event.message);
} else {
this.unregisterMessages([event.message.id]);
}
}),
),
this.client.on('live_location_sharing.stopped', (event) => {
if (!event.live_location) return;

this.unregisterMessages([event.live_location?.message_id]);
}),
];

return () => subscriptions.forEach((subscription) => subscription.unsubscribe());
}

private registerMessage(message: MessageResponse) {
if (
!this.client.userID ||
message?.user?.id !== this.client.userID ||
!isValidLiveLocationMessage(message)
)
return;

this.state.next((currentValue) => {
const messages = new Map(currentValue.messages);
messages.set(message.id, {
...message.shared_location,
stopSharingTimeout: setTimeout(
() => {
this.unregisterMessages([message.id]);
},
new Date(message.shared_location.end_at).getTime() - Date.now(),
),
});
return {
...currentValue,
messages,
};
});
}

private unregisterMessages(messageIds: string[]) {
const messages = this.messages;
const removedMessages = new Set(messageIds);
const newMessages = new Map(
Array.from(messages).filter(([messageId, location]) => {
if (removedMessages.has(messageId) && location.stopSharingTimeout) {
clearTimeout(location.stopSharingTimeout);
location.stopSharingTimeout = null;
}
return !removedMessages.has(messageId);
}),
);

if (newMessages.size === messages.size) return;

this.state.partialNext({
messages: newMessages,
});
}
}
34 changes: 34 additions & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import type {
GetMultipleMessagesAPIResponse,
GetReactionsAPIResponse,
GetRepliesAPIResponse,
LiveLocationPayload,
LocalMessage,
MarkReadOptions,
MarkUnreadOptions,
Expand Down Expand Up @@ -63,10 +64,12 @@ import type {
SendMessageAPIResponse,
SendMessageOptions,
SendReactionOptions,
StaticLocationPayload,
TruncateChannelAPIResponse,
TruncateOptions,
UpdateChannelAPIResponse,
UpdateChannelOptions,
UpdateLocationPayload,
UserResponse,
} from './types';
import type { Role } from './permissions';
Expand Down Expand Up @@ -669,6 +672,37 @@ export class Channel {
return data;
}

public async sendSharedLocation(
location: StaticLocationPayload | LiveLocationPayload,
userId?: string,
) {
const result = await this.sendMessage({
id: location.message_id,
shared_location: location,
user: userId ? { id: userId } : undefined,
});

if ((location as LiveLocationPayload).end_at) {
this.getClient().dispatchEvent({
message: result.message,
type: 'live_location_sharing.started',
});
}

return result;
}

public async stopLiveLocationSharing(payload: UpdateLocationPayload) {
const location = await this.getClient().updateLocation({
...payload,
end_at: new Date().toISOString(),
});
this.getClient().dispatchEvent({
live_location: location,
type: 'live_location_sharing.stopped',
});
}

/**
* delete - Delete the channel. Messages are permanently removed.
*
Expand Down
8 changes: 4 additions & 4 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ import type {
SegmentTargetsResponse,
SegmentType,
SendFileAPIResponse,
SharedLocationRequest,
SharedLocationResponse,
SortParam,
StreamChatOptions,
Expand All @@ -209,6 +208,7 @@ import type {
UpdateChannelTypeResponse,
UpdateCommandOptions,
UpdateCommandResponse,
UpdateLocationPayload,
UpdateMessageAPIResponse,
UpdateMessageOptions,
UpdatePollAPIResponse,
Expand Down Expand Up @@ -4584,11 +4584,11 @@ export class StreamChat {
/**
* updateLocation - Updates a location
*
* @param location UserLocation the location data to update
* @param location SharedLocationRequest the location data to update
*
* @returns {Promise<APIResponse>} The server response
* @returns {Promise<SharedLocationResponse>} The server response
*/
async updateLocation(location: SharedLocationRequest) {
async updateLocation(location: UpdateLocationPayload) {
return await this.put<SharedLocationResponse>(
this.baseURL + `/users/live_locations`,
location,
Expand Down
2 changes: 2 additions & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export const EVENT_MAP = {
'connection.recovered': true,
'transport.changed': true,
'capabilities.changed': true,
'live_location_sharing.started': true,
'live_location_sharing.stopped': true,

// Reminder events
'reminder.created': true,
Expand Down
Loading