@nsshunt/stsappframework
v3.3.8
Published
TODO
Downloads
1,052
Readme
stsappframework
TODO
Create working RAFT (sRAFT for STS RAFT) for cluster management
Create a REDIS STREAM service to request data from all REDIS cluster clients
example use case: stsinstrumentmanager; get all socket.io namespaces and socket details for all workers across all instances get all redis listeners for all workers across all instances get all RAFT details for all workers across all instances
using STSServiceController below 1. query STSServiceController for all stsinstrumentmanager worker instances 2. publish query to redis pub/sub instance on the stsinstrumentmanager_admin channel 3. all stsinstrumentmanager workers receive the query via the stsinstrumentmanager_admin channel 3.1 get the information required 3.2 publish response using unique message id on the stsinstrumentmanager_admin_response channel 4. initiating stsinstrumentmanager worker instance listening on the stsinstrumentmanager_admin_response channel 5. based on the worker list from step 1. above, wait for all responses or a timeout 6. if all responses, send responses with OK status 7. if timeout, send responses with TIMEOUT status
New Service: STSServiceController Eventually, this will need to use RAFT for a cluster configuration
Single instance in cluster mode
main thread to hold all STSServiceData
Uses Redis Pub/Sub for service messaging
All STS services and apps (rest, broker, instrumentmanager, uiterm) will publish to ping channel
workers will subscribe to ping channel
workers will send data via IPC to master thread (this will eventually become the RAFT leader)
master thread will keep inmemory (or redis) copy of all connected service data
master thread will use keep-alive logic to auto delete service details if not received within timeout time (2s)
services can query the STSServiceController to get service information for all sts services and apps
Influx DB Notes
|> histogram(bins: [0.0, 10.0, 20.0, 50.0, 100.0, 1000.0, 50000.0, math.mInf(sign: 1) ])
|> histogramQuantile(quantile: 0.99999)
option task = {name: "downsample-daily", every: 1d}
option task = {name: "downsample-daily", every: 1d}
from(bucket: "my-db/") |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "example-measurement") |> filter(fn: (r) => r._field == "example-field") |> aggregateWindow(every: 1h, fn: mean) |> set(key: "_measurement", value: "average-example-measurement") |> to(org: "example-org", bucket: "my-db/example-rp")
|> set(key: "_measurement", value: "sts01")
|> to(org: "my-org", bucket: "TestBucket01")
from(bucket: "TestBucket01")
|> range(start: -5s)
|> last()
|> filter(fn: (r) => r["_measurement"] == "service")
|> filter(fn: (r) => r["_field"] != "memory")
|> group(columns: ["_field"])
|> sum()
|> map(
fn: (r) => ({r with _time: now(), _measurement: "sts01"}),
)
|> to(org: "my-org", bucket: "TestBucket01")
group(columns: ["serviceId", "serviceInstanceId", "_field"])
option task = {name: "task-sts-stats-sum", every: 1s} from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum"})) |> to(org: "my-org", bucket: "TestBucket01")
option task = {name: "task-sts-stats-sum-byservice", every: 1s} from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["serviceId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum-byservice"})) |> to(org: "my-org", bucket: "TestBucket01")
option task = {name: "task-sts-stats-sum-byserviceinstance", every: 1s} from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["serviceId", "serviceInstanceId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum-byserviceinstance"})) |> to(org: "my-org", bucket: "TestBucket01")
import "math" from(bucket: "TestBucket01") |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "sts-stats-sum-byservice" and r["serviceId"] == "[email protected]" and r["_field"] == "cpu") |> histogram(bins: [0.0, 10.0, 20.0, 50.0, 100.0, 200.0, 250.0, 1000.0, 50000.0, math.mInf(sign: 1) ]) |> difference()
import "math" from(bucket: "TestBucket01") |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "sts-stats-sum" and r["_field"] == "cpu") |> histogram(bins: [0.0, 10.0, 20.0, 50.0, 100.0, 200.0, 250.0, 1000.0, 50000.0, math.mInf(sign: 1) ]) |> difference()
/* More accurate */
data = from(bucket: "TestBucket01") |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "sts-stats-sum-byservice" and r["serviceId"] == "[email protected]" and r["_field"] == "cpu") |> aggregateWindow(every: 5s, fn: max, createEmpty: false)
dostsquantileex = (q) =>
data
|> quantile(q: q, method: "estimate_tdigest", compression: 1000.0)
|> set(key: "quantile", value: string(v:q))
|> group(columns: ["quantile"])
union(tables: [
dostsquantileex(q: 0.5),
dostsquantileex(q: 0.8),
dostsquantileex(q: 0.9),
dostsquantileex(q: 0.95),
dostsquantileex(q: 0.99)
])
/* Less accurate */
import "math" data = from(bucket: "TestBucket01") |> range(start: -10m) |> filter(fn: (r) => r["_measurement"] == "sts-stats-sum-byservice" and r["serviceId"] == "[email protected]" and r["_field"] == "cpu") |> histogram(bins: [0.0, 10.0, 20.0, 50.0, 100.0, 200.0, 250.0, 1000.0, 50000.0, math.mInf(sign: 1) ])
dostsquantileex = (q) =>
data
|> histogramQuantile(quantile: q)
|> set(key: "quantile", value: string(v:q))
|> group(columns: ["quantile"])
union(tables: [ dostsquantileex(q: 0.5), dostsquantileex(q: 0.8), dostsquantileex(q: 0.9), dostsquantileex(q: 0.95), dostsquantileex(q: 0.99) ])
activeRequestCount 0 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 1 sts-stats-sum authenticationCount 0 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 2 sts-stats-sum connectionCount 40 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 3 sts-stats-sum connectionIdleCount 12 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 4 sts-stats-sum connectionPoolCount 12 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 5 sts-stats-sum connectionWaitingCount 0 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 6 sts-stats-sum coreCount
cpu 50.91 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 8 sts-stats-sum duration 3.6900000000000004 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 9 sts-stats-sum errorCount 0 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 10 sts-stats-sum latency 22.82 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 11 sts-stats-sum requestCount 4485612 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 12 sts-stats-sum retryCount 0 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 13 sts-stats-sum systemcpu
timer 72578365.22 2023-11-11T06:23:52.761Z 2023-11-11T06:33:52.761Z 2023-11-11T06:33:52.000Z 15 sts-stats-sum velocity
Service OLD
option task = { name: "sts-service-old", every: 1s, }
from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["serviceId", "serviceInstanceId", "serviceInstanceProcessId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum-byserviceInstanceprocessid"})) |> to(org: "my-org", bucket: "TestBucket01")
from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["serviceId", "serviceInstanceId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum-byserviceinstance"})) |> to(org: "my-org", bucket: "TestBucket01")
from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["serviceId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum-byservice"})) |> to(org: "my-org", bucket: "TestBucket01")
from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory") |> group(columns: ["_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-sum"})) |> to(org: "my-org", bucket: "TestBucket01")
Service NEW
Note: The duration and latency > 0.0 is a problem becuase no data is returned for detailed drill in views
option task = {name: "task-sts-service-stats", every: 1s}
data = from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "service" and r["_field"] != "memory")
r1 = data |> filter( fn: (r) => r["_field"] == "requestCount" or r["_field"] == "errorCount" or r["_field"] == "retryCount" or r["_field"] == "authenticationCount" or r["_field"] == "activeRequestCount" or r["_field"] == "connectionCount" or r["_field"] == "connectionPoolCount" or r["_field"] == "connectionIdleCount" or r["_field"] == "connectionWaitingCount" or r["_field"] == "coreCount" or r["_field"] == "cpu" or r["_field"] == "systemcpu" or r["_field"] == "velocity" or r["_field"] == "timer", )
r2 = data |> filter( fn: (r) => float(v: r["_value"]) > 0.0 and (r["_field"] == "duration" or r["_field"] == "latency"), )
serviceInstanceProcessSum = r1 |> group(columns: ["serviceId", "serviceInstanceId", "serviceInstanceProcessId", "_field"]) |> sum() |> toFloat()
serviceInstanceProcessMean = r2 |> group(columns: ["serviceId", "serviceInstanceId", "serviceInstanceProcessId", "_field"]) |> mean() |> toFloat()
union(tables: [serviceInstanceProcessSum, serviceInstanceProcessMean]) |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-serviceinstanceprocess"})) |> to(org: "my-org", bucket: "TestBucket01")
serviceInstanceSum = r1 |> group(columns: ["serviceId", "serviceInstanceId", "_field"]) |> sum() |> toFloat()
serviceInstanceMean = r2 |> group(columns: ["serviceId", "serviceInstanceId", "_field"]) |> mean() |> toFloat()
union(tables: [serviceInstanceSum, serviceInstanceMean]) |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-serviceinstance"})) |> to(org: "my-org", bucket: "TestBucket01")
serviceSum = r1 |> group(columns: ["serviceId", "_field"]) |> sum() |> toFloat()
serviceMean = r2 |> group(columns: ["serviceId", "_field"]) |> mean() |> toFloat()
union(tables: [serviceSum, serviceMean]) |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-service"})) |> to(org: "my-org", bucket: "TestBucket01")
globalServiceSum = r1 |> group(columns: ["_field"]) |> sum() |> toFloat()
globalServiceMean = r2 |> group(columns: ["_field"]) |> mean() |> toFloat()
union(tables: [globalServiceSum, globalServiceMean]) |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-globalservice"})) |> to(org: "my-org", bucket: "TestBucket01")
--- Agents
option task = {name: "task-sts-agent-stats", every: 1s}
data = from(bucket: "TestBucket01") |> range(start: -5s) |> last() |> filter(fn: (r) => r["_measurement"] == "agent")
byagentthreadasyncunner = data |> group(columns: ["agentName", "threadId", "asyncRunnerId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-agentthreadasyncunner"})) |> to(org: "my-org", bucket: "TestBucket01")
byagentthread = data |> group(columns: ["agentName", "threadId", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-agentthread"})) |> to(org: "my-org", bucket: "TestBucket01")
byagent = data |> group(columns: ["agentName", "_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-by-agent"})) |> to(org: "my-org", bucket: "TestBucket01")
globalagent = data |> group(columns: ["_field"]) |> sum() |> toFloat() |> map(fn: (r) => ({r with _time: now(), _measurement: "sts-stats-globalagent"})) |> to(org: "my-org", bucket: "TestBucket01")