AWS Big Data Blog

Amazon MSK IAM authentication now supports all programming languages

The AWS Identity and Access Management (IAM) authentication feature in Amazon Managed Streaming for Apache Kafka (Amazon MSK) now supports all programming languages. Administrators can simplify and standardize access control to Kafka resources using IAM. This support is based on SASL/OUATHBEARER, an open standard for authorization and authentication. Both Amazon MSK provisioned and serverless cluster types support the new Amazon MSK IAM expansion to all programming languages.

In this post, we show how you can connect your applications to MSK clusters with minimal code changes using the open-sourced client helper libraries and code samples for popular languages, including JavaPythonGoJavaScript, and .NET. You can also use standard IAM access controls such as temporary role-based credentials and precisely scoped permission policies more broadly with the multiple language support on Amazon MSK.

For clients that need to connect from other VPCs to an MSK cluster, whether in a same or a different AWS account, you can enable multi-VPC private connectivity and cluster policy support. IAM access control via cluster policy helps you manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Solution overview

You can get started by using IAM principals as identities for your Apache Kafka clients and define identity policies to provide them precisely scoped access permissions. After IAM authentication is enabled for your cluster, you can configure client applications to use the IAM authentication with minimal code changes.

The code changes allow your clients to use SASL/OAUTHBEARER, a Kafka supported token-based access mechanism, to pass the credentials required for IAM authentication. In this post, we show how you can make these code changes by using the provided code libraries and examples.

With this launch, new code libraries for the following programming languages are available in the AWS GitHub repo:

The following diagram shows the conceptual process flow of using SASL/OAUTHBEARER with IAM access control for non-Java clients.

The workflow contains the following steps:

  1. The client generates an OAUTHBEARER token with the help of the provided library. The token contains a signed base64 encoded transformation of your IAM identity credentials.
  2. The client sends this to Amazon MSK using the IAM bootstrap broker addresses along with its request to access Apache Kafka resources.
  3. The MSK broker decodes the OATHBEARER token, validates the credentials, and checks if the client is authorized to perform the requested action according to the policy attached to the IAM identity.
  4. When the token expires, the client Kafka library automatically refreshes the token by making another call to the specified token provider.

Create IAM identities and policies

IAM access control for non-Java clients is supported for MSK clusters with Kafka version 2.7.1 and above. Before you start, you need to configure the IAM identities and policies that define the client’s permissions to access resources on the cluster. The following is an example authorization policy for a cluster named MyTestCluster. To understand the semantics of the action and resource elements, see Semantics of actions and resources.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:topic/MyTestCluster/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:0123456789012:group/MyTestCluster/*"
            ]
        }
    ]
}

Set up the MSK cluster

You need to enable the IAM access control authentication scheme for your MSK provisioned cluster and wait until the cluster finishes updating and turns to the Active state. This is because SASL/OAUTHBEARER uses the same broker addresses for IAM authentication.

Configure the client

You should make code changes to your application that allow the clients to use SASL/OAUTHBEARER to pass the credentials required for IAM authentication. Next, update your application to use the bootstrap server addresses for IAM authentication. You also need to make sure the security group associated with your MSK cluster has an inbound rule allowing the traffic from the client applications in the same VPC as the cluster to the TCP port 9098.

You must use a Kafka client library that provides support for SASL with OAUTHBRARER authentication.

In this post, we use the JavaScript programming language. We also use https://github.com/tulios/kafkajs as our Kafka client library.

Amazon MSK provides you with a new code library per each language that generates the OAUTHBEARER token.

  1. To get started working with the Amazon MSK IAM SASL signer for JavaScript with your Kafka client library, run the following command:
    npm install https://github.com/aws/aws-msk-iam-sasl-signer-js
  2. You need to import the installed Amazon MSK IAM SASL signer library in your code:
    const { Kafka } = require('kafkajs')
    
    const { generateAuthToken } = require('aws-msk-iam-sasl-signer-js')
  3. Next, your application code needs to define a token provider that wraps the function that generates new tokens:
    async function oauthBearerTokenProvider(region) {
        // Uses AWS Default Credentials Provider Chain to fetch credentials
        const authTokenResponse = await generateAuthToken({ region });
        return {
            value: authTokenResponse.token
        }
    }
  4. Specify security_protocol as SASL_SSL and sasl_mechanism as oauthbearer in your JavaScript Kafka client properties, and pass the token provider in the configuration object:
    const run = async () => {
        const kafka = new Kafka({
            clientId: 'my-app',
            brokers: [bootstrap server addresses for IAM],
            ssl: true,
            sasl: {
                mechanism: 'oauthbearer',
                oauthBearerProvider: () => oauthBearerTokenProvider('us-east-1')
            }
        })
    
        const producer = kafka.producer()
        const consumer = kafka.consumer({ groupId: 'test-group' })
    
        // Producing
        await producer.connect()
        await producer.send({
            topic: 'test-topic',
            messages: [
                { value: 'Hello KafkaJS user!' },
            ],
        })
    
        // Consuming
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
    
        await consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                console.log({
                    partition,
                    offset: message.offset,
                    value: message.value.toString(),
                })
            },
        })
    }
    
    run().catch(console.error)

You are now finished with all the code changes. For more examples of generating auth tokens or for more troubleshooting tips, refer to the following GitHub repo.

Conclusion

IAM access control for Amazon MSK enables you to handle both authentication and authorization for your MSK cluster. This eliminates the need to use one mechanism for authentication and another for authorization. For example, when a client tries to write to your cluster, Amazon MSK uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

With today’s launch, Amazon MSK IAM authentication now supports all programming languages. This means you can connect your applications in all languages without worrying about implementing separate authentication and authorization mechanisms. For workloads that require Amazon MSK multi-VPC private connectivity and cluster policy support, you can now simplify connectivity to your MSK brokers and manage all access to the cluster and topics in one place that is your cluster policy.

For further reading on Amazon MSK, visit the official product page.


About the author

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customer’s use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.