-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.ts
149 lines (124 loc) · 6.1 KB
/
client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import { Socket } from "net";
import { AMQPChannelMethod, AMQPClassesId, AMQPContentBodyFrame, AMQPContentHeaderFrame, ProtocolFrame } from "./lib/base-frames";
import EventEmitter, { once } from "events";
import { ConnectionOpen, ConnectionStartOk, ConnectionTuneOk } from "./lib/connection-frames";
import { LongString, Table } from "./lib/amqp-data-types";
import { ChannelClose, ChannelOpen } from "./lib/channel-frame";
import { BasicPublish } from "./lib/basic-frames";
import { QueueDeclare } from "./lib/queue-frames";
export async function createConnection(uri: string, options: any = {}): Promise<AMQPClient> {
const sock = new Socket();
return new Promise((resolve, reject) => {
sock.on('error', reject);
sock.on('close', reject);
sock.connect(5672, 'localhost', () => {
console.log(`Connected`);
const client = new AMQPClient(sock);
return client.connect().then(resolve);
})
})
}
class AMQPClient {
private readonly __channels = [];
private readonly __framesEmitter = new EventEmitter();
constructor(private readonly _socket: Socket){
this._recvFrames();
}
async connect(){
const protocolFrame = new ProtocolFrame();
this._socket.write(protocolFrame.getBuffer());
await once(this.__framesEmitter, 'Connection#Start');
this._socket.write(this._createConnectionStartOkFrame());
const [{ frameSizeMax, channelMax, heartbeat }] = await once(this.__framesEmitter, 'Connection#Tune');
this._socket.write(new ConnectionTuneOk(channelMax, frameSizeMax, heartbeat).getBuffer());
this._socket.write(new ConnectionOpen("/").getBuffer());
await once(this.__framesEmitter, 'Connection#OpenOk');
return this;
}
_createConnectionStartOkFrame(){
const clientProperties = {
product: new LongString('MyApp'),
version: new LongString('1.0.0'),
};
const mechanism = 'PLAIN';
const response = `\u0000guest\u0000guest`;
const locale = 'en_US';
return new ConnectionStartOk(new Table(clientProperties), mechanism, response, locale).getBuffer();
}
_recvFrames(){
this._socket.on('data', (data) => {
let frameOffset = 0;
const view = new DataView(data.buffer);
const type = view.getUint8(frameOffset);
const channelId = view.getUint16(frameOffset += 1);
const frameSize = view.getUint32(frameOffset += 2);
const frameEnd = view.getUint8((frameOffset += 4) + frameSize);
console.log(JSON.stringify({type, channelId, frameSize, frameEnd}));
if(frameEnd !== 206){
throw new Error(`Frame end invalid ${frameEnd}`);
}
if(type === 8){
return this._socket.write(Buffer.from(new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206])));
}
if(type === 1){
console.log(`Received Method`);
const classId = view.getUint16(frameOffset);
const methodId = view.getUint16(frameOffset += 2);
console.log(JSON.stringify({ classId, methodId }));
if(classId === 10 && methodId === 10){
console.log(`Received Connection#Start`);
this.__framesEmitter.emit('Connection#Start');
}
if(classId === 10 && methodId === 30){
console.log(`Received Connection#Tune`);
const channelMax = view.getInt16(frameOffset += 2);
const frameSizeMax = view.getInt32(frameOffset += 2);
const heartBeat = view.getInt16(frameOffset += 4);
console.log('server tunning parameters', JSON.stringify({ channelMax, frameSizeMax, heartBeat }));
this.__framesEmitter.emit('Connection#Tune', { channelMax, frameSizeMax, heartBeat });
}
if(classId === 10 && methodId === 41){
console.log(`Received Connection#OpenOk`);
this.__framesEmitter.emit('Connection#OpenOk');
}
if(classId === 20 && methodId == 41){
console.log(`Received Channel#CloseOk`);
this.__framesEmitter.emit(`Channel#CloseOk:${channelId}`);
}
if(classId === 20 && methodId === 11){
this.__framesEmitter.emit(`Channel#OpenOk:${channelId}`);
}
}
});
}
async createChannel(){
const channelId = this.__channels.length + 1;
this._socket.write(new ChannelOpen(channelId).getBuffer());
await once(this.__framesEmitter, `Channel#OpenOk:${channelId}`);
return new AMQPChannel(channelId, this._socket, this.__framesEmitter);
}
}
class AMQPChannel {
constructor(
private readonly _channelId: number,
private readonly _sock: Socket,
private readonly __frameEmitter: EventEmitter,
){}
async publish(data: Buffer){
const publish = new BasicPublish(this._channelId, '', 'flow-controller', false, false).getBuffer();
const contentHeader = new AMQPContentHeaderFrame(this._channelId, data.byteLength, 0, new Table({})).getBuffer();
const payload = new AMQPContentBodyFrame(this._channelId, data).getBuffer();
return new Promise((res) => this._sock.write(Buffer.concat([publish, contentHeader, payload]), res));
}
async close(){
this._sock.write(new ChannelClose(this._channelId, 200, "Client closed connection", AMQPClassesId.CHANNEL, AMQPChannelMethod.CLOSE).getBuffer());
await once(this.__frameEmitter, `Channel#CloseOk:${this._channelId}`);
}
async declareQueue(queueName: string){
return new Promise((res) => this._sock.write(new QueueDeclare(this._channelId, queueName, false, true, false, true, {}).getBuffer(), res));
}
}
const client = await createConnection('amqp://localhost:5672');
const channel = await client.createChannel();
channel.declareQueue('some-queue-another');
//channel.publish(Buffer.from('o'));