rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E09 - Processing chunked responses before EOF is reached

Published Jul 23, 2020 by in Go, Golang, Chunked Response, KsqlDB at https://rmoff.net/2020/07/23/learning-golang-some-rough-notes-s02e09-processing-chunked-responses-before-eof-is-reached/

The server sends Transfer-Encoding: chunked data, and you want to work with the data as you get it, instead of waiting for the server to finish, the EOF to fire, and then process the data?

Here’s an example curl of the kind of session I’m talking about:

➜ curl --verbose --location 'http://localhost:8088/query' \
--header 'Content-Type: application/vnd.ksql.v1+json; charset=utf-8' \
--data-raw '{
    "ksql": "SELECT NAME, TS, CAPACITY, EMPTY_PLACES FROM CARPARK_EVENTS  WHERE  EMPTY_PLACES > 100 emit changes;"
}'
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8088 (#0)
> POST /query HTTP/1.1
> Host: localhost:8088
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Type: application/vnd.ksql.v1+json; charset=utf-8
> Content-Length: 118
>
* upload completely sent off: 118 out of 118 bytes
< HTTP/1.1 200 OK
< content-type: application/json
< Transfer-Encoding: chunked
<

The API that I’m working with sends a complete JSON message, but spread over chunks. It starts with a header

[{"header":{"queryId":"none","schema":"`NAME` STRING, `TS` BIGINT, `CAPACITY` INTEGER, `EMPTY_PLACES` INTEGER"}},

and then at some point - perhaps straight away, perhaps after a few seconds, you get some data

{"row":{"columns":["Westgate",1595372100000,116,116]}},
{"row":{"columns":["Burnett St",1595372100000,122,117]}},

and then some empty rows

and then maybe some more data

{"row":{"columns":["Crown Court",1595372100000,142,130]}},
{"row":{"columns":["Leisure Exchange",1595372100000,996,976]}},

This is from a streaming database, and the idea is that the client can use the data as it’s continually sent. Contrast this to the standard request-response pattern of data consumption in which the request is fully satisfied before the client will process the response.

From my Googling I came across two standard patterns for consuming JSON from a REST call:

  • NewDecoder

    json.NewDecoder(resp.Body).Decode(&m)
  • Unmarshal

    json.Unmarshal(resp.Body, &m)

But I found that both of these blocked until the entire response had been received - which is not what I wanted. Courtesy of chourobin I found this solution. First up, create the client and request:

// Prepare the request
url := "http://localhost:8088/query"
method := "POST"
k := "SELECT NAME, TS, CAPACITY, EMPTY_PLACES FROM CARPARK_EVENTS  WHERE  EMPTY_PLACES > " + strconv.Itoa(c) + "  EMIT CHANGES;"
payload := strings.NewReader("{\"ksql\":\"" + k + "\"}")

// Create the client
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
if err != nil {
    return err
}
req.Header.Add("Content-Type", "application/vnd.ksql.v1+json; charset=utf-8")

// Make the request
res, err := client.Do(req)
if err != nil {
    return err
}
defer res.Body.Close()

Now create a NewReader to consume the response:

reader := bufio.NewReader(res.Body)

And then run a loop which consumes the response a line at a time:

doThis := true
for doThis {
    // Read the next chunk
    lb, err := reader.ReadBytes('\n')
    if err != nil {
        // Got an error back (e.g. EOF), so exit the loop
        doThis = false
    } else {
        // Do stuff with the response here
        fmt.Printf("\nGot some data:\n\t%v", string(lb))
    }
}

What about the JSON? 🔗

If you notice the example response shown above, the chunks are not self-contained JSON.

  • The header chunk opens an array:

    [{"header":{"queryId":"none","schema":"`NAME` STRING, `TS` BIGINT, `CAPACITY` INTEGER, `EMPTY_PLACES` INTEGER"}},
  • Each row chunk is an array entry with trailing comma

    {"row":{"columns":["Westgate",1595372100000,116,116]}},

The inbound stream of Bytes is split into lines using reader.ReadBytes('\n'). This function takes a single byte as the token by which to split, but instead of splitting on \n (ASCII 13) alone, we actually want to split on ,\r\n (ASCII 44, 10, 13) since we have the trailing comma to remove, and the CRLF as the delineator.

Now, I think the proper option here is to use a Scanner but for a quick win I instead did a dirty thing and just truncated slice by two bytes 🤢 (the first byte being \n which had already been removed by the ReadBytes function)

if len(lb) > 2 {
    lb = lb[:len(lb)-2]

You can then take the slice of bytes and marshall the JSON into a Go variable. You need to declare this first, using a custom type—defining the type is easy using this handy little tool, into which you paste some sample JSON and it spits out the Go type defintion:

jsontogo

So taking this Go code:

type ksqlDBMessageRow struct {
	Row struct {
		Columns []interface{} `json:"columns"`
	} `json:"row"`
}

you declare the variable into which you’ll store the row that’s been read:

var r ksqlDBMessageRow

// …

if strings.Contains(string(lb), "row") {
    // Looks like a Row, let's process it!
    err = json.Unmarshal(lb, &r)
    if err != nil {
        fmt.Printf("Error decoding JSON %v (%v)\n", string(lb), err)
    }
}

From that you can then access the actual values in the payload itself:

if r.Row.Columns != nil {
    CARPARK = r.Row.Columns[0].(string)
    DATA_TS = r.Row.Columns[1].(float64)
    CURRENT_EMPTY_PLACES = r.Row.Columns[2].(float64)
    CAPACITY = r.Row.Columns[3].(float64)
    // Handle the timestamp
    t := int64(DATA_TS)
    ts := time.Unix(t/1000, 0)
    fmt.Printf("Carpark %v at %v has %v spaces available (capacity %v)\n", CARPARK, ts, CURRENT_EMPTY_PLACES, CAPACITY)
}

📺 More Episodes… 🔗

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025