diff --git a/build-logic/src/main/kotlin/kotlinx/io/conventions/kotlinx-io-multiplatform.gradle.kts b/build-logic/src/main/kotlin/kotlinx/io/conventions/kotlinx-io-multiplatform.gradle.kts index dd76066e2..b50e1f769 100644 --- a/build-logic/src/main/kotlin/kotlinx/io/conventions/kotlinx-io-multiplatform.gradle.kts +++ b/build-logic/src/main/kotlin/kotlinx/io/conventions/kotlinx-io-multiplatform.gradle.kts @@ -52,9 +52,12 @@ kotlin { configureNativePlatforms() val nativeTargets = nativeTargets() + val appleTargets = appleTargets() sourceSets { - createSourceSet("nativeMain", parent = commonMain.get(), children = nativeTargets) - createSourceSet("nativeTest", parent = commonTest.get(), children = nativeTargets) + val nativeMain = createSourceSet("nativeMain", parent = commonMain.get(), children = nativeTargets) + val nativeTest = createSourceSet("nativeTest", parent = commonTest.get(), children = nativeTargets) + createSourceSet("appleMain", parent = nativeMain, children = appleTargets) + createSourceSet("appleTest", parent = nativeTest, children = appleTargets) } } @@ -126,7 +129,7 @@ fun KotlinMultiplatformExtension.configureNativePlatforms() { } fun nativeTargets(): List { - return appleTargets() + linuxTargets() + mingwTargets() + androidTargets() + return linuxTargets() + mingwTargets() + androidTargets() } fun appleTargets() = listOf( diff --git a/core/apple/src/-Util.kt b/core/apple/src/-Util.kt new file mode 100644 index 000000000..6871df21e --- /dev/null +++ b/core/apple/src/-Util.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.UnsafeNumber +import platform.Foundation.NSError +import platform.Foundation.NSLocalizedDescriptionKey +import platform.Foundation.NSUnderlyingErrorKey + +@OptIn(UnsafeNumber::class) +internal fun Exception.toNSError() = NSError( + domain = "Kotlin", + code = 0, + userInfo = mapOf( + NSLocalizedDescriptionKey to message, + NSUnderlyingErrorKey to this + ) +) diff --git a/core/apple/src/AppleCore.kt b/core/apple/src/AppleCore.kt new file mode 100644 index 000000000..3c12a18fa --- /dev/null +++ b/core/apple/src/AppleCore.kt @@ -0,0 +1,117 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +@file:OptIn(UnsafeNumber::class) + +package kotlinx.io + +import kotlinx.cinterop.* +import platform.Foundation.NSInputStream +import platform.Foundation.NSOutputStream +import platform.Foundation.NSStreamStatusClosed +import platform.Foundation.NSStreamStatusNotOpen +import platform.posix.uint8_tVar + +/** + * Returns [RawSink] that writes to an output stream. + * + * Use [RawSink.buffered] to create a buffered sink from it. + * + * @sample kotlinx.io.samples.KotlinxIoSamplesApple.outputStreamAsSink + */ +public fun NSOutputStream.asSink(): RawSink = OutputStreamSink(this) + +private open class OutputStreamSink( + private val out: NSOutputStream, +) : RawSink { + + init { + if (out.streamStatus == NSStreamStatusNotOpen) out.open() + } + + override fun write(source: Buffer, byteCount: Long) { + if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") + + checkOffsetAndCount(source.size, 0, byteCount) + var remaining = byteCount + while (remaining > 0) { + val head = source.head!! + val toCopy = minOf(remaining, head.limit - head.pos).toInt() + val bytesWritten = head.data.usePinned { + val bytes = it.addressOf(head.pos).reinterpret() + out.write(bytes, toCopy.convert()).toLong() + } + + if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error") + if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity") + + head.pos += bytesWritten.toInt() + remaining -= bytesWritten + source.size -= bytesWritten + + if (head.pos == head.limit) { + source.head = head.pop() + SegmentPool.recycle(head) + } + } + } + + override fun flush() { + // no-op + } + + override fun close() = out.close() + + override fun toString() = "RawSink($out)" +} + +/** + * Returns [RawSource] that reads from an input stream. + * + * Use [RawSource.buffered] to create a buffered source from it. + * + * @sample kotlinx.io.samples.KotlinxIoSamplesApple.inputStreamAsSource + */ +public fun NSInputStream.asSource(): RawSource = NSInputStreamSource(this) + +private open class NSInputStreamSource( + private val input: NSInputStream, +) : RawSource { + + init { + if (input.streamStatus == NSStreamStatusNotOpen) input.open() + } + + override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { + if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") + + if (byteCount == 0L) return 0L + checkByteCount(byteCount) + + val tail = sink.writableSegment(1) + val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit) + val bytesRead = tail.data.usePinned { + val bytes = it.addressOf(tail.limit).reinterpret() + input.read(bytes, maxToCopy.convert()).toLong() + } + + if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error") + if (bytesRead == 0L) { + if (tail.pos == tail.limit) { + // We allocated a tail segment, but didn't end up needing it. Recycle! + sink.head = tail.pop() + SegmentPool.recycle(tail) + } + return -1 + } + tail.limit += bytesRead.toInt() + sink.size += bytesRead + return bytesRead + } + + override fun close() = input.close() + + override fun toString() = "RawSource($input)" +} diff --git a/core/apple/src/BuffersApple.kt b/core/apple/src/BuffersApple.kt new file mode 100644 index 000000000..ca5d67de3 --- /dev/null +++ b/core/apple/src/BuffersApple.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +@file:OptIn(UnsafeNumber::class) + +package kotlinx.io + +import kotlinx.cinterop.* +import platform.Foundation.* +import platform.darwin.ByteVar +import platform.darwin.NSUIntegerMax +import platform.posix.* + +internal fun Buffer.write(source: CPointer, maxLength: Int) { + require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } + + var currentOffset = 0 + while (currentOffset < maxLength) { + val tail = writableSegment(1) + + val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit) + tail.data.usePinned { + memcpy(it.addressOf(tail.pos), source + currentOffset, toCopy.convert()) + } + + currentOffset += toCopy + tail.limit += toCopy + } + size += maxLength +} + +internal fun Buffer.readAtMostTo(sink: CPointer, maxLength: Int): Int { + require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } + + val s = head ?: return 0 + val toCopy = minOf(maxLength, s.limit - s.pos) + s.data.usePinned { + memcpy(sink, it.addressOf(s.pos), toCopy.convert()) + } + + s.pos += toCopy + size -= toCopy.toLong() + + if (s.pos == s.limit) { + head = s.pop() + SegmentPool.recycle(s) + } + + return toCopy +} + +internal fun Buffer.snapshotAsNSData(): NSData { + if (size == 0L) return NSData.data() + + check(size.toULong() <= NSUIntegerMax) { "Buffer is too long ($size) to be converted into NSData." } + + val bytes = malloc(size.convert())?.reinterpret() + ?: throw Error("malloc failed: ${strerror(errno)?.toKString()}") + var curr = head + var index = 0 + do { + check(curr != null) { "Current segment is null" } + val pos = curr.pos + val length = curr.limit - pos + curr.data.usePinned { + memcpy(bytes + index, it.addressOf(pos), length.convert()) + } + curr = curr.next + index += length + } while (curr !== head) + return NSData.create(bytesNoCopy = bytes, length = size.convert()) +} diff --git a/core/apple/src/SinksApple.kt b/core/apple/src/SinksApple.kt new file mode 100644 index 000000000..f4fbe9d6d --- /dev/null +++ b/core/apple/src/SinksApple.kt @@ -0,0 +1,157 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.* +import platform.Foundation.* +import platform.darwin.NSInteger +import platform.darwin.NSUInteger +import platform.posix.uint8_tVar +import kotlin.native.ref.WeakReference + +/** + * Returns an output stream that writes to this sink. Closing the stream will also close this sink. + * + * The stream supports both polling and run-loop scheduling, please check + * [Apple's documentation](https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Streams/Articles/PollingVersusRunloop.html) + * for information about stream events handling. + * + * The stream does not implement initializers + * ([NSOutputStream.initToBuffer](https://developer.apple.com/documentation/foundation/nsoutputstream/1410805-inittobuffer), + * [NSOutputStream.initToMemory](https://developer.apple.com/documentation/foundation/nsoutputstream/1409909-inittomemory), + * [NSOutputStream.initWithURL](https://developer.apple.com/documentation/foundation/nsoutputstream/1414446-initwithurl), + * [NSOutputStream.initToFileAtPath](https://developer.apple.com/documentation/foundation/nsoutputstream/1416367-inittofileatpath)), + * their use will result in a runtime error. + * + * @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream + */ +public fun Sink.asNSOutputStream(): NSOutputStream = SinkNSOutputStream(this) + +@OptIn(UnsafeNumber::class) +private class SinkNSOutputStream( + private val sink: Sink +) : NSOutputStream(toMemory = Unit), NSStreamDelegateProtocol { + + private val isClosed: () -> Boolean = when (sink) { + is RealSink -> sink::closed + is Buffer -> { + { false } + } + } + + private var status = NSStreamStatusNotOpen + private var error: NSError? = null + set(value) { + status = NSStreamStatusError + field = value + postEvent(NSStreamEventErrorOccurred) + sink.close() + } + + override fun streamStatus() = if (status != NSStreamStatusError && isClosed()) NSStreamStatusClosed else status + + override fun streamError() = error + + override fun open() { + if (status == NSStreamStatusNotOpen) { + status = NSStreamStatusOpening + status = NSStreamStatusOpen + postEvent(NSStreamEventOpenCompleted) + postEvent(NSStreamEventHasSpaceAvailable) + } + } + + override fun close() { + if (status == NSStreamStatusError || status == NSStreamStatusNotOpen) return + status = NSStreamStatusClosed + runLoop = null + runLoopModes = listOf() + sink.close() + } + + @OptIn(DelicateIoApi::class) + override fun write(buffer: CPointer?, maxLength: NSUInteger): NSInteger { + if (streamStatus != NSStreamStatusOpen || buffer == null) return -1 + status = NSStreamStatusWriting + val toWrite = minOf(maxLength, Int.MAX_VALUE.convert()).toInt() + return try { + sink.writeToInternalBuffer { + it.write(buffer, toWrite) + } + status = NSStreamStatusOpen + toWrite.convert() + } catch (e: Exception) { + error = e.toNSError() + -1 + } + } + + override fun hasSpaceAvailable() = !isFinished + + private val isFinished + get() = when (streamStatus) { + NSStreamStatusClosed, NSStreamStatusError -> true + else -> false + } + + @OptIn(InternalIoApi::class) + override fun propertyForKey(key: NSStreamPropertyKey): Any? = when (key) { + NSStreamDataWrittenToMemoryStreamKey -> sink.buffer.snapshotAsNSData() + else -> null + } + + override fun setProperty(property: Any?, forKey: NSStreamPropertyKey) = false + + // WeakReference as delegate should not be retained + // https://developer.apple.com/documentation/foundation/nsstream/1418423-delegate + private var _delegate: WeakReference? = null + private var runLoop: NSRunLoop? = null + private var runLoopModes = listOf() + + private fun postEvent(event: NSStreamEvent) { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop == this.runLoop) { + delegateOrSelf.stream(this, event) + } + } + } + + override fun delegate() = _delegate?.value + + private val delegateOrSelf get() = delegate ?: this + + override fun setDelegate(delegate: NSStreamDelegateProtocol?) { + _delegate = delegate?.let { WeakReference(it) } + } + + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // no-op + } + + override fun scheduleInRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (runLoop == null) { + runLoop = aRunLoop + } + if (runLoop == aRunLoop) { + runLoopModes += forMode + } + if (status == NSStreamStatusOpen) { + postEvent(NSStreamEventHasSpaceAvailable) + } + } + + override fun removeFromRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (aRunLoop == runLoop) { + runLoopModes -= forMode + if (runLoopModes.isEmpty()) { + runLoop = null + } + } + } + + override fun description() = "$sink.asNSOutputStream()" +} diff --git a/core/apple/src/SourcesApple.kt b/core/apple/src/SourcesApple.kt new file mode 100644 index 000000000..f03a7af7a --- /dev/null +++ b/core/apple/src/SourcesApple.kt @@ -0,0 +1,176 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.* +import platform.Foundation.* +import platform.darwin.NSInteger +import platform.darwin.NSUInteger +import platform.darwin.NSUIntegerVar +import platform.posix.uint8_tVar +import kotlin.native.ref.WeakReference + +/** + * Returns an input stream that reads from this source. Closing the stream will also close this source. + * + * The stream supports both polling and run-loop scheduling, please check + * [Apple's documentation](https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Streams/Articles/PollingVersusRunloop.html) + * for information about stream events handling. + * + * The stream does not implement initializers + * ([NSInputStream.initWithURL](https://developer.apple.com/documentation/foundation/nsinputstream/1417891-initwithurl), + * [NSInputStream.initWithData](https://developer.apple.com/documentation/foundation/nsinputstream/1412470-initwithdata), + * [NSInputStream.initWithFileAtPath](https://developer.apple.com/documentation/foundation/nsinputstream/1408976-initwithfileatpath)), + * their use will result in a runtime error. + * + * @sample kotlinx.io.samples.KotlinxIoSamplesApple.asStream + */ +public fun Source.asNSInputStream(): NSInputStream = SourceNSInputStream(this) + +@OptIn(InternalIoApi::class, UnsafeNumber::class) +private class SourceNSInputStream( + private val source: Source +) : NSInputStream(NSData()), NSStreamDelegateProtocol { + + private val isClosed: () -> Boolean = when (source) { + is RealSource -> source::closed + is Buffer -> { + { false } + } + } + + private var status = NSStreamStatusNotOpen + private var error: NSError? = null + set(value) { + status = NSStreamStatusError + field = value + source.close() + } + + override fun streamStatus() = if (status != NSStreamStatusError && isClosed()) NSStreamStatusClosed else status + + override fun streamError() = error + + override fun open() { + if (status == NSStreamStatusNotOpen) { + status = NSStreamStatusOpening + status = NSStreamStatusOpen + postEvent(NSStreamEventOpenCompleted) + checkBytes() + } + } + + override fun close() { + if (status == NSStreamStatusError || status == NSStreamStatusNotOpen) return + status = NSStreamStatusClosed + runLoop = null + runLoopModes = listOf() + source.close() + } + + override fun read(buffer: CPointer?, maxLength: NSUInteger): NSInteger { + if (streamStatus != NSStreamStatusOpen && streamStatus != NSStreamStatusAtEnd || buffer == null) return -1 + status = NSStreamStatusReading + try { + if (source.exhausted()) { + status = NSStreamStatusAtEnd + return 0 + } + val toRead = minOf(maxLength.toLong(), source.buffer.size, Int.MAX_VALUE.toLong()).toInt() + val read = source.buffer.readAtMostTo(buffer, toRead).convert() + status = NSStreamStatusOpen + checkBytes() + return read + } catch (e: Exception) { + error = e.toNSError() + postEvent(NSStreamEventErrorOccurred) + return -1 + } + } + + override fun getBuffer(buffer: CPointer>?, length: CPointer?) = false + + override fun hasBytesAvailable() = !isFinished + + private val isFinished + get() = when (streamStatus) { + NSStreamStatusClosed, NSStreamStatusError -> true + else -> false + } + + override fun propertyForKey(key: NSStreamPropertyKey): Any? = null + + override fun setProperty(property: Any?, forKey: NSStreamPropertyKey) = false + + // WeakReference as delegate should not be retained + // https://developer.apple.com/documentation/foundation/nsstream/1418423-delegate + private var _delegate: WeakReference? = null + private var runLoop: NSRunLoop? = null + private var runLoopModes = listOf() + + private fun postEvent(event: NSStreamEvent) { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop == this.runLoop) { + delegateOrSelf.stream(this, event) + } + } + } + + private fun checkBytes() { + val runLoop = runLoop ?: return + runLoop.performInModes(runLoopModes) { + if (runLoop != this.runLoop || isFinished) return@performInModes + val event = try { + if (source.exhausted()) { + status = NSStreamStatusAtEnd + NSStreamEventEndEncountered + } else { + NSStreamEventHasBytesAvailable + } + } catch (e: Exception) { + error = e.toNSError() + NSStreamEventErrorOccurred + } + delegateOrSelf.stream(this, event) + } + } + + override fun delegate() = _delegate?.value + + private val delegateOrSelf get() = delegate ?: this + + override fun setDelegate(delegate: NSStreamDelegateProtocol?) { + _delegate = delegate?.let { WeakReference(it) } + } + + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // no-op + } + + override fun scheduleInRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (runLoop == null) { + runLoop = aRunLoop + } + if (runLoop == aRunLoop) { + runLoopModes += forMode + } + if (status == NSStreamStatusOpen) { + checkBytes() + } + } + + override fun removeFromRunLoop(aRunLoop: NSRunLoop, forMode: NSRunLoopMode) { + if (aRunLoop == runLoop) { + runLoopModes -= forMode + if (runLoopModes.isEmpty()) { + runLoop = null + } + } + } + + override fun description() = "$source.asNSInputStream()" +} diff --git a/core/apple/test/NSInputStreamSourceTest.kt b/core/apple/test/NSInputStreamSourceTest.kt new file mode 100644 index 000000000..2a65584e4 --- /dev/null +++ b/core/apple/test/NSInputStreamSourceTest.kt @@ -0,0 +1,90 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.io.files.Path +import kotlinx.io.files.sink +import platform.Foundation.NSInputStream +import platform.Foundation.NSTemporaryDirectory +import platform.Foundation.NSURL +import platform.Foundation.NSUUID +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class NSInputStreamSourceTest { + @Test + fun nsInputStreamSource() { + val input = NSInputStream(data = byteArrayOf(0x61).toNSData()) + val source = input.asSource() + val buffer = Buffer() + assertEquals(1, source.readAtMostTo(buffer, 1L)) + assertEquals("a", buffer.readString()) + } + + @OptIn(ExperimentalStdlibApi::class) + @Test + fun nsInputStreamSourceFromFile() { + // can be replaced with createTempFile() when #183 is fixed + // https://github.com/Kotlin/kotlinx-io/issues/183 + val file = "${NSTemporaryDirectory()}${NSUUID().UUIDString()}" + try { + Path(file).sink().use { + it.writeString("example") + } + + val input = NSInputStream(uRL = NSURL.fileURLWithPath(file)) + val source = input.asSource() + val buffer = Buffer() + assertEquals(7, source.readAtMostTo(buffer, 10)) + assertEquals("example", buffer.readString()) + } finally { + deleteFile(file) + } + } + + @Test + fun sourceFromInputStream() { + val input = NSInputStream(data = ("a" + "b".repeat(Segment.SIZE * 2) + "c").encodeToByteArray().toNSData()) + + // Source: ab...bc + val source: RawSource = input.asSource() + val sink = Buffer() + + // Source: b...bc. Sink: abb. + assertEquals(3, source.readAtMostTo(sink, 3)) + assertEquals("abb", sink.readString(3)) + + // Source: b...bc. Sink: b...b. + assertEquals(Segment.SIZE.toLong(), source.readAtMostTo(sink, 20000)) + assertEquals("b".repeat(Segment.SIZE), sink.readString()) + + // Source: b...bc. Sink: b...bc. + assertEquals((Segment.SIZE - 1).toLong(), source.readAtMostTo(sink, 20000)) + assertEquals("b".repeat(Segment.SIZE - 2) + "c", sink.readString()) + + // Source and sink are empty. + assertEquals(-1, source.readAtMostTo(sink, 1)) + } + + @Test + fun sourceFromInputStreamWithSegmentSize() { + val input = NSInputStream(data = ByteArray(Segment.SIZE).toNSData()) + val source = input.asSource() + val sink = Buffer() + + assertEquals(Segment.SIZE.toLong(), source.readAtMostTo(sink, Segment.SIZE.toLong())) + assertEquals(-1, source.readAtMostTo(sink, Segment.SIZE.toLong())) + + assertNoEmptySegments(sink) + } + + @Test + fun sourceFromInputStreamBounds() { + val source = NSInputStream(data = ByteArray(100).toNSData()).asSource() + assertFailsWith { source.readAtMostTo(Buffer(), -1) } + } +} diff --git a/core/apple/test/NSOutputStreamSinkTest.kt b/core/apple/test/NSOutputStreamSinkTest.kt new file mode 100644 index 000000000..14e24c6d1 --- /dev/null +++ b/core/apple/test/NSOutputStreamSinkTest.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.ByteVar +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.get +import kotlinx.cinterop.reinterpret +import platform.Foundation.* +import kotlin.test.Test +import kotlin.test.assertEquals + +class NSOutputStreamSinkTest { + @Test + @OptIn(UnsafeNumber::class) + fun nsOutputStreamSink() { + val out = NSOutputStream.outputStreamToMemory() + val sink = out.asSink() + val buffer = Buffer().apply { + writeString("a") + } + sink.write(buffer, 1L) + val data = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertEquals(1U, data.length) + val bytes = data.bytes!!.reinterpret() + assertEquals(0x61, bytes[0]) + } + + @Test + fun sinkFromOutputStream() { + val data = Buffer().apply { + writeString("a") + writeString("b".repeat(9998)) + writeString("c") + } + val out = NSOutputStream.outputStreamToMemory() + val sink = out.asSink() + + sink.write(data, 3) + val outData = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + val outString = outData.toByteArray().decodeToString() + assertEquals("abb", outString) + + sink.write(data, data.size) + val outData2 = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + val outString2 = outData2.toByteArray().decodeToString() + assertEquals("a" + "b".repeat(9998) + "c", outString2) + } +} diff --git a/core/apple/test/SinkNSOutputStreamTest.kt b/core/apple/test/SinkNSOutputStreamTest.kt new file mode 100644 index 000000000..235298a2f --- /dev/null +++ b/core/apple/test/SinkNSOutputStreamTest.kt @@ -0,0 +1,158 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.atomicfu.atomic +import kotlinx.cinterop.* +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import platform.CoreFoundation.CFRunLoopStop +import platform.Foundation.* +import platform.darwin.NSObject +import platform.darwin.NSUInteger +import platform.posix.uint8_tVar +import kotlin.test.* + +@OptIn(UnsafeNumber::class) +class SinkNSOutputStreamTest { + @Test + fun bufferOutputStream() { + testOutputStream(Buffer(), "abc") + testOutputStream(Buffer(), "a" + "b".repeat(Segment.SIZE * 2) + "c") + } + + @Test + fun realSinkOutputStream() { + testOutputStream(RealSink(Buffer()), "abc") + testOutputStream(RealSink(Buffer()), "a" + "b".repeat(Segment.SIZE * 2) + "c") + } + + @OptIn(InternalIoApi::class) + private fun testOutputStream(sink: Sink, input: String) { + val out = sink.asNSOutputStream() + val byteArray = input.encodeToByteArray() + val size: NSUInteger = input.length.convert() + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, out.streamStatus) + assertEquals(-1, out.write(cPtr, size)) + out.open() + assertEquals(NSStreamStatusOpen, out.streamStatus) + + assertEquals(size.convert(), out.write(cPtr, size)) + sink.flush() + when (sink) { + is Buffer -> { + val data = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertContentEquals(byteArray, data.toByteArray()) + assertContentEquals(byteArray, sink.buffer.readByteArray()) + } + is RealSink -> assertContentEquals(byteArray, (sink.sink as Buffer).readByteArray()) + } + } + } + + @Test + @OptIn(DelicateIoApi::class) + fun nsOutputStreamClose() { + val buffer = Buffer() + val sink = RealSink(buffer) + assertFalse(sink.closed) + + val out = sink.asNSOutputStream() + out.open() + out.close() + assertTrue(sink.closed) + assertEquals(NSStreamStatusClosed, out.streamStatus) + + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(-1, out.write(cPtr, 4U)) + assertNull(out.streamError) + assertTrue(sink.buffer.readByteArray().isEmpty()) + } + } + + @Test + fun delegateTest() { + val runLoop = startRunLoop() + + fun produceWithDelegate(out: NSOutputStream, data: String) { + val opened = Mutex(true) + val written = atomic(0) + val completed = Mutex(true) + + out.delegate = object : NSObject(), NSStreamDelegateProtocol { + val source = data.encodeToByteArray() + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasSpaceAvailable -> { + if (source.isNotEmpty()) { + source.usePinned { + assertEquals( + data.length.convert(), + out.write(it.addressOf(written.value).reinterpret(), data.length.convert()) + ) + written.value += data.length + } + } + val writtenData = out.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertEquals(data, writtenData.toByteArray().decodeToString()) + out.close() + completed.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + out.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + out.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + } + assertEquals(data.length, written.value) + } + + produceWithDelegate(Buffer().asNSOutputStream(), "custom") + produceWithDelegate(Buffer().asNSOutputStream(), "") + CFRunLoopStop(runLoop.getCFRunLoop()) + } + + @Test + fun testSubscribeAfterOpen() { + val runLoop = startRunLoop() + + fun subscribeAfterOpen(out: NSOutputStream) { + val available = Mutex(true) + + out.delegate = object : NSObject(), NSStreamDelegateProtocol { + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> fail("opened before subscribe") + NSStreamEventHasSpaceAvailable -> available.unlock() + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + out.open() + out.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + runBlocking { + available.lockWithTimeout() + } + out.close() + } + + subscribeAfterOpen(Buffer().asNSOutputStream()) + CFRunLoopStop(runLoop.getCFRunLoop()) + } +} diff --git a/core/apple/test/SourceNSInputStreamTest.kt b/core/apple/test/SourceNSInputStreamTest.kt new file mode 100644 index 000000000..c09a67bf9 --- /dev/null +++ b/core/apple/test/SourceNSInputStreamTest.kt @@ -0,0 +1,269 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.locks.reentrantLock +import kotlinx.atomicfu.locks.withLock +import kotlinx.cinterop.* +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import platform.CoreFoundation.CFRunLoopStop +import platform.Foundation.* +import platform.darwin.NSObject +import platform.posix.uint8_tVar +import kotlin.test.* + +@OptIn(UnsafeNumber::class) +class SourceNSInputStreamTest { + @Test + fun bufferInputStream() { + val source = Buffer() + source.writeString("abc") + testInputStream(source.asNSInputStream()) + } + + @Test + fun realSourceInputStream() { + val source = Buffer() + source.writeString("abc") + testInputStream(RealSource(source).asNSInputStream()) + } + + private fun testInputStream(input: NSInputStream) { + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, input.streamStatus) + assertEquals(-1, input.read(cPtr, 4U)) + input.open() + assertEquals(NSStreamStatusOpen, input.streamStatus) + + byteArray.fill(-5) + assertEquals(3, input.read(cPtr, 4U)) + assertEquals("[97, 98, 99, -5]", byteArray.contentToString()) + + byteArray.fill(-7) + assertEquals(0, input.read(cPtr, 4U)) + assertEquals("[-7, -7, -7, -7]", byteArray.contentToString()) + } + } + + @Test + fun bufferInputStreamLongData() { + val source = Buffer() + source.writeString("a" + "b".repeat(Segment.SIZE * 2) + "c") + testInputStreamLongData(source.asNSInputStream()) + } + + @Test + fun realSourceInputStreamLongData() { + val source = Buffer() + source.writeString("a" + "b".repeat(Segment.SIZE * 2) + "c") + testInputStreamLongData(RealSource(source).asNSInputStream()) + } + + private fun testInputStreamLongData(input: NSInputStream) { + val lengthPlusOne = Segment.SIZE * 2 + 3 + val byteArray = ByteArray(lengthPlusOne) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + assertEquals(NSStreamStatusNotOpen, input.streamStatus) + assertEquals(-1, input.read(cPtr, lengthPlusOne.convert())) + input.open() + assertEquals(NSStreamStatusOpen, input.streamStatus) + + byteArray.fill(-5) + assertEquals(Segment.SIZE.convert(), input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[97${", 98".repeat(Segment.SIZE - 1)}${", -5".repeat(Segment.SIZE + 3)}]", byteArray.contentToString()) + + byteArray.fill(-6) + assertEquals(Segment.SIZE.convert(), input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[98${", 98".repeat(Segment.SIZE - 1)}${", -6".repeat(Segment.SIZE + 3)}]", byteArray.contentToString()) + + byteArray.fill(-7) + assertEquals(2, input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[98, 99${", -7".repeat(Segment.SIZE * 2 + 1)}]", byteArray.contentToString()) + + byteArray.fill(-8) + assertEquals(0, input.read(cPtr, lengthPlusOne.convert())) + assertEquals("[-8${", -8".repeat(lengthPlusOne - 1)}]", byteArray.contentToString()) + } + } + + @Test + fun nsInputStreamClose() { + val buffer = Buffer() + buffer.writeString("abc") + val source = RealSource(buffer) + assertFalse(source.closed) + + val input = source.asNSInputStream() + input.open() + input.close() + assertTrue(source.closed) + assertEquals(NSStreamStatusClosed, input.streamStatus) + + val byteArray = ByteArray(4) + byteArray.usePinned { + val cPtr = it.addressOf(0).reinterpret() + + byteArray.fill(-5) + assertEquals(-1, input.read(cPtr, 4U)) + assertNull(input.streamError) + assertEquals("[-5, -5, -5, -5]", byteArray.contentToString()) + } + } + + @Test + fun delegateTest() { + val runLoop = startRunLoop() + + fun consumeWithDelegate(input: NSInputStream, data: String) { + val opened = Mutex(true) + val read = atomic(0) + val completed = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + val sink = ByteArray(data.length) + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasBytesAvailable -> { + sink.usePinned { + assertEquals(1, input.read(it.addressOf(read.value).reinterpret(), 1U)) + read.value++ + } + } + NSStreamEventEndEncountered -> { + assertEquals(data, sink.decodeToString()) + input.close() + completed.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + input.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + input.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + } + assertEquals(data.length, read.value) + } + + consumeWithDelegate(Buffer().apply { writeString("custom") }.asNSInputStream(), "custom") + consumeWithDelegate(Buffer().asNSInputStream(), "") + CFRunLoopStop(runLoop.getCFRunLoop()) + } + + @Test + fun testRunLoopSwitch() { + val runLoop1 = startRunLoop("run-loop-1") + val runLoop2 = startRunLoop("run-loop-2") + + fun consumeSwitching(input: NSInputStream, data: String) { + val opened = Mutex(true) + val readLock = reentrantLock() + var read = 0 + val completed = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + val sink = ByteArray(data.length) + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + // Ensure thread safe access to `read` between scheduled run loops + readLock.withLock { + if (read == 0) { + // until first read + assertEquals("run-loop-1", NSThread.currentThread.name) + } else { + // after first read + assertEquals("run-loop-2", NSThread.currentThread.name) + } + when (handleEvent) { + NSStreamEventOpenCompleted -> opened.unlock() + NSStreamEventHasBytesAvailable -> { + if (read == 0) { + // switch to other run loop before first read + input.removeFromRunLoop(runLoop1, NSDefaultRunLoopMode) + input.scheduleInRunLoop(runLoop2, NSDefaultRunLoopMode) + } else if (read >= data.length - 3) { + // unsubscribe before last read + input.removeFromRunLoop(runLoop2, NSDefaultRunLoopMode) + } + sink.usePinned { + val readBytes = input.read(it.addressOf(read).reinterpret(), 3U) + assertNotEquals(0, readBytes) + read += readBytes.toInt() + } + if (read == data.length) { + assertEquals(data, sink.decodeToString()) + completed.unlock() + } + } + NSStreamEventEndEncountered -> fail("$data shouldn't be subscribed") + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + } + input.scheduleInRunLoop(runLoop1, NSDefaultRunLoopMode) + input.open() + runBlocking { + opened.lockWithTimeout() + completed.lockWithTimeout() + // wait a bit to be sure delegate is no longer called + delay(200) + } + input.close() + } + + consumeSwitching(Buffer().apply { writeString("custom") }.asNSInputStream(), "custom") + CFRunLoopStop(runLoop1.getCFRunLoop()) + CFRunLoopStop(runLoop2.getCFRunLoop()) + } + + @Test + fun testSubscribeAfterOpen() { + val runLoop = startRunLoop() + + fun subscribeAfterOpen(input: NSInputStream, data: String) { + val available = Mutex(true) + + input.delegate = object : NSObject(), NSStreamDelegateProtocol { + override fun stream(aStream: NSStream, handleEvent: NSStreamEvent) { + assertEquals("run-loop", NSThread.currentThread.name) + when (handleEvent) { + NSStreamEventOpenCompleted -> fail("opened before subscribe") + NSStreamEventHasBytesAvailable -> { + val sink = ByteArray(data.length) + sink.usePinned { + assertEquals(data.length.convert(), input.read(it.addressOf(0).reinterpret(), data.length.convert())) + } + assertEquals(data, sink.decodeToString()) + input.close() + available.unlock() + } + else -> fail("unexpected event ${handleEvent.asString()}") + } + } + } + input.open() + input.scheduleInRunLoop(runLoop, NSDefaultRunLoopMode) + runBlocking { + available.lockWithTimeout() + } + } + + subscribeAfterOpen(Buffer().apply { writeString("custom") }.asNSInputStream(), "custom") + CFRunLoopStop(runLoop.getCFRunLoop()) + } +} diff --git a/core/apple/test/samples/samplesApple.kt b/core/apple/test/samples/samplesApple.kt new file mode 100644 index 000000000..7c8d5e142 --- /dev/null +++ b/core/apple/test/samples/samplesApple.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io.samples + +import kotlinx.cinterop.UnsafeNumber +import kotlinx.cinterop.convert +import kotlinx.cinterop.reinterpret +import kotlinx.io.* +import platform.Foundation.* +import kotlin.test.Test +import kotlin.test.assertContentEquals + +class KotlinxIoSamplesApple { + @Test + fun inputStreamAsSource() { + val data = ByteArray(100) { it.toByte() } + val inputStream = NSInputStream(data = data.toNSData()) + + val receivedData = inputStream.asSource().buffered().readByteArray() + assertContentEquals(data, receivedData) + } + + @Test + fun outputStreamAsSink() { + val data = ByteArray(100) { it.toByte() } + val outputStream = NSOutputStream.outputStreamToMemory() + + val sink = outputStream.asSink().buffered() + sink.write(data) + sink.flush() + + val writtenData = outputStream.propertyForKey(NSStreamDataWrittenToMemoryStreamKey) as NSData + assertContentEquals(data, writtenData.toByteArray()) + } + + @Test + @OptIn(UnsafeNumber::class) + fun asStream() { + val buffer = Buffer() + val data = ByteArray(100) { it.toByte() }.toNSData() + + val outputStream = buffer.asNSOutputStream() + outputStream.open() + outputStream.write(data.bytes?.reinterpret(), data.length) + + val inputStream = buffer.asNSInputStream() + inputStream.open() + val readData = NSMutableData.create(length = 100.convert())!! + inputStream.read(readData.bytes?.reinterpret(), 100.convert()) + + assertContentEquals(data.toByteArray(), readData.toByteArray()) + } +} diff --git a/core/apple/test/utilApple.kt b/core/apple/test/utilApple.kt new file mode 100644 index 000000000..d67be91cd --- /dev/null +++ b/core/apple/test/utilApple.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2017-2023 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +@file:OptIn(UnsafeNumber::class) + +package kotlinx.io + +import kotlinx.cinterop.* +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.withTimeout +import platform.Foundation.* +import platform.posix.memcpy +import kotlin.test.fail +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +internal fun ByteArray.toNSData() = if (isNotEmpty()) { + usePinned { + NSData.create(bytes = it.addressOf(0), length = size.convert()) + } +} else { + NSData.data() +} + +fun NSData.toByteArray() = ByteArray(length.toInt()).apply { + if (isNotEmpty()) { + memcpy(refTo(0), bytes, length) + } +} + +fun startRunLoop(name: String = "run-loop"): NSRunLoop { + val created = Mutex(true) + lateinit var runLoop: NSRunLoop + val thread = NSThread { + runLoop = NSRunLoop.currentRunLoop + runLoop.addPort(NSMachPort.port(), NSDefaultRunLoopMode) + created.unlock() + runLoop.run() + } + thread.name = name + thread.start() + runBlocking { + created.lockWithTimeout() + } + return runLoop +} + +suspend fun Mutex.lockWithTimeout(timeout: Duration = 5.seconds) { + class MutexSource : Throwable() + val source = MutexSource() + try { + withTimeout(timeout) { lock() } + } catch (e: TimeoutCancellationException) { + fail("Mutex never unlocked", source) + } +} + +fun NSStreamEvent.asString(): String { + return when (this) { + NSStreamEventNone -> "NSStreamEventNone" + NSStreamEventOpenCompleted -> "NSStreamEventOpenCompleted" + NSStreamEventHasBytesAvailable -> "NSStreamEventHasBytesAvailable" + NSStreamEventHasSpaceAvailable -> "NSStreamEventHasSpaceAvailable" + NSStreamEventErrorOccurred -> "NSStreamEventErrorOccurred" + NSStreamEventEndEncountered -> "NSStreamEventEndEncountered" + else -> "Unknown event $this" + } +} diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 1d5c556e8..200369de1 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -36,6 +36,11 @@ kotlin { api(project(":kotlinx-io-bytestring")) } } + appleTest { + dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.2") + } + } } } @@ -49,7 +54,8 @@ tasks.withType().configureEach { "common/test/samples/moduleDescriptionSample.kt", "common/test/samples/samples.kt", "common/test/samples/byteStringSample.kt", - "jvm/test/samples/samplesJvm.kt" + "jvm/test/samples/samplesJvm.kt", + "apple/test/samples/samplesApple.kt" ) } } diff --git a/core/common/src/Sink.kt b/core/common/src/Sink.kt index 4d887de1d..677b587fe 100644 --- a/core/common/src/Sink.kt +++ b/core/common/src/Sink.kt @@ -41,7 +41,7 @@ package kotlinx.io * * ### Write methods' behavior and naming conventions * - * Methods writing a value of some type are usually name `write`, like [writeByte] or [writeInt], except methods + * Methods writing a value of some type are usually named `write`, like [writeByte] or [writeInt], except methods * writing data from a some collection of bytes, like `write(ByteArray, Int, Int)` or * `write(source: RawSource, byteCount: Long)`. * In the latter case, if a collection is consumable (i.e., once data was read from it will no longer be available for diff --git a/core/common/test/util.kt b/core/common/test/util.kt index c621b3c17..9a98e2ed0 100644 --- a/core/common/test/util.kt +++ b/core/common/test/util.kt @@ -21,6 +21,7 @@ package kotlinx.io import kotlin.test.assertEquals +import kotlin.test.assertTrue fun segmentSizes(buffer: Buffer): List { var segment = buffer.head ?: return emptyList() @@ -34,6 +35,10 @@ fun segmentSizes(buffer: Buffer): List { return sizes } +fun assertNoEmptySegments(buffer: Buffer) { + assertTrue(segmentSizes(buffer).all { it != 0 }, "Expected all segments to be non-empty") +} + expect fun createTempFile(): String expect fun deleteFile(path: String) diff --git a/core/jvm/src/JvmCore.kt b/core/jvm/src/JvmCore.kt index 1506ab1ba..fd2a108bf 100644 --- a/core/jvm/src/JvmCore.kt +++ b/core/jvm/src/JvmCore.kt @@ -62,7 +62,7 @@ private open class OutputStreamSink( override fun close() = out.close() - override fun toString() = "sink($out)" + override fun toString() = "RawSink($out)" } /** @@ -104,7 +104,7 @@ private open class InputStreamSource( override fun close() = input.close() - override fun toString() = "source($input)" + override fun toString() = "RawSource($input)" } /** diff --git a/core/jvm/src/SinksJvm.kt b/core/jvm/src/SinksJvm.kt index 8c5500349..3b70cd100 100644 --- a/core/jvm/src/SinksJvm.kt +++ b/core/jvm/src/SinksJvm.kt @@ -64,12 +64,12 @@ public fun Sink.asOutputStream(): OutputStream { return object : OutputStream() { override fun write(byte: Int) { - if (isClosed()) throw IOException("Underlying sink is closed") + if (isClosed()) throw IOException("Underlying sink is closed.") writeToInternalBuffer { it.writeByte(byte.toByte()) } } override fun write(data: ByteArray, offset: Int, byteCount: Int) { - if (isClosed()) throw IOException("Underlying sink is closed") + if (isClosed()) throw IOException("Underlying sink is closed.") writeToInternalBuffer { it.write(data, offset, offset + byteCount) } } @@ -82,7 +82,7 @@ public fun Sink.asOutputStream(): OutputStream { override fun close() = this@asOutputStream.close() - override fun toString() = "${this@asOutputStream}.outputStream()" + override fun toString() = "${this@asOutputStream}.asOutputStream()" } } diff --git a/core/jvm/src/SourcesJvm.kt b/core/jvm/src/SourcesJvm.kt index 7c940cdb1..ed7f20fc1 100644 --- a/core/jvm/src/SourcesJvm.kt +++ b/core/jvm/src/SourcesJvm.kt @@ -126,7 +126,7 @@ public fun Source.asInputStream(): InputStream { override fun close() = this@asInputStream.close() - override fun toString() = "${this@asInputStream}.inputStream()" + override fun toString() = "${this@asInputStream}.asInputStream()" } } diff --git a/core/jvm/test/utilJVM.kt b/core/jvm/test/utilJVM.kt index 6aae75a50..369d1aad8 100644 --- a/core/jvm/test/utilJVM.kt +++ b/core/jvm/test/utilJVM.kt @@ -7,7 +7,6 @@ package kotlinx.io import java.nio.file.Files import java.nio.file.Paths import kotlin.test.assertEquals -import kotlin.test.assertTrue actual fun createTempFile(): String = Files.createTempFile(null, null).toString() @@ -18,10 +17,6 @@ actual fun deleteFile(path: String) { Files.delete(Paths.get(path)) } -fun assertNoEmptySegments(buffer: Buffer) { - assertTrue(segmentSizes(buffer).all { it != 0 }, "Expected all segments to be non-empty") -} - fun assertByteArrayEquals(expectedUtf8: String, b: ByteArray) { assertEquals(expectedUtf8, b.toString(Charsets.UTF_8)) }