Send to zulip

This commit is contained in:
Koper
2023-11-20 21:39:33 +07:00
parent 82f50817f8
commit ba40d28152
3609 changed files with 2311843 additions and 7 deletions

View File

@@ -0,0 +1,21 @@
var eventMessageChunker = require('../event-stream/event-message-chunker').eventMessageChunker;
var parseEvent = require('./parse-event').parseEvent;
function createEventStream(body, parser, model) {
var eventMessages = eventMessageChunker(body);
var events = [];
for (var i = 0; i < eventMessages.length; i++) {
events.push(parseEvent(parser, eventMessages[i], model));
}
return events;
}
/**
* @api private
*/
module.exports = {
createEventStream: createEventStream
};

View File

@@ -0,0 +1,174 @@
var util = require('../core').util;
var crypto = util.crypto;
var Int64 = require('./int64').Int64;
var toBuffer = util.buffer.toBuffer;
var allocBuffer = util.buffer.alloc;
var Buffer = util.Buffer;
/**
* @api private
*/
function buildMessage(message) {
var formattedHeaders = buildHeaders(message.headers);
var headerLengthBytes = allocBuffer(4);
headerLengthBytes.writeUInt32BE(formattedHeaders.length, 0);
var totalLengthBytes = allocBuffer(4);
totalLengthBytes.writeUInt32BE(
totalLengthBytes.length + // size of this buffer
headerLengthBytes.length + // size of header length buffer
4 + // prelude crc32
formattedHeaders.length + // total size of headers
message.body.length + // total size of payload
4, // size of crc32 of the total message
0
);
var prelude = Buffer.concat([
totalLengthBytes,
headerLengthBytes
], totalLengthBytes.length + headerLengthBytes.length);
var preludeCrc32 = crc32(prelude);
var totalSansCrc32 = Buffer.concat([
prelude, preludeCrc32, formattedHeaders, message.body
], prelude.length + preludeCrc32.length + formattedHeaders.length + message.body.length);
var totalCrc32 = crc32(totalSansCrc32);
return Buffer.concat([totalSansCrc32, totalCrc32]);
}
function buildHeaders(headers) {
/** @type {Buffer[]} */
var chunks = [];
var totalSize = 0;
var headerNames = Object.keys(headers);
for (var i = 0; i < headerNames.length; i++) {
var headerName = headerNames[i];
var bytes = toBuffer(headerName);
var headerValue = buildHeaderValue(headers[headerName]);
var nameLength = allocBuffer(1);
nameLength[0] = headerName.length;
chunks.push(
nameLength,
bytes,
headerValue
);
totalSize += nameLength.length + bytes.length + headerValue.length;
}
var out = allocBuffer(totalSize);
var position = 0;
for (var j = 0; j < chunks.length; j++) {
var chunk = chunks[j];
for (var k = 0; k < chunk.length; k++) {
out[position] = chunk[k];
position++;
}
}
return out;
}
/**
* @param {object} header
* @param {'boolean'|'byte'|'short'|'integer'|'long'|'binary'|'string'|'timestamp'|'uuid'} header.type
* @param {*} header.value
* @returns {Buffer}
*/
function buildHeaderValue(header) {
switch (header.type) {
case 'binary':
var binBytes = allocBuffer(3);
binBytes.writeUInt8(HEADER_VALUE_TYPE.byteArray, 0);
binBytes.writeUInt16BE(header.value.length, 1);
return Buffer.concat([
binBytes, header.value
], binBytes.length + header.value.length);
case 'boolean':
var boolByte = allocBuffer(1);
boolByte[0] = header.value ? HEADER_VALUE_TYPE.boolTrue : HEADER_VALUE_TYPE.boolFalse;
return boolByte;
case 'byte':
var singleByte = allocBuffer(2);
singleByte[0] = HEADER_VALUE_TYPE.byte;
singleByte[1] = header.value;
return singleByte;
case 'integer':
var intBytes = allocBuffer(5);
intBytes.writeUInt8(HEADER_VALUE_TYPE.integer, 0);
intBytes.writeInt32BE(header.value, 1);
return intBytes;
case 'long':
var longBytes = allocBuffer(1);
longBytes[0] = HEADER_VALUE_TYPE.long;
return Buffer.concat([
longBytes, header.value.bytes
], 9);
case 'short':
var shortBytes = allocBuffer(3);
shortBytes.writeUInt8(HEADER_VALUE_TYPE.short, 0);
shortBytes.writeInt16BE(header.value, 1);
return shortBytes;
case 'string':
var utf8Bytes = toBuffer(header.value);
var strBytes = allocBuffer(3);
strBytes.writeUInt8(HEADER_VALUE_TYPE.string, 0);
strBytes.writeUInt16BE(utf8Bytes.length, 1);
return Buffer.concat([
strBytes, utf8Bytes
], strBytes.length + utf8Bytes.length);
case 'timestamp':
var tsBytes = allocBuffer(1);
tsBytes[0] = HEADER_VALUE_TYPE.timestamp;
return Buffer.concat([
tsBytes, Int64.fromNumber(header.value.valueOf()).bytes
], 9);
case 'uuid':
if (!UUID_PATTERN.test(header.value)) {
throw new Error('Invalid UUID received: ' + header.value);
}
var uuidBytes = allocBuffer(1);
uuidBytes[0] = HEADER_VALUE_TYPE.uuid;
return Buffer.concat([
uuidBytes, toBuffer(header.value.replace(/\-/g, ''), 'hex')
], 17);
}
}
function crc32(buffer) {
var crc32 = crypto.crc32(buffer);
var crc32Buffer = allocBuffer(4);
crc32Buffer.writeUInt32BE(crc32, 0);
return crc32Buffer;
}
/**
* @api private
*/
var HEADER_VALUE_TYPE = {
boolTrue: 0,
boolFalse: 1,
byte: 2,
short: 3,
integer: 4,
long: 5,
byteArray: 6,
string: 7,
timestamp: 8,
uuid: 9,
};
var UUID_PATTERN = /^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$/;
/**
* @api private
*/
module.exports = {
buildMessage: buildMessage
};

