source: genappalpha/languages/html5/util/msg-tcpserver.go @ 1488

Last change on this file since 1488 was 1488, checked in by ehb, 4 years ago

fixes for install with tcp messaging

File size: 13.2 KB
Line 
1package main
2
3import (
4        "github.com/fredli74/lockfile"
5        zmq "github.com/pebbe/zmq__zmqversion__"
6        "encoding/json"
7        "net"
8        "io"
9        "os"
10        "bufio"
11        "fmt"
12        "io/ioutil"
13        "time"
14        "sync"
15        "strconv"
16        __~debug:tcp{"reflect"}
17)
18
19var appconfig = "__appconfig__"
20
21var timeout time.Duration = 60 * time.Second // in seconds, maybe set override in request
22
23var messaging map[string]interface{}
24var zmqlisten string
25var tcplisten string
26var tcprlisten string
27var rchanmap map[uint64] chan []byte
28var mutex = &sync.Mutex{} // synchronize access to rchanmap
29var lockdir string = "/var/run/genapp"
30var tcplockfile string
31
32func main() {
33
34        if len(os.Args) > 1 {
35                appconfig = os.Args[1]
36        } else {
37                fmt.Println( "appconfig used:" + appconfig )
38        }
39
40        getappconfig() // make sure we have a good appconfig
41
42        if lock, err := lockfile.Lock(tcplockfile); err != nil {
43                panic(err)
44        } else {
45                defer lock.Unlock()
46        }
47
48        fmt.Printf( "default timeout:%v\n", timeout )
49
50        rchanmap = make(map[uint64] chan []byte)
51
52        // listen
53        server, err := net.Listen("tcp", tcplisten )
54        if server == nil {
55                panic("couldn't start listening: " + tcplisten + ":" + err.Error())
56        }
57        fmt.Println( "msg-tcpserver started listening on " + tcplisten );
58
59        rserver, rerr := net.Listen("tcp", tcprlisten )
60        if rserver == nil {
61                panic("couldn't start listening: " + tcprlisten + ":" + rerr.Error())
62        }
63        fmt.Println( "msg-tcpserver started response listening on " + tcprlisten );
64
65        conns, connsid := clientConns(server) // probably need 2 channels, one for net.Conn, 2nd for int64 - id
66        rconns, rconnsid := clientConns(rserver)
67
68        for {
69                select {
70                case msg1 := <-conns:
71                        id := <-connsid
72                        go handleConn( msg1, id )
73                case msg2 := <-rconns:
74                        _ = <-rconnsid // ignored
75                        go handleRconn( msg2 ) // receives response, will check rchanmap for valid id received in response
76                }
77        }
78}
79
80func getappconfig() {
81        dat, err := ioutil.ReadFile( appconfig );
82
83        if err != nil {
84                panic( "error: reading "  + appconfig + " : " + err.Error())
85        }
86
87        var jdata map[string]interface{}
88        err = json.Unmarshal(dat, &jdata)
89
90        if _, ok := jdata["messaging"]; ok {
91                // v := jdata["messaging"]
92                // to uncomment these debugging statements, import "reflect" above
93                // fmt.Printf("  value:%v  kind:%s  type:%s\n", v, reflect.TypeOf(v).Kind(), reflect.TypeOf(v))
94                messaging = jdata["messaging"].(map[string]interface{})
95                if _, ok := messaging["tcpport"]; !ok {
96                        panic( "error: in "  + appconfig + " : no messaging:tcpport defined" );
97                }
98                // fmt.Printf( "tcpport: %v\n", messaging["tcpport"] )
99                if _, ok := messaging["tcphostip"]; !ok {
100                        panic( "error: in "  + appconfig + " : no messaging:tcphostip defined" );
101                }
102                if _, ok := messaging["zmqhostip"]; !ok {
103                        panic( "error: in "  + appconfig + " : no messaging:zmqhostip defined" );
104                }
105                if _, ok := messaging["zmqport"]; !ok {
106                        panic( "error: in "  + appconfig + " : no messaging:zmqport defined" );
107                }
108                if _, ok := messaging["tcprport"]; !ok {
109                        panic( "error: in "  + appconfig + " : no messaging:tcprport defined" );
110                }
111                if _, ok := messaging["tcptimeout"]; ok {
112                        if _, ok := messaging["tcptimeout"].(float64); ok {
113                                timeout = time.Duration(messaging["tcptimeout"].(float64)) * time.Second
114                        }
115                }
116        } else {
117                panic( "error: in "  + appconfig + " : no messaging defined" );
118        }
119        if _, ok := jdata["lockdir"]; ok {
120                lockdir = jdata["lockdir"].(string)
121        }
122        tcplockfile = lockdir + "/msg-tcp-" + fmt.Sprint( messaging["zmqport"] ) + ".lock"
123        tcplisten = fmt.Sprint( messaging["tcphostip"] ) + ":" + fmt.Sprint( messaging["tcpport"] )
124        tcprlisten = fmt.Sprint( messaging["tcphostip"] ) + ":" + fmt.Sprint( messaging["tcprport"] )
125        zmqlisten = "tcp://" + fmt.Sprint( messaging["zmqhostip"] ) + ":" + fmt.Sprint( messaging["zmqport"] )
126}
127
128func clientConns(listener net.Listener) ( chan net.Conn, chan uint64 ) {
129        ch := make(chan net.Conn)
130        chi := make(chan uint64)
131        var i uint64 = 0
132        go func() {
133                for {
134                        client, err := listener.Accept()
135                        if client == nil {
136                                fmt.Printf("couldn't accept: " + err.Error())
137                                continue
138                        }
139                        i++
140                        __~debug:tcp{fmt.Printf("%d: %v <-> %v\n", i, client.LocalAddr(), client.RemoteAddr())}
141                        ch <- client
142                        chi <- i
143                }
144        }()
145        return ch, chi
146}
147
148func handleConn(client net.Conn, id uint64) {
149        __~debug:tcp{fmt.Println( "handleConn" )}
150        b := bufio.NewReader(client)
151
152        defer client.Close();
153
154        // open zmqclient to forward message
155        zmqclient, err := zmq.NewSocket(zmq.PUSH)
156        if err != nil { // EOF, or worse
157                rmap := map[string]string{"error":"zmq socket open error:" + err.Error()}
158                rmapj, _ := json.Marshal( rmap )
159                client.Write( rmapj )
160                return
161        }
162
163        defer zmqclient.Close()
164
165        zmqclient.Connect( zmqlisten )
166
167        line, err := b.ReadBytes('\n')
168        is_closed := false
169        if err != nil { // EOF, or worse
170                if err == io.EOF {
171                        __~debug:tcp{fmt.Printf("eof: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
172                        is_closed = true
173                } else {
174                        __~debug:tcp{fmt.Printf("read client error: %v <-> %v %v\n", client.LocalAddr(), client.RemoteAddr(), err.Error())}
175                }
176                if len(line) < 2 {
177                        return
178                }
179        }
180        // do we have good json?
181
182        var jdata map[string]interface{}
183        err = json.Unmarshal(line, &jdata)
184        if err != nil {
185                // send error json back and exit
186                rmap := map[string]string{"error":"invalid json : " + err.Error()}
187                rmapj, _ := json.Marshal( rmap )
188                client.Write( rmapj )
189                __~debug:tcp{fmt.Printf("invalid json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
190                __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
191                return;
192        }
193
194        if _, ok := jdata["_uuid"]; !ok {
195                // send error json back and exit
196                rmap := map[string]string{"error":"json missing _uuid"}
197                rmapj, _ := json.Marshal( rmap )
198                client.Write( rmapj )
199                __~debug:tcp{fmt.Printf("no uuid in json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
200                __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
201                return;
202        }
203
204        if _, ok := jdata["_question"]; !ok {
205                // not a question, so just pass it along
206                zmqclient.SendMessage( line );
207                rmap := map[string]string{"ok":"message forwarded to client"}
208                rmapj, _ := json.Marshal( rmap )
209                client.Write( rmapj )
210                __~debug:tcp{fmt.Printf("response len %d: %v <-> %v\n", len(line), client.LocalAddr(), client.RemoteAddr())}
211                __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
212                return;
213        }
214
215        // add _msgid to json data for channel retrieval
216        jdata["_msgid"] = id;
217        if line, err = json.Marshal( jdata ); err != nil {
218                rmap := map[string]string{"error":"internal error : " + err.Error()}
219                rmapj, _ := json.Marshal( rmap )
220                client.Write( rmapj )
221                __~debug:tcp{fmt.Println("json marhsal error:" + err.Error() )}
222                __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
223                return;
224        }
225
226        if is_closed {
227                __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
228                return
229        }
230
231        use_timeout := timeout;
232
233        if _, ok := jdata["timeout"]; ok {
234                if _, ok := jdata["timeout"].(float64); ok {
235                        use_timeout = time.Duration(jdata["timeout"].(float64)) * time.Second
236                } else if _, ok := jdata["timeout"].(string); ok {
237                        msgidret, err := strconv.ParseUint(jdata["timeout"].(string), 10, 64 );
238                        if err != nil {
239                                rmap := map[string]string{"error":"_question:timeout specified but could not be converted to a time: " + err.Error()}
240                                rmapj, _ := json.Marshal( rmap )
241                                client.Write( rmapj )
242                                fmt.Println("json marhsal error:" + err.Error() )
243                                fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())
244                                return;
245                        }
246                        use_timeout = time.Duration(msgidret) * time.Second
247                }
248        }
249
250        // all ok, forward question
251
252        rchanmap[id] = make(chan []byte)
253        zmqclient.SendMessage(line)
254
255        // wait for response
256
257        select {
258        case res := <-rchanmap[id]:
259                delete(rchanmap, id)
260                client.Write(res)
261                ackmap := map[string]string{"_uuid":jdata["_uuid"].(string),"_msgid":strconv.FormatUint( id, 10 ),"_question_answered":""}
262                ackmapj, _ := json.Marshal( ackmap )
263                zmqclient.SendMessage( ackmapj )
264                __~debug:tcp{fmt.Printf("response len %d: %v <-> %v\n", len(res), client.LocalAddr(), client.RemoteAddr())}
265        case <-time.After(use_timeout): // maybe set override in request
266                mutex.Lock() // handle possible race condition: timeout happens in race with response received
267                if len( rchanmap[ id ] ) > 0 {
268                        mutex.Unlock()
269                        res := <-rchanmap[id]
270                        delete(rchanmap, id)
271                        client.Write(res)
272                        ackmap := map[string]string{"_uuid":jdata["_uuid"].(string),"_msgid":strconv.FormatUint( id, 10 ),"_question_answered":""}
273                        ackmapj, _ := json.Marshal( ackmap )
274                        zmqclient.SendMessage( ackmapj )
275                        __~debug:tcp{fmt.Printf("response len %d: %v <-> %v\n", len(res), client.LocalAddr(), client.RemoteAddr())}
276                        return
277                }
278                delete(rchanmap, id)
279                mutex.Unlock()
280                rmap := map[string]string{"error":"timeout"}
281                rmapj, _ := json.Marshal( rmap )
282                client.Write( rmapj )
283                ackmap := map[string]string{"_uuid":jdata["_uuid"].(string),"_msgid":strconv.FormatUint( id, 10 ),"_question_timeout":""}
284                ackmapj, _ := json.Marshal( ackmap )
285                zmqclient.SendMessage( ackmapj )
286        }
287
288        __~debug:tcp{fmt.Printf("closed: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
289}
290
291func handleRconn(client net.Conn) {
292        __~debug:tcp{fmt.Println( "handleRconn" )}
293        b := bufio.NewReader(client)
294
295        defer client.Close();
296
297        // just once
298        // for {
299
300        line, err := b.ReadBytes('\n')
301        if err != nil { // EOF, or worse
302                if err == io.EOF {
303                        __~debug:tcp{fmt.Printf("eof on tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
304                } else {
305                        __~debug:tcp{fmt.Printf("read client error on tcpr: %v <-> %v %v\n", client.LocalAddr(), client.RemoteAddr(), err.Error())}
306                }
307                if len(line) < 2 {
308                        return
309                }
310        }
311        __~debug:tcp{fmt.Println("tcprserver received" + string(line))}
312        // check message, if ok, channel response to rchanmap[_msgid] for tcp response
313
314        var jdata map[string]interface{}
315        err = json.Unmarshal(line, &jdata)
316        if err != nil {
317                // send error json back and exit
318                rmap := map[string]string{"error":"invalid json : " + err.Error()}
319                rmapj, _ := json.Marshal( rmap )
320                client.Write( rmapj )
321                __~debug:tcp{fmt.Printf("tcpr: invalid json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
322                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
323                return;
324        }
325
326        if _, ok := jdata["_uuid"]; !ok {
327                // send error json back and exit
328                rmap := map[string]string{"error":"json missing _uuid"}
329                rmapj, _ := json.Marshal( rmap )
330                client.Write( rmapj )
331                __~debug:tcp{fmt.Printf("tcpr: no _uuid in json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
332                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
333                return;
334        }
335       
336        if _, ok := jdata["_response"]; !ok {
337                // send error json back and exit
338                rmap := map[string]string{"error":"json missing _response"}
339                rmapj, _ := json.Marshal( rmap )
340                client.Write( rmapj )
341                __~debug:tcp{fmt.Printf("tcpr: no _response in json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
342                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
343                return;
344        }
345       
346        if _, ok := jdata["_msgid"]; !ok {
347                // send error json back and exit
348                rmap := map[string]string{"error":"json missing _msgid"}
349                rmapj, _ := json.Marshal( rmap )
350                client.Write( rmapj )
351                __~debug:tcp{fmt.Printf("tcpr: no _msgid in json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
352                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
353                return;
354        }
355
356        var msgid uint64;
357
358        if _, ok := jdata["_msgid"].(float64); ok {
359                msgid = uint64(jdata["_msgid"].(float64));
360        } else if _, ok := jdata["_msgid"].(string); ok {
361                msgidret, err := strconv.ParseUint(jdata["_msgid"].(string), 10, 64 );
362                if err != nil {
363                        // send error json back and exit
364                        rmap := map[string]string{"error":"json _msgid not numeric, error converting string"}
365                        rmapj, _ := json.Marshal( rmap )
366                        client.Write( rmapj )
367                        __~debug:tcp{fmt.Println(err.Error())}
368                        __~debug:tcp{fmt.Printf("tcpr: _msgid not numeric 1: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
369                        __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
370                        return;
371                }
372                msgid = msgidret
373        } else {
374                __~debug:tcp{fmt.Printf(" type of _msgid  kind:%s  type:%s\n", reflect.TypeOf(jdata["_msgid"]).Kind(), reflect.TypeOf( jdata["_msgid"]))}
375                // send error json back and exit
376                rmap := map[string]string{"error":"json _msgid not numeric"}
377                rmapj, _ := json.Marshal( rmap )
378                client.Write( rmapj )
379                __~debug:tcp{fmt.Printf("tcpr: _msgid not numeric 2: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
380                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
381                return;
382        }
383
384        mutex.Lock() // handle possible race condition: timeout happens in race with response received
385        if _, ok := rchanmap[ msgid ]; !ok {
386                mutex.Unlock();
387                // send error json back and exit
388                rmap := map[string]string{"error":"tcp receiver closed, likely due to timeout"}
389                rmapj, _ := json.Marshal( rmap )
390                client.Write( rmapj )
391                __~debug:tcp{fmt.Printf("tcpr: no uuid in json: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
392                __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
393                return;
394        }
395               
396        rchanmap[ msgid ] <- line
397        mutex.Unlock();
398
399        __~debug:tcp{fmt.Printf("closed tcpr: %v <-> %v\n", client.LocalAddr(), client.RemoteAddr())}
400}
Note: See TracBrowser for help on using the repository browser.