SENDING SENSOR DATA TO ORACLE CLOUD WITH LORAWAN RAK WIRELESS AND THE THINGS NETWORK | PART FOUR


Part four of the series, previous prts are one, two and three . Today we are using an MQTT integration to publish data from TTN to Oracle cloud.

Goto TTN Applications and grab the MQTT information relative to the application created:

Click on [Generate new API key] to create the passwordd forr the MQTT connection and grab the value for later use:

OCI SETUP

Get an OCI user with priveileges enough to create stream topics and use API key credentials:

Create an OCI streaming topic (public) and grab the following information in a shell file called m2os.sh:

export broker="mqtt://eu1.cloud.thethings.network:1883"
export mqtttopic="#"
export stream="ocid1.stream.oc1.eu-frankfurt-1.amaaa...bogq"
export tenancy="ocid1.tenancy.oc1..aaaaaaaallx2fzcc...ga"
export user="ocid1.user.oc1..aaaaaaaaodkl6fruq2z...xa"
export region="eu-frankfurt-1"
export fingerprint="f0:75:...3:1a"
export mqttuser=<your mqtt-user>
export mqttpasswd=<mqtt api key>
export ppkfile=$(cat ./myppk)
export ppkpasswd="-"

go run m2os.go 

Put the contents of your private key in a file named myppk:

-----BEGIN RSA PRIVATE KEY-----
MIIEpAIB...KBwBOa30/g==
-----END RSA PRIVATE KEY-----

MQTT TO OCI STREAMING CONVERTER

The following go program will be utilised to read sensor data from MQTT and be pushed in OCI streaming topic:

/*
* jmu april 2021
*/
package main

import (
	"fmt"
	"os"
	"log"
	"context"
	"github.com/google/uuid"
	"github.com/oracle/oci-go-sdk/common"
	"github.com/oracle/oci-go-sdk/streaming"
	MQTT "github.com/eclipse/paho.mqtt.golang"
)

// reads from mosquito and writes in oci streaming
func main() {

    intro()

	broker := os.Getenv("broker")
    fmt.Printf("mqtt endpoint: %s\n", broker)

    mqtttopic := os.Getenv("mqtttopic")
    fmt.Printf("mqtt topic name: %s\n", mqtttopic)

	stream := os.Getenv("stream")
    fmt.Printf("oci stream ocid: %s\n", stream)

	tenancy := os.Getenv("tenancy")
    fmt.Printf("oci tenancy ocid: %s\n", tenancy)

    user := os.Getenv("user")
    fmt.Printf("oci user ocid: %s\n", user)

    region := os.Getenv("region")
    fmt.Printf("oci region: %s\n", region)

    fingerprint := os.Getenv("fingerprint")
    fmt.Printf("fingerprint: %s\n", fingerprint)

    ppkcontent := os.Getenv("ppkfile")

	ppkpasswd := os.Getenv("ppkpassword")
    fmt.Printf("ppk passwd: %s\n", "****")

	mqttuser := os.Getenv("mqttuser")
    fmt.Printf("mqttuser: %s\n", mqttuser)

	mqttpasswd := os.Getenv("mqttpasswd")
    fmt.Printf("mqtt passwd: %s\n", "****")

	opts := MQTT.NewClientOptions()
	opts.AddBroker(broker)
	opts.SetClientID(uuid.New().String())
	opts.SetUsername(mqttuser)
	opts.SetPassword(mqttpasswd)
	qos := 0
	log.Print(opts)

	nrcp := common.NewRawConfigurationProvider(tenancy, user, region, fingerprint, string(ppkcontent), &ppkpasswd)

	// https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com
	var endpoint = "https://cell-1.streaming." + region + ".oci.oraclecloud.com"
	fmt.Printf("Streaming endpoint: %s\n", endpoint)
	sclient, err := streaming.NewStreamClientWithConfigurationProvider(nrcp, endpoint)
	if err != nil {
		fmt.Println("Error:", err)
		return
	}
		receiveCount := 0
		choke := make(chan [2]string)

		opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
			choke <- [2]string{msg.Topic(), string(msg.Payload())}
		})

		fmt.Printf("Connecting to mqtt: %s\n", opts)
		client := MQTT.NewClient(opts)
		if token := client.Connect(); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		fmt.Printf("Subscribing to mqtt topic: %s\n", mqtttopic)
		if token := client.Subscribe(mqtttopic, byte(qos), nil); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			os.Exit(1)
		}

		for receiveCount >=0  {
			incoming := <-choke
			fmt.Printf("Recieved from mqtt topic %s the message <- %s\t", incoming[0], incoming[1])
			receiveCount++
			putMessage(sclient, stream, region, uuid.New().String(), fmt.Sprintf(incoming[1]))
		}

		client.Disconnect(250)
		fmt.Println("Sample Subscriber Disconnected")
}

/*
* put a meessage in a topic
*/
func putMessage(client streaming.StreamClient, stream string, region string, key string, value string) int{
	var req streaming.PutMessagesRequest
	req.StreamId = &stream
	var entry streaming.PutMessagesDetailsEntry
	entry.Key = []byte(key)
	entry.Value = []byte(value)
	var entryarray [] streaming.PutMessagesDetailsEntry
	entryarray = append(entryarray, entry)
	var det streaming.PutMessagesDetails
	det.Messages = entryarray
	req.PutMessagesDetails = det
	//client.SetRegion(region)
	_, err := client.PutMessages(context.Background(), req)
	if err != nil {
		fmt.Println("Error: ", err)
		return  -1
	}
	fmt.Println("Sent to OCI streaming -> \n", value)
	return  0
}
/*
*
*/
func intro(){
    fmt.Fprintf(os.Stderr, "\n (c) jmu 2021 | m2os, get messages from mqtt topic and publishes to oci streaming\n")
    fmt.Printf("-----------------------------------------------------------------------------------------------\n")
}
/*
*
*/
func check(e error) {
    if e != nil {
        panic(e)
    }
}

Put all the stuff in a directory:

Execute m2os.sh and see what happens:

CONTAINERIZE AND RUN IN K8S

Follow instructions here to containerize and run the mqtt to oci streaming converter in K8s if you will (recommended for industrial production purposes -some small improvement to the code must be added to allow parallel processing using streaming topics partitions and replication factor in K8s).

That’s all for the 4th part of the series. In next post we’ll work with Orracle Cloud to implement data engineering with the sensor data produced and transferred fron the device to the cloud.

One Comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.