View File

@@ -0,0 +1,121 @@
var util = require('../core').util;
var Transform = require('stream').Transform;
var allocBuffer = util.buffer.alloc;
/** @type {Transform} */
function EventMessageChunkerStream(options) {
Transform.call(this, options);
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
/** @type {Buffer} */
this.currentMessage = null;
/** @type {Buffer} */
this.messageLengthBuffer = null;
}
EventMessageChunkerStream.prototype = Object.create(Transform.prototype);
/**
*
* @param {Buffer} chunk
* @param {string} encoding
* @param {*} callback
*/
EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) {
var chunkLength = chunk.length;
var currentOffset = 0;
while (currentOffset < chunkLength) {
// create new message if necessary
if (!this.currentMessage) {
// working on a new message, determine total length
var bytesRemaining = chunkLength - currentOffset;
// prevent edge case where total length spans 2 chunks
if (!this.messageLengthBuffer) {
this.messageLengthBuffer = allocBuffer(4);
}
var numBytesForTotal = Math.min(
4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer
bytesRemaining // bytes left in chunk
);
chunk.copy(
this.messageLengthBuffer,
this.currentMessagePendingLength,
currentOffset,
currentOffset + numBytesForTotal
);
this.currentMessagePendingLength += numBytesForTotal;
currentOffset += numBytesForTotal;
if (this.currentMessagePendingLength < 4) {
// not enough information to create the current message
break;
}
this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0));
this.messageLengthBuffer = null;
}
// write data into current message
var numBytesToWrite = Math.min(
this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message
chunkLength - currentOffset // number of bytes left in the original chunk
);
chunk.copy(
this.currentMessage, // target buffer
this.currentMessagePendingLength, // target offset
currentOffset, // chunk offset
currentOffset + numBytesToWrite // chunk end to write
);
this.currentMessagePendingLength += numBytesToWrite;
currentOffset += numBytesToWrite;
// check if a message is ready to be pushed
if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) {
// push out the message
this.push(this.currentMessage);
// cleanup
this.currentMessage = null;
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
}
}
callback();
};
EventMessageChunkerStream.prototype._flush = function(callback) {
if (this.currentMessageTotalLength) {
if (this.currentMessageTotalLength === this.currentMessagePendingLength) {
callback(null, this.currentMessage);
} else {
callback(new Error('Truncated event message received.'));
}
} else {
callback();
}
};
/**
* @param {number} size Size of the message to be allocated.
* @api private
*/
EventMessageChunkerStream.prototype.allocateMessage = function(size) {
if (typeof size !== 'number') {
throw new Error('Attempted to allocate an event message where size was not a number: ' + size);
}
this.currentMessageTotalLength = size;
this.currentMessagePendingLength = 4;
this.currentMessage = allocBuffer(size);
this.currentMessage.writeUInt32BE(size, 0);
};
/**
* @api private
*/
module.exports = {
EventMessageChunkerStream: EventMessageChunkerStream
};

