I recently ran into a peculiarity of multicast in Java / Kotlin. I was using a MulticastSocket: [https://docs.oracle.com/javase/8/docs/api/java/net/MulticastSocket.html](https://docs.oracle.com/javase/8/docs/api/java/net/MulticastSocket.html) and trying to ensure that it winds up bound to either an Inet4Address or Inet6Address. It turns out that even if I did something like:
```
val multicastSocket = MulticastSocket(InetSocketAddress("0.0.0.0", MULTICAST_DEFAULT_PORT))
assert(multicastSocket.localAddress is Inet4Address)
```
The assertion could fail. Similarly if I did:
```
val multicastSocket = MulticastSocket(InetSocketAddress("::", MULTICAST_DEFAULT_PORT))
assert(multicastSocket.localAddress is Inet6Address)
```
The assertion could also fail.
The effect seems to be, the call to join an ipv4 multicast group on an ipv6 socket (and vice-versa) seems to succeed (the function returns void, so there is no feedback that it fails), but then if you try to send a request to the multicast group, it is never received.
The solution is to use a MulticastChannel instead: [https://stackoverflow.com/a/41860430](https://stackoverflow.com/a/41860430), where you can open the channel by specifying the StandardProtocolFamily explicitly, for example:
```
val datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(InetSocketAddress("0.0.0.0", MULTICAST_DEFAULT_PORT))
.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface)
```
Here is a complete example that illustrates the working and non-working versions:
```
package xyz.bumpapp.util
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import xyz.bumpapp.vpn.server.MulticastHandshakeServer.Companion.MULTICAST4_DEFAULT_GROUP
import xyz.bumpapp.vpn.server.MulticastHandshakeServer.Companion.MULTICAST6_DEFAULT_GROUP
import xyz.bumpapp.vpn.server.MulticastHandshakeServer.Companion.MULTICAST_DEFAULT_PORT
import java.net.Inet4Address
import java.net.Inet6Address
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.MulticastSocket
import java.net.NetworkInterface
import java.net.StandardProtocolFamily
import java.net.StandardSocketOptions
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousCloseException
import java.nio.channels.ClosedByInterruptException
import java.nio.channels.ClosedChannelException
import java.nio.channels.DatagramChannel
/**
* These tests are to make sure my assumptions about multicast:
* - binding and bind-reuse
* - joining
* - leaving
* - send
* - recv
* are correct.
*/
class MulticastTests {
private val logger = LoggerFactory.getLogger(javaClass)
private val threads = mutableListOf<Thread>()
private val serverChannels = mutableListOf<DatagramChannel>()
private val multicastSockets = mutableListOf<MulticastSocket>()
@Volatile private var receivedCount = 0
@Volatile private var ipv4RecvCount = 0
@Volatile private var ipv6RecvCount = 0
@BeforeEach fun setup() {
threads.clear()
serverChannels.clear()
multicastSockets.clear()
receivedCount = 0
ipv4RecvCount = 0
ipv6RecvCount = 0
}
// these types of multicast requests are not guarenteed to work on machines that support both
// ipv4 and ipv6: https://stackoverflow.com/a/41860430
@Disabled("Can't guarantee we will get an ipv4 multicast socket with this method")
@Test
fun ipv4MulticastSocket() {
for (networkInterface in NetworkInterface.getNetworkInterfaces()) {
if (networkInterface.isUp && networkInterface.supportsMulticast()) {
for (inetAddress in networkInterface.inetAddresses) {
if (inetAddress is Inet4Address) {
logger.debug("Joining multicast group on ${inetAddress.hostAddress} on interface ${networkInterface.displayName}")
// NB - according to netstat, even with this 0.0.0.0 business, its still binding to ipv6 address on my local machine
// so multicast socket can't be relied on to force the type of socket. Need to use DatagramChannel (see tests below)
val multicastSocket =
MulticastSocket(InetSocketAddress("0.0.0.0", MULTICAST_DEFAULT_PORT))
multicastSocket.reuseAddress = true
multicastSocket.joinGroup(
InetSocketAddress(
MULTICAST4_DEFAULT_GROUP,
MULTICAST_DEFAULT_PORT,
),
networkInterface,
)
multicastSockets.add(multicastSocket)
assert(multicastSocket.localAddress is Inet4Address) // this fails on my local machine.
}
}
} else {
logger.debug("Skipping ${networkInterface.displayName} because it is not up or does not support multicast")
}
}
setupReceive()
Thread.sleep(1000)
val clientChannel = DatagramChannel.open(StandardProtocolFamily.INET).bind(InetSocketAddress("0.0.0.0", 0))
val buffer = ByteBuffer.wrap("Hello World".toByteArray())
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST4_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel.send(buffer, InetSocketAddress(MULTICAST4_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
Thread.sleep(1000)
closeAndWait()
assertTrue(receivedCount > 0)
}
// these types of multicast requests are not guarenteed to work on machines that support both
// ipv4 and ipv6: https://stackoverflow.com/a/41860430
@Disabled("Can't guarantee we will get an ipv6 multicast socket with this method")
@Test
fun ipv6MulticastSocket() {
for (networkInterface in NetworkInterface.getNetworkInterfaces()) {
if (networkInterface.isUp && networkInterface.supportsMulticast()) {
for (inetAddress in networkInterface.inetAddresses) {
if (inetAddress is Inet6Address) {
logger.debug("Joining multicast group on ${inetAddress.hostAddress} on interface ${networkInterface.displayName}")
val multicastSocket =
MulticastSocket(InetSocketAddress("::", MULTICAST_DEFAULT_PORT))
multicastSocket.reuseAddress = true
multicastSocket.joinGroup(
InetSocketAddress(
MULTICAST6_DEFAULT_GROUP,
MULTICAST_DEFAULT_PORT,
),
networkInterface,
)
multicastSockets.add(multicastSocket)
assert(multicastSocket.localAddress is Inet6Address)
}
}
} else {
logger.debug("Skipping ${networkInterface.displayName} because it is not up or does not support multicast")
}
}
setupReceive()
Thread.sleep(10000)
val clientChannel = DatagramChannel.open(StandardProtocolFamily.INET6).bind(InetSocketAddress("::", 0))
val buffer = ByteBuffer.wrap("Hello World".toByteArray())
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST6_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel.send(buffer, InetSocketAddress(MULTICAST6_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
Thread.sleep(1000)
closeAndWait()
assertTrue(receivedCount > 0)
}
@Test
fun ipv4DatagramChannel() {
for (networkInterface in NetworkInterface.getNetworkInterfaces()) {
if (networkInterface.isUp && networkInterface.supportsMulticast()) {
for (inetAddress in networkInterface.inetAddresses) {
if (inetAddress is Inet4Address) {
logger.debug("Binding to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
val multicast4Group: InetAddress = InetAddress.getByName(MULTICAST4_DEFAULT_GROUP)
val datagramChannel =
DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(InetSocketAddress("0.0.0.0", MULTICAST_DEFAULT_PORT))
.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface)
datagramChannel.join(multicast4Group, networkInterface)
serverChannels.add(datagramChannel)
val localAddress = datagramChannel.localAddress as InetSocketAddress
assert(localAddress.address is Inet4Address)
}
}
} else {
logger.debug("Skipping ${networkInterface.displayName} because it is not up or does not support multicast")
}
}
setupReceive()
val clientChannel = DatagramChannel.open(StandardProtocolFamily.INET).bind(InetSocketAddress("0.0.0.0", 0))
val buffer = ByteBuffer.wrap("Hello World".toByteArray())
Thread.sleep(1000)
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST4_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel.send(buffer, InetSocketAddress(MULTICAST4_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
closeAndWait()
assertTrue(receivedCount > 0)
assertTrue(ipv4RecvCount > 0)
}
@Test
fun ipv6DatagramChannel() {
for (networkInterface in NetworkInterface.getNetworkInterfaces()) {
if (networkInterface.isUp && networkInterface.supportsMulticast()) {
for (inetAddress in networkInterface.inetAddresses) {
if (inetAddress is Inet6Address) {
logger.debug("Binding to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
try {
val multicast6Group: InetAddress =
InetAddress.getByName(MULTICAST6_DEFAULT_GROUP)
val datagramChannel =
DatagramChannel.open(StandardProtocolFamily.INET6)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(InetSocketAddress("::", MULTICAST_DEFAULT_PORT))
.setOption(
StandardSocketOptions.IP_MULTICAST_IF,
networkInterface,
)
datagramChannel.join(multicast6Group, networkInterface)
serverChannels.add(datagramChannel)
val localAddress = datagramChannel.localAddress as InetSocketAddress
assert(localAddress.address is Inet6Address)
} catch (e: Exception) {
logger.warn("Failed to bind to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
}
}
}
} else {
logger.debug("Skipping ${networkInterface.displayName} because it is not up or does not support multicast")
}
}
setupReceive()
val clientChannel6 = DatagramChannel.open(StandardProtocolFamily.INET6).bind(InetSocketAddress("::", 0))
val buffer = ByteBuffer.wrap("Hello World".toByteArray())
Thread.sleep(1000)
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST6_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel6.send(buffer, InetSocketAddress(MULTICAST6_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
closeAndWait()
assertTrue(receivedCount > 0)
assertTrue(ipv6RecvCount > 0)
}
@Test
fun ipv4and6Channel() {
for (networkInterface in NetworkInterface.getNetworkInterfaces()) {
if (networkInterface.isUp && networkInterface.supportsMulticast()) {
for (inetAddress in networkInterface.inetAddresses) {
if (inetAddress is Inet6Address) {
logger.debug("Binding to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
try {
val multicast6Group: InetAddress =
InetAddress.getByName(MULTICAST6_DEFAULT_GROUP)
val datagramChannel =
DatagramChannel.open(StandardProtocolFamily.INET6)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(InetSocketAddress("::", MULTICAST_DEFAULT_PORT))
.setOption(
StandardSocketOptions.IP_MULTICAST_IF,
networkInterface,
)
datagramChannel.join(multicast6Group, networkInterface)
serverChannels.add(datagramChannel)
val localAddress = datagramChannel.localAddress as InetSocketAddress
assert(localAddress.address is Inet6Address)
} catch (e: Exception) {
logger.warn("Failed to bind to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
}
} else if (inetAddress is Inet4Address) {
logger.debug("Binding to ${inetAddress.hostAddress} on IF: ${networkInterface.displayName}")
val multicast4Group: InetAddress =
InetAddress.getByName(MULTICAST4_DEFAULT_GROUP)
val datagramChannel =
DatagramChannel.open(StandardProtocolFamily.INET)
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
.bind(InetSocketAddress("0.0.0.0", MULTICAST_DEFAULT_PORT))
.setOption(StandardSocketOptions.IP_MULTICAST_IF, networkInterface)
datagramChannel.join(multicast4Group, networkInterface)
serverChannels.add(datagramChannel)
val localAddress = datagramChannel.localAddress as InetSocketAddress
assert(localAddress.address is Inet4Address)
}
}
} else {
logger.debug("Skipping ${networkInterface.displayName} because it is not up or does not support multicast")
}
}
setupReceive()
val clientChannel6 = DatagramChannel.open(StandardProtocolFamily.INET6).bind(InetSocketAddress("::", 0))
val clientChannel = DatagramChannel.open(StandardProtocolFamily.INET).bind(InetSocketAddress("0.0.0.0", 0))
val buffer = ByteBuffer.wrap("Hello World".toByteArray())
Thread.sleep(1000)
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST6_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel6.send(buffer, InetSocketAddress(MULTICAST6_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
buffer.rewind()
logger.debug("SENDING ${buffer.limit()} bytes to $MULTICAST4_DEFAULT_GROUP:$MULTICAST_DEFAULT_PORT")
clientChannel.send(buffer, InetSocketAddress(MULTICAST4_DEFAULT_GROUP, MULTICAST_DEFAULT_PORT))
Thread.sleep(1000)
closeAndWait()
assertTrue(receivedCount > 0)
assertTrue(ipv4RecvCount > 0)
assertTrue(ipv6RecvCount > 0)
}
private fun setupReceive() {
for (serverChannel in serverChannels) {
val thread =
Thread {
val buffer = ByteBuffer.allocate(1024)
while (!Thread.interrupted()) {
try {
serverChannel.receive(buffer)
buffer.flip()
val localAddress = serverChannel.localAddress as InetSocketAddress
if (buffer.limit() == 0) {
continue
}
logger.debug("GOT ${buffer.limit()} bytes at multicast server $localAddress")
receivedCount++
if (localAddress.address is Inet4Address) {
ipv4RecvCount++
} else {
ipv6RecvCount++
}
return@Thread
} catch (e: ClosedByInterruptException) {
break
} catch (e: AsynchronousCloseException) {
break
} catch (e: ClosedChannelException) {
break
}
}
}
thread.start()
threads.add(thread)
}
}
private fun closeAndWait() {
Thread.sleep(1000)
for (multicastSocket in multicastSockets) {
try {
multicastSocket.close()
} catch (e: Exception) {
//
}
}
for (serverChannel in serverChannels) {
try {
serverChannel.close()
} catch (e: Exception) {
//
}
}
for (thread in threads) {
thread.interrupt()
thread.join()
}
}
}
```