ใน TypeScript/JavaScript ข้อมูลแบบ Stream หรือ Asynchronous Sequence คือชุดของค่าที่จะถูกส่งออกมาเมื่อเวลาผ่านไป (Events over time) คล้ายกับการดูวิดีโอ Live หรือการรอรับข้อมูลจากเครือข่าย
1. ประเภทและแนวคิดของ Stream
แนวคิดของ Stream ใน TS/JS ถูกแบ่งออกตามลักษณะการทำงาน โดยมีแนวคิดเทียบเคียงกับ Dart ดังนี้:
| Dart Stream | TypeScript/RxJS Equivalent | ลักษณะการทำงาน |
| Single Subscription | Cold Observable | ข้อมูลจะถูกส่งออกมาใหม่ทุกครั้งที่มีผู้เริ่มฟัง (Listener) มักใช้ในการอ่านไฟล์หรือทำ HTTP Request เพียงครั้งเดียว |
| Broadcast Stream | Hot Observable/Subject | ข้อมูลจะถูกส่งออกมาอย่างต่อเนื่อง โดยผู้ฟัง (Listener) คนใดเข้ามาฟัง ณ เวลาใด ก็จะได้รับข้อมูลที่ส่งออกมา ณ เวลานั้น ๆ (เช่น Mouse Events, Live Data) |
2. การสร้าง Stream ด้วย Async Generator
Async Generator เป็นคุณสมบัติของภาษา TypeScript/JavaScript ที่เป็นรูปแบบที่ใกล้เคียงกับ async* และ yield ใน Dart มากที่สุด ใช้สำหรับสร้าง Stream แบบ Async Iterable ที่ผู้ใช้งานสามารถดึงข้อมูลออกมาได้ทีละส่วน (pull stream)
A. การกำหนด Async Generator
ใช้คีย์เวิร์ด async function* และ yield ในการส่งค่าแต่ละชุดออกมา:
// ฟังก์ชันสร้าง Stream ของข้อความ 5 บรรทัด
async function* generateLines(to: number): AsyncIterable<string> {
for (let i = 1; i <= to; i++) {
// yield จะส่งค่าออกมาทีละชุด แล้วหยุดรอการเรียกครั้งถัดไป
yield `This is Line ${i}`;
}
}
B. การติดตาม Stream ด้วย for await...of
ใช้คำสั่ง for await...of เพื่อวนลูปและรอรับค่าจาก Async Iterable ทีละค่า จนกว่า Stream จะสิ้นสุด (Complete)
async function processStream() {
const stream = generateLines(3);
// for await...of จะหยุดรอจนกว่า yield จะส่งค่าออกมา
for await (const line of stream) {
console.log(line);
}
console.log('Stream completed.');
}
processStream();
// Output:
// This is Line 1
// This is Line 2
// This is Line 3
// Stream completed.
3. การสร้างและจัดการ Stream ด้วย Observables (RxJS)
สำหรับงานที่ซับซ้อน เช่น การจัดการเหตุการณ์ (Events), การรวม Stream, และการควบคุม Flow ของข้อมูล ไลบรารี RxJS ที่ใช้ Observable คือเครื่องมือหลักในโลก TypeScript
A. การสร้าง Observable (Source)
แทนที่จะใช้ StreamController ใน Dart เราใช้ Observable หรือ Subject ใน RxJS เพื่อสร้างแหล่งกำเนิดข้อมูล:
import { Observable, Subject } from 'rxjs'; // ต้องมีการติดตั้ง RxJS
// 1. สร้าง Subject (Hot Observable/Broadcast Stream)
const mySubject = new Subject<number>();
// 2. ส่งข้อมูล (Pushing Events)
mySubject.next(1); // ข้อมูลถูกส่งออกมาทันที
// 3. Subscription (การฟัง)
mySubject.subscribe(value => {
console.log(`Listener A: ${value}`);
});
mySubject.next(2); // ผู้ฟัง A จะได้รับค่า 2
mySubject.next(3); // ผู้ฟัง A จะได้รับค่า 3
mySubject.subscribe(value => {
console.log(`Listener B: ${value}`);
});
mySubject.next(4);
// Listener A: 4
// Listener B: 4
B. การติดตาม Stream ด้วย .subscribe()
เทียบเท่ากับ .listen() ใน Dart โดย subscribe() จะรับ Callback Functions 3 ตัว:
-
onNext(onData): สำหรับรับข้อมูลแต่ละชุด -
onError(onError): สำหรับจัดการข้อผิดพลาด -
onComplete(onDone): สำหรับทำงานเมื่อ Stream เสร็จสิ้น
// สมมติ myObservable คือ Observable<string> ที่ถูกสร้างไว้
const subscription = mySubject.subscribe({
next: (data) => console.log('Data:', data), // เมื่อมีข้อมูลมา
error: (err) => console.error('Error:', err), // เมื่อเกิดข้อผิดพลาด
complete: () => console.log('Done.'), // เมื่อ Stream เสร็จสิ้น
});
// การยกเลิก Stream (คล้าย StreamSubscription)
subscription.unsubscribe();
C. การแปลงข้อมูลด้วย Operators (transform Equivalent)
ใน RxJS เราใช้ Operators (เช่น map, filter, scan) ผ่าน Method .pipe() เพื่อแปลงและจัดการข้อมูล Stream อย่างมีประสิทธิภาพ
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5) // สร้าง Observable จากค่า [1, 2, 3, 4, 5]
.pipe(
// 1. Filter: กรองเฉพาะเลขคู่เท่านั้น
filter(value => value % 2 === 0),
// 2. Map: แปลงค่าเป็นข้อความ
map(value => `Number is ${value}`)
)
.subscribe(result => console.log(result));
// Output:
// Number is 2
// Number is 4