View File

@@ -0,0 +1,30 @@
/**
* Takes in a buffer of event messages and splits them into individual messages.
* @param {Buffer} buffer
* @api private
*/
function eventMessageChunker(buffer) {
/** @type Buffer[] */
var messages = [];
var offset = 0;
while (offset < buffer.length) {
var totalLength = buffer.readInt32BE(offset);
// create new buffer for individual message (shares memory with original)
var message = buffer.slice(offset, totalLength + offset);
// increment offset to it starts at the next message
offset += totalLength;
messages.push(message);
}
return messages;
}
/**
* @api private
*/
module.exports = {
eventMessageChunker: eventMessageChunker
};

View File

@@ -0,0 +1,39 @@
var Transform = require('stream').Transform;
var parseEvent = require('./parse-event').parseEvent;
/** @type {Transform} */
function EventUnmarshallerStream(options) {
options = options || {};
// set output to object mode
options.readableObjectMode = true;
Transform.call(this, options);
this._readableState.objectMode = true;
this.parser = options.parser;
this.eventStreamModel = options.eventStreamModel;
}
EventUnmarshallerStream.prototype = Object.create(Transform.prototype);
/**
*
* @param {Buffer} chunk
* @param {string} encoding
* @param {*} callback
*/
EventUnmarshallerStream.prototype._transform = function(chunk, encoding, callback) {
try {
var event = parseEvent(this.parser, chunk, this.eventStreamModel);
this.push(event);
return callback();
} catch (err) {
callback(err);
}
};
/**
* @api private
*/
module.exports = {
EventUnmarshallerStream: EventUnmarshallerStream
};

View File

@@ -0,0 +1,6 @@
export interface StreamingEventStream<Events> extends NodeJS.ReadableStream {
on(event: "data", listener: (event: Events) => void): this;
on(event: string, listener: Function): this;
}
export type EventStream<Events> = StreamingEventStream<Events> | Events[];

View File

@@ -0,0 +1,93 @@
var util = require('../core').util;
var toBuffer = util.buffer.toBuffer;
/**
* A lossless representation of a signed, 64-bit integer. Instances of this
* class may be used in arithmetic expressions as if they were numeric
* primitives, but the binary representation will be preserved unchanged as the
* `bytes` property of the object. The bytes should be encoded as big-endian,
* two's complement integers.
* @param {Buffer} bytes
*
* @api private
*/
function Int64(bytes) {
if (bytes.length !== 8) {
throw new Error('Int64 buffers must be exactly 8 bytes');
}
if (!util.Buffer.isBuffer(bytes)) bytes = toBuffer(bytes);
this.bytes = bytes;
}
/**
* @param {number} number
* @returns {Int64}
*
* @api private
*/
Int64.fromNumber = function(number) {
if (number > 9223372036854775807 || number < -9223372036854775808) {
throw new Error(
number + ' is too large (or, if negative, too small) to represent as an Int64'
);
}
var bytes = new Uint8Array(8);
for (
var i = 7, remaining = Math.abs(Math.round(number));
i > -1 && remaining > 0;
i--, remaining /= 256
) {
bytes[i] = remaining;
}
if (number < 0) {
negate(bytes);
}
return new Int64(bytes);
};
/**
* @returns {number}
*
* @api private
*/
Int64.prototype.valueOf = function() {
var bytes = this.bytes.slice(0);
var negative = bytes[0] & 128;
if (negative) {
negate(bytes);
}
return parseInt(bytes.toString('hex'), 16) * (negative ? -1 : 1);
};
Int64.prototype.toString = function() {
return String(this.valueOf());
};
/**
* @param {Buffer} bytes
*
* @api private
*/
function negate(bytes) {
for (var i = 0; i < 8; i++) {
bytes[i] ^= 0xFF;
}
for (var i = 7; i > -1; i--) {
bytes[i]++;
if (bytes[i] !== 0) {
break;
}
}
}
/**
* @api private
*/
module.exports = {
Int64: Int64
};

