◆ 外部サーバーから受け取るデータを 1 つの stream としてブラウザに送る
◆ ブラウザでは全体を受け取らずチャンクごとに処理する
◆ stream をファイルにパイプしてダウンロードにも使える

前の記事など 最近 stream 関係のことを書いてましたが 今回の内容をやろうとした結果 調べることになったものでした
そのやりたかったことですが まず なにかのデータを取得できる外部のサーバーがあります
そこから条件に一致する全件を受け取って何らかの処理をブラウザで行いたいです
ブラウザで直接アクセスできないので 自サーバーを経由しての取得になります
処理の都合上 自サーバーで全件受け取って何らかの処理をして結果をブラウザに返すのではなく 処理自体をブラウザで済ませたいです

全件はかなりのデータ量になり 外部サーバーでも全件の一括出力はできず 1 回の最大取得量が決まっています
ページャ風にリクエストを順番に送っていく感じです

全件取得するまでブラウザからリクエストを何度も送るでもいいのですが ムダも多いですし やることが決まってるのですから外部サーバーにリクエストを複数送る部分は自サーバーでやって 自サーバーからブラウザへは stream 形式でデータを渡そうと思ってます

外部サーバー

テスト用に外部サーバー相当のものを作ります
中身のデータは簡単なのでいいので { id: 1 } みたいなオブジェクトで id が増えていくものにしてます
JSON 形式で上記のオブジェクトが配列で入ってます

取得数と全体件数は最初の変数で切り替えます

const http = require("http")

const max_per_req = 10
const end = 185

http.createServer((req, res) => {
const url = new URL(req.url, `http://${req.headers.host}`)

if (url.pathname !== "/") {
res.writeHead(404)
res.end("not found")
return
}

const start = ~~url.searchParams.get("n") + 1

const data = Array.from(
Array(max_per_req),
(_, i) => {
const id = start + i
return id <= end ? { id } : null
}
).filter(x => x)

const next = data.length === 0
? null
: data.at(-1).id < end
? `${url.origin}?n=${data.at(-1).id}`
: null

const result = { data, next }
res.writeHead(200, {
"Content-Type": "application/json",
})
res.end(JSON.stringify(result))
}).listen(4000)

レスポンスはこんな感じの JSON です
next に続き取得用の URL が入ってます

{
"data": [ { "id": 1 }, { "id": 2 }, ... ],
"next": "http://localhost:4000?n=10"
}

自サーバー

自サーバーでは 上のサーバーからデータを取得し next がある限り追加で取得し続けてブラウザに送ります
stream 形式にするわけですが stream って JSON と相性が悪いですよね
JSON は最後まで読み取ってからパースすることが基本ですし
XML の SAX みたいに Stream 対応のパーサーもあるらしいのですが 滅多に聞かないですし一般的なパーサーの標準機能でもないですし
最近は 1 行に 1 JSON というフォーマットのほうがよく見る気がします
これなら作るのも読み取るのも簡単です
ということでこれにしました
ところでこのフォーマットって mime type は決まってるのでしょうか?

const http = require("http")
const stream = require("stream")
const fs = require("fs")

http.createServer(async (req, res) => {
const url = new URL(req.url, `http://${req.headers.host}`)

if (url.pathname === "/") {
fs.createReadStream(__dirname + "/page.html").pipe(res)
return
}
if (url.pathname === "/data") {
const readable = new stream.Readable({ read() {} })
readable.pipe(res)

for (let next = "http://localhost:4000"; next;) {
const result = await fetch(next).then(x => x.json())
const text = result.data.map(x => JSON.stringify(x) + "\n").join("")
readable.push(text)
next = result.next
}
readable.push(null)
return
}

res.writeHead(404)
res.end("not found")
}).listen(5000)

ブラウザ

ブラウザ側の処理です
実際の複雑な処理はしなくても stream で受け取れてることを確認できればいいです
とりあえずチャンクごとの最後の id で last_id 変数を更新していくことにしました
件数がかなり多いときは console.log に出してると重たくなるので console.log はなくして last_id をみて進捗を把握します

let last_id = 0

document.querySelector(".start").onclick = async () => {
const res = await fetch("/data")
const reader = res.body.pipeThrough(new TextDecoderStream()).getReader()
let rem = ""

const onItems = (items) => {
console.log(items)
last_id = items.at(-1).id
}

while (true) {
const { done, value } = await reader.read()
if (done) {
if (rem) {
const item = JSON.parse(rem)
onItems([item])
}
break
}
const lines = (rem + value).split("\n")
rem = lines.pop()
if (lines.length) {
const items = lines.map(x => JSON.parse(x))
onItems(items)
}
}
}

問題なく処理できてます
外部サーバー部分の一回の取得数を 1000 にしたり 合計数を 1000 万くらいにしたりしても大丈夫でした

ダウンロード

ブラウザ側では受け取ったデータを画面内で使うのではなくダウンロードしたいケースもあります
ダウンロードなら a タグの download 属性を使ったり サーバーが返すレスポンスのヘッダーで Content-Disposition: attachment をつけたりもできます
ですが これらだとエラーの場合の制御とかができないのですよね
ページ遷移や新しいタブで開かれたページにエラーが出ることになります
画面内でエラーを通知できないです
stream をそのままファイルに書き込む方法にすればエラーを制御できます

document.querySelector(".download").onclick = async () => {
const handle = await window.showSaveFilePicker()
const res = await fetch("/data")

if (!res.ok) {
alert("error")
console.error(res)
return
}

const writable = await handle.createWritable()
await res.body.pipeTo(writable)
console.log("Done")
}

ここでは alert ですが もっとリッチな toast とかにもできます

受信データをそのまま保存するようにしていますが 変換する stream を通して変換することもできます
文字列を大文字化する場合はこういう感じです

document.querySelector(".download").onclick = async () => {
const handle = await window.showSaveFilePicker()
const res = await fetch("/data")

if (!res.ok) {
alert("error")
console.log(res)
return
}

const writable = await handle.createWritable()
await res.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
},
}))
.pipeTo(writable)

console.log("Done")
}