alex/worker-latency: capture message event latency drift

pull/3281/head
alex 2024-03-27 13:39:08 +00:00
rodzic 408a269114
commit 4ed03e335f
7 zmienionych plików z 87 dodań i 4 usunięć

Wyświetl plik

@ -289,6 +289,21 @@ export class TLDrawDurableObject extends TLServer {
})
break
}
case 'message_delivery_drift': {
this.writeEvent(event.type, {
blobs: [event.roomId],
doubles: [
event.drift.p01,
event.drift.p05,
event.drift.p25,
event.drift.p50,
event.drift.p75,
event.drift.p95,
event.drift.p99,
],
})
break
}
default: {
exhaustiveSwitchError(event)
}

Wyświetl plik

@ -91,7 +91,7 @@ async function blockUnknownOrigins(request: Request) {
}
const origin = request.headers.get('origin')
if (env.IS_LOCAL !== 'true' && (!origin || !isAllowedOrigin(origin))) {
if (env.TLDRAW_ENV !== undefined && (!origin || !isAllowedOrigin(origin))) {
console.error('Attempting to connect from an invalid origin:', origin, env, request)
return new Response('Not allowed', { status: 403 })
}

Wyświetl plik

@ -17,6 +17,7 @@ export class ServerSocketAdapter<R extends UnknownRecord> implements TLRoomSocke
}
// see TLRoomSocket for details on why this accepts a union and not just arrays
sendMessage(msg: TLSocketServerSentEvent<R>) {
msg.ts = Date.now()
const message = JSON.stringify(msg)
this.opts.logSendMessage(msg.type, message.length)
this.opts.ws.send(message)

Wyświetl plik

@ -3,6 +3,7 @@ import * as WebSocket from 'ws'
import { ServerSocketAdapter } from './ServerSocketAdapter'
import { RoomSnapshot, TLSyncRoom } from './TLSyncRoom'
import { JsonChunkAssembler } from './chunk'
import { DriftHistogram } from './protocol'
import { schema } from './schema'
import { RoomState } from './server-types'
@ -45,6 +46,11 @@ export type TLServerEvent =
messageType: string
messageLength: number
}
| {
type: 'message_delivery_drift'
roomId: string
drift: DriftHistogram
}
/**
* This class manages rooms for a websocket server.
@ -109,6 +115,14 @@ export abstract class TLServer {
}
})
roomState.room.events.on('client_time_drift', (drift) => {
this.logEvent({
type: 'message_delivery_drift',
roomId: persistenceKey,
drift,
})
})
// persist on an interval...
this.setRoomState(persistenceKey, roomState)

Wyświetl plik

@ -62,6 +62,8 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
/** The last clock time from the most recent server update */
private lastServerClock = 0
private lastServerInteractionTimestamp = Date.now()
private lastServerMessageTimestamp: number | null = null
private unsentServerTimestampDrift: number[] = []
/** The queue of in-flight push requests that have not yet been acknowledged by the server */
private pendingPushRequests: { request: TLPushRequest<R>; sent: boolean }[] = []
@ -201,7 +203,35 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
this.debug('ping loop', { isConnectedToRoom: this.isConnectedToRoom })
if (!this.isConnectedToRoom) return
try {
this.socket.sendMessage({ type: 'ping' })
let drift = undefined
if (this.unsentServerTimestampDrift.length >= 100) {
this.unsentServerTimestampDrift.sort((a, b) => a - b)
drift = {
p01: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.01)
],
p05: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.05)
],
p25: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.25)
],
p50: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.5)
],
p75: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.75)
],
p95: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.95)
],
p99: this.unsentServerTimestampDrift[
Math.floor(this.unsentServerTimestampDrift.length * 0.99)
],
}
this.unsentServerTimestampDrift = []
}
this.socket.sendMessage({ type: 'ping', drift })
} catch (error) {
console.warn('ping failed, resetting', error)
this.resetConnection()
@ -356,7 +386,17 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
/** Handle events received from the server */
private handleServerEvent = (event: TLSocketServerSentEvent<R>) => {
this.debug('received server event', event)
this.lastServerInteractionTimestamp = Date.now()
const now = Date.now()
if (event.ts && this.lastServerMessageTimestamp) {
const serverDelta = event.ts - this.lastServerMessageTimestamp
const localDelta = now - this.lastServerInteractionTimestamp
this.unsentServerTimestampDrift.push(serverDelta - localDelta)
}
this.lastServerInteractionTimestamp = now
this.lastServerMessageTimestamp = event.ts ?? null
// always update the lastServerClock when it is present
switch (event.type) {
case 'connect':

Wyświetl plik

@ -42,6 +42,7 @@ import {
} from './diff'
import { interval } from './interval'
import {
DriftHistogram,
TLIncompatibilityReason,
TLSYNC_PROTOCOL_VERSION,
TLSocketClientSentEvent,
@ -183,6 +184,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
readonly events = createNanoEvents<{
room_became_empty: () => void
session_removed: (args: { sessionKey: string }) => void
client_time_drift: (drift: DriftHistogram) => void
}>()
// Values associated with each uid (must be serializable).
@ -648,6 +650,9 @@ export class TLSyncRoom<R extends UnknownRecord> {
if (session.state === RoomSessionState.Connected) {
session.lastInteractionTime = Date.now()
}
if (message.drift) {
this.events.emit('client_time_drift', message.drift)
}
return this.sendMessage(session.sessionKey, { type: 'pong' })
}
default: {

Wyświetl plik

@ -17,7 +17,10 @@ export type TLIncompatibilityReason =
(typeof TLIncompatibilityReason)[keyof typeof TLIncompatibilityReason]
/** @public */
export type TLSocketServerSentEvent<R extends UnknownRecord> =
export type TLSocketServerSentEvent<R extends UnknownRecord> = {
/** timestamp of when the server sent this event */
ts?: number
} & (
| {
type: 'connect'
hydrationType: 'wipe_all' | 'wipe_presence'
@ -40,6 +43,7 @@ export type TLSocketServerSentEvent<R extends UnknownRecord> =
}
| { type: 'data'; data: TLSocketServerSentDataEvent<R>[] }
| TLSocketServerSentDataEvent<R>
)
/** @public */
export type TLSocketServerSentDataEvent<R extends UnknownRecord> =
@ -77,9 +81,13 @@ export type TLConnectRequest = {
schema: SerializedSchema
}
export type DriftHistogram = Record<'p01' | 'p05' | 'p25' | 'p50' | 'p75' | 'p95' | 'p99', number>
/** @public */
export type TLPingRequest = {
type: 'ping'
/** Difference in time between the client receiving events and the server sending them */
drift?: DriftHistogram
}
/** @public */