In this article, I want to cover a simplified but working example of decentralized Blockchain based on Proof Of Work algorithm, some sort of simplified Bitcoin. You can think about it as a simplified version of Bitcoin. - PoW blockchain theory - How to write blockchain in Python from scratch - How to write Full-Node and create decentralized nodes to run the blockchain on them What I will describe: The code you can find in my GitHub: https://github.com/creotiv/full_node_blockchain What the point in all of this? First you should understand that Blockchain and Cryptocurrencies are not the same. Blockchain is a cryptography algorithm to store data in a decentralized way when you cant trust anyone. Cryptocurrencies are little part of all projects that used Blockchain. Second. Idea of decentralized data storing and processing lies in the ability to create conditions where the system can be controlled only by most of the people and not by just some of them. Which blocks people, parties, countries from enforcing their rules on users of the system. Basically some sort of Free speech, but for the internet services. What need to understand before proceeding As we can’t trust anyone, that implies that we cant trust any data outside our blockchain database. That means that we can’t use things like time, API calls to any services, etc. Which makes things much harder. Some systems that used on-chain services called Oracles to access off-chain data, but that’s another story. No off chain data. Which come from the . You can pick only two. So there can’t be Blazing Fast, Decentralized, and Secure solution. Blockchain trilemma. CAP theorem PoW Blockchain is one of the oldest algorithms of consensus used in a Blockchain. — is a way how decentralized actors/nodes can agree on something that happens in the system, like data update. Proof of Work Consensus So for us Blockchain is a decentralized database where we have transactions to update it state and have a consensus algorithm so nodes can agree that the update is valid. And that what is Blockchain from a global perspective. Transactions So assume we have many nodes, each of it has acurrent data state. So we need some way to update that state and save an order of our updates(in most cases like money transfers it matters). We have 3 ways of doing our transactions: save new data state in the transaction, save diff of states(+$100/-$100), save state flow. In the first one we can’t verify the chain of our modification, in case of error or attack. Even current centralized financial solutions doesn’t save just your current money balance, instead they save log of all updates to your balance, from which are calculation your current balance. That’s why most(didn’t see them all :) ) blockchain solutions use diff of state(Ethereum) or state flow pattern(Bitcoin) to describe transactions. Here is how looks like state flow: Each Transaction has Inputs and Outputs, where each Input pointing on the previous output. Each Output can be used as an input only 1 time. Each input is signed by a private key of user wallet to prove that he is the owner of that Output. So for example you have one Output from the exchange that saying that you were bought 10 coins to your wallet. And now you transferring 5 coins to your friend wallet. You left 5 coins from the previous output not spent, if you left them like this system will count them as a fee for processing your Transaction. To not pay a fee, you can add additional Output that will move money back to your wallet. That’s the idea behind the transactions in a bitcoin-like system. But now we need add some order to Txs, as we have a decentralized system and data can mess up. We cant use time as other systems, because it not trusted data. That where we get Blocks, from which came the name Blockchain. Blocks Blocks add an order for our transactions. All Txs that go in the same block considering running at the same time. Here is how they look like: Each block consists of data(list of transactions), hash of the block(got from mining), and link to the previous block hash, and some additional data like time, nonce, block index. First transaction in a block is a COINBASE transaction, that doesnt have previous hash and is used to give reward for mining. Each block hash is constructed based on Tx hashes, previous block hash, nonce and other data. Thus you cant replace block from inside the chain. The only way how you can change the data in the whole blockchain is to recompute hashes for all blocks. And that’s where the Proof of Work algorithm takes his place. Proof Of Work algorithm As recomputing hashes it’s not a problem, we need some mechanism to make this unreal. PoW one of such things. Instead of just use any hash for our block, PoW set rules on which hash we can consider valid. Such rules add a level of difficulty to calculate the hash, cause now it need many iterations to find a valid hash, thus it takes time and resources. From that point we see that attackers can’t modify running blockchain without having at least 51% of all resources used in it. Because not forget that block mined decentralized by thousands of different nodes. Mining Mining is a process of finding hash for the Block that will be considered valid by the system. If you look at the block description you will find field. That field is used to change the hash for mining, as we can change block data. That’s working because even small modification to data, makes hash totally different. nonce For each mined Block miner receive some reward and also all fees from Txs. With some amount of blocks reward and difficulty of hash rules may change. That’s how the total supply of coins is limited. Merkel Tree — is an algorithm to hash data with a tree structure, which adds a possibility to update a resulting hash without whole tree re-computation when new data added. Merkel Tree It is used to hash list of transactions in a Block and save resources during mining by removing the need to recompute all Txs hashes each time. Also, it gives the ability to find if some data in a tree with less computation needed. System together We have some number of nodes. Each of them has a duplicate of the whole data of the blockchain(such nodes called Full-Node). Nodes are using the gossip communication principle — If we get data that we didn’t see we broadcast it to other nodes that we know. In such a way data like Txs, blocks, new nodes addresses, etc, are shared across all nodes. When some time/number of Txs passed from the adding(mining) of the last block, nodes start mining for a new block concurrently, the first who mine it, add it to his chain and share to other nodes, they validate it and if it ok add it also and broadcast it farther. In some situations Split Brain situation may occur, when two nodes mine two different blocks at the ~same time. In such case some nodes continue mining new Block based on first Block, and other on the second. The first chain which will be longer wins and will be added to the blockchain. Problems Running mining concurrently and increasing difficulty use too much resources, that could be used more useful. As difficulty too big for one person to mine, people gathered in a mining pools, which uses their resources to mine block and spread reward among all participants. As we see if 3 pool merges, they will have more than 51% mining resources, which will give them ability to compromise the network. Also it shows that bitcoin is not such decentralized solution as many people think. Codding part You can proceed straight to the code: https://github.com/creotiv/full_node_blockchain Project structure: I tried to not add different hard things that are not related to the blockchain directly, to minimize code amount so you are not lost in it. Also nodes don’t save their states on shut down. What i covered in this demo: Blockchain based on Proof Of Work algorithm Transaction spent control. Each Tx Input pointed to the previous Tx Output Signed Inputs by wallet private key Using Merkel Tree for faster Block hash computation during mining Mining process Sync process between nodes Transaction and Block verifiers Same configuration on reward and difficulty for all blocks. Thus no supply limits. Nodes blocks, txs, nodes address gossip broadcast Covering Blockchain split brain situation but only for one level. Openapi schema + UI (generated by FastAPI) Some tests for blockchain. Cause it very simple to mess things up with all these hashes What i didn’t covered in this demo: Multi-level split brain Automatic node discovery in subnets through service discovery protocol and ping Integration testing Byzantine testing Many things that real blockchain solution has. If you interesting in such, you can open Bitcoin or Ethereum after reading this. Multiprocessing for mining Light client No limitation on block sizes or number of Txs. Block mining starts after previous mining ends. Wallet rsa binascii isinstance(addr, rsa.PublicKey): self.addr = addr : isinstance(addr,str): addr = addr.encode() self.addr = rsa.PublicKey.load_pkcs1( % addr) .join(self.addr.save_pkcs1().split( )[ : ]).decode() self.addr __slots__ = , pub: self._pub = Address(pub) self._priv = rsa.PrivateKey.load_pkcs1(priv) inst = cls( , ) _pub, _priv = rsa.newkeys( ) inst._pub = Address(_pub) inst._priv = _priv inst signature = binascii.unhexlify(signature.encode()) isinstance(address, Address): address = Address(address) : rsa.verify(data, signature, address.key) == : str(self._pub) self._priv.save_pkcs1() binascii.hexlify(rsa.sign(hash, self._priv, )).decode() import import : class Address : def __init__ (self, addr) if else if # thats not clean bu i didnt find simple crypto library for 512 sha key # to get address/public_key short. b'-----BEGIN RSA PUBLIC KEY-----\n%b\n-----END RSA PUBLIC KEY-----\n' : def __str__ (self) return b'' b'\n' 1 -2 @property : def key (self) return : class Wallet '''For real case wallet use ECDSA cryptography''' '_pub' '_priv' : def __init__ (self, pub=None, priv=None) if @classmethod : def create (cls) b'' b'' 512 return @classmethod : def verify (cls, data, signature, address) if not try return 'SHA-256' except return False @property : def address (self) return @property : def priv (self) return : def sign (self, hash) return 'SHA-256' We create some wrapper around RSA python library. Wallet consists from 2 keys: public and private. The Public key is our blockchain address, and it used to verify the signature on data, which we make with our private key(which is not shared with anyone). In Bitcoin and other solution are used instead of RSA ECDSA cryptography Blocks time hashlib sha256 merkletools MerkleTools .wallet Address __slots__ = , , , , , , self.prev_tx_hash = prev_tx_hash self.output_index = output_index self.address = address self.index = self._hash = self.signature = self.amount = hash_string = .format( self.prev_tx_hash, self.output_index, self.address, self.index ).encode() self.signature = wallet.sign(hash_string) self._hash: self._hash self.signature self.prev_tx_hash != : Exception( ) hash_string = .format( self.prev_tx_hash, self.output_index, self.address, self.signature, self.index ) self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode( )).hexdigest() self._hash { :self.prev_tx_hash, :self.output_index, :str(self.address), :self.index, :self.hash, :self.signature } inst = cls( data[ ], data[ ], Address(data[ ]), data[ ], ) inst.signature = data[ ] inst._hash = inst __slots__ = , , , , self.address = address self.index = self.amount = int(amount) self.input_hash = self._hash = self._hash: self._hash hash_string = .format( self.amount, self.index, self.address, self.input_hash ) self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode( )).hexdigest() self._hash { :int(self.amount), :str(self.address), :self.index, : self.input_hash, :self.hash } inst = cls( Address(data[ ]), data[ ], data[ ], ) inst.input_hash = data[ ] inst._hash = inst __slots__ = , , , self.inputs = inputs self.outputs = outputs self.timestamp = timestamp int(time.time()) self._hash = self._hash: self._hash inp_hash = sha256((str([el.as_dict el self.inputs]) + str(self.timestamp)).encode()).hexdigest() el self.outputs: el.input_hash = inp_hash hash_string = .format( [el.as_dict el self.inputs], [el.as_dict el self.outputs], self.timestamp ) self._hash = sha256(sha256(hash_string.encode()).hexdigest().encode( )).hexdigest() self._hash inp_hash = sha256((str([el.as_dict el self.inputs]) + str(self.timestamp)).encode()).hexdigest() el self.outputs: el.input_hash = inp_hash { :[el.as_dict el self.inputs], :[el.as_dict el self.outputs], :self.timestamp, :self.hash } inps = [Input.from_dict(el) el data[ ]] outs = [Output.from_dict(el) el data[ ]] inp_hash = sha256((str([el.as_dict el inps]) + str(data[ ])).encode()).hexdigest() el outs: el.input_hash = inp_hash inst = cls( inps, outs, data[ ], ) inst._hash = inst __slots__ = , , , , , self.txs = txs [] self.prev_hash = prev_hash self.index = index self.nonce = nonce self.timestamp = timestamp int(time.time()) self.merkel_root = self.merkel_root: self.merkel_root mt = MerkleTools(hash_type= ) el self.txs: mt.add_leaf(el.hash) mt.make_tree() self.merkel_root = mt.get_merkle_root() self.merkel_root nonce: self.nonce = nonce block_string = .format( self.build_merkel_tree(), self.prev_hash, self.index, self.nonce, self.timestamp ) sha256(sha256(block_string.encode()).hexdigest().encode( )).hexdigest() { : self.index, : self.timestamp, : self.prev_hash, : self.hash(), : [el.as_dict el self.txs], : self.nonce, :self.merkel_root } cls( [Tx.from_dict(el) el data[ ]], data[ ], data[ ], data[ ], data[ ] ) import from import from import from import : class Input 'prev_tx_hash' 'output_index' 'signature' '_hash' 'address' 'index' 'amount' : def __init__ (self, prev_tx_hash, output_index, address, index= ) 0 0 None None None : def sign (self, wallet) '{}{}{}{}' @property : def hash (self) if return if not and 'COINBASE' raise 'Sing the input first' '{}{}{}{}' 'utf8' return @property : def as_dict (self) return "prev_tx_hash" "output_index" "address" "index" "hash" "signature" @classmethod : def from_dict (cls, data) 'prev_tx_hash' 'output_index' 'address' 'index' 'signature' None return : class Output '_hash' 'address' 'index' 'amount' 'input_hash' : def __init__ (self, address, amount, index= ) 0 0 # i use input hash here to make output hash unique, especialy for COINBASE tx None None @property : def hash (self) if return '{}{}{}{}' 'utf8' return @property : def as_dict (self) return "amount" "address" "index" "input_hash" "hash" @classmethod : def from_dict (cls, data) 'address' 'amount' 'index' 'input_hash' None return : class Tx 'inputs' 'outputs' 'timestamp' '_hash' : def __init__ (self, inputs, outputs, timestamp=None) or None @property : def hash (self) if return # calculating input_hash for outputs for in for in '{}{}{}' for in for in 'utf8' return @property : def as_dict (self) for in for in return "inputs" for in "outputs" for in "timestamp" "hash" @classmethod : def from_dict (cls, data) for in 'inputs' for in 'outputs' for in 'timestamp' for in 'timestamp' None return : class Block 'nonce' 'prev_hash' 'index' 'txs' 'timestamp' 'merkel_root' : def __init__ (self, txs, index, prev_hash, timestamp=None, nonce= ) 0 or or None : def build_merkel_tree (self) """ Merkel Tree used to hash all the transactions, and on mining do not recompute Txs hash everytime Which making things much faster. And tree used because we can append new Txs and rebuild root hash much faster, when just building block before mine it. """ if return "SHA256" for in return : def hash (self, nonce=None) if '{}{}{}{}{}' return 'utf8' @property : def as_dict (self) return "index" "timestamp" "prev_hash" "hash" "txs" for in "nonce" "merkel_root" @classmethod : def from_dict (cls, data) return for in 'txs' 'index' 'prev_hash' 'timestamp' 'nonce' In a blocks.py we describe our blockchain building blocks as Txs, Input, Output and Block . Each class has hash, as_dict, from_dict methods. We sign each Input with our wallet instance. Output class has field that used to create a unique hash for each output in a transaction, in other cases it would be similar in many cases input_hash As i said before we use the Merkel Tree algorithm to hash all transactions in a block to speed up mining Verifiers rsa binascii .wallet Address = db def verify(self, inputs, outputs): total_amount_in = total_amount_out = i,inp enumerate(inputs): inp.prev_tx_hash == and i == : total_amount_in = int(self.db.config[ ]) : out = self.db.transaction_by_hash[inp.prev_tx_hash][ ][inp.output_index] except KeyError: raise Exception( ) total_amount_in += int(out[ ]) (inp.prev_tx_hash,out[ ]) not self.db.unspent_txs_by_user_hash.get(out[ ], set()): raise Exception( ) hash_string = .format( inp.prev_tx_hash, inp.output_index, inp.address, inp.index ) : rsa.verify(hash_string.encode(), binascii.unhexlify(inp.signature.encode()), Address(out[ ]).key) == except: raise Exception( % inp.as_dict) out outputs: total_amount_out += int(out.amount) total_amount_in < total_amount_out: raise Exception( ) total_amount_in - total_amount_out = db self.tv = TxVerifier(db) def verify(self, head, block): total_block_reward = int(self.db.config[ ]) # verifying block hash int(block.hash(), ) > ( ** ( -self.db.config[ ])): raise BlockVerificationFailed( ) # verifying transactions a block tx block.txs[ :]: fee = self.tv.verify(tx.inputs, tx.outputs) total_block_reward += fee total_reward_out = out block.txs[ ].outputs: total_reward_out += out.amount # verifying block reward total_block_reward != total_reward_out: raise BlockVerificationFailed( ) # verifying some other things head: head.index >= block.index: raise BlockOutOfChain( ) head.hash() != block.prev_hash: raise BlockOutOfChain( ) head.timestamp > block.timestamp: raise BlockOutOfChain( ) True import import from import : ( , ): . class TxVerifier def __init__ self db self db 0 0 for in if 'COINBASE' 0 'mining_reward' continue try 'outputs' 'Transaction output not found.' 'amount' if 'hash' in 'address' 'Output of transaction already spent.' '{}{}{}{}' try 'address' 'SHA-256' 'Signature verification failed: %s' for in if 'Insuficient funds.' return ( ): ( ): : ( , ): . class BlockOutOfChain Exception pass class BlockVerificationFailed Exception pass class BlockVerifier def __init__ self db self db 'mining_reward' if 16 2 256 'difficulty' 'Block hash bigger then target difficulty' in for in 1 0 for in 0 if 'Wrong reward sum' if if 'Block index number wrong' if 'New block not pointed to the head' if 'Block from the past' return One of the main parts of our system, as we need to be sure that the data that we get from the other nodes are valid. Previous Inputs to our Txs controlled with internal DB, that updated with each block to remove needs of passing through thewhole blockchain(27GB now) to find needed data. Basically it’s how Blockchain is saved on nodes in a network. Blockchain .blocks Block, Tx, Input, Output .verifiers TxVerifier, BlockOutOfChain, BlockVerifier, BlockVerificationFailed logging logger = logging.getLogger( ) __slots__ = , , , , , , , , self.max_nonce = ** self.db = db self.wallet = wallet self.on_new_block = on_new_block self.on_prev_block = on_prev_block self.unconfirmed_transactions = set() self.current_block_transactions = set() self.chain = [] self.fork_blocks = {} tx = self.create_coinbase_tx() block = Block([tx], , ) self.mine_block(block) inp = Input( , ,self.wallet.address, ) inp.sign(self.wallet) out = Output(self.wallet.address, self.db.config[ ]+fee, ) Tx([inp],[out]) bv = BlockVerifier(self.db) bv.verify(self.head, block) self.head block.hash() == self.head.hash(): logger.error( ) : self.is_valid_block(block) BlockOutOfChain: block.prev_hash == self.head.prev_hash: logger.error( ) self.fork_blocks[block.hash()] = block : b_hash, b self.fork_blocks.items(): block.prev_hash == b_hash: logger.error( ) self.rollback_block() self.chain.append(b) self.chain.append(block) self.fork_blocks = {} logger.error( ) BlockVerificationFailed e: logger.error( % e) : self.chain.append(block) self.fork_blocks = {} logger.info( ) logger.error( ) self.db.transaction_by_hash.get(tx.hash): tv = TxVerifier(self.db) fee = tv.verify(tx.inputs, tx.outputs) self.db.transaction_by_hash[tx.hash] = tx.as_dict self.unconfirmed_transactions.add((fee, tx.hash)) txs = sorted(self.unconfirmed_transactions, key= x:-x[ ])[:self.db.config[ ]] self.current_block_transactions = set(txs) fee = sum([v[ ] v txs]) txs = [Tx.from_dict(self.db.transaction_by_hash[v[ ]]) v txs ] block = Block( txs=[self.create_coinbase_tx(fee)] + txs, index=self.head.index+ , prev_hash=self.head.hash(), ) self.mine_block(block, check_stop) self.unconfirmed_transactions -= self.current_block_transactions self.db.block_index = block.index tx block.txs: self.db.transaction_by_hash[tx.hash] = tx.as_dict out tx.outputs: self.db.unspent_txs_by_user_hash[str(out.address)].add((tx.hash,out.hash)) self.db.unspent_outputs_amount[str(out.address)][out.hash] = int(out.amount) inp tx.inputs: inp.prev_tx_hash == : prev_out = self.db.transaction_by_hash[inp.prev_tx_hash][ ][inp.output_index] self.db.unspent_txs_by_user_hash[prev_out[ ]].remove((inp.prev_tx_hash,prev_out[ ])) self.db.unspent_outputs_amount[prev_out[ ]][prev_out[ ]] self.on_new_block: self.on_new_block(block, self.db) self.current_block_transactions = set() block = self.chain.pop() self.db.block_index -= total_amount_in = total_amount_out = tx block.txs: out tx.outputs: self.db.unspent_txs_by_user_hash[str(out.address)].remove((tx.hash,out.hash)) self.db.unspent_outputs_amount[str(out.address)][out.hash] total_amount_out += out.amount inp tx.inputs: inp.prev_tx_hash == : prev_out = self.db.transaction_by_hash[inp.prev_tx_hash][ ][inp.output_index] self.db.unspent_txs_by_user_hash[prev_out[ ]].add((inp.prev_tx_hash,prev_out[ ])) self.db.unspent_outputs_amount[prev_out[ ]][prev_out[ ]] = prev_out[ ] total_amount_in += int(prev_out[ ]) fee = total_amount_in - total_amount_out self.unconfirmed_transactions.add((fee,tx.hash)) self.on_prev_block: self.on_prev_block(block, self.db) n range(self.max_nonce): check_stop check_stop(): logger.error( ) int(block.hash(nonce=n), ) <= ( ** ( -self.db.config[ ])): self.add_block(block) self.rollover_block(block) logger.info( % n) self.chain: self.chain[ ] [el.as_dict el reversed(self.chain)] from import from import import 'Blockchain' : class Blockchain 'max_nonce' 'chain' 'unconfirmed_transactions' 'db' 'wallet' 'on_new_block' 'on_prev_block' 'current_block_transactions' 'fork_blocks' : def __init__ (self, db, wallet, on_new_block=None, on_prev_block=None) 2 32 : def create_first_block (self) """ Creating first block in a chain. Only COINBASE Tx. """ 0 0x0 : def create_coinbase_tx (self, fee= ) 0 'COINBASE' 0 0 'mining_reward' 0 return : def is_valid_block (self, block) return : def add_block (self, block) if and 'Duplicate block' return False try except # Here we covering split brain case only for next 2 leves of blocks # with high difficulty its a rare case, and more then 2 level much more rare. if 'Split Brain detected' return False else for in if 'Split Brain fixed. Longer chain choosen' return True 'Second Split Brain detected. Not programmed to fix this' return False except as 'Block verification failed: %s' return False else ' Block added' return True 'Hard chain out of sync' : def add_tx (self, tx) if return False return True : def force_block (self, check_stop=None) ''' Forcing to mine block. Gthering all txs with some limit. First take Txs with bigger fee. ''' lambda 0 'txs_per_block' 0 for in 1 for in 1 : def rollover_block (self, block) ''' As we use some sort of DB, we need way to update it depends we need add block or remove. So we have 2 methods Rollover and Rollback. Also i added some sort of callback in case some additional functionality should be added on top. For example some Blockchain analytic DB. ''' for in for in for in if 'COINBASE' continue 'outputs' 'address' 'hash' del 'address' 'hash' if : def rollback_block (self) 1 0 0 for in # removing new unspent outputs for in del # adding back previous unspent outputs for in if 'COINBASE' continue 'outputs' 'address' 'hash' 'address' 'hash' 'amount' 'amount' # adding Tx back un unprocessed stack if : def mine_block (self, block, check_stop=None) ''' Mine a block with ability to stop in case if check callback return True ''' for in if and 'Mining interrupted.' return if 16 2 256 'difficulty' ' Block mined at nonce: %s' break @property : def head (self) if not return None return -1 @property : def blockchain (self) return for in Method used to run mining of the new block by gathering some number of Txs ordered by fee and add Coinbase Tx to them. force_block After block added the chain we use to update our DB with new data. rollover_block In case when new block(that we got from a different node) create Split Brain issue we use to rollback the chain to the previous block and merge the new longest chain(code do not support multi level split brain, as it almost impossible in the real world) rollback_block There also some tests to verify that blockchain code works normaly. Now we need to make this tun concurrently Creating full-node with FastApi I used FastApi as it fast, simple, use asyncio and can build OpenApi schema and debug UI from code(which is awesome). fastapi FastAPI, BackgroundTasks, Request uvicorn requests asyncio logging sys models * blockchain.db DB blockchain.blockchain Blockchain blockchain.wallet Wallet blockchain.api API blockchain.blocks Input, Output, Tx super(ColorFormatter,self).__init__(fmt) red = nc = cyan = err_fmt = info_fmt = self.err = logging.Formatter(err_fmt) self.log = logging.Formatter(info_fmt) record.levelno == logging.ERROR: self.err.format(record) : self.log.format(record) logger = logging.getLogger( ) app = FastAPI() app.config = {} app.jobs = {} logger.info( ) bc = app.config[ ] head = bc.get_head() : sync_running = node app.config[ ]: node == ( % (app.config[ ],app.config[ ])): url = % node start = head[ ]+ head : logger.info(url, { :start, : }) res = requests.get(url, params={ :start, : }) res.status_code == : data = res.json() data: sync_running = block data: : bc.add_block(block) Exception e: logger.exception(e) : logger.info( ) start += head = bc.get_head() sync_running: app.config[ ] = logger.info( ) node list(app.config[ ])[:]: node == ( % (app.config[ ],app.config[ ])) fiter_host == node: url = % (node,path) logger.info( ) : params: requests.post(url, params=data, timeout= , headers={ : % (app.config[ ],app.config[ ])}) : requests.post(url, json=data, timeout= , headers={ : % (app.config[ ],app.config[ ])}) : logger.info( ) : : event.is_set() logger.info( ) app.config[ ].mine_block(check_stop) logger.info( ) broadcast( , app.config[ ].get_head()) event.is_set(): asyncio.CancelledError: logger.info( ) Exception e: logger.exception(e) app.jobs.get( ): app.jobs[ ].set() app.jobs[ ] = app.jobs.get( ): loop = asyncio.get_running_loop() app.jobs[ ] = asyncio.Event() loop.run_in_executor( , mine, app.jobs[ ]) app.config[ ] length = len(app.config[ ]) app.config[ ] |= set(nodes.nodes) length < len(app.config[ ]): broadcast( , { :list(app.config[ ])}, , request.headers.get( )) logger.info( ) { : } address_from = app.config[ ].address wallet = app.config[ ] bc = app.config[ ] unspent_txs = bc.get_user_unspent_txs(address_from) total = inputs = [] i = : total < amount: prev = unspent_txs[i] inp = Input(prev[ ],prev[ ],address_from,i) inp.sign(wallet) total += prev[ ] i += inputs.append(inp) Exception e: { : , :str(e)} outs = [Output(address_to, amount, )] total - amount > : outs.append(Output(address_from, total - amount, )) tx = Tx(inputs,outs) : res = bc.add_tx(tx.as_dict) Exception e: logger.exception(e) { : , :str(e)} : res: logger.info( ) background_tasks.add_task(broadcast, , tx.as_dict, ) { : } logger.info( ) { : , : } bc = app.config[ ] { : address, :bc.get_user_balance(address)} bc = app.config[ ] { : address, :bc.get_user_unspent_txs(address)} bc = app.config[ ] head = bc.get_head() head: { : } { :head[ ], :head[ ], :head[ ], :head[ ] } bc = app.config[ ] bc.get_chain(from_block, limit) logger.info( ) app.config[ ]: logger.error( ) { : , : } bc = app.config[ ] head = bc.get_head() (head[ ] + ) < block.index: app.config[ ] = background_tasks.add_task(sync_data) logger.error( ) { : , : } : res = bc.add_block(block.dict()) res: restart_miner() Exception e: logger.exception(e) { : , :str(e)} : res: logger.info( ) background_tasks.add_task(broadcast, , block.dict(), , request.headers.get( )) { : } logger.info( ) { : , : } logger.info( ) bc = app.config[ ] : res = bc.add_tx(tx.dict()) Exception e: logger.exception(e) { : , :str(e)} : res: logger.info( ) background_tasks.add_task(broadcast, , tx.dict(), , request.headers.get( )) { : } logger.info( ) { : , : } app.config[ ] = loop = asyncio.get_running_loop() loop.run_in_executor( , sync_data) loop.run_in_executor( , broadcast, , { :[ % (app.config[ ],app.config[ ])]}, ) app.config[ ]: app.jobs[ ] = asyncio.Event() loop.run_in_executor( , mine, app.jobs[ ]) app.jobs.get( ): app.jobs.get( ).set() app.jobs.get( ): loop = asyncio.get_running_loop() app.jobs[ ].set() app.jobs[ ] = asyncio.Event() loop.run_in_executor( , mine, app.jobs[ ]) __name__ == : logger.setLevel(logging.INFO) handler = logging.StreamHandler(sys.stdout) handler.setFormatter(ColorFormatter()) handler.setLevel(logging.INFO) logger.addHandler(handler) argparse parser = argparse.ArgumentParser(description= ) parser.add_argument( , type=str, help= ) parser.add_argument( , required= , type=int, help= ) parser.add_argument( , required= , type=bool, help= ) parser.add_argument( , required= , type=int, help= ) args = parser.parse_args() _DB = DB() _DB.config[ ] _W = Wallet.create() _BC = Blockchain(_DB, _W) _API = API(_BC) logger.info( %_W.address) app.config[ ] = _DB app.config[ ] = _W app.config[ ] = _BC app.config[ ] = _API app.config[ ] = args.port app.config[ ] = app.config[ ] = set([args.node]) args.node set([ % args.port]) app.config[ ] = app.config[ ] = args.mine args.node: _BC.create_first_block() uvicorn.run(app, host= , port=args.port, access_log= ) from import import import import import import from import from import from import from import from import from import # Custom formatter : class ColorFormatter (logging.Formatter) : def __init__ (self, fmt= ) "%(asctime)s - Blockchain - %(message)s" '\033[0;31m' '\033[0m' '\033[0;96m' f" %(asctime)s - Blockchain - %(message)s" {red} {nc} f" %(asctime)s - Blockchain - %(message)s" {cyan} {nc} : def format (self, record) if return else return "Blockchain" ### TASKS : def sync_data () '================== Sync started =================' 'api' while True False for in 'nodes' if '%s:%s' 'host' 'port' continue 'http://%s/chain/sync' 'index' 1 if else 0 while True "from_block" "limit" 20 "from_block" "limit" 20 if 200 if not break True for in try except as return else f"Block added: # " {block[ ]} 'index' 20 if not 'sync_running' False '================== Sync stopped =================' return : def broadcast (path, data, params=False, fiter_host=None) for in 'nodes' if '%s:%s' 'host' 'port' or continue 'http://%s%s' f'Sending broadcast except: ' {url} {fiter_host} try # header added here as we run all nodes on one domain and need somehow understand the sender node # to not create broadcast loop if 2 'node' '%s:%s' 'host' 'port' else 2 'node' '%s:%s' 'host' 'port' except pass : def mine (event) '>>>>>>>>>> Starting mining loop' # In real case you chould do like this, mining script should run in separate process while True try : def check_stop () return f'>> Starting new block mining' 'api' f'>> New block mined' '/chain/add_block' 'api' if return except '>>>>>>>>>> Mining loop stopped' return except as ### SERVER OPERATIONS @app.post("/chain/stop-mining") async : def stop_mining () if 'mining' 'mining' 'mining' None @app.post("/chain/start-mining") async : def start_minig () if not 'mining' 'mining' None 'mining' @app.get("/server/nodes") async : def get_nodes () return 'nodes' @app.post("/server/add_nodes") async : def add_nodes (nodes:NodesModel, request: Request) 'nodes' 'nodes' if 'nodes' '/server/add_nodes' 'nodes' 'nodes' False 'node' f'New nodes added: ' {nodes.nodes} return "success" True ### DEMO OPERATIONS @app.get("/demo/send_amount") async : def send_amount (address_to:str, amount:int, background_tasks: BackgroundTasks) '''Sending amount of coins from server wallet to some other wallet''' 'wallet' 'wallet' 'api' 0 0 try while 'tx' 'output_index' 'amount' 1 except as return "success" False "msg" 0 if 0 1 try except as return "success" False "msg" else if f'Tx added to the stack' '/chain/tx_create' False return "success" True 'Tx already in stack. Skipped.' return "success" False "msg" "Duplicate" ### ON CHAIN OPERATIONS @app.get("/chain/get_amount") async : def get_wallet (address) 'api' return "address" "amount" @app.get("/chain/get_unspent_tx") async : def get_unspent_tx (address) 'api' return "address" "tx" @app.get("/chain/status") async : def status () 'api' if not return 'empty_node' True return 'block_index' 'index' 'block_prev_hash' 'prev_hash' 'block_hash' 'hash' 'timestamp' 'timestamp' @app.get("/chain/sync") async : def sync (from_block:int, limit:int= ) 20 'api' return @app.post("/chain/add_block") async : def add_block (block:BlockModel, background_tasks: BackgroundTasks, request: Request) f"New block arived: # from " {block.index} {request.headers.get( )} 'node' if 'sync_running' f'################### Not added, cause sync is running' return "success" False "msg" 'Out of sync' 'api' if 'index' 1 'sync_running' True f'################### Not added, cause node out of sync.' return "success" False "msg" 'Out of sync' try if except as return "success" False "msg" else if 'Block added to the chain' '/chain/add_block' False 'node' return "success" True 'Old block. Skipped.' return "success" False "msg" "Duplicate" @app.post("/chain/tx_create") async : def add_tx (tx: TxModel, background_tasks: BackgroundTasks, request: Request) f'New Tx arived' 'api' try except as return "success" False "msg" else if f'Tx added to the stack' '/chain/tx_create' False 'node' return "success" True 'Tx already in stack. Skipped.' return "success" False "msg" "Duplicate" @app.on_event("startup") async : def on_startup () 'sync_running' True # sync data before run the node await None # add our node address to connected node to broadcast around network None '/server/add_nodes' 'nodes' '%s:%s' 'host' 'port' False if 'mine' 'mining' None 'mining' @app.on_event("shutdown") async : def on_shutdown () if 'mining' 'mining' #### Utils ########################### : def restart_miner () if 'mining' 'mining' 'mining' None 'mining' if "__main__" import 'Blockchain full node.' '--node' 'Address of node to connect. If not will init fist node.' '--port' True 'Port on which run the node.' '--mine' False 'Port on which run the node.' '--diff' False 'Difficulty' 'difficulty' ' ####### Server address: %s ########' 'db' 'wallet' 'bc' 'api' 'port' 'host' '127.0.0.1' 'nodes' if else '127.0.0.1:%s' 'sync_running' False 'mine' if not "127.0.0.1" True To wrap some additional functionality around blockchain code i added some API layer between node and blockchain. Here we have 3 async tasks: mine, broadcast and sync_data. We should run mining as a separate process, but this will add more code, so right now it just running in the same thread, which is ok for the test. Broadcast is used to spread data across known nodes. And Data Sync getting blockchain from the node on start or if get outperforming block. Mining are running without any stop, block after block, if we dont add any Txs then it will have only coinbase transaction. If we get a new block before mining of the block ends we stop mining and proceed with mining from the new block. All duplicates Tx, Blocks removed and not broadcasted to the network. Code repo: https://github.com/creotiv/full_node_blockchain