Wyle.Gong-巩文昕 7a1aae1e2f ui
2025-04-23 11:21:08 +08:00

548 lines
14 KiB
JavaScript

/*
* Copyright 2018-2021 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
const test = require("ava");
const { delay, DataBuffer } = require("../lib/nats-base-client/internal_mod");
const { connect, Empty, consumerOpts, AckPolicy, headers, StringCodec } =
require(
"./index",
);
const { NatsServer, wsConfig } = require("./helpers/launcher");
const { jetstreamServerConf } = require("./helpers/jsutil");
const { setTimeout } = require("timers");
test("jetstream - jsm", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const jsm = await nc.jetstreamManager();
const ai = await jsm.getAccountInfo();
// we made one api call, so
t.is(ai.api.total, 1);
t.is(ai.type, "io.nats.jetstream.api.v1.account_info_response");
let streams = await jsm.streams.list().next();
t.is(streams.length, 0);
let si = await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
t.is(si.config.name, "stream");
t.is(si.config.subjects.length, 1);
t.is(si.config.subjects[0], "hello.>");
t.is(si.state.messages, 0);
streams = await jsm.streams.list().next();
t.is(streams.length, 1);
t.is(streams[0].config.name, "stream");
const h = headers();
h.set("xxx", "a");
nc.publish("hello.world", Empty, { headers: h });
nc.publish("hello.world", Empty, { headers: h });
await nc.flush();
si = await jsm.streams.info("stream");
t.is(si.state.messages, 2);
const conf = si.config;
conf.subjects.push("goodbye.>");
await jsm.streams.update(conf.name, conf);
const name = await jsm.streams.find("goodbye.>");
t.is(name, "stream");
let ci = await jsm.consumers.add("stream", {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
t.is(ci.name, "me");
t.is(ci.delivered.stream_seq, 0);
t.is(ci.num_pending, 2);
let consumers = await jsm.consumers.list("stream").next();
t.is(consumers.length, 1);
ci = await jsm.consumers.info("stream", "me");
t.is(ci.name, "me");
let ok = await jsm.consumers.delete("stream", "me");
t.is(ok, true);
await t.throwsAsync(async () => {
await jsm.consumers.info("stream", "me");
}, { message: "consumer not found" });
consumers = await jsm.consumers.list("stream").next();
t.is(consumers.length, 0);
const sm = await jsm.streams.getMessage("stream", { seq: 2 });
t.is(sm.seq, 2);
t.truthy(sm.header);
t.is(sm.header.get("xxx"), "a");
ok = await jsm.streams.deleteMessage("stream", 1);
t.is(ok, true);
si = await jsm.streams.info("stream");
t.is(si.state.messages, 1);
const pr = await jsm.streams.purge("stream");
t.is(pr.purged, 1);
t.is(pr.success, true);
ok = await jsm.streams.delete("stream");
t.is(ok, true);
await t.throwsAsync(async () => {
await jsm.streams.info("stream");
}, { message: "stream not found" });
streams = await jsm.streams.list().next();
t.is(streams.length, 0);
await nc.flush();
await nc.close();
await ns.stop();
});
test("jetstream - pull", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
await jsm.consumers.add("stream", {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();
await t.throwsAsync(async () => {
await js.pull("stream", "me");
}, { message: /no messages$/ });
let pa = await js.publish("hello.world", Empty, {
expect: { lastSequence: 0 },
});
t.is(pa.seq, 1);
pa = await js.publish("hello.world", Empty, { expect: { lastSequence: 1 } });
t.is(pa.seq, 2);
let m = await js.pull("stream", "me");
t.is(m.seq, 1);
m.ack();
m = await js.pull("stream", "me");
t.is(m.seq, 2);
m.ack();
await t.throwsAsync(async () => {
await js.pull("stream", "me");
}, { message: /no messages$/ });
await nc.flush();
await nc.close();
await ns.stop();
});
test("jetstream - fetch", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
await jsm.consumers.add("stream", {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
const js = nc.jetstream();
let iter = await js.fetch("stream", "me", { no_wait: true });
await (async () => {
for await (const m of iter) {
// nothing
}
})();
t.is(iter.getProcessed(), 0);
let pa = await js.publish("hello.world", Empty, {
expect: { lastSequence: 0 },
});
t.is(pa.seq, 1);
pa = await js.publish("hello.world", Empty, { expect: { lastSequence: 1 } });
t.is(pa.seq, 2);
pa = await js.publish("hello.world", Empty, { expect: { lastSequence: 2 } });
t.is(pa.seq, 3);
iter = await js.fetch("stream", "me", { no_wait: true, batch: 2 });
await (async () => {
for await (const m of iter) {
m.ack();
}
})();
t.is(iter.getProcessed(), 2);
iter = await js.fetch("stream", "me", { no_wait: true, batch: 2 });
await (async () => {
for await (const m of iter) {
m.ack();
}
})();
t.is(iter.getProcessed(), 1);
iter = await js.fetch("stream", "me", { no_wait: true, batch: 3 });
await (async () => {
for await (const m of iter) {
m.ack();
}
})();
t.is(iter.getProcessed(), 0);
await nc.flush();
await nc.close();
await ns.stop();
});
test("jetstream - jetstream sub", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
const js = nc.jetstream();
let pa = await js.publish("hello.world", Empty, {
expect: { lastSequence: 0 },
});
t.is(pa.seq, 1);
pa = await js.publish("hello.world", Empty, { expect: { lastSequence: 1 } });
t.is(pa.seq, 2);
pa = await js.publish("hello.world", Empty, { expect: { lastSequence: 2 } });
t.is(pa.seq, 3);
const cob = consumerOpts();
cob.durable("me");
cob.deliverTo("hi");
cob.ackExplicit();
cob.deliverAll();
cob.maxMessages(3);
let iter = await js.subscribe("hello.>", cob);
await (async () => {
for await (const m of iter) {
m.ack();
}
})();
t.is(iter.getProcessed(), 3);
await nc.close();
await ns.stop();
});
test("jetstream - jetstream pullsub", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
let nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
let js = nc.jetstream();
let pa = await js.publish("hello.world", Empty);
t.is(pa.seq, 1);
pa = await js.publish("hello.world", Empty);
t.is(pa.seq, 2);
pa = await js.publish("hello.world", Empty);
t.is(pa.seq, 3);
const cob = consumerOpts();
cob.durable("me");
cob.ackExplicit();
cob.deliverAll();
cob.maxMessages(3);
const sub = await js.pullSubscribe("hello.>", cob);
const done = (async () => {
for await (const m of sub) {
m.ack();
}
})();
// pull one
sub.pull({ batch: 1, expires: 100 });
await delay(150);
t.is(sub.getProcessed(), 1);
// pull two more
sub.pull({ batch: 2, expires: 100 });
await delay(150);
t.is(sub.getProcessed(), 3);
await done;
t.is(sub.getProcessed(), 3);
await delay(1000);
await nc.close();
await ns.stop();
});
test("jetstream - kv basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();
const kv = await js.views.kv("test");
const sc = StringCodec();
await kv.put("a", sc.encode("hello"));
const v = await kv.get("a");
t.truthy(v);
t.is(v.bucket, "test");
t.is(v.key, "a");
t.is(sc.decode(v.value), "hello");
await nc.close();
await ns.stop();
});
function readableStreamFrom(data) {
return new ReadableStream({
pull(controller) {
controller.enqueue(data);
controller.close();
},
});
}
async function fromReadableStream(
rs,
) {
const buf = new DataBuffer();
const reader = rs.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
return buf.drain();
}
if (value && value.length) {
buf.fill(value);
}
}
}
test("jetstream - os basics", async (t) => {
if (process.version.startsWith("v14.")) {
t.log(
`node ${process.version} cannot run objectstore as webcrypto is not available`,
);
t.pass();
return;
}
if (typeof globalThis.crypto === "undefined") {
const c = require("crypto");
global.crypto = c.webcrypto;
}
if (typeof globalThis.ReadableStream === "undefined") {
const streams = require("web-streams-polyfill/ponyfill");
global.ReadableStream = streams.ReadableStream;
}
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();
const os = await js.views.os("test");
const sc = StringCodec();
await os.put({ name: "a" }, readableStreamFrom(sc.encode("hello")));
const v = await os.get("a");
t.truthy(v);
t.is(v.info.bucket, "test");
t.is(v.info.name, "a");
t.is(v.info.chunks, 1);
t.is(sc.decode(await fromReadableStream(v.data)), "hello");
await nc.close();
await ns.stop();
});
test("jetstream - consumer basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();
await t.throwsAsync(async () => {
await js.consumers.get("stream", "a");
}, { message: "stream not found" });
await t.throwsAsync(async () => {
await js.streams.get("stream");
}, { message: "stream not found" });
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
const s = await js.streams.get("stream");
t.truthy(s);
t.is(s.name, "stream");
await t.throwsAsync(async () => {
await js.consumers.get("stream", "a");
}, { message: "consumer not found" });
await t.throwsAsync(async () => {
await s.getConsumer("a");
}, { message: "consumer not found" });
await jsm.consumers.add("stream", {
name: "a",
ack_policy: AckPolicy.ackExplicit,
});
let c = await js.consumers.get("stream", "a");
t.truthy(c);
c = await s.getConsumer("a");
t.truthy(c);
let ci = await c.info(true);
t.is(ci.name, "a");
t.is(ci.num_pending, 0);
let m = await c.next({ expires: 1000 });
t.is(m, null);
await js.publish("hello.a");
await js.publish("hello.b");
await js.publish("hello.c");
await js.publish("hello.d");
ci = await c.info();
t.is(ci.num_pending, 4);
m = await c.next();
m.ack();
t.is(m.subject, "hello.a");
let iter = await c.fetch({ max_messages: 2 });
const buf = [];
for await (let m of iter) {
buf.push(m);
m.ack();
}
t.is(iter.getProcessed(), 2);
t.is(buf.length, 2);
t.is(buf[0].subject, "hello.b");
t.is(buf[1].subject, "hello.c");
buf.length = 0;
iter = await c.consume();
const done = (async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
}
})();
setTimeout(() => {
iter.stop();
}, 2000);
await done;
t.is(iter.getProcessed(), 1);
t.is(buf.length, 1);
t.is(buf[0].subject, "hello.d");
await nc.close();
await ns.stop();
});
test("jetstream - ordered consumer basics", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf(wsConfig()));
const nc = await connect({ servers: `ws://127.0.0.1:${ns.websocket}` });
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "stream", subjects: ["hello.>"] });
const s = await js.streams.get("stream");
try {
await js.consumers.get("stream");
} catch (err) {
if (err.message.indexOf("is only supported on") !== -1) {
t.log(err.message);
t.pass();
await nc.close();
await ns.stop();
return;
} else {
t.fail(err);
}
}
let c = await js.consumers.get("stream");
t.truthy(c);
c = await s.getConsumer();
t.truthy(c);
let ci = await c.info(true);
t.is(ci.num_pending, 0);
let m = await c.next({ expires: 1000 });
t.is(m, null);
await js.publish("hello.a");
await js.publish("hello.b");
await js.publish("hello.c");
await js.publish("hello.d");
ci = await c.info();
t.is(ci.num_pending, 4);
m = await c.next();
m.ack();
t.is(m.subject, "hello.a");
c = await js.consumers.get("stream");
let iter = await c.fetch({ max_messages: 4 });
const buf = [];
for await (let m of iter) {
buf.push(m);
m.ack();
}
t.is(iter.getProcessed(), 4);
t.is(buf.length, 4);
t.is(buf[0].subject, "hello.a");
t.is(buf[1].subject, "hello.b");
t.is(buf[2].subject, "hello.c");
t.is(buf[3].subject, "hello.d");
buf.length = 0;
c = await js.consumers.get("stream");
iter = await c.consume();
const done = (async () => {
for await (const m of iter) {
buf.push(m);
m.ack();
}
})();
setTimeout(() => {
iter.stop();
}, 2000);
await done;
t.is(iter.getProcessed(), 4);
t.is(buf.length, 4);
await nc.close();
await ns.stop();
});