Revert "squish sync data events before sending them out" (#3331)

Reverts tldraw/tldraw#3118
pull/3332/head
David Sheldrick 2024-04-03 10:31:28 +01:00 zatwierdzone przez GitHub
rodzic 0e912fe0f2
commit 5557f6be5b
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
7 zmienionych plików z 32 dodań i 825 usunięć

Wyświetl plik

@ -37,7 +37,6 @@
"lint": "yarn run -T tsx ../../scripts/lint.ts"
},
"devDependencies": {
"fast-check": "^3.16.0",
"tldraw": "workspace:*",
"typescript": "^5.3.3",
"uuid-by-string": "^4.0.0",

Wyświetl plik

@ -49,43 +49,6 @@ export type TLPersistentClientSocket<R extends UnknownRecord = UnknownRecord> =
const PING_INTERVAL = 5000
const MAX_TIME_TO_WAIT_FOR_SERVER_INTERACTION_BEFORE_RESETTING_CONNECTION = PING_INTERVAL * 2
export function _applyNetworkDiffToStore<R extends UnknownRecord, S extends Store<R> = Store<R>>(
diff: NetworkDiff<R>,
store: S
): RecordsDiff<R> | null {
const changes: RecordsDiff<R> = { added: {} as any, updated: {} as any, removed: {} as any }
type k = keyof typeof changes.updated
let hasChanges = false
for (const [id, op] of objectMapEntries(diff)) {
if (op[0] === RecordOpType.Put) {
const existing = store.get(id as RecordId<any>)
if (existing && !isEqual(existing, op[1])) {
hasChanges = true
changes.updated[id as k] = [existing, op[1]]
} else {
hasChanges = true
changes.added[id as k] = op[1]
}
} else if (op[0] === RecordOpType.Patch) {
const record = store.get(id as RecordId<any>)
if (!record) {
// the record was removed upstream
continue
}
const patched = applyObjectDiff(record, op[1])
hasChanges = true
changes.updated[id as k] = [record, patched]
} else if (op[0] === RecordOpType.Remove) {
if (store.has(id as RecordId<any>)) {
hasChanges = true
changes.removed[id as k] = store.get(id as RecordId<any>)
}
}
}
return hasChanges ? changes : null
}
// Should connect support chunking the response to allow for large payloads?
/**
@ -532,8 +495,36 @@ export class TLSyncClient<R extends UnknownRecord, S extends Store<R> = Store<R>
*/
private applyNetworkDiff(diff: NetworkDiff<R>, runCallbacks: boolean) {
this.debug('applyNetworkDiff', diff)
const changes = _applyNetworkDiffToStore(diff, this.store)
if (changes !== null) {
const changes: RecordsDiff<R> = { added: {} as any, updated: {} as any, removed: {} as any }
type k = keyof typeof changes.updated
let hasChanges = false
for (const [id, op] of objectMapEntries(diff)) {
if (op[0] === RecordOpType.Put) {
const existing = this.store.get(id as RecordId<any>)
if (existing && !isEqual(existing, op[1])) {
hasChanges = true
changes.updated[id as k] = [existing, op[1]]
} else {
hasChanges = true
changes.added[id as k] = op[1]
}
} else if (op[0] === RecordOpType.Patch) {
const record = this.store.get(id as RecordId<any>)
if (!record) {
// the record was removed upstream
continue
}
const patched = applyObjectDiff(record, op[1])
hasChanges = true
changes.updated[id as k] = [record, patched]
} else if (op[0] === RecordOpType.Remove) {
if (this.store.has(id as RecordId<any>)) {
hasChanges = true
changes.removed[id as k] = this.store.get(id as RecordId<any>)
}
}
}
if (hasChanges) {
this.store.applyDiff(changes, runCallbacks)
}
}

Wyświetl plik

@ -48,7 +48,6 @@ import {
TLSocketServerSentDataEvent,
TLSocketServerSentEvent,
} from './protocol'
import { squishDataEvents } from './squish'
/** @public */
export type TLRoomSocket<R extends UnknownRecord> = {
@ -457,10 +456,7 @@ export class TLSyncRoom<R extends UnknownRecord> {
session.socket.sendMessage(message)
}
} else {
session.socket.sendMessage({
type: 'data',
data: squishDataEvents(session.outstandingDataMessages),
})
session.socket.sendMessage({ type: 'data', data: session.outstandingDataMessages })
}
session.outstandingDataMessages.length = 0
}

Wyświetl plik

@ -236,11 +236,7 @@ export function applyObjectDiff<T extends object>(object: T, objectDiff: ObjectD
break
}
case ValueOpType.Patch: {
if (
object[key as keyof T] &&
typeof object[key as keyof T] === 'object' &&
!Array.isArray(object[key as keyof T])
) {
if (object[key as keyof T] && typeof object[key as keyof T] === 'object') {
const diff = op[1]
const patched = applyObjectDiff(object[key as keyof T] as object, diff)
if (patched !== object[key as keyof T]) {

Wyświetl plik

@ -1,191 +0,0 @@
import { UnknownRecord } from '@tldraw/store'
import { exhaustiveSwitchError, objectMapEntries, structuredClone } from '@tldraw/utils'
import {
NetworkDiff,
ObjectDiff,
RecordOp,
RecordOpType,
ValueOpType,
applyObjectDiff,
} from './diff'
import { TLSocketServerSentDataEvent } from './protocol'
interface State<R extends UnknownRecord> {
lastPatch: (TLSocketServerSentDataEvent<R> & { type: 'patch' }) | null
squished: TLSocketServerSentDataEvent<R>[]
}
type Bailed = boolean
function patchThePatch(lastPatch: ObjectDiff, newPatch: ObjectDiff): Bailed {
for (const [newKey, newOp] of Object.entries(newPatch)) {
switch (newOp[0]) {
case ValueOpType.Put:
lastPatch[newKey] = newOp
break
case ValueOpType.Append:
if (lastPatch[newKey] === undefined) {
lastPatch[newKey] = newOp
} else {
const lastOp = lastPatch[newKey]
switch (lastOp[0]) {
case ValueOpType.Put: {
const lastValues = lastOp[1]
if (Array.isArray(lastValues)) {
const newValues = newOp[1]
lastValues.push(...newValues)
} else {
// we're trying to append to something that was put previously, but
// is not an array; bail out
return true
}
break
}
case ValueOpType.Append: {
const lastValues = lastOp[1]
const lastOffset = lastOp[2]
const newValues = newOp[1]
const newOffset = newOp[2]
if (newOffset === lastOffset + lastValues.length) {
lastValues.push(...newValues)
} else {
// something weird is going on, bail out
return true
}
break
}
default:
// trying to append to either a deletion or a patch, bail out
return true
}
}
break
case ValueOpType.Patch:
if (lastPatch[newKey] === undefined) {
lastPatch[newKey] = newOp
} else {
// bail out, recursive patching is too hard
return true
}
break
case ValueOpType.Delete:
// overwrite whatever was there previously, no point if it's going to be removed
// todo: check if it was freshly put and don't add if it wasn't?
lastPatch[newKey] = newOp
break
default:
exhaustiveSwitchError(newOp[0])
}
}
return false
}
function patchTheOp<R extends UnknownRecord>(
lastRecordOp: RecordOp<R>,
newPatch: ObjectDiff
): Bailed {
switch (lastRecordOp[0]) {
case RecordOpType.Put:
// patching a freshly added value is easy, just patch as normal
lastRecordOp[1] = applyObjectDiff(lastRecordOp[1], newPatch)
break
case RecordOpType.Patch: {
// both are patches, merge them
const bailed = patchThePatch(lastRecordOp[1], newPatch)
if (bailed) {
return true
}
break
}
case RecordOpType.Remove:
// we're trying to patch an object that was removed, just disregard the update
break
default:
exhaustiveSwitchError(lastRecordOp[0])
}
return false
}
function squishInto<R extends UnknownRecord>(
lastDiff: NetworkDiff<R>,
newDiff: NetworkDiff<R>
): Bailed {
for (const [newId, newOp] of objectMapEntries(newDiff)) {
switch (newOp[0]) {
case RecordOpType.Put:
// we Put the same record several times, just overwrite whatever came previously
lastDiff[newId] = newOp
break
case RecordOpType.Patch:
if (lastDiff[newId] === undefined) {
// this is the patch now
lastDiff[newId] = newOp
} else {
// patch the previous RecordOp!
const bailed = patchTheOp(lastDiff[newId], newOp[1])
if (bailed) {
return true
}
}
break
case RecordOpType.Remove:
// overwrite whatever was there previously
// todo: check if it was freshly put and don't add if it wasn't?
lastDiff[newId] = newOp
break
default:
exhaustiveSwitchError(newOp[0])
}
}
return false
}
export function squishDataEvents<R extends UnknownRecord>(
dataEvents: TLSocketServerSentDataEvent<R>[]
): TLSocketServerSentDataEvent<R>[] {
if (dataEvents.length < 2) {
// most common case
return dataEvents
}
const state: State<R> = { lastPatch: null, squished: [] }
for (const e of dataEvents) {
switch (e.type) {
case 'push_result':
if (state.lastPatch !== null) {
state.squished.push(state.lastPatch)
state.lastPatch = null
}
state.squished.push(e)
break
case 'patch':
if (state.lastPatch !== null) {
// this structuredClone is necessary to avoid modifying the original list of events
// (otherwise objects can get reused on put and then modified on patch)
const bailed = squishInto(state.lastPatch.diff, structuredClone(e.diff))
if (bailed) {
// this is unfortunate, but some patches were too hard to patch, give up
// and return the original list
return dataEvents
}
state.lastPatch.serverClock = e.serverClock
} else {
state.lastPatch = structuredClone(e)
}
break
default:
exhaustiveSwitchError(e, 'type')
}
}
if (state.lastPatch !== null) {
state.squished.push(state.lastPatch)
}
return state.squished
}

Wyświetl plik

@ -1,574 +0,0 @@
import { createRecordType, IdOf, RecordId, Store, StoreSchema, UnknownRecord } from '@tldraw/store'
import { assert, structuredClone } from '@tldraw/utils'
import fc, { Arbitrary } from 'fast-check'
import { NetworkDiff, ObjectDiff, RecordOpType, ValueOpType } from '../lib/diff'
import { TLSocketServerSentDataEvent } from '../lib/protocol'
import { squishDataEvents } from '../lib/squish'
import { _applyNetworkDiffToStore } from '../lib/TLSyncClient'
test('basic squishing', () => {
const capture = [
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 929.58203125,
h: 500.14453125,
},
],
},
],
},
serverClock: 9237,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679590],
cursor: [
'put',
{
x: 1526.07421875,
y: 565.66796875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9238,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 916.046875,
h: 494.20703125,
},
],
},
],
},
serverClock: 9239,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679599],
cursor: [
'put',
{
x: 1519.26171875,
y: 563.71875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9240,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 909.234375,
h: 492.2578125,
},
],
},
],
},
serverClock: 9241,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679608],
cursor: [
'put',
{
x: 1512.41015625,
y: 562.23046875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9242,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 902.3828125,
h: 490.76953125,
},
],
},
],
},
serverClock: 9243,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679617],
cursor: [
'put',
{
x: 1506.71484375,
y: 561.29296875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9244,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 896.6875,
h: 489.83203125,
},
],
},
],
},
serverClock: 9245,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679625],
cursor: [
'put',
{
x: 1501.734375,
y: 560.88671875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9246,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 891.70703125,
h: 489.42578125,
},
],
},
],
},
serverClock: 9247,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
lastActivityTimestamp: ['put', 1710188679633],
cursor: [
'put',
{
x: 1497.22265625,
y: 560.6875,
rotation: 0,
type: 'default',
},
],
},
],
},
serverClock: 9248,
},
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
RecordOpType.Patch,
{
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 887.1953125,
h: 489.2265625,
},
],
},
],
},
serverClock: 9249,
},
] as const satisfies TLSocketServerSentDataEvent<UnknownRecord>[]
const squished = squishDataEvents(capture)
const manuallySquished = [
{
type: 'patch',
diff: {
'instance_presence:nlyxdltolNVL0VONRr9Bz': [
'patch',
{
lastActivityTimestamp: ['put', 1710188679633],
cursor: [
'put',
{
x: 1497.22265625,
y: 560.6875,
rotation: 0,
type: 'default',
},
],
brush: [
'put',
{
x: 610.02734375,
y: 71.4609375,
w: 887.1953125,
h: 489.2265625,
},
],
},
],
},
serverClock: 9249,
},
]
// see https://github.com/jestjs/jest/issues/14011 for why the second clone is needed
expect(squished).toStrictEqual(structuredClone(manuallySquished))
})
const TEST_RECORD_TYPENAME = 'testRecord' as const
interface TestRecord extends UnknownRecord {
fieldA?: TestRecordValue
fieldB?: TestRecordValue
fieldC?: TestRecordValue
}
type TestRecordValue =
| string
| number[]
| { fieldA?: TestRecordValue; fieldB?: TestRecordValue; fieldC?: TestRecordValue }
const TestRecord = createRecordType<TestRecord>(TEST_RECORD_TYPENAME, {
validator: {
validate(value) {
return value as TestRecord
},
},
scope: 'document',
})
class Model {
diffs: NetworkDiff<TestRecord>[] = []
idMap: IdOf<TestRecord>[]
private readonly initialStoreData: Record<IdOf<TestRecord>, TestRecord>
constructor(public initialStoreContent: TestRecord[]) {
this.idMap = initialStoreContent.map((r) => r.id)
this.initialStoreData = Object.fromEntries(initialStoreContent.map((r) => [r.id, r]))
}
trueIdx(idx: number) {
return idx % this.idMap.length
}
getId(idx: number) {
return this.idMap[this.trueIdx(idx)]
}
private getFreshStore(): Store<TestRecord> {
return new Store({
initialData: this.initialStoreData,
schema: StoreSchema.create<TestRecord>({ testRecord: TestRecord }),
props: {},
})
}
private getStoreWithDiffs(diffs: NetworkDiff<TestRecord>[]) {
const store = this.getFreshStore()
for (const diff of diffs) {
const changes = _applyNetworkDiffToStore(diff, store)
if (changes !== null) {
store.applyDiff(changes, false)
}
}
return store
}
runTest() {
const dataEvents = this.diffs.map((diff, idx) => ({
type: 'patch' as const,
diff,
serverClock: idx,
}))
const squishedDiffs = squishDataEvents(dataEvents).map((e) => {
assert(e.type === 'patch')
return e.diff
})
const baseStore = this.getStoreWithDiffs(this.diffs)
const squishedStore = this.getStoreWithDiffs(squishedDiffs)
// see https://github.com/jestjs/jest/issues/14011 for the explanation for that structuredClone
expect(squishedStore.serialize()).toEqual(structuredClone(baseStore.serialize()))
}
// offsets are a MAJOR pain because they depend on the entire history of diffs so far, and
// the store silently discards append patches if their offsets don't match, so they need
// to be correct to exercise the squisher
// NOTE: modifies the diff
fixOffsets(recordId: IdOf<TestRecord>, fullDiff: ObjectDiff) {
const fixed = structuredClone(fullDiff)
const store = this.getStoreWithDiffs(this.diffs)
const record = store.get(recordId)
if (record === undefined) {
return fixed
}
const fixer = (obj: any, diff: ObjectDiff) => {
for (const [k, v] of Object.entries(diff)) {
if (v[0] === ValueOpType.Append && Array.isArray(obj[k])) {
v[2] = obj[k].length
} else if (v[0] === ValueOpType.Patch && typeof obj[k] === 'object') {
fixer(obj[k], v[1])
}
}
}
fixer(record, fixed)
return fixed
}
}
type Real = 'whatever'
class RecordPut implements fc.Command<Model, Real> {
constructor(readonly record: TestRecord) {}
check(_m: Readonly<Model>) {
return true
}
run(m: Model): void {
m.diffs.push({ [this.record.id]: [RecordOpType.Put, this.record] })
m.idMap.push(this.record.id)
m.runTest()
}
toString = () => `Put(${JSON.stringify(this.record)})`
}
class RecordRemove implements fc.Command<Model, Real> {
constructor(readonly idx: number) {}
check(m: Readonly<Model>) {
return m.idMap.length > 0
}
run(m: Model) {
m.diffs.push({ [m.getId(this.idx)]: [RecordOpType.Remove] })
m.idMap.splice(m.trueIdx(this.idx), 1)
m.runTest()
}
toString = () => `Remove(#${this.idx})`
}
class RecordPatch implements fc.Command<Model, Real> {
constructor(
readonly idx: number,
readonly patch: ObjectDiff
) {}
check(m: Readonly<Model>) {
return m.idMap.length > 0
}
run(m: Model) {
const fixedPatch = m.fixOffsets(m.getId(this.idx), this.patch)
m.diffs.push({ [m.getId(this.idx)]: [RecordOpType.Patch, fixedPatch] })
m.runTest()
}
toString = () => `Patch(#${this.idx}, ${JSON.stringify(this.patch)})`
}
const { TestRecordValueArb }: { TestRecordValueArb: Arbitrary<TestRecordValue> } = fc.letrec(
(tie) => ({
TestRecordValueArb: fc.oneof(
fc.string(),
fc.array(fc.integer()),
fc.record(
{
fieldA: tie('TestRecordValueArb'),
fieldB: tie('TestRecordValueArb'),
fieldC: tie('TestRecordValueArb'),
},
{ requiredKeys: ['fieldA'] }
)
),
})
)
const TestRecordKeyArb = fc.oneof(
fc.constant('fieldA' as const),
fc.constant('fieldB' as const),
fc.constant('fieldC' as const)
)
const TestRecordArb = fc.record(
{
id: fc.oneof(fc.constant('idA'), fc.constant('idB'), fc.constant('idC')) as Arbitrary<
RecordId<TestRecord>
>,
typeName: fc.constant(TEST_RECORD_TYPENAME),
fieldA: TestRecordValueArb,
fieldB: TestRecordValueArb,
fieldC: TestRecordValueArb,
},
{ requiredKeys: ['id', 'typeName'] }
)
const { ObjectDiffArb }: { ObjectDiffArb: Arbitrary<ObjectDiff> } = fc.letrec((tie) => ({
ObjectDiffArb: fc.dictionary(
TestRecordKeyArb,
fc.oneof(
fc.tuple(fc.constant(ValueOpType.Put), TestRecordValueArb),
// The offset is -1 because it depends on the length of the array *in the current state*,
// so it can't be generated here. Instead, it's patched up in the command
fc.tuple(fc.constant(ValueOpType.Append), fc.array(fc.integer()), fc.constant(-1)),
fc.tuple(fc.constant(ValueOpType.Patch), tie('ObjectDiffArb')),
fc.tuple(fc.constant(ValueOpType.Delete))
),
{ minKeys: 1, maxKeys: 3 }
),
}))
const allCommands = [
TestRecordArb.map((r) => new RecordPut(r)),
fc.nat(10).map((idx) => new RecordRemove(idx)),
fc.tuple(fc.nat(), ObjectDiffArb).map(([idx, diff]) => new RecordPatch(idx, diff)),
]
const initialStoreContentArb: Arbitrary<TestRecord[]> = fc.uniqueArray(TestRecordArb, {
selector: (r) => r.id,
maxLength: 3,
})
test('fast-checking squish', () => {
// If you see this test failing, to reproduce you need both seed and path in fc.assert,
// and replayPath in fc.commands. See the next test for an examples
fc.assert(
fc.property(
initialStoreContentArb,
fc.commands(allCommands, {}),
(initialStoreContent, cmds) => {
fc.modelRun(
() => ({
model: new Model(initialStoreContent),
real: 'whatever',
}),
cmds
)
}
),
{
verbose: 1,
numRuns: 1_000,
}
)
})
test('problem: applying a patch to an array', () => {
fc.assert(
fc.property(
initialStoreContentArb,
fc.commands(allCommands, {
replayPath: 'CDJ:F',
}),
(initialStoreContent, cmds) => {
fc.modelRun(
() => ({
model: new Model(initialStoreContent),
real: 'whatever',
}),
cmds
)
}
),
{ seed: -1883357795, path: '7653:1:2:2:4:3:3:3:3', endOnFailure: true }
)
})

Wyświetl plik

@ -7662,7 +7662,6 @@ __metadata:
"@tldraw/store": "workspace:*"
"@tldraw/tlschema": "workspace:*"
"@tldraw/utils": "workspace:*"
fast-check: "npm:^3.16.0"
lodash.isequal: "npm:^4.5.0"
nanoevents: "npm:^7.0.1"
nanoid: "npm:4.0.2"
@ -13815,15 +13814,6 @@ __metadata:
languageName: node
linkType: hard
"fast-check@npm:^3.16.0":
version: 3.16.0
resolution: "fast-check@npm:3.16.0"
dependencies:
pure-rand: "npm:^6.0.0"
checksum: 4a14945b885ef2d75c3252a067a4cfa2440a2c0da18341d514be3803fafb616b0ec68806071f29e1267a85c7a9e4a5e192ae5e592727d8d2e66389f946be472c
languageName: node
linkType: hard
"fast-deep-equal@npm:^3.1.1, fast-deep-equal@npm:^3.1.3":
version: 3.1.3
resolution: "fast-deep-equal@npm:3.1.3"