View File

@@ -0,0 +1,73 @@
var parseMessage = require('./parse-message').parseMessage;
/**
*
* @param {*} parser
* @param {Buffer} message
* @param {*} shape
* @api private
*/
function parseEvent(parser, message, shape) {
var parsedMessage = parseMessage(message);
// check if message is an event or error
var messageType = parsedMessage.headers[':message-type'];
if (messageType) {
if (messageType.value === 'error') {
throw parseError(parsedMessage);
} else if (messageType.value !== 'event') {
// not sure how to parse non-events/non-errors, ignore for now
return;
}
}
// determine event type
var eventType = parsedMessage.headers[':event-type'];
// check that the event type is modeled
var eventModel = shape.members[eventType.value];
if (!eventModel) {
return;
}
var result = {};
// check if an event payload exists
var eventPayloadMemberName = eventModel.eventPayloadMemberName;
if (eventPayloadMemberName) {
var payloadShape = eventModel.members[eventPayloadMemberName];
// if the shape is binary, return the byte array
if (payloadShape.type === 'binary') {
result[eventPayloadMemberName] = parsedMessage.body;
} else {
result[eventPayloadMemberName] = parser.parse(parsedMessage.body.toString(), payloadShape);
}
}
// read event headers
var eventHeaderNames = eventModel.eventHeaderMemberNames;
for (var i = 0; i < eventHeaderNames.length; i++) {
var name = eventHeaderNames[i];
if (parsedMessage.headers[name]) {
// parse the header!
result[name] = eventModel.members[name].toType(parsedMessage.headers[name].value);
}
}
var output = {};
output[eventType.value] = result;
return output;
}
function parseError(message) {
var errorCode = message.headers[':error-code'];
var errorMessage = message.headers[':error-message'];
var error = new Error(errorMessage.value || errorMessage);
error.code = error.name = errorCode.value || errorCode;
return error;
}
/**
* @api private
*/
module.exports = {
parseEvent: parseEvent
};

View File

@@ -0,0 +1,128 @@
var Int64 = require('./int64').Int64;
var splitMessage = require('./split-message').splitMessage;
var BOOLEAN_TAG = 'boolean';
var BYTE_TAG = 'byte';
var SHORT_TAG = 'short';
var INT_TAG = 'integer';
var LONG_TAG = 'long';
var BINARY_TAG = 'binary';
var STRING_TAG = 'string';
var TIMESTAMP_TAG = 'timestamp';
var UUID_TAG = 'uuid';
/**
* @api private
*
* @param {Buffer} headers
*/
function parseHeaders(headers) {
var out = {};
var position = 0;
while (position < headers.length) {
var nameLength = headers.readUInt8(position++);
var name = headers.slice(position, position + nameLength).toString();
position += nameLength;
switch (headers.readUInt8(position++)) {
case 0 /* boolTrue */:
out[name] = {
type: BOOLEAN_TAG,
value: true
};
break;
case 1 /* boolFalse */:
out[name] = {
type: BOOLEAN_TAG,
value: false
};
break;
case 2 /* byte */:
out[name] = {
type: BYTE_TAG,
value: headers.readInt8(position++)
};
break;
case 3 /* short */:
out[name] = {
type: SHORT_TAG,
value: headers.readInt16BE(position)
};
position += 2;
break;
case 4 /* integer */:
out[name] = {
type: INT_TAG,
value: headers.readInt32BE(position)
};
position += 4;
break;
case 5 /* long */:
out[name] = {
type: LONG_TAG,
value: new Int64(headers.slice(position, position + 8))
};
position += 8;
break;
case 6 /* byteArray */:
var binaryLength = headers.readUInt16BE(position);
position += 2;
out[name] = {
type: BINARY_TAG,
value: headers.slice(position, position + binaryLength)
};
position += binaryLength;
break;
case 7 /* string */:
var stringLength = headers.readUInt16BE(position);
position += 2;
out[name] = {
type: STRING_TAG,
value: headers.slice(
position,
position + stringLength
).toString()
};
position += stringLength;
break;
case 8 /* timestamp */:
out[name] = {
type: TIMESTAMP_TAG,
value: new Date(
new Int64(headers.slice(position, position + 8))
.valueOf()
)
};
position += 8;
break;
case 9 /* uuid */:
var uuidChars = headers.slice(position, position + 16)
.toString('hex');
position += 16;
out[name] = {
type: UUID_TAG,
value: uuidChars.substr(0, 8) + '-' +
uuidChars.substr(8, 4) + '-' +
uuidChars.substr(12, 4) + '-' +
uuidChars.substr(16, 4) + '-' +
uuidChars.substr(20)
};
break;
default:
throw new Error('Unrecognized header type tag');
}
}
return out;
}
function parseMessage(message) {
var parsed = splitMessage(message);
return { headers: parseHeaders(parsed.headers), body: parsed.body };
}
/**
* @api private
*/
module.exports = {
parseMessage: parseMessage
};

