Part four of the series, previous prts are one, two and three . Today we are using an MQTT integration to publish data from TTN to Oracle cloud.
Goto TTN Applications and grab the MQTT information relative to the application created:

Click on [Generate new API key] to create the passwordd forr the MQTT connection and grab the value for later use:

OCI SETUP
Get an OCI user with priveileges enough to create stream topics and use API key credentials:
Create an OCI streaming topic (public) and grab the following information in a shell file called m2os.sh:
export broker="mqtt://eu1.cloud.thethings.network:1883" export mqtttopic="#" export stream="ocid1.stream.oc1.eu-frankfurt-1.amaaa...bogq" export tenancy="ocid1.tenancy.oc1..aaaaaaaallx2fzcc...ga" export user="ocid1.user.oc1..aaaaaaaaodkl6fruq2z...xa" export region="eu-frankfurt-1" export fingerprint="f0:75:...3:1a" export mqttuser=<your mqtt-user> export mqttpasswd=<mqtt api key> export ppkfile=$(cat ./myppk) export ppkpasswd="-" go run m2os.go
Put the contents of your private key in a file named myppk:
-----BEGIN RSA PRIVATE KEY----- MIIEpAIB...KBwBOa30/g== -----END RSA PRIVATE KEY-----
MQTT TO OCI STREAMING CONVERTER
The following go program will be utilised to read sensor data from MQTT and be pushed in OCI streaming topic:
/* * jmu april 2021 */ package main import ( "fmt" "os" "log" "context" "github.com/google/uuid" "github.com/oracle/oci-go-sdk/common" "github.com/oracle/oci-go-sdk/streaming" MQTT "github.com/eclipse/paho.mqtt.golang" ) // reads from mosquito and writes in oci streaming func main() { intro() broker := os.Getenv("broker") fmt.Printf("mqtt endpoint: %s\n", broker) mqtttopic := os.Getenv("mqtttopic") fmt.Printf("mqtt topic name: %s\n", mqtttopic) stream := os.Getenv("stream") fmt.Printf("oci stream ocid: %s\n", stream) tenancy := os.Getenv("tenancy") fmt.Printf("oci tenancy ocid: %s\n", tenancy) user := os.Getenv("user") fmt.Printf("oci user ocid: %s\n", user) region := os.Getenv("region") fmt.Printf("oci region: %s\n", region) fingerprint := os.Getenv("fingerprint") fmt.Printf("fingerprint: %s\n", fingerprint) ppkcontent := os.Getenv("ppkfile") ppkpasswd := os.Getenv("ppkpassword") fmt.Printf("ppk passwd: %s\n", "****") mqttuser := os.Getenv("mqttuser") fmt.Printf("mqttuser: %s\n", mqttuser) mqttpasswd := os.Getenv("mqttpasswd") fmt.Printf("mqtt passwd: %s\n", "****") opts := MQTT.NewClientOptions() opts.AddBroker(broker) opts.SetClientID(uuid.New().String()) opts.SetUsername(mqttuser) opts.SetPassword(mqttpasswd) qos := 0 log.Print(opts) nrcp := common.NewRawConfigurationProvider(tenancy, user, region, fingerprint, string(ppkcontent), &ppkpasswd) // https://cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com var endpoint = "https://cell-1.streaming." + region + ".oci.oraclecloud.com" fmt.Printf("Streaming endpoint: %s\n", endpoint) sclient, err := streaming.NewStreamClientWithConfigurationProvider(nrcp, endpoint) if err != nil { fmt.Println("Error:", err) return } receiveCount := 0 choke := make(chan [2]string) opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { choke <- [2]string{msg.Topic(), string(msg.Payload())} }) fmt.Printf("Connecting to mqtt: %s\n", opts) client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } fmt.Printf("Subscribing to mqtt topic: %s\n", mqtttopic) if token := client.Subscribe(mqtttopic, byte(qos), nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } for receiveCount >=0 { incoming := <-choke fmt.Printf("Recieved from mqtt topic %s the message <- %s\t", incoming[0], incoming[1]) receiveCount++ putMessage(sclient, stream, region, uuid.New().String(), fmt.Sprintf(incoming[1])) } client.Disconnect(250) fmt.Println("Sample Subscriber Disconnected") } /* * put a meessage in a topic */ func putMessage(client streaming.StreamClient, stream string, region string, key string, value string) int{ var req streaming.PutMessagesRequest req.StreamId = &stream var entry streaming.PutMessagesDetailsEntry entry.Key = []byte(key) entry.Value = []byte(value) var entryarray [] streaming.PutMessagesDetailsEntry entryarray = append(entryarray, entry) var det streaming.PutMessagesDetails det.Messages = entryarray req.PutMessagesDetails = det //client.SetRegion(region) _, err := client.PutMessages(context.Background(), req) if err != nil { fmt.Println("Error: ", err) return -1 } fmt.Println("Sent to OCI streaming -> \n", value) return 0 } /* * */ func intro(){ fmt.Fprintf(os.Stderr, "\n (c) jmu 2021 | m2os, get messages from mqtt topic and publishes to oci streaming\n") fmt.Printf("-----------------------------------------------------------------------------------------------\n") } /* * */ func check(e error) { if e != nil { panic(e) } }
Put all the stuff in a directory:

Execute m2os.sh and see what happens:

CONTAINERIZE AND RUN IN K8S
Follow instructions here to containerize and run the mqtt to oci streaming converter in K8s if you will (recommended for industrial production purposes -some small improvement to the code must be added to allow parallel processing using streaming topics partitions and replication factor in K8s).
That’s all for the 4th part of the series. In next post we’ll work with Orracle Cloud to implement data engineering with the sensor data produced and transferred fron the device to the cloud.
One Comment