Как работают потоки в Node.JS?

Потоки Node.js — это мощная функция, позволяющая эффективно обрабатывать потоки данных без блокировки. Потоки в Node.js — это встроенные объекты, которые позволяют вам читать или записывать в источник постепенно, небольшими порциями, вместо того, чтобы сразу загружать все данные в память.

В Node.js существует четыре основных типа потоков:

1. Потоки для чтения. Эти потоки представляют собой источник, из которого можно использовать данные. Например, чтение данных из файла, получение данных из HTTP-запроса или создание данных из запроса к базе данных.
Доступные для чтения потоки генерируют событие `data` всякий раз, когда доступны новые данные, и потребитель может читать эти данные, используя различные методы.

2. Записываемые потоки. Эти потоки представляют место назначения, в которое могут быть записаны данные. Например, запись данных в файл, отправка данных через ответ HTTP или вставка данных в базу данных. Доступные для записи потоки предоставляют такие методы, как `write()` и `end()` для отправки данных в поток.

3. Дуплексные потоки. Эти потоки представляют потоки, доступные как для чтения, так и для записи. Это означает, что вы можете одновременно читать и писать в эти потоки. Дуплексные потоки являются двунаправленными и обычно используются для таких задач, как сетевое взаимодействие.

4. Потоки преобразования. Эти потоки представляют собой особый тип дуплексных потоков, которые манипулируют
или преобразовывают данные по мере их передачи от стороны, доступной для чтения, к стороне, доступной для записи. Потоки преобразования полезны для таких задач, как сжатие, шифрование или форматирование данных. Их можно использовать для изменения данных «на лету» во время их потоковой передачи.

Потоки в Node.js работают по принципу событий и обратного давления. Данные потребляются или создаются фрагментами, а такие события, как `data`, `end` и `error` генерируется для уведомления приложения о состоянии потока.

Противодавление – это механизм, используемый для управления потоком. данных, когда доступный для записи поток не может обрабатывать входящие данные так же быстро, как они создаются. Это позволяет читаемому потоку приостанавливать или замедлять производство данных до тех пор, пока записываемый поток не будет готов потреблять больше данных.

Для работы с потоками в Node.js вы обычно создаете экземпляры соответствующего типа потока, настраиваете обработчики событий и используете различные методы, предоставляемые API потоков, для взаимодействия с данными. Вы также можете соединять потоки вместе, что означает подключение вывода одного потока к вводу другого потока, чтобы создать конвейер, который автоматически обрабатывает поток данных между несколькими потоками.

В целом, потоки Node.js обеспечивают эффективный и гибкий способ обработки данных, особенно при работе с большими объемами данных или сценариями, когда данные необходимо обрабатывать поэтапно.

В приведенных ниже примерах демонстрируются различные сценарии использования потоков в Node.js, включая копирование файлов, сжатие данных и преобразование данных. В зависимости от вашего конкретного варианта использования вы можете адаптировать эти примеры или изучить документацию по потокам Node.js для получения более подробной информации и дополнительных операций, связанных с потоками.

const fs = require('fs');
const zlib = require('zlib');
const { Duplex } = require('stream');
const { Transform } = require('stream');

// Reading and writing data file to another:
export const readWriteStream = () => {
    const readableStream = fs.createReadStream('input.txt');
    const writableStream = fs.createWriteStream('output.txt');

    readableStream.pipe(writableStream);

    console.log('File is being copied...');
}

// Reading data file, generating zip and writing to another file:
export const zipGenerate = () => {
    const readableStream = fs.createReadStream('input.txt');
    const gzipStream = zlib.createGzip();
    const writableStream = fs.createWriteStream('output.txt.gz');

    readableStream.pipe(gzipStream).pipe(writableStream);

    console.log('File is being compressed...');
}

// Reading input data, convert to upper case and writing on output:
export const convertInputToUpperCase = () => {
    const uppercaseTransform = new Transform({
        transform(chunk, encoding, callback) {
            const uppercasedChunk = chunk.toString().toUpperCase();
            this.push(uppercasedChunk);
            callback();
        }
    });

    process.stdin.pipe(uppercaseTransform).pipe(process.stdout);
}

// Handle with Duplex streams:
/* 
When you run this code, it will write "Hello" to the duplex stream, which will 
be logged as "Writing: Hello". Then, the stream will generate and push characters 
starting from 'A' to the readable side. Each received chunk will be logged as 
"Received: <character>". The stream will continue pushing characters until it 
reaches 'Z', at which point it will end the stream by pushing null. 
*/
export const logChars = () => {
    const myDuplexStream = new Duplex({
        write(chunk, encoding, callback) {
            console.log(`Writing: ${chunk}`);
            callback();
        },
        read(size) {
            if (this.currentCharCode > 90) {
                this.push(null);
                return;
            }
            this.push(String.fromCharCode(this.currentCharCode++));
        }
    });

    myDuplexStream.currentCharCode = 65; // ASCII code for 'A'

    myDuplexStream.on('data', (chunk) => {
        console.log(`Received: ${chunk}`);
    });

    myDuplexStream.write('Hello');

    myDuplexStream.end();
}