IPv4 によるマルチユニキャスト

IPv4 によるマルチユニキャスト:


1. 概要

この記事では、主にクラウド対策として IPv4 による “マルチユニキャスト1” 実現のための Golang サンプルコードを投稿します。


2. はじめに

前回までの記事で、マルチキャストやブロードキャストについて投稿してきました。

が、クラウドのインフラ環境(AWS, GCP, Azure など)においては、マルチキャストやブロードキャストなどの通信は制限されているのが通例だと思います。

また、同じサブネット環境ならばマルチキャストやブロードキャストなどはシンプルに利用できて有用ですが、異なるサブネット環境や、グローバル環境においては、複雑化したり現実的ではないことが多くあります。

そこで、今回の記事では、これらの対策として、マルチユニキャスト1 の Golang サンプルコードを投稿します。

これは、マルチキャストの代換案の一つですが、実際には、マルチにユニキャストしているだけの大した内容ではありません(笑)。

が、このサンプルでは、不特定多数のノードを扱えるマルチキャストの利点に近づけるため、ノードをマルチユニキャストグループへ自動参加させるコードも併せて投稿しておきました。

これは、ノード数が多く、ユニキャストアドレスペアを各ノードの設定ファイルへ静的に追加するのが困難な場合にも有用かもしれません。


3. 環境

  • RHEL-7系
  • Go 1.9


4. 構成

  • node1

    • 192.0.2.1/24
  • node2

    • 192.0.2.2/24


5. 設計



multi-unicast.png



6. Golang サンプルコード


6-1. シンプルなマルチユニキャストのサンプル

$ sudo nvim ~/go/multi_unicast_only.go

~/go/multi_unicast_only.go
func SendMultiUnicast(addresses []string, message string) { 
    // Start multi-unicast 
    for _, address := range addresses { 
        go func(_address string) { 
            address_byte, err := net.ResolveUDPAddr("udp", _address) 
            multi_unicast, err := net.DialUDP("udp", nil, address_byte) 
            Error(err) 
 
            defer multi_unicast.Close() 
            fmt.Printf("Connected multi-unicast > %s\n", _address) 
 
            // Multi-unicast message 
            multi_unicast.Write([]byte(_message)) 
            fmt.Printf("Multi-unicast > %s as “%s”\n", _address, _message) 
        }(address) 
    } 
} 


6-2. マルチユニキャストグループ自動参加サンプル

@ node1, 2
$ sudo nvim ~/go/multi_unicast_auto.go

~/go/multi_unicast_auto.go
package main 
 
import ( 
    "flag" 
    "fmt" 
    "net" 
    "strings" 
) 
 
var DefaultIp string = "192.0.2.1" // node2 は 192.0.2.2 
var DefaultPort string = ":56789" 
var DefaultAddr string = DefaultIp + DefaultPort 
var ReceptorPort string = ":56788" 
var ReceptorAddr string = DefaultIp + ReceptorPort 
var ApplyingToIp string = "" 
var ApplyingToAddr string = "" 
var ApplyingMessage string = "applying" 
var EntryPrefix string = "entry:" 
var MultiUnicastAddresses []string 
var BufferByte int = 64 
 