View File

@@ -0,0 +1,70 @@
var util = require('../core').util;
var toBuffer = util.buffer.toBuffer;
// All prelude components are unsigned, 32-bit integers
var PRELUDE_MEMBER_LENGTH = 4;
// The prelude consists of two components
var PRELUDE_LENGTH = PRELUDE_MEMBER_LENGTH * 2;
// Checksums are always CRC32 hashes.
var CHECKSUM_LENGTH = 4;
// Messages must include a full prelude, a prelude checksum, and a message checksum
var MINIMUM_MESSAGE_LENGTH = PRELUDE_LENGTH + CHECKSUM_LENGTH * 2;
/**
* @api private
*
* @param {Buffer} message
*/
function splitMessage(message) {
if (!util.Buffer.isBuffer(message)) message = toBuffer(message);
if (message.length < MINIMUM_MESSAGE_LENGTH) {
throw new Error('Provided message too short to accommodate event stream message overhead');
}
if (message.length !== message.readUInt32BE(0)) {
throw new Error('Reported message length does not match received message length');
}
var expectedPreludeChecksum = message.readUInt32BE(PRELUDE_LENGTH);
if (
expectedPreludeChecksum !== util.crypto.crc32(
message.slice(0, PRELUDE_LENGTH)
)
) {
throw new Error(
'The prelude checksum specified in the message (' +
expectedPreludeChecksum +
') does not match the calculated CRC32 checksum.'
);
}
var expectedMessageChecksum = message.readUInt32BE(message.length - CHECKSUM_LENGTH);
if (
expectedMessageChecksum !== util.crypto.crc32(
message.slice(0, message.length - CHECKSUM_LENGTH)
)
) {
throw new Error(
'The message checksum did not match the expected value of ' +
expectedMessageChecksum
);
}
var headersStart = PRELUDE_LENGTH + CHECKSUM_LENGTH;
var headersEnd = headersStart + message.readUInt32BE(PRELUDE_MEMBER_LENGTH);
return {
headers: message.slice(headersStart, headersEnd),
body: message.slice(headersEnd, message.length - CHECKSUM_LENGTH),
};
}
/**
* @api private
*/
module.exports = {
splitMessage: splitMessage
};

View File

@@ -0,0 +1,39 @@
/**
* What is necessary to create an event stream in node?
* - http response stream
* - parser
* - event stream model
*/
var EventMessageChunkerStream = require('../event-stream/event-message-chunker-stream').EventMessageChunkerStream;
var EventUnmarshallerStream = require('../event-stream/event-message-unmarshaller-stream').EventUnmarshallerStream;
function createEventStream(stream, parser, model) {
var eventStream = new EventUnmarshallerStream({
parser: parser,
eventStreamModel: model
});
var eventMessageChunker = new EventMessageChunkerStream();
stream.pipe(
eventMessageChunker
).pipe(eventStream);
stream.on('error', function(err) {
eventMessageChunker.emit('error', err);
});
eventMessageChunker.on('error', function(err) {
eventStream.emit('error', err);
});
return eventStream;
}
/**
* @api private
*/
module.exports = {
createEventStream: createEventStream
};