b5f3067a6a
Initial setup of project dependencies including axios, express, dotenv, and other supporting packages. Added configuration files and documentation for project setup and usage.
144 lines
3.6 KiB
JavaScript
144 lines
3.6 KiB
JavaScript
'use strict';
|
|
|
|
import stream from 'stream';
|
|
import utils from '../utils.js';
|
|
|
|
const kInternals = Symbol('internals');
|
|
|
|
class AxiosTransformStream extends stream.Transform{
|
|
constructor(options) {
|
|
options = utils.toFlatObject(options, {
|
|
maxRate: 0,
|
|
chunkSize: 64 * 1024,
|
|
minChunkSize: 100,
|
|
timeWindow: 500,
|
|
ticksRate: 2,
|
|
samplesCount: 15
|
|
}, null, (prop, source) => {
|
|
return !utils.isUndefined(source[prop]);
|
|
});
|
|
|
|
super({
|
|
readableHighWaterMark: options.chunkSize
|
|
});
|
|
|
|
const internals = this[kInternals] = {
|
|
timeWindow: options.timeWindow,
|
|
chunkSize: options.chunkSize,
|
|
maxRate: options.maxRate,
|
|
minChunkSize: options.minChunkSize,
|
|
bytesSeen: 0,
|
|
isCaptured: false,
|
|
notifiedBytesLoaded: 0,
|
|
ts: Date.now(),
|
|
bytes: 0,
|
|
onReadCallback: null
|
|
};
|
|
|
|
this.on('newListener', event => {
|
|
if (event === 'progress') {
|
|
if (!internals.isCaptured) {
|
|
internals.isCaptured = true;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
_read(size) {
|
|
const internals = this[kInternals];
|
|
|
|
if (internals.onReadCallback) {
|
|
internals.onReadCallback();
|
|
}
|
|
|
|
return super._read(size);
|
|
}
|
|
|
|
_transform(chunk, encoding, callback) {
|
|
const internals = this[kInternals];
|
|
const maxRate = internals.maxRate;
|
|
|
|
const readableHighWaterMark = this.readableHighWaterMark;
|
|
|
|
const timeWindow = internals.timeWindow;
|
|
|
|
const divider = 1000 / timeWindow;
|
|
const bytesThreshold = (maxRate / divider);
|
|
const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
|
|
|
|
const pushChunk = (_chunk, _callback) => {
|
|
const bytes = Buffer.byteLength(_chunk);
|
|
internals.bytesSeen += bytes;
|
|
internals.bytes += bytes;
|
|
|
|
internals.isCaptured && this.emit('progress', internals.bytesSeen);
|
|
|
|
if (this.push(_chunk)) {
|
|
process.nextTick(_callback);
|
|
} else {
|
|
internals.onReadCallback = () => {
|
|
internals.onReadCallback = null;
|
|
process.nextTick(_callback);
|
|
};
|
|
}
|
|
}
|
|
|
|
const transformChunk = (_chunk, _callback) => {
|
|
const chunkSize = Buffer.byteLength(_chunk);
|
|
let chunkRemainder = null;
|
|
let maxChunkSize = readableHighWaterMark;
|
|
let bytesLeft;
|
|
let passed = 0;
|
|
|
|
if (maxRate) {
|
|
const now = Date.now();
|
|
|
|
if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
|
|
internals.ts = now;
|
|
bytesLeft = bytesThreshold - internals.bytes;
|
|
internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
|
|
passed = 0;
|
|
}
|
|
|
|
bytesLeft = bytesThreshold - internals.bytes;
|
|
}
|
|
|
|
if (maxRate) {
|
|
if (bytesLeft <= 0) {
|
|
// next time window
|
|
return setTimeout(() => {
|
|
_callback(null, _chunk);
|
|
}, timeWindow - passed);
|
|
}
|
|
|
|
if (bytesLeft < maxChunkSize) {
|
|
maxChunkSize = bytesLeft;
|
|
}
|
|
}
|
|
|
|
if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
|
|
chunkRemainder = _chunk.subarray(maxChunkSize);
|
|
_chunk = _chunk.subarray(0, maxChunkSize);
|
|
}
|
|
|
|
pushChunk(_chunk, chunkRemainder ? () => {
|
|
process.nextTick(_callback, null, chunkRemainder);
|
|
} : _callback);
|
|
};
|
|
|
|
transformChunk(chunk, function transformNextChunk(err, _chunk) {
|
|
if (err) {
|
|
return callback(err);
|
|
}
|
|
|
|
if (_chunk) {
|
|
transformChunk(_chunk, transformNextChunk);
|
|
} else {
|
|
callback(null);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
export default AxiosTransformStream;
|