func main() { 
 
    // Start receptor 
    receptor_addr_byte, err := net.ResolveUDPAddr("udp", ReceptorAddr) 
    Error(err) 
 
    // Listen receptor 
    receptor, err := net.ListenUDP("udp", receptor_addr_byte) 
    Error(err) 
    defer receptor.Close() 
    fmt.Printf("Listened receptor *:* > %s\n", ReceptorAddr) 
 
    receptor_buffer := make([]byte, BufferByte) 
    go func() { 
        for { 
            // Receive applying message 
            length, applying_addr_byte, err := receptor.ReadFrom(receptor_buffer) 
            Error(err) 
            applying_message := string(receptor_buffer[:length]) 
            applying_addr := applying_addr_byte.(*net.UDPAddr).String() 
 
            applying_addr_parts := strings.Split(applying_addr, ":") 
            entry_ip := applying_addr_parts[0] 
            entry_addr := entry_ip + DefaultPort 
 
            if applying_message != ApplyingMessage { 
                continue 
            } 
            fmt.Printf("Reveived applying %s > %s as “%s”\n", applying_addr, ReceptorAddr, applying_message) 
 
            // Check duplicated addresses 
            entered_flag := false 
            for _, addr := range MultiUnicastAddresses { 
                if addr == entry_addr { 
                    entered_flag = true 
                    fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr) 
                    break 
                } 
            } 
 
            if !entered_flag { 
                // Append entry address 
                MultiUnicastAddresses = append(MultiUnicastAddresses, entry_addr) 
                fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr) 
 
                // Send entry message 
                entry_message := EntryPrefix + strings.Join(MultiUnicastAddresses, ",") 
                SendMultiUnicast(entry_message) 
            } 
        } 
    }() 
 
    // Apply own address to multi-unicast group 
    flag.Parse() 
    ApplyingToIp = flag.Arg(0) 
    ApplyingToAddr = ApplyingToIp + ReceptorPort 
    Apply() 
 
    // Start inbound 
    inbound_to_addr_byte, err := net.ResolveUDPAddr("udp", DefaultAddr) 
    Error(err) 
 
    // Listen inbound 
    inbound, err := net.ListenUDP("udp", inbound_to_addr_byte) 
    Error(err) 
    defer inbound.Close() 
    fmt.Printf("Listened inbound *:* > %s\n", DefaultAddr) 
 
    inbound_buffer := make([]byte, BufferByte) 
    for { 
        // Receive inbound message 
        length, inbound_from_addr, err := inbound.ReadFrom(inbound_buffer) 
        Error(err) 
        inbound_message := string(inbound_buffer[:length]) 
 
        // Receive entry message 
        if 0 == strings.Index(inbound_message, EntryPrefix) { 
            fmt.Printf("Reveived entry message %s > %s as “%s”\n", inbound_from_addr, DefaultAddr, inbound_message) 
 
            // Extract entry address 
            entry_messages := strings.Split(inbound_message, EntryPrefix) 
            entry_address := entry_messages[1] 
            entry_addresses := strings.Split(entry_address, ",") 
 
            for _, entry_addr := range entry_addresses { 
                entered_flag := false 
                for _, addr := range MultiUnicastAddresses { 
                    if addr == entry_addr { 
                        entered_flag = true 
                        fmt.Printf("Duplicated entry address in multi-unicast addresses: %s\n", entry_addr) 
                        break 
                    } 
                } 
 
                if !entered_flag { 
                    MultiUnicastAddresses = append(MultiUnicastAddresses, entry_addr) 
                    fmt.Printf("Appended entry address into multi-unicast addresses: %s\n", entry_addr) 
                } 
            } 
        } 
 
        // Receive others message 
        if 0 == strings.Index(inbound_message, "others:") { 
            // Process others 
        } 
    } 
} 
 
func Error(_err error) { 
    if _err != nil { 
        panic(_err) 
    } 
} 
 
func Apply() { 
    // Start applying 
    applying_to_addr, err := net.ResolveUDPAddr("udp", ApplyingToAddr) 
    applying, err := net.DialUDP("udp", nil, applying_to_addr) 
    Error(err) 
    defer applying.Close() 
    fmt.Printf("Connected applying > %s\n", ApplyingToAddr) 
 
    // Outbound applying message 
    applying.Write([]byte(ApplyingMessage)) 
    fmt.Printf("Outbound applying > %s as “%s”\n", ApplyingToAddr, ApplyingMessage) 
} 
 
func SendMultiUnicast(_message string) { 
    // Start multi-unicast 
    for _, to_addr := range MultiUnicastAddresses { 
 
        go func(_to_addr string) { 
            outbound_to_addr, err := net.ResolveUDPAddr("udp", _to_addr) 
            multi_unicast, err := net.DialUDP("udp", nil, outbound_to_addr) 
            Error(err) 
 
            defer multi_unicast.Close() 
            fmt.Printf("Connected multi-unicast > %s\n", outbound_to_addr) 
 
            // Multi-unicast message 
            multi_unicast.Write([]byte(_message)) 
            fmt.Printf("Multi-unicast > %v as “%s”\n", outbound_to_addr, _message) 
        }(to_addr) 
    } 
} 


7. Golang サンプル実行

@ node1

1 台目を起動して、一人マルチユニキャストグループを生成。

最下行の Duplicated は正しい挙動。(詳細は後述)
$ sudo go run ~/go/multi_unicast_auto.go 192.0.2.1

[user@node1 ~/go]$ go run multi_unicast_auto.go 192.0.2.1 
Listened receptor *:* > 192.0.2.1:56788 
Connected applying > 192.0.2.1:56788 
Outbound applying > 192.0.2.1:56788 as “applying” 
Listened inbound *:* > 192.0.2.1:56789 
Reveived applying 192.0.2.1:39312 > 192.0.2.1:56788 as “applying” 
Appended entry address into multi-unicast addresses: 192.0.2.1:56789 
Connected multi-unicast > 192.0.2.1:56789 
Multi-unicast > 192.0.2.1:56789 as “entry:192.0.2.1:56789” 
Reveived entry message 192.0.2.1:50219 > 192.0.2.1:56789 as “entry:192.0.2.1:56789” 
Duplicated entry address in multi-unicast addresses: 192.0.2.1:56789 
@ node2

2 台目のノードからコマンドを叩いて、マルチユニキャストグループへ参加申請。

※この際のコマンドライン引数の IP アドレスは、マルチユニキャストグループに参加済みのノードの IP アドレスならどれでも良い。(あえてマスターノードを指定する必要はない。)

申請ノードのアドレスはすぐに認識され、マルチユニキャストグループへエントリーされる。

エントリー後、すぐにグループ内でマルチユニキャストが可能になる。

$ sudo go run ~/go/multi_unicast_auto.go 192.0.2.1

[user@node2 ~/go]$ go run multi_unicast_auto.go 192.0.2.1 
Listened receptor *:* > 192.0.2.2:56788 
Connected applying > 192.0.2.1:56788 
Outbound applying > 192.0.2.1:56788 as “applying” 
Listened inbound *:* > 192.0.2.2:56789 
Reveived entry message 192.0.2.1:49989 > 192.0.2.2:56789 as “entry:192.0.2.1:56789,192.0.2.2:56789” 
Appended entry address into multi-unicast addresses: 192.0.2.1:56789 
Appended entry address into multi-unicast addresses: 192.0.2.2:56789 
@ node1

最下 2 行の Duplicated は、申請ノードから指定された受付ノードにおいては正しい挙動。

受付(applying)時に登録済みのため、参加(entry)時では重複となる。

[user@node1 ~/go]$ go run multi_unicast_auto.go 192.0.2.1 
...snip 
Reveived applying 192.0.2.2:33152 > 192.0.2.1:56788 as “applying” 
Appended entry address into multi-unicast addresses: 192.0.2.2:56789 
Connected multi-unicast > 192.0.2.2:56789 
Multi-unicast > 192.0.2.2:56789 as “entry:192.0.2.1:56789,192.0.2.2:56789” 
Connected multi-unicast > 192.0.2.1:56789 
Multi-unicast > 192.0.2.1:56789 as “entry:192.0.2.1:56789,192.0.2.2:56789” 
Reveived entry message 192.0.2.1:56758 > 192.0.2.1:56789 as “entry:192.0.2.1:56789,192.0.2.2:56789” 
Duplicated entry address in multi-unicast addresses: 192.0.2.1:56789 
Duplicated entry address in multi-unicast addresses: 192.0.2.2:56789 


8. まとめ

クラウドのインフラ環境(AWS, GCP, Azure など)においては、マルチキャストやブロードキャストなどの通信は制限されているのが通例だと思います。

また、同じサブネット環境ならばマルチキャストやブロードキャストなどはシンプルに利用できて有用ですが、異なるサブネット環境や、グローバル環境においては、複雑化したり現実的ではないことが多くあります。

そこで、今回の記事では、これらの対策として、IPv4 によるマルチユニキャスト1 の Golang サンプルコードを投稿しました。

これは、マルチキャストの代換案の一つですが、実際には、マルチにユニキャストしているだけの大した内容ではありません。

しかし、このサンプルでは、不特定多数のノードを扱えるマルチキャストの利点に近づけるため、ノードをマルチユニキャストグループへ自動参加させるコードも併せて投稿しておきました。

これは、ノード数が多く、ユニキャストアドレスペアを各ノードの設定ファイルへ静的に追加するのが困難な場合にも有用かもしれません。



  1. 造語です。直訳すればそのままです。 


コメント

このブログの人気の投稿

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2020-12-01 09:41:49 RSSフィード2020-12-01 09:00 分まとめ